GKE上にAirbyteを構築しSaaSデータ連携をリプレイスした話

OGP

はじめに

こんにちは、MLデータ部データ基盤ブロックの仲地です。初めてのテックブログへの投稿になります。主に業務ではデータ基盤の開発・運用を担当しています。

データ基盤ブロックではELTツールであるAirbyteを導入し、一部のデータ転送パイプラインをリプレイスしました。本記事ではそのAirbyteの構築方法と運用するにあたって工夫した点を紹介します。

目次

背景

現在、当社のデータ基盤には、自社運用の基幹DBからのデータ連携だけでなく、使用中のSaaSからのデータ連携も含まれています。基幹DBからのデータ連携やデータ基盤については、以前のブログに詳細があります。

techblog.zozo.com

基幹DBからのデータ連携以外にも、各業務で必要なSaaSにおけるデータを、データ基盤へ連携してほしいという要望がありました。SasSのデータ連携において、Airybte導入以前に用いていたDMP (Data Management Platform)は必要な機能以上のスペックが備わっていました。例えばそのDMPで行えるデータ転送において、任意のカラムによるフィルタやキーワードによる抽出条件の変更などができました。弊社におけるデータ基盤において、エンドユーザーである利用者が用いる分析基盤はBigQueryであり、そのDMPからBigQueryへのデータ転送が実施されていました。そしてこの転送プロセスにおいて抽出条件などを設けていたことで、日々のデータ転送時に欠損が生じたときに調査を困難にしていた課題などがありました。そのDMPは分析基盤として優秀ですが、BigQueryをデータ分析基盤と固めた今、その中間処理は複雑化する一因になっていました。またそのサービスを活かしきれてなく金銭的コストもかかっていました。不要な中間処理を省き、データ転送パイプラインの簡素化が求められていました。

Airbyte

AirbyteはOSSなELTツールです。数多くあるELTツールの中でも、今回Airbyteを選択した理由を軸にAirbyteを紹介します。

OSS

選択の理由の1つとして、AirbyteがOSSであることです。OSSは柔軟で拡張可能なフレームワークやプラットフォームを提供しています。これにより、自身のニーズに合わせてソフトウェアをカスタマイズし拡張できます。またソースコードが公開されているため、OSSは透明性が高いです。バグや脆弱性を自身で発見し、コミュニティを通じて迅速に対応できます。そのほかにもOSSの採用は、ベンダーロックインから解放される手段となります。システムのアップデートやバグの対応、セキュリティパッチなどはベンダー依存になり、ベンダーロックインの懸念がありましたが、OSSであれば自身で対応できます。

Connectorの豊富さ

次の理由はConnectorの豊富さです。データ取得元であるデータソースが豊富であり、また転送先であるDWHやストレージなどが多いという特徴があります。実際のデータソースや転送先のサービス一覧は、下記のドキュメントをご覧ください。

airbyte.com

今回、移行前に連携していたSaaSが全てAirbyteのConnectorとして既に実装済みだったのも選択理由の1つでした。

ETLではなくEL(T)

AirbyteはEL(T)ツールだと自身で説明しています。詳しくは下記の記事をご覧ください。

airbyte.com

従来よく使われていたデータ転送パイプラインは、データ取得元であるデータソースからデータを取得し、そのデータの中間処理を行い、宛先テーブルに注入するという流れでした。いわゆるETL(Extract, Transform, Load)です。しかし近年では、クラウド環境におけるデータウェアハウスの発展が著しく、データのサイズを懸念せずにデータを保持することが容易になりました。弊社では基幹DBからのデータ連携は一度、BigQueryに未加工のデータを取り込み、マシンパワーが強いBigQuery上でデータを加工しています。この方針にもAirbyteのEL(T)ツールは適していました。

コミュニティが活発

コミュニティが活発であることも選択理由の1つです。AirbyteのコミュニティはSlackやGitHubのIssueなどで活発に議論が行われています。また、Airbyteの開発チームもコミュニティに参加しており、Issueの対応やPull Requestのレビューなどを行っています。私自身もAirbyteのSlackに参加し、構築時に発生した問題の質問や、機能の追加要望などを行いました。その際には開発チームからの返答もあり、コミュニティの活発さを実感しました。AirbyteのGitHubのリポジトリを見ると、GitHubのスター数やコミット数なども多く、今後も機能の追加やバグの修正が期待できます。

github.com

GCP上でAirbyteを構築

全体構成

Airbyteの構築にあたって、GCP上のリソースを下記図のように構築しました。

インフラ構成図

以下が各コンポーネントの概説です。

  • GKE : Airbyteのコンテナをデプロイするために使用しました。本記事ではGKEのAutopilotを使用しています。
  • IAP : GKE上のAirbyteのWeb UIへアクセスするために使用しました。ユーザーレベルの制御を行えます。
  • Cloud SQL : Airbyteのメタデータを保存するために使用しました。可能な限り、コンテナをステートレス化したかったため使用しました。
  • Cloud Storage : Airbyteのログを保存するために使用しました。Cloud SQLと同様に、コンテナをステートレス化するために使用しました。
  • Cloud Composer : Airbyteのデータ転送パイプラインのスケジューリングを行うために使用しました。本記事では構築方法は紹介しません。

次節より詳しい構築方法を紹介します。

Terraform

GCP上のリソースを管理するために、Terraformを用いました。Terraformのコードを下記に紹介します。

Network関連

#--------------------------#
# data
#--------------------------#
data "google_compute_network" "vpc" {
  name    = "vpc-${local.project}"
  project = local.project
}

data "google_dns_managed_zone" "zone" {
  name = "zozo-zone"
}

#--------------------------#
# Cloud NAT
#--------------------------#
resource "google_compute_router" "vpc_router" {
  name    = "${local.project}-vpc-router"
  region  = local.region
  network = data.google_compute_network.vpc.self_link
}

resource "google_compute_address" "nat_ip" {
  name   = "${local.project}-nat-ip"
  region = local.region
}

resource "google_compute_router_nat" "nat_gateway" {
  name                               = "${local.project}-nat-gateway"
  router                             = google_compute_router.vpc_router.name
  region                             = local.region
  nat_ip_allocate_option             = "MANUAL_ONLY"
  nat_ips                            = [google_compute_address.nat_ip.self_link]
  source_subnetwork_ip_ranges_to_nat = "ALL_SUBNETWORKS_ALL_IP_RANGES"
}


#--------------------------#
# Subnet
#--------------------------#
resource "google_compute_subnetwork" "airbyte" {
  name                     = "subnet-${local.project}-airbyte"
  project                  = local.project
  region                   = local.region
  network                  = data.google_compute_network.vpc.id
  private_ip_google_access = true
  ip_cidr_range            = local.subnet_cidr_range_airbyte
  secondary_ip_range {
    range_name    = "pods"
    ip_cidr_range = local.subnet_pods_secondary_cidr_range_airbyte
  }
  secondary_ip_range {
    range_name    = "services"
    ip_cidr_range = local.subnet_services_secondary_cidr_range_airbyte
  }
}

resource "google_compute_subnetwork" "subnet_proxy_only" {
  name          = "subnet-${local.project}-proxy-only"
  project       = local.project
  region        = "us-central1"
  network       = google_compute_network.vpc.id
  ip_cidr_range = local.subnet_cidr_range_proxy_only
  purpose       = "REGIONAL_MANAGED_PROXY"
  role          = "ACTIVE"
}

#--------------------------#
# Firewall
#--------------------------#
resource "google_compute_firewall" "firewall_proxy_connection" {
  name    = "firewall-proxy-connection-${local.project}"
  project = local.project
  network = google_compute_network.vpc.name

  allow {
    protocol = "tcp"
    # Now, Internal Load Balancer is used in airbyte only. So, the following port is specified.
    ports = ["8001"]
  }
  source_ranges = [local.subnet_cidr_range_proxy_only]
}

既に存在するVPCに対して、SubnetやFirewallの設定しています。また、GKE上で外部へ通信するために、Cloud NATを設定しています。連携するSaaSにおいてIPアドレスの制限があったため、Cloud NATのIPアドレスは固定IPアドレスを設けました。

GKE

#--------------------------#
# GKE Cluster(Autopilot)
#--------------------------#
resource "google_container_cluster" "airbyte" {
  name = "airbyte-cluster"

  enable_autopilot = true

  location   = local.region
  network    = data.google_compute_network.vpc.self_link
  subnetwork = google_compute_subnetwork.airbyte.self_link

  networking_mode = "VPC_NATIVE"
  ip_allocation_policy {
    cluster_secondary_range_name  = "pods"
    services_secondary_range_name = "services"
  }

  private_cluster_config {
    enable_private_nodes    = true
    enable_private_endpoint = false
    master_ipv4_cidr_block  = local.gke_master_ipv4_cidr_block

    master_global_access_config {
      enabled = true
    }
  }

  # ref https://qiita.com/inductor/items/e60be2b1b33347dc0c21
  maintenance_policy {
    recurring_window {
      start_time = "2020-05-05T20:00:00Z" # 05:00 JST
      end_time   = "2020-05-06T00:00:00Z" # 09:00 JST
      recurrence = "FREQ=WEEKLY;BYDAY=MO,TU,WE,TH"
    }
  }

  release_channel {
    channel = "STABLE"
  }

}

GKEはAutopilotを使用しています。AutopilotはGKEのマネージドサービスであり、マスタノードの管理やノードプールの管理などをGCPが行ってくれます。また、GKEのバージョンアップなどもGCPが行ってくれます。GKEの管理にかかるコスト削減を図ります。

Cloud SQL関連

#--------------------------#
# Cloud SQL
#--------------------------#
# private services access
resource "google_compute_global_address" "private_ip_alloc_google_managed_service" {
  name          = "google-managed-services-${data.google_compute_network.vpc.name}"
  purpose       = "VPC_PEERING"
  address_type  = "INTERNAL"
  prefix_length = tonumber(element(split("/", local.cidr_google_managed_services), 1))
  network       = data.google_compute_network.vpc.id
  address       = element(split("/", local.cidr_google_managed_services), 0)
}

resource "google_service_networking_connection" "private_service_connection_google_managed_service" {
  network                 = data.google_compute_network.vpc.id
  service                 = "servicenetworking.googleapis.com"
  reserved_peering_ranges = [google_compute_global_address.private_ip_alloc_google_managed_service.name]
}

resource "google_sql_database_instance" "airbyte_primary" {
  name             = "db-airbyte"
  region           = local.region
  database_version = "POSTGRES_14"

  settings {
    tier = local.postgre_instance_type
    ip_configuration {
      ipv4_enabled    = false
      private_network = data.google_compute_network.vpc.id
    }
    backup_configuration {
      point_in_time_recovery_enabled = true
      enabled                        = true
      start_time                     = "17:00" # JST:02:00
    }
    availability_type = "REGIONAL"
  }
  depends_on = [google_service_networking_connection.private_service_connection_google_managed_service]
}

resource "google_sql_database" "airbyte" {
  name      = "db-airbyte"
  instance  = google_sql_database_instance.airbyte_primary.name
}


resource "google_sql_user" "airbyte_user" {
  name     = "airbyte_k8s"
  instance = google_sql_database_instance.airbyte_primary.name
  # NOTE: ダミーのパスワードを設定しておき、後から手動で変更する
  password = "DummyPassword"
  lifecycle {
    ignore_changes = [password,]
  }
}

Airbyteの公式の構築手順では、Kubernetesのコンテナ内にPostgreSQLを構築をしていますが、本記事ではコンテナをステートレス化するためにCloud SQLを使用しました。Cloud SQLは管理が容易で、データベースのパフォーマンスの監視やメンテナンスがクラウドプロバイダによって自動的に行われます。またCloud SQLは定期的に自動バックアップを行い、必要に応じてこれを使用してデータベースを復元できます。これらのマネージドサービスのメリットを授かり、運用にかかるコスト削減を試みました。

また、GKE上からCloud SQLのプライベートIPアドレスで接続するために、プライベートサービスアクセスを構築しています。詳しくは下記のドキュメントをご覧ください。

cloud.google.com

cloud.google.com

Compute Address, Cloud DNS, ManagedCertificate

#--------------------------#
# Compute Address
#--------------------------#
resource "google_compute_global_address" "airbyte_webapp" {
  name   = "${local.project}-airbyte-webapp"
}

resource "google_compute_address" "airbyte_server_internal_address" {
  name         = "${local.project}-airbyte-server"
  subnetwork   = google_compute_subnetwork.airbyte.id
  address_type = "INTERNAL"
  region       = "us-central1"
}

#--------------------------#
# Cloud DNS
#--------------------------#
resource "google_dns_record_set" "airbyte_webapp" {
  name         = "airbyte.${data.google_dns_managed_zone.zone.dns_name}"
  managed_zone = data.google_dns_managed_zone.zone.name
  type         = "A"
  ttl          = 300

  rrdatas = [google_compute_global_address.airbyte_webapp.address]
}

resource "google_dns_record_set" "airbyte_server_internal" {
  name         = "internal.airbyte-server.${google_dns_managed_zone.zone.dns_name}"
  managed_zone = google_dns_managed_zone.zone.name
  type         = "A"
  ttl          = 300

  rrdatas = [google_compute_address.airbyte_server_internal_address.address]
}

#--------------------------#
# ManagedCertificate
#--------------------------#
resource "google_compute_managed_ssl_certificate" "airbyte_webapp_cert" {
  name = "${local.project}-airbyte-webapp-managed-cert"

  managed {
    domains = ["airbyte.${data.google_dns_managed_zone.zone.dns_name}"]
  }
}

Compute Address, Cloud DNS, ManagedCertificateを設定しています。Compute Addressは、GKE上のIngressで使用します。1つは、AirbyteのWeb UIへアクセスするために使用し、もう1つは、Cloud ComposerからAirbyteのAPIへアクセスするために静的内部IPアドレスを設定しています。詳しくは、Airflowによるスケジュール実行節にて紹介します。Cloud DNSは、Compute Addressの名前解決のために使用します。ManagedCertificateは、Compute Addressに対して証明書を発行するために使用します。

IAP

#--------------------------#
# IaP
#--------------------------#
resource "google_iap_brand" "iap_brand" {
  support_email     = "{自身の適当なメールアドレス}"
  application_title = "Cloud IAP for ${local.project}"
}

resource "google_iap_client" "airbyte_iap_client" {
  display_name  = "Airbyte OAuth Client"
  brand         =  google_iap_brand.iap_brand.name
}

IAPは、AirbyteのWeb UIへアクセスするために使用します。ここで弊社が用いているVPNのIP制限によるアクセス制御も検討しましたが、ユーザーレベルの制御が行いやすい、GCPのIAPを用いました。IAPはGCPのリソースに対して認証する機能で、GCPの認証情報を持つユーザーのみがアクセスできるようになります。

Cloud Storage

#--------------------------#
# Cloud Storage
#--------------------------#
resource "google_storage_bucket" "airbyte_log" {
  name     = "${local.project}-airbyte-log"
  location = "US"

  uniform_bucket_level_access = true
}

resource "google_storage_bucket" "airbyte_bq_staging" {
  name     = "${local.project}-airbyte-bq-staging"
  location = "US"

  uniform_bucket_level_access = true
}

Airbyteのログを保存するために、Cloud Storageを使用しました。また、BigQueryへのデータ転送の際に一時的にデータを保存するために、Cloud Storageを追加しています。

Service Accountと権限関連

#--------------------------#
# Service Account (IAM)
#--------------------------#
resource "google_service_account" "airbyte_app" {
  account_id   = "airbyte-app"
  display_name = "Service Account for Airbyte Application"
}

resource "google_project_iam_member" "airbyte_app_bq_data_editor" {
  role    = "roles/bigquery.dataEditor"
  member  = "serviceAccount:${google_service_account.airbyte_app.email}"
  project = local.project
}

resource "google_project_iam_member" "airbyte_app_bq_user" {
  role    = "roles/bigquery.user"
  member  = "serviceAccount:${google_service_account.airbyte_app.email}"
  project = local.project
}

resource "google_storage_bucket_iam_member" "airbyte_app_storage_admin_log" {
  bucket = google_storage_bucket.airbyte_log.name
  role   = "roles/storage.admin"
  member = "serviceAccount:${google_service_account.airbyte_app.email}"
}

resource "google_storage_bucket_iam_member" "airbyte_app_storage_admin_bg_staging" {
  bucket = google_storage_bucket.airbyte_bq_staging.name
  role   = "roles/storage.admin"
  member = "serviceAccount:${google_service_account.airbyte_app.email}"
}

resource "google_iap_web_iam_member" "airbyte_app_access_service_account" {
  role   = "roles/iap.httpsResourceAccessor"
  member = "serviceAccount:${google_service_account.airbyte_app.email}"
}

resource "google_iap_web_iam_member" "airbyte_app_access_members" {
  for_each = toset(local.airbyte_app_access_members)
  role = "roles/iap.httpsResourceAccessor"
  member = each.value
}

# Bind GSA to KSA
resource "google_service_account_iam_member" "airbyte_app_k8s_iam_default" {
  service_account_id = google_service_account.airbyte_app.name
  role               = "roles/iam.workloadIdentityUser"
  member             = "serviceAccount:${local.project}.svc.id.goog[default/default]"
}

resource "google_service_account_iam_member" "airbyte_app_k8s_iam_airbyte_admin" {
  service_account_id = google_service_account.airbyte_app.name
  role               = "roles/iam.workloadIdentityUser"
  member             = "serviceAccount:${local.project}.svc.id.goog[default/airbyte-admin]"
}

AirbyteのコンテナからGCPのリソースへアクセスするために、Service Accountを追加と権限の借用の設定をしています。ここで、2つのKubernetes Service Account (以下、KSA)に対して、同一のGoogle Service Account (以下、GSA)の権限を借用する設定をしています。ServiceAccountの権限借用節で詳しく説明します。

また、AirbyteのWeb UIへアクセスするために、特定のGoogle Accountのみに対してiap.httpsResourceAccessorのロールを付与しています。同様にサービスアカウントにも同じロールを付与していますが、データ転送パイプラインの設定情報の反映節において紹介する認証トークンを取得するために追加しています。

次に各環境に依存する変数を定義した、locals のみを定義したファイルを紹介します。

local.tf

locals {
  project = "airbyte-project-prd"
  region  = "us-central1"

  subnet_cidr_range_airbyte                     = "{SubnetのCIDR}"
  subnet_pods_secondary_cidr_range_airbyte      = "{Pods SecondaryのCIDR}"
  subnet_services_secondary_cidr_range_airbyte  = "{Services SecondaryのCIDR}"
  gke_master_ipv4_cidr_block                    = "{GKE MasterのCIDR}"
  cidr_google_managed_services                  = "{Google Managed ServicesのCIDR}"

  postgre_instance_type = "db-custom-1-3840"

  # AirbyteのWeb UIにアクセス可能なGoogle Accountのメールアドレス
  airbyte_app_access_members = [
    "user:example@example.com"
  ]
}

Kubernetesのマニフェスト

いくつかAirbyteオリジナルのKubernetesのKustomizeを変更したので、その設定例を紹介します。また詳しくはHelm化節で紹介しますが、Kubernetesのデプロイは現在Helmを使った方法が推奨されています。本記事ではKustomizeを用いている、Airbyteの最終Versionであるv0.40.32のkubeディレクトリとの差分を紹介します。

最初にディレクトリ構成の紹介します。

kube
├── overlays
│   ├── dev
│   │   ├── dev.yaml
│   │   ├── kustomization.yaml
│   │   └── set-resource-limits.yaml
│   └── prd
│       ├── kustomization.yaml
│       ├── prd.yaml
│       └── set-resource-limits.yaml
└── resources
    ├── admin-service-account.yaml
    ├── bootloader.yaml
    ├── connector-builder-server.yaml
    ├── cron.yaml
    ├── default-service-account.yaml
    ├── kustomization.yaml
    ├── pod-sweeper.yaml
    ├── secret-gcs-log-creds.yaml
    ├── server.yaml
    ├── temporal.yaml
    ├── volume-configs.yaml
    ├── webapp.yaml
    └── worker.yaml

公式のstable-with-resource-limitsを参考にしながら、各環境に合わせて変更を加えています。

最初に各環境共通であるresourcesディレクトリ内のマニフェストの差分設定を紹介します。

kustomization.yaml

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

resources:
  - bootloader.yaml
  - connector-builder-server.yaml
  - cron.yaml
  - pod-sweeper.yaml
  - admin-service-account.yaml
  - default-service-account.yaml
  - server.yaml
  - temporal.yaml
  - volume-configs.yaml
  - webapp.yaml
  - worker.yaml

公式のkustomization.yamlと比較するとDB関連とMinIOのマニフェストファイルを反映していません。DBはCloud SQLを使用するため、MinIOを本記事では使用しないためです。またSecretはマニフェストで管理せず、KubernetesのCLI Secretを用いているので除外しています。

webapp.yaml

kind: Service
metadata:
  name: airbyte-webapp-svc
  annotations:
    cloud.google.com/backend-config: '{"default": "airbyte-webapp-backend-config"}'
...
# この間は変更ないので省略
...
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: airbyte-webapp-ingress
  annotations:
    kubernetes.io/ingress.class: "gce"
    kubernetes.io/ingress.allow-http: "false"
spec:
  defaultBackend:
    service:
      name: airbyte-webapp-svc
      port:
        number: 80
---
apiVersion: cloud.google.com/v1
kind: BackendConfig
metadata:
  name: airbyte-webapp-backend-config
spec:
  iap:
    enabled: true
    oauthclientCredentials:
      secretName: airbyte-iap-client-secrets
  timeoutSec: 300

特定のドメインを紐づけるために、Ingressを追加しました。また、IAPによる認証のために、BackendConfigiap の設定を追加しています。secretNameについてはKubernetesのSecret節にて紹介します。

cloud.google.com

server.yaml

kind: Service
metadata:
  name: airbyte-server-svc
  annotations:
    cloud.google.com/neg: '{"ingress": true}'
    cloud.google.com/backend-config: '{"default": "airbyte-server-backend-config"}'
...
# この間は変更ないので省略
...
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: airbyte-server-ilb-ingress
  annotations:
    kubernetes.io/ingress.class: "gce-internal"
spec:
  defaultBackend:
    service:
      name: airbyte-server-svc
      port:
        number: 8001
---
apiVersion: cloud.google.com/v1
kind: BackendConfig
metadata:
  name: airbyte-server-backend-config
spec:
  timeoutSec: 300
  healthCheck:
    type: HTTP
    requestPath: /api/v1/health # NOTE: 変わる可能性がある
    port: 8001
---

同一のVPCに存在するCloud ComposerからAirbyteのAPIを叩くために、内部Ingressを追加しました。導入の経緯についてはAirflowによるスケジュール実行節にて紹介します。

cloud.google.com

volume-configs.yaml

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: airbyte-storage-class
provisioner: filestore.csi.storage.gke.io
volumeBindingMode: Immediate
allowVolumeExpansion: false
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airbyte-volume-configs
  labels:
    airbyte: volume-configs
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: airbyte-storage-class
  resources:
    requests:
      storage: 500Mi

こちらのリソースの変更については、PVCのAccessModeの変更節にて説明します。

default-service-account.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: default

こちらの追加リソースは、ServiceAccountの権限借用節にて説明します。

次に環境ごとに依存するoverlayディレクトリ内の差分設定を、prd環境を例に紹介します。

kusomization.yaml

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

namespace: default

bases:
  - ../../resources

images:
  - name: airbyte/db
    newTag: 0.40.32
  - name: airbyte/bootloader
    newTag: 0.40.32
  - name: airbyte/server
    newTag: 0.40.32
  - name: airbyte/webapp
    newTag: 0.40.32
  - name: airbyte/worker
    newTag: 0.40.32
  - name: temporalio/auto-setup
    newTag: 1.13.0
  - name: airbyte/cron
    newTag: 0.40.32
  - name: airbyte/connector-builder-server
    newTag: 0.40.32

configMapGenerator:
  - name: airbyte-env
    env: .env

patchesStrategicMerge:
  - set-resource-limits.yaml
  - prd.yaml

公式のkustomization.yamlと比較すると、Secretはマニフェストでコード管理せず、KubernetesのCLI Secretを用いているので secretGenerator を除外しています。また、patchesStrategicMerge にて prd.yaml を追加しています。これはprd環境でのみ適用する設定を記述するためです。

prd.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: airbyte-admin
  annotations:
    iam.gke.io/gcp-service-account: airbyte-app@airbyte-project-prd.iam.gserviceaccount.com
---
# Note:The following are deprecated on Airbyte. However, since this project uses a cluster for Airbyte only, we will also bind Google Service Account to the k8s default account.
# https://github.com/airbytehq/airbyte/pull/11697
apiVersion: v1
kind: ServiceAccount
metadata:
  name: default
  annotations:
    iam.gke.io/gcp-service-account: airbyte-app@airbyte-project-prd.iam.gserviceaccount.com
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: airbyte-webapp-ingress
  annotations:
    kubernetes.io/ingress.global-static-ip-name: airbyte-project-prd-airbyte-webapp
    ingress.gcp.kubernetes.io/pre-shared-cert: airbyte-project-prd-airbyte-webapp-managed-cert
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: airbyte-server-ilb-ingress
  annotations:
    kubernetes.io/ingress.regional-static-ip-name: airbyte-project-prd-airbyte-server
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: airbyte-storage-class
parameters:
  tier: standard
  network: vpc-airbyte-project-prd

実際の各環境に依存する設定を記述しています。airbyte-admindefault のKSAに、GSAを権限借用するためにアノテーションを追加しています。ServiceAccountの権限借用節にて目的を説明します。

また、airbyte-webapp-ingressでは、下記の記事を参考にTerraformで作成したGlobal IPとManagedCertificateを紐付けています。

qiita.com

airbyte-storage-class は、PersistentVolumeのStorageClassを指定しています。こちらの設定についてはPVCのAccessModeの変更節にて説明します。

KubernetesのSecret

公式のKubernetesのマニフェストではSecretをマニフェストで管理していますが、秘密情報をコード管理することはセキュリティリスクがあるため、この方法は見送りました。代替案としてKubernetesのCLIを用いたので、その方法を紹介します。実際には以下の3つのSecretを作成しました。

  • airbyte-secrets
    • 公式のkustomization.yaml内の secretGenerator で生成されるSecretです。主にDBへ接続するための情報が含まれています。設定Keyが多いので下記コマンドでは省略していますが、.secretで確認できます。
    • kubectl create secret generic airbyte-secrets --from-literal=DATABASE_USER=airbyte_k8s --from-literal=DATABASE_PASSWORD=airbyte --from-literal=...
  • gcs-log-creds
    • 公式のsecret-gcs-log-creds.yamlに該当するSecretです。AirbyteのログをGCSに出力するための認証情報が含まれています。
    • kubectl create secret generic gcs-log-creds --from-literal=gcp.json='{認証情報のjsonファイルの中身}'
  • airbyte-iap-client-secrets
    • こちらはIAPによる認証のための認証情報が含まれています。
    • kubectl create secret generic airbyte-iap-client-secrets --from-literal=client_id={client_id} --from-literal=client_secret={client_secret}

Kubernetesのデプロイ

ここまででKubernetesのマニフェストとSecretの作成が完了したので、実際にデプロイを行います。GKEの認証情報を取得後に、下記コマンドを実行します。

kubectl apply -k kube/overlays/prd

デプロイ後、DNSの設定で紐付けたドメインにアクセスすると、AirbyteのWeb UIが表示されます。 AirbyteのHome画面

工夫した点

実際の運用にあたって直面した課題と、それを解決した工夫点をご紹介します。

GKE上での構築

本記事では、Airbyteの構築はKubernetesをベースとしたマネージドサービスであるGKE上に構築しました。AirbyteはDockerが動作する環境、例えばGCE上などに簡単に構築ができます。また内部にTemporalというワークフローエンジンがあり、タスクの実行時はDocker Composeの場合ならコンテナを、Kubernetes環境の場合ならPodのコンテナを並列に稼働させます。しかし単一ノードで稼働するDocker Composeではデータ転送タスクが増えるごとにマシンリソースを意識しなければならなく、運用するにあたってリソース管理が懸念されました。そこで本記事は、Kubernetesをマネージドに提供するGKE上で構築することで、インフラの管理コストの低減を図りました。

Airflowによるスケジュール実行

Airbyteのスケジュール実行には、Airbyteの機能を用いる方法と、Airflowなどのスケジューラーを用いる方法があります。本記事では、Airflowを用いる方法を選択しました。理由としては、弊社のデータマート基盤として既にCloud Composerを使用しており、運用の都合がよかったためです。またAirbyteによる転送後のデータ加工のためにAirflowの機能を用いることで、Airbyteの機能を用いるよりも柔軟にデータ加工を行えると考えました。Cloud Composerに関する情報は下記の記事をご覧ください。

techblog.zozo.com

AirflowにおけるAirbyteのオペレーターを用いたタスク実行は、下記の公式のドキュメントを参考にしました。

docs.airbyte.com

AirflowからAirbyteのスケジュールを行うには、Airbyte ServerのHost情報などを含む、Connectionの設定が必要です。構築初期の段階では、Web UIをホスティングしている webapp Podに対してのみ、外部Ingressを追加していました。しかし、AirbyteのWeb UIにはIAPによる認証を設定しており、リクエスト時にHeaderへ認証トークンを付与する必要があります。Connectionの設定ではHeaderへの認証トークンの付与ができないため、この方法は断念しました。

この問題に対して、内部Ingressを追加することで解決しました。Cloud ComposerとAirbyte Serverは同じVPC内に構築されており、Airbyte Serverの内部Ingressは静的内部IPアドレスを指定しています。この静的内部IPアドレスは、DNSでドメインに紐づいています。そのドメインをAirflowのConnectionのHostに指定することで、AirflowからAirbyte Serverへアクセスできるようにしました。

関連ページ内リンク

MinIOを用いない

Airbyteはジョブ実行のログなどを、デフォルトのKubernetesのyamlを適用してしまうとMinIOというオブジェクトストレージに出力します。本記事では、MinIOを用いず、GCSにログを出力するようにしました。1つ目の理由は、MinIOのLICENCEがGNU AGPL v3だったためです。弊社では、オープンソースのライセンスのうち、AGPLの使用が禁止されています。2つ目の理由は、使い慣れているGCSを用いたかったためです。データ基盤チームの主要な使用クラウドサービスはGCPであり、GCSを用いることでログの管理コストを低減できると考えました。

PVCのAccessModeの変更

以下のIssueの対応です。

github.com

airbyte-serverairbyte-cron のPodにおいて airbyte-volume-configs というPVCをマウントしています。このPodがクラスタ内の別々のノードにスケジュールされることがあり、その際にPVCのAccessModeが ReadWriteMany でないと、Podが起動できないという問題がありました。そのため、PVCのAccessModeを ReadWriteMany に変更しました。GKEで ReadWriteMany を用いるには、Filestore CSI Driverを使用する必要があります。下記の公式のドキュメントを参考にしました。

cloud.google.com

関連ページ内リンク

ServiceAccountの権限借用

こちらの問題1は既に公式の対応によって解決されていますが、事例として紹介します。Airbyte上でBigQueryをDestinationとして登録する際、秘密情報であるService Account KeyをAirbyte上に登録する必要がありました。この時、GKE上にAirbyteを構築したため、KSAとGSAの権限借用を用いて、秘密情報を登録することなくBigQueryへのアクセス方法を検討しました。しかし、当時のAirbyteの転送ジョブの挙動として、転送用のPodを起動していました。そのPodは airbyte-admin Service Accountではなく default Service Accountで起動されていました。同じ課題を感じる方がPull Requestを出していたのですが、セキュリティの観点からRejectされていました。

上記の問題を解決するために、本記事では airbyte-admindefault のKSAに、GSAを権限借用する設定を追加しました。これにより、Airbyteの転送ジョブのPodは default KSAで起動されますが、GSAの権限を借用することで、BigQueryへのアクセスが可能になりました。また上記Pull Requestでは、同一クラスタ内に複数のサービスが存在する場合 default のKSAにGSAを権限借用する設定を追加すると、過剰な権限付与と指摘されています。本記事ではAirbyteのPodのみが存在するクラスタであるため、この設定で問題ないと判断しました。

BigQueryのDestinationの設定画面

関連ページ内リンク

前述したように、この問題は解決済みのようです。最新のバージョンではApplication Default Credentials (ADC)による認証が可能になっています。

github.com

導入後の課題点

Helm化

現在、Airbyteの公式では下記リンクのHelmを用いたデプロイが推奨されています。Airbyte構築時点では、Kustomizeを用いたデプロイが推奨されていたので、そちらを選択しました。Airbyteのアップデートにも対応できるように、今後はHelmによるデプロイに移行する予定です。

docs.airbyte.com

データ転送パイプラインの設定情報の反映

実際のデータ転送パイプラインの設定情報を反映する方法に課題があります。インフラのコード管理はTerraformとKubernetesのマニフェストで行えていますが、Airbyteのコネクタの設定はWeb UI上で行っています。Airbyteの公式がOctavia CLI という、コネクタの設定情報をCLIで管理するツールを提供しており、yaml形式で設定情報を管理できます。当初、こちらのCLIを用いて設定情報を管理しようと考えましたが、現在はoctavia import allを用いた設定情報のバックアップのみを行っています。理由としては、connectionsのスキーマの変更に追従できないためです。sourcesとdestinationsの設定情報は一度設定すると、基本的に変更されることはありません。しかしconnectionsの設定情報はAPIのアップデートによってスキーマが変更された場合、その内容を取り込む必要があります。この自動化が構築初期の段階では難しかったため、現在は前述した設定情報のバックアップのみを行っています。

Octavia CLIをインストール後、下記手順で設定情報のバックアップを行いました。

  1. GCPのAPI&Service画面のCredentialsから、IAPリソース追加時に作成されたOAuth ClientのClient IDの値を取得します。 OAuth ClientのClient IDの値
  2. 次に初回時のみ、octavia init コマンドを実行し、設定情報をバックアップするディレクトリを作成します。
  3. 下記のコマンドを実行し、設定情報をバックアップします。
octavia --airbyte-url {AirbyteのWeb UI URL} --api-http-header "Authorization" "Bearer $(gcloud auth print-identity-token airbyte-app@airbyte-project-prd.iam.gserviceaccount.com --audiences={1で取得したClient ID}" import all

Cloud Loggingへのログ出力

本記事ではAirbyteのログをGCSへ出力するようにしましたが、実際のログを確認する際には該当するGCS内にあるファイルを確認する必要があります。現在、転送Jobの実行ログは、下記画像のようにAirbyteのWeb UI上で確認できます。しかしその他のWeb ServerやWorkerのログは、GCS内にあるファイルを確認する必要があります。

AirbyteのJobの実行ログ

Cloud LoggingはGCSとは異なり、ログの検索やアラートの設定などが可能です。そのため、Cloud Loggingへのログ出力について調査しました。しかし、現在のAirbyteのログ出力は、GCSやS3、MinIOなどのオブジェクトストレージに対応していますが、Cloud Loggingに対応していません。そのため、Cloud Loggingへのログ出力については、今後の課題として残しています。

まとめ

この記事では、ZOZOのデータ基盤におけるELTツールとしてAirbyteの導入と、それをGKE上で構築した経緯と詳細を紹介しました。Airbyteの選定理由、OSSである利点、豊富なConnectorの存在、EL(T)ツールとしての機能、そして活発なコミュニティサポートが主なポイントでした。

GKE上でAirbyteを構築する過程で、Kubernetesのマニフェストのカスタマイズ、Cloud SQLの使用、IAPによる認証、Airflowによるスケジュール実行の設定、そしてMinIOを用いず、GCSへのログ出力への切り替えなど、様々な工夫と調整が必要でした。これらの取り組みによって、Airbyteを構築する上で直面した課題を解決し、データ基盤の強化につながりました。

しかし、Helmによるデプロイへの移行や、データ転送パイプラインの設定情報の反映方法など、今後の課題も残っています。これらの課題に対応し、さらなるデータ基盤の強化を目指しています。

本記事が、Airbyteの導入を検討している方々や、GKE上でのELTツールの構築を考えている方々にとって、役立つ情報を提供できていれば幸いです。

ZOZOでは、一緒に楽しく働く仲間を募集中です。ご興味のある方は下記採用ページをご覧ください!

corp.zozo.com

カテゴリー