ForgeVision Engineer Blog

フォージビジョン エンジニア ブログ

Amazon Athenaデータソースコネクターを使ってみた ~ AthenaElasticsearchConnector編 ~

こんにちは、クラウドインテグレーション事業部の平原です。

皆様は、目玉焼きには醤油とソースのどちらをかける派でしょうか?
私は醤油派です。たまにソースにしたりもしますが。醤油とソース以外の方もいると思いますし、お好みの物をかけて食べるのが一番ですね!

といったところで今回は、Amazon Athenaとデータソース(RDSやOpenSearchなど)を連携するデータソースコネクターについて記事にします。
お好みのAWSサービスを利用してデータ分析を行っている方がいると思いますが、Amazon Athenaにも興味をもって貰えればと思います。

はじめに

今回の記事は、データソースコネクターのAthenaElasticsearchConnector編です。
別の記事で、MySQLのデータソースと連携するAthenaMySQLConnector編も記事にしているので、そちらも見てもらえると嬉しいです。

使用可能なデータソースコネクターは下記ページが参考になります。 docs.aws.amazon.com

なおOpenSearchは、MySQLを利用するAmazon Athenaとは違い、JSON形式でデータを保存します。
その為、データ型のマッピングなどに留意が必要となります。その点については下記ページが参考になります。 docs.aws.amazon.com

また、参考程度となりますが、CloudFormationテンプレートの記載についても記述します。

データソースコネクターの作成

AWSマネージメントコンソールからデータソースコネクターを作成する方法を記載します。

  1. AWSマネージメントコンソールのAthenaページに遷移する

  2. 左ペインのデータソースを選択し、「データソースの作成」をクリックする

  3. "データソースを選択"で、"Amazon OpenSearch"をチェックし、「次へ」をクリックする

  4. "データソースの詳細を入力"で、以下を入力し、「Lambda関数の作成」をクリックする
    (AWS Lambdaアプリケーション画面が開く)
     ・データソース名: 任意の名前
     ・説明: 任意の説明(入力しなくても良い)

  5. AWS Lambdaアプリケーション AthenaElasticsearchConnectorの"設定とデプロイ"で、"アプリケーションの設定"に以下の様に入力し、「デプロイ」をクリックする
    ※ データソースコネクターのアプリケーションは、AWS CloudFormationを利用してデプロイされる

     ・アプリケーション名: AWS CloudFormationスタック名
     ・AthenaCatalogName: AWS Lambdaの関数名
     ・IsVPCAccess: Amazon OpenSearchをVPC内に配置したか否か   VPC内に配置している場合、"true"
     ・SecretNamePrefix: RDSの認証に使用しているAmazon Secrets Managerのシークレット名のプレフィックス
      この値が、AWS Lambda関数のIAMロールのポリシーで使用される
       例) シークレット名が"test-rds-secret"の場合、ここの値は"test"にする
     ・SpillBucket: Amazon S3バケット名
      Amazon Athenaのクエリ実行が、AWS Lambda関数のTime Out時間を超える場合、一時的に使用されるバケット
     ・AutoDiscoverEndpoint: Amazon OpenSearch Serviceの全ドメインに自動的に接続するか否か
      自動的に接続する場合、"true"。"false"場合、後述の"DomainMapping"に接続するドメインを明示的に設定する必要あり  ・DisableSpillEncryption: SpillBucketの暗号化を無効とするか否か
      "false"の場合、暗号化が有効
     ・DomainMapping: データソースコネクターから接続するOpenSearchエンドポイント
      [書式] <表示されるデータベース名>=https://${<AWS Secrets Managerシークレット名>}:<OpenSearchドメインのエンドポイント>
       ","カンマ区切りで複数設定可能。
       例) test-opensearch1=https://${test-opensearch-secret}:search-test-opensearch-xxxxxxxxxx1234567890yyyyyy.ap-northeast-1.es.amazonaws.com,test-opensearch2=・・・
     ・LambdaMemory: AWS Lambdaのメモリサイズ
     ・LambdaTimeout: AWS Lambda関数のTime Out時間
     ・PermissionsBoundaryARN: AWS Lambda関数に割り当てるIAMロールに設定するPermissionsBoundaryポリシーのARN
     ・QueryScrollTimeout: OpenSearchのScroll機能を利用してデータを取得する際のタイムアウト期間
     ・QueryTimeoutCluster: 並列スキャンを利用するクエリを生成するタイムアウト期間
      Athena OpenSearch コネクタは、シャードベースの並列スキャンをサポートしている
     ・QueryTimeoutSearch: OpenSearchのデータを取得する際のタイムアウト期間
     ・SecurityGroupIds: AWS Lambda関数にアタッチするセキュリティグループのID
     ・SpillPrefix: SpillBucketが利用される際の保存先プレフィックス
     ・SubnetIds: AWS Lambda関数が作成されるVPCサブネットのID
     ・このアプリがカスタム IAM ロールとリソースポリシーを作成することを承認します: チェックする

  6. CloudFormationスタック作成後、Amazon Athenaの"データソースの詳細を入力"画面に戻る

  7. "接続の詳細"で、作成したAWS Lambda関数を選択し、「次へ」をクリックする
     必要に応じて、更新ボタンをクリック

  8. "確認と作成"で、設定内容を確認し「データソースの作成」をクリックする

※ "SecurityGroupIds"や"SubnetIds"から分かるように、AWS Lambda関数はVPC Lambdaとなる
※ AWS Lambda関数は、権限はもちろんだが、ネットワーク的にAWS APIにアクセスできる設定にする必要あり
 AWS Lambda関数をPublicサブネットに配置したが、Public IPアドレスが振られておらず"Network Failure"エラーとなり、ハマりました...

データソースコネクター設定後の確認

  1. Amazon Athenaのデータソースコネクター画面にて、一覧から作成したデータソースコネクターをクリックする

  2. "関連付けられたデータベース"で、意図したデータベースが表示されることを確認する

  3. 任意のデータベースをクリックし、"関連付けられたテーブル"に意図したテーブルが表示されることを確認する

  4. Amazon Athenaの左ペインよりクエリエディタを選択し、遷移する

  5. 上部の"設定"タブにて、クエリ結果の出力先とするS3バケットを設定する

  6. 上部の"エディタ"タブにて、"データソース": 作成したデータソース / "データベース": 任意のデータベースを選択する

  7. クエリのテキストに、クエリ文を記載し、「実行」をクリックする

AthenaElasticsearchConnectorの留意点

AthenaMySQLConnectorとの違いやデータがJSON形式であることからの留意点を記載します。

  • AthenaElasticsearchConnectorの設定には、AWS Lambda関数に割り当てるロールを指定できない
     設定値としてLambdaRoleARNがない為、自動作成されるIAMロールを使用することとなる
     独自の権限追加が必要な場合、自動作成後に権限を追加するオペレーションとなる

  • AthenaElasticsearchConnectorで参照するOpenSearch向けのAWS Secrets Managerのシークレットは、プレーンテキストで登録する必要あり
     DomainMappingで参照先として指定するシークレットは、プレーンテキストで"username@password"の形式で登録する  

  • MySQLデータとJSON形式データの型マッピングに注意する
     私が利用時に経験したエラーについて、記載します。
     a. Object(STRUCT)型は、"null" が許可されない
      OpenSearchのデータとしては"null"でも問題ないが、Athenaからクエリする場合エラーとなる
      OpenSearchにデータを挿入する際"null"とならない様にアプリケーション側を改修して対応
     b. LIST型となるデータは、OpenSearchのMappingに"_meta"情報を設定する必要あり
      AthenaElasticsearchConnectorでデータを変換する際、LIST型データは"_meta"情報を利用すると思われる
      OpenSearchのMapping(Indexテンプレート)設定に"_meta"情報を追加することで対応

CloudFormation記載例

参考程度ですが、CloudFormationでの記載例を記述します。
(このまま利用してもCloudFormationは成功しません)

AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31

Resources:
  # --------------------------------------------------------------
  # Lambda Application: Connector
  # --------------------------------------------------------------
  AthenaOpensearchConnector:
    Type: AWS::Serverless::Application
    Properties:
      Location:
        ApplicationId: arn:aws:serverlessrepo:us-east-1:292517598671:applications/AthenaElasticsearchConnector # 利用リージョンに関わらず、ここは"us-east-1"
        SemanticVersion: 2023.30.1 ###### バージョンを指定
      Parameters: 
        AthenaCatalogName: 'test-athena-opensearch-connector'
        SecretNamePrefix: 'test'
        SpillBucket: 'test-athena-opensearch-connector-bucket'
        LambdaTimeout: '900'
        LambdaMemory: '3008'
        DisableSpillEncryption: "false"
        AutoDiscoverEndpoint: "false"
        DomainMapping: '<表示されるデータベース名>=https://${!<AWS Secrets Managerシークレット名>}:<OpenSearchドメインのエンドポイント>'
        QueryTimeoutCluster: '10' # default
        QueryTimeoutSearch: '720' # default
        QueryScrollTimeout: '60' # default
        PermissionsBoundaryARN: ''
        IsVPCAccess: 'true'
        SecurityGroupIds: 'sg-xxxxxx'
        SubnetIds: 'subnet-aaaaaa, subnet-bbbbbb'

  # --------------------------------------------------------------
  # Athena
  # --------------------------------------------------------------
  AthenaMysqlDataCatalog:
    Type: AWS::Athena::DataCatalog
    Properties: 
      Name: "test-athena-opensearch-datasource"
      Description: Athena - Aurora Opensearch DataSource
      Parameters: 
        function: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:test-athena-opensearch-connector"
      Type: "LAMBDA"

※ 下記の部分は、"${}"がテンプレート内のパラメータと判定されない為に、"!"でエスケープしています

DomainMapping: '<表示されるデータベース名>=https://${!<AWS Secrets Managerシークレット名>}:<OpenSearchドメインのエンドポイント>'

さいごに

AthenaElasticsearchConnectorは、AthenaMySQLConnectorと比べると留意すべき点が多いかなと感じました。
事前に上述した参考ページを一読しておくのが良いかなと思います。
本ブログを見て、Amazon Athenaのデータソースコネクターに興味を持った方がいたら幸いです!