📦

Pub/Sub の Cloud Storage import Topic について紹介

2024/12/25に公開

こんにちは、クラウドエース株式会社 第一開発部の阿部です。
本日は 2024 年 11 月 6 日にリリースされた Pub/Sub Cloud Storage import Topic について紹介します。
こちらは Jagu'e'r Advent Calendar 2024 の 10 日目の記事としても書きました。(開いていたためそうしました。)

概要

今回リリースされた機能の前に、Pub/Sub の概要について説明します。

Pub/Sub とは

Pub/Sub は、Google Cloud のメッセージングサービスであり、様々なシステム間でメッセージを送受信するためのサービスです。
Pub/Sub はトピックとサブスクリプションという 2 つのリソースで構成されており、アプリケーション間のメッセージ通信だけでなく、Google Cloud のイベント駆動型サービスやデータパイプラインの構築にも利用されます。

Pub/Sub の概要は下記のドキュメントを参照ください。

https://cloud.google.com/pubsub/docs/overview?hl=ja

これまでの Cloud Storage から Pub/Sub への通知する方法について

これまで、Cloud Storage (以降、GCS)にファイルをアップロードした際に、GCS のイベントトリガーを利用して、Pub/Sub に通知する方法は以下の方法がありました。

  • ① GCS の Pub/Sub Notification と Cloud Functions (現在は Cloud Run functions) を利用する方法
  • ② Eventarc のイベントトリガーで Cloud Run を利用する方法

Cloud Storage から Pub/Sub への通知方法

ただし、これらの方法はあくまでアプリケーションプラットフォームへ通知するのみであり、実際に Pub/Sub にパブリッシュする処理はアプリケーションで実装する必要がありました。

Pub/Sub の Cloud Storage import Topic とは

今回リリースされた Pub/Sub Cloud Storage import Topic は、GCS にファイルをアップロードした際に、Pub/Sub がファイルの内容をメッセージとしてインポートしパブリッシュする機能です。
この機能は、アプリケーション実装不要で使用できるため、非常に簡単に GCS から Pub/Sub へのメッセージング処理を実装できます。

Cloud Storage import Topic
Cloud Storage import Topic のイメージ図

Cloud Storage import Topic の詳細は以下のドキュメントを参照ください。

https://cloud.google.com/pubsub/docs/create-cloud-storage-import-topic?hl=ja

Cloud Storage import Topic でインポートできるファイル形式

Cloud Storage import Topic でインポートできるファイル形式は以下の通りです。

  • テキストファイル: 特定の区切り文字(デフォルトは改行文字)で区切られたテキストファイルをインポートします。区切り文字毎にメッセージが生成されます。
  • Avro ファイル: Apache Avro バイナリ形式のファイルをインポートします。
  • Pub/Sub Avro ファイル: Pub/Sub Cloud Storage サブスクリプションで出力した Avro ファイル形式をインポートします。

上記インポート可能なファイル形式の詳細はドキュメントを参照ください。

Pub/Sub Cloud Storage import Topic の作成手順

GCS バケット作成とサービスアカウントへのロール付与

はじめに、Pub/Sub Cloud Storage import Topic がインポート対象とする GCS バケットを作成し、Pub/Sub サービスアカウントに適切なロールを付与します。
インポート用の GCS バケットを作成し、そのバケットの IAM で Pub/Sub サービスアカウント (service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com) に Storage Legacy Object Reader (roles/storage.legacyObjectReader) ロール と Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) ロール を付与すればよいです。

下記はコマンドラインにおける手順の例です。

GCS バケット作成

gcloud storage buckets create gs://BUCKET_NAME --location LOCATION

※BUCKET_NAME はバケット名、 LOCATION はバケットを配置するロケーションに置き換えてください。

Pub/Sub サービスアカウントへのロール付与

Storage Legacy Object Reader ロール付与
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
  --role="roles/storage.legacyObjectReader"
Storage Legacy Bucket Reader ロール付与
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
  --role="roles/storage.legacyBucketReader"

※PROJECT_NUMBER は Google Cloud プロジェクトのプロジェクト番号に置き換えてください。
※プロジェクト番号は gcloud projects describe PROJECT_ID --format="value(projectNumber)" コマンドで、プロジェクト ID から取得できます。

ロールの補足

何故 Storage Legacy Object Reader ロール と Storage Legacy Bucket Reader ロールの 2 つが必要かというと、Cloud Storage import Topic の動作には以下の権限が必要なためです。

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

しかし、これらを 1 つのロールで包括する適当なロールが存在しないため、前述の 2 つのロールを付与する必要があります。
もしロールを 2 つ付与するのが煩雑な場合は、上記の権限を含むカスタムロールを作成し、それを付与することでも対応可能です。

Pub/Sub Cloud Storage import Topic の作成

Pub/Sub の Cloud Storage import Topic を作成するには、gcloud pubsub topics create コマンドに --cloud-storage-ingestion-bucket オプションを指定し、インポート対象の GCS バケットを指定します。
また、--cloud-storage-ingestion-input-format オプションでインポートするファイル形式を指定します。

テキストファイルインポートのトピック作成

gcloud pubsub topics create TOPIC_ID \
  --cloud-storage-ingestion-bucket=BUCKET_NAME \
  --cloud-storage-ingestion-input-format=text \
  --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER \
  --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME \
  --cloud-storage-ingestion-match-glob=MATCH_GLOB \
  --ingestion-log-severity=SEVERITY

--cloud-storage-ingestion-input-format オプションに text を指定することで、テキストファイルをインポートするトピックを作成できます。

--cloud-storage-ingestion-text-delimiter オプションの TEXT_DELIMITER は、テキストファイルの区切り文字を指定します。オプション未指定時は改行文字がデフォルトとして使用されます。
例えば、 #, 等、任意の区切り文字を指定することができますが、複数の区切り文字を指定することはできません。

--cloud-storage-ingestion-minimum-object-create-time オプションの MINIMUM_OBJECT_CREATE_TIME は取り込み対象とする最小のオブジェクト作成時間(つまり、取り込み開始時間)を指定します。
取り込み開始時間を指定することで例えば本番環境のデータ取り込みのタイミングを制御することができます。RFC3339 UTC "Zulu" 形式で指定します。

--cloud-storage-ingestion-match-glob オプションの MATCH_GLOB は、インポート対象のファイルを指定するための GCS のファイルパスを指定します。
例えば、 gs://BUCKET_NAME/import/*.txt として、 import フォルダにあるテキストファイルをインポートするように指定したい場合は、 MATCH_GLOBimport/*.txt を指定します。

--ingestion-log-severity オプションで、取り込み失敗時のログを記録することができます。取り込み失敗時のログの優先度を SEVERITY で指定します。
デフォルトだと取り込み失敗してもログ出力されませんので、必要に応じて指定してください。(指定しておいた方がよいと思います。)

Avro ファイルインポートのトピック作成

gcloud pubsub topics create TOPIC_ID \
  --cloud-storage-ingestion-bucket=BUCKET_NAME \
  --cloud-storage-ingestion-input-format=avro \
  --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME \
  --cloud-storage-ingestion-match-glob=MATCH_GLOB \
  --ingestion-log-severity=SEVERITY

--cloud-storage-ingestion-input-format オプションに avro を指定することで、Avro ファイルをインポートするトピックを作成できます。
--cloud-storage-ingestion-text-delimiter オプションを指定出来ないこと以外は、テキストファイルのインポートと同様のオプションを指定できます。

Pub/Sub Avro ファイルインポートのトピック作成

gcloud pubsub topics create TOPIC_ID \
  --cloud-storage-ingestion-bucket=BUCKET_NAME \
  --cloud-storage-ingestion-input-format=pubsub_avro \
  --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME \
  --cloud-storage-ingestion-match-glob=MATCH_GLOB \
  --ingestion-log-severity=SEVERITY

--cloud-storage-ingestion-input-format オプションに pubsub_avro を指定することで、Pub/Sub Avro ファイルをインポートするトピックを作成できます。
--cloud-storage-ingestion-text-delimiter オプションを指定出来ないこと以外は、テキストファイルのインポートと同様のオプションを指定できます。

Pub/Sub サービスアカウントにトピックパブリッシュのロールを付与

前述の手順で作成したトピックに、Pub/Sub サービスアカウント (service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com) が Pub/Sub Publisher (roles/pubsub.publisher) ロールを付与します。
以下のコマンドラインを実行します。

gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
   --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
   --role="roles/pubsub.publisher"

おまけ: Terraform で Pub/Sub Cloud Storage import Topic を作成する手順

Terraform で Pub/Sub Cloud Storage import Topic を作成することも可能です。
以下は、Terraform で Pub/Sub Cloud Storage import Topic におけるテキストインポートトピックと関連リソースを作成するコードの例です。

Terraform コード例
locals {
  gcs_location    = "asia-northeast1"
  service_account = "service-${data.google_project.current.number}@gcp-sa-pubsub.iam.gserviceaccount.com"
}

data "google_project" "current" {
  project_id = "PROJECT_ID"
}

resource "google_pubsub_topic" "main" {
  name = "pubsub-gcs-import"

  ingestion_data_source_settings {
    cloud_storage {
      bucket = google_storage_bucket.main.name
      text_format {}
      #avro_format {} ## Avro ファイルインポートの場合はこの行を有効化
      #pubsub_avro_format {} ## Pub/Sub Avro ファイルインポートの場合はこの行を有効化
      match_glob = "import/**.txt"
      #match_glob = "import/**.avro" ## Avro ファイル、Pub/Sub Avro ファイルインポートの場合はこの行を有効化
    }

    platform_logs_settings {
      severity = "WARNING"
    }
  }
}

resource "google_pubsub_topic_iam_member" "sa_publisher" {
  topic  = google_pubsub_topic.main.id
  member = "serviceAccount:${local.service_account}"
  role   = "roles/pubsub.publisher"
}

resource "google_storage_bucket" "main" {
  name                     = "IMPORT_BUCEKT_NAME"
  location                 = local.gcs_location
  force_destroy            = true
  public_access_prevention = "enforced"
}

resource "google_storage_bucket_iam_member" "sa_object_viewer" {
  bucket = google_storage_bucket.main.name
  member = "serviceAccount:${local.service_account}"
  role   = "roles/storage.objectViewer"
}
resource "google_storage_bucket_iam_member" "sa_legacy_bucket_reader" {
  bucket = google_storage_bucket.main.name
  member = "serviceAccount:${local.service_account}"
  role   = "roles/storage.legacyBucketReader"
}

実際のインポート操作

Pub/Sub の Cloud Storage import Topic を作成した後は、インポート対象の GCS バケットの取り込み先(MATCH_GLOB で指定したルール)にファイルをアップロードすることで、自動的に Pub/Sub にメッセージがパブリッシュされます。
以下は、実際にインポートして検証した際の動作について説明します。

テキストファイルインポートの動作

テキストファイルのインポートの動作は、改行文字毎にメッセージが生成されます。
この様子は下記の画像の通りで、動作がイメージしやすいと思います。

テキストファイルインポートの動作
改行文字毎にメッセージが生成される様子

一方、テキストファイルのインポートの区切り文字を改行文字以外を指定した場合は、指定した区切り文字毎にメッセージが生成されます。
このとき、改行文字は無視されるため、例えば # を指定した際に改行と # を含む場合は以下のようにメッセージが生成されます。

テキストファイルインポートの動作
改行文字以外の区切り文字毎にメッセージが生成される様子

こうした動作になるため、改行文字以外を指定する場合は、前述のようなメッセージ生成の挙動を考慮して設定する必要があります。
できれば 改行文字+別の区切り文字の組み合わせでメッセージを生成するようなオプションがあると便利だと思いました。

なお、私が検証した限りでは Pub/Sub Topic Schema を使ったテキストデータを構造化しながらのインポートはできないようでした。
例えば、 1 行 1 JSON(NDJSON 形式)のデータをインポートし、BigQuery Subscription を使用してメッセージをスキーマで構造化した状態でデータを保存することはできません。
もしこれができると、全て Google Cloud のリソースの組合せだけで JSON データベースで BigQuery の自動ロード処理が作れるので、今後に期待したいです。

Avro ファイルインポートの動作

Avro ファイルのインポート動作は、Avro のスキーマに従ってレコード毎にメッセージが出力されます。

Avro ファイルインポートの動作
Avro ファイルのスキーマに従ってメッセージが生成される様子

インポートされたメッセージはスキーマ無し Avro バイナリ形式で出力されるため、メッセージをデコードする際には Avro のスキーマを使用してデコードする必要があります。

また、こちらについても私が検証した限りでは Pub/Sub Topic Schema を使った Avro データを構造化しながらのインポートはできないようでした。
そのため、Avro のスキーマを BigQuery Subscrition で使用してデータを構造化して保存できませんでした。
(なんとなくできそうな予感があるものの、可能な手順を見つけられませんでした。)

Pub/Sub Avro ファイルインポートの動作

Pub/Sub Avro ファイルのインポート動作は、Pub/Sub Cloud Storage サブスクリプションで出力した Avro ファイル形式のスキーマを厳密に満たす必要があること以外は、Avro ファイルのインポートと同様の動作となります。
また、インポートされたメッセージはスキーマ無し Avro バイナリ形式で出力されるところも同様です。
なお、Pub/Sub Avro ファイルのスキーマと異なる場合はインポートされません。
このときの Pub/Sub Avro ファイルのスキーマは、Cloud Storage サブスクリプションのファイル形式のドキュメントを参照してください。

この形式のファイルインポートを使うユースケースは、自分でスキーマを定義して Avro ファイルを作成するというよりは、 Cloud Storage Subscription で出力した Avro ファイルを再利用する、といったことが考えられます。

まとめ

Pub/Sub の Cloud Storage import Topic について紹介しました。
Cloud Functions や Cloud Run によるインポート処理を完全に代替できるわけではありませんが、簡単に GCS バケットから Pub/Sub へのメッセージング処理を実装できるため、要件に合えばデータパイプラインの構築やイベント駆動型サービスの構築において有用な機能だと思います。
テキストファイルのインポート動作、特に Pub/Sub Schema の対応については今後のアップデートに期待したいところです。

この記事が、Pub/Sub を使う方の参考になれば幸いです。

Discussion