ForgeVision Engineer Blog

フォージビジョン エンジニア ブログ

Apache Iceberg Materialized Viewを試してみた② - MVを外部参照する -

こんにちは、開発グループの寺田です。
この記事では前回に引き続き、AWS Glue の Apache Iceberg Materialized View(以下 MV)の検証をまとめています。


はじめに

前回の記事では、AWS Glue Job を使って Apache Iceberg の Materialized View(MV)を作成するところまでを紹介しました。

シリーズ構成
- ① Glue Job で MV を作る(掲載済み)
- ② 外部からデータを参照する ← 本記事
- ③ Lake Formation で参照範囲を制御する(掲載予定)

この記事のゴール

この MV は Glue Data Catalog に登録されており、S3 上に Iceberg 形式でデータが保存されています。 今回はこの MV を Lambda から Athena 経由でクエリして結果を取得するところまでを検証します。

検証環境

コンポーネント 内容
外部参照手段 AWS Lambda(Python)+ Amazon Athena
クエリエンジン Amazon Athena v3
参照対象 Glue Data Catalog 上の sales_summary_mv
メタデータ管理 AWS Glue Data Catalog
ストレージ Amazon S3 汎用バケット

前提条件

IAM ロールに必要な権限

Lambda から Athena 経由で MV を参照する処理は、大きく複数のフェーズに分かれます。それぞれのフェーズで異なる AWS サービスにアクセスするため、実行ロールには複数のサービスに対する権限が必要です。

# 処理 必要な権限
Lambda が Athena にクエリを発行する Athena
Athena がメタデータを解決する Glue Data Catalog
Athena が MV の実データを読む S3(MV データ格納先)
Athena がクエリ結果を S3 に書く S3(クエリ結果出力先)
Lambda がクエリ結果を読み取る S3(クエリ結果出力先)

S3 バケットが 2 種類登場する点に注意が必要です。「クエリ結果出力先」と「MV データ格納先」はそれぞれ別のバケットです。
※今回の検証では以下のバケット名で作成しています。実際の環境に合わせて読み替えてください。

対象 バケット 役割
S3(クエリ結果出力先) poc-athena-results-\<ACCOUNT_ID> Athena がクエリ結果 CSV を書き出す場所。Lambda はここから結果を読む
S3(MV データ格納先) poc-iceberg-warehouse-\<ACCOUNT_ID> MV の実データ(Parquet)が置かれている場所。Athena がクエリ時に直接読む

この 2 つを混同すると「クエリは通るが結果が返ってこない」「AccessDenied で詰まる」といったハマりポイントになります。それぞれ別バケットとして権限を設定してください。

各権限の詳細は以下のとおりです。

① Amazon Athena への権限

アクション 用途
athena:StartQueryExecution クエリを非同期で発行する
athena:GetQueryExecution クエリのステータス(実行中 / 完了 / 失敗)を取得する
athena:GetQueryResults 完了したクエリの結果を取得する

② S3(クエリ結果出力先)への権限

Athena はクエリ結果を CSV 形式で S3 に書き出します。Lambda 側もこのバケットから結果を読む必要があるため、読み書き両方の権限が必要です。

アクション 用途
s3:PutObject Athena がクエリ結果 CSV を書き出す
s3:GetObject Lambda(get_query_results)が結果ファイルを読む
s3:ListBucket バケット内のオブジェクト一覧を取得する
s3:GetBucketLocation Athena がバケットのリージョンを確認する(不足すると InvalidBucketAclError が発生)

③ Glue Data Catalog への権限

Athena はクエリ実行時に Glue Data Catalog を参照して MV のスキーマ・S3 パスを解決します。MV が登録されているデータベースとテーブルへのアクセスが必要です。

アクション 用途
glue:GetDatabase poc_iceberg データベースのメタデータを取得する
glue:GetTable sales_summary_mv のスキーマ・S3 パスを取得する

④ S3(MV データ格納先)への権限

Athena がメタデータを解決した後、実際の Parquet ファイルを読みに行くバケットです。

アクション 用途
s3:GetObject MV の Parquet ファイルを読む
s3:ListBucket バケット内のオブジェクト一覧を取得する

⑤ Lake Formation への権限

Glue Data Catalog が Lake Formation と統合されている環境では、Athena がデータにアクセスする際に Lake Formation 経由で一時的な認証情報を取得します。この権限がないと Athena のクエリが AccessDeniedException で失敗します。

アクション 用途
lakeformation:GetDataAccess Lake Formation が管理するデータへのアクセス認証情報を取得する

今回の検証で使用した IAM ポリシーの全体は以下のとおりです。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AthenaQueryExecution",
      "Effect": "Allow",
      "Action": [
        "athena:StartQueryExecution",
        "athena:GetQueryExecution",
        "athena:GetQueryResults"
      ],
      "Resource": "*"
    },
    {
      "Sid": "S3ResultsBucket",
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:GetBucketLocation"
      ],
      "Resource": [
        "arn:aws:s3:::poc-athena-results-<ACCOUNT_ID>",
        "arn:aws:s3:::poc-athena-results-<ACCOUNT_ID>/*"
      ]
    },
    {
      "Sid": "GlueCatalogAccess",
      "Effect": "Allow",
      "Action": [
        "glue:GetDatabase",
        "glue:GetTable"
      ],
      "Resource": [
        "arn:aws:glue:<REGION>:<ACCOUNT_ID>:catalog",
        "arn:aws:glue:<REGION>:<ACCOUNT_ID>:database/poc_iceberg",
        "arn:aws:glue:<REGION>:<ACCOUNT_ID>:table/poc_iceberg/sales_summary_mv"
      ]
    },
    {
      "Sid": "S3IcebergDataAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::poc-iceberg-warehouse-<ACCOUNT_ID>",
        "arn:aws:s3:::poc-iceberg-warehouse-<ACCOUNT_ID>/*"
      ]
    },
    {
      "Sid": "LakeFormationDataAccess",
      "Effect": "Allow",
      "Action": [
        "lakeformation:GetDataAccess"
      ],
      "Resource": "*"
    }
  ]
}

Lake Formation での権限付与

IAM ポリシーに加えて、Lake Formation 側でも権限を付与する必要があります。今回の検証では以下の 2 種類の権限を付与しました。

📝 IAM と Lake Formation の関係
Lake Formation の権限付与は IAM ポリシーとは独立して管理されます。IAM 側で許可していても Lake Formation 側で権限がなければアクセスは拒否されます。両方の設定が揃って初めてクエリが通ります。

①データベースへの Describe 権限 Athena がクエリ実行時にデータベースのメタデータを参照するために必要です。Lake Formation コンソールの「Grant permissions」から以下の設定で付与します。

MV テーブルへの Select + Describe 権限 Athena が MV のデータを読み取るために必要です。同じ Grant permissions 画面で、Database に加えてテーブルも指定して以下を付与します。

Athena クエリ結果の出力先 S3 バケット

Athena はクエリ結果を S3 に書き出します。あらかじめ出力先バケットを用意し、実行ロールに書き込み権限を付与しておく必要があります。

s3://poc-athena-results-<ACCOUNT_ID>/athena-results/

Lambda スクリプト

基本構成

今回は Lambda 関数として実装しました。Athena へのクエリ実行は非同期のため、以下の 3 ステップが基本になります。

  1. start_query_execution でクエリを発行して QueryExecutionId を受け取る
  2. get_query_execution でステータスをポーリングし、完了を待機する
  3. get_query_results で結果を取得して返す

接続先の設定値(データベース名・S3 パス・カタログ名)は環境変数で管理します。

環境変数 説明
ATHENA_DATABASE クエリ対象のデータベース名 poc_iceberg
ATHENA_RESULTS_PATH クエリ結果の出力先 S3 パス s3://poc-athena-results-\<ACCOUNT_ID>/athena-results/
ATHENA_CATALOG 使用する Data Catalog(省略時は AwsDataCatalog) AwsDataCatalog

スクリプト(全体)

"""
Lambda 関数: Athena 経由で汎用バケット上の Iceberg MV を参照する
"""
import json
import os
import time
import boto3

# 環境変数
ATHENA_DATABASE     = os.environ['ATHENA_DATABASE']      # 例: poc_iceberg
ATHENA_RESULTS_PATH = os.environ['ATHENA_RESULTS_PATH']  # 例: s3://your-bucket/results/
ATHENA_CATALOG      = os.environ.get('ATHENA_CATALOG', 'AwsDataCatalog')

athena = boto3.client('athena', region_name='ap-northeast-1')


def lambda_handler(event, context):
    mv_name = event.get('mv_name', 'sales_summary_mv')
    limit   = int(event.get('limit', 10))

    query = f"SELECT * FROM {mv_name} LIMIT {limit}"

    # クエリ実行
    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': ATHENA_DATABASE,
            'Catalog':  ATHENA_CATALOG  # Glue Data Catalog を使用する場合は AwsDataCatalog
        },
        ResultConfiguration={'OutputLocation': ATHENA_RESULTS_PATH}
    )
    execution_id = response['QueryExecutionId']
    print(f'Query Execution ID: {execution_id}')

    # クエリ完了まで待機(最大 300 秒)
    for _ in range(300):
        status = athena.get_query_execution(QueryExecutionId=execution_id)
        state  = status['QueryExecution']['Status']['State']

        if state == 'SUCCEEDED':
            break
        elif state in ('FAILED', 'CANCELLED'):
            reason = status['QueryExecution']['Status'].get('StateChangeReason', '')
            raise RuntimeError(f'Query {state}: {reason}')

        time.sleep(1)
    else:
        raise TimeoutError('Query did not complete within 300 seconds')

    # 結果取得
    result  = athena.get_query_results(QueryExecutionId=execution_id)
    columns = [col['Label'] for col in result['ResultSet']['ResultSetMetadata']['ColumnInfo']]
    rows    = []
    for row in result['ResultSet']['Rows'][1:]:  # 1 行目はヘッダーのためスキップ
        values = [field.get('VarCharValue', '') for field in row['Data']]
        rows.append(dict(zip(columns, values)))

    print(f'取得件数: {len(rows)}')
    for row in rows:
        print(row)

    return {'count': len(rows), 'data': rows}

📝 実装上のポイント

  • Catalog パラメータ: QueryExecutionContextCatalog を明示することで、Glue Data Catalog(AwsDataCatalog)を使うことを明示できます。省略してもデフォルトで Glue が使われますが、明示することで意図が伝わりやすくなります。
  • ポーリング間隔: time.sleep(1) で 1 秒間隔、最大 300 回(5 分)待機します。クエリが長くなる場合は上限を調整してください。
  • エラーハンドリング: FAILED / CANCELLED の際は StateChangeReason をそのまま RuntimeError に乗せています。Athena コンソールでも同じメッセージが確認できます。
  • 結果件数の上限: get_query_results は 1 リクエストあたり最大 1,000 件です。それ以上を取得する場合はページネーションが必要です。

実行結果の確認

上記スクリプトを実行すると、以下のような出力が得られます。前回の Athena コンソールでの確認結果と同じ内容が、Python スクリプトから取得できていることを確認できます。

Athena のクエリ統計(実行時間・スキャンデータ量)を確認したい場合は get_query_executionStatistics フィールドを参照してください。

 res['QueryExecution']['Statistics']['EngineExecutionTimeInMillis']

📝 Athena 側の出力 S3 バケットについて
OutputLocation に指定した S3 バケットには、クエリごとに CSV ファイルとメタデータファイルが生成されます。 不要なファイルが蓄積しないよう、S3 ライフサイクルルールで自動削除する設定を推奨します。

実装のポイントまとめ

# ポイント
1 start_query_execution は非同期 — ポーリングで完了を待つ
2 クエリ結果の出力先 S3 バケットを必ず指定する
3 get_query_results の上限は 1,000 件 — 大量取得はページネーションを活用する
4 IAM ポリシーは 5 ブロック構成(Athena / S3×2 / Glue / Lake Formation)
5 S3 バケットは 2 種類ある — クエリ結果出力先と MV データ格納先を混同しない
6 s3:GetBucketLocation を忘れると InvalidBucketAclError が発生する
7 Lake Formation 統合環境では IAM だけでなく LF 側の権限付与も必須(DB: Describe / Table: Select + Describe)

まとめ

Lambda(Boto3)+ Athena の組み合わせで、Glue Job が作成した Iceberg MV を外部から参照することができました。

MV を参照レイヤーとして使うことで、外部システムは raw テーブルを意識することなく集計済みデータにアクセスできます。クエリのたびに JOIN・集計処理が走らないため、参照コストも抑えられます。

次回は Lake Formation を使って、MV への参照範囲をロール単位で制御する構成を検証します。


参考リンク


# Apache Iceberg # Materialized Views # AWS # Glue # Athena # Boto3