はじめまして。CADDiでバックエンドエンジニアとして働いている中野です。 この記事では、Cloud Data Fusionを利用して作成したデータパイプラインについてご紹介します。 TL;DR Salesforce とBigQuery間のデータ連携にHeroku Connectをこれまで利用していたのですが、Cloud Data Fusionに乗り換えることでダウンタイムなしで約1/8までコストダウンができました。 モチベーション 弊社では、 Salesforce に溜まったデータをBigQueryに連携し、営業などのBizサイドの組織も含めアクセスできる状態にしております。これまでは連携に Heroku Connect 及び Heroku Postgres と Stitch というCloud Data Pipelineを用いていました。 しかし、Heroku Connect及びHeroku Postgresの利用料が高額でコストダウンしたいというモチベーションがありました。 乗り換え先として、Embulkなどの OSS を利用して自分たちで ホスティング を行う方法なども検討に上がりましたが、なるべくメンテナンスコストをかけたくないことから、要件を全て満たせそう且つフルマネージドなCloud Data Fusionを使うことに決定しました。 Cloud Data Fusionについて Cloud Data Fusion は、データ パイプラインを迅速に構築し管理するための、フルマネージドかつ クラウド ネイティブな エンタープライズ データ統合サービスです。Cloud Data Fusion は、データ パイプラインを迅速に構築し管理するための、フルマネージドかつ クラウド ネイティブな エンタープライズ データ統合サービスです。 引用: https://cloud.google.com/data-fusion/docs/concepts/overview?hl=ja UIからの操作も直感的に可能で、シンプルなパイプラインであればエンジニア以外でも簡単にデプロイすることができます。 構成 今回我々がやりたかったことは、「 Salesforce にあるデータをBigQueryに連携する」ということです。それを実現するために、Cloud Data Fusionのデプロイは以下の構成で行いました。 ▽図1:システム構成図 しかし、一度デプロイした後には不要になるリソースがいくつかあります。そのためデプロイが完了し、Dataproc クラスタ ーをCloud Data Fusionがプロビジョニング可能な状態になった後には、定期実行のスケジュールを設定し不要なリソースを削除した上で、以下の構成で運用しています。 Dataprocは バッチ処理 などを行うためのマネージドサービスです。Dataprocが実際に Salesforce と通信してデータを取得し、BigQueryにデータを貯める役割を担っています。Dataprocの詳細は最後に参考文献として載せています。 ▽図2:リソース削除後システム構成図 マイグレーション プラン 弊社では様々な部署がBigQueryに蓄積されたデータを元に業務を行っているため、できる限りダウンタイムを作らずに マイグレーション を行う必要がありました。そのため以下方針で マイグレーション を行い、ダウンタイムを発生させずに作業を完了させることができました。(前提として、BigQueryの利用者はこれまで Salesforce のデータが連携されていた dataset sf_heroku_connect にある各テーブルを直接参照せず、dataset sf にあるViewを経由してデータにアクセスしておりました。) Cloud Data Fusionのリソースを作成し、dataset sf_cloud_data_fusion の各テーブルに Salesforce から取得したデータを格納する。 dataset sf のデータソースを dataset sf_heroku_connect の各テーブルから、 dataset sf_cloud_data_fusion の各テーブルに置き換える。 しばらく稼働させ、問題が発生しないか確認する。 dataset sf_heroku_connect を削除する。 実装詳細 以下リソースの定義を行いました。 実際には、module化して管理しておりますが、ここではブログ用に基本的にresourceとして定義しています。また、BigQueryのリソースも実際には別プロジェクト内に配置してあるのですが、ここでは簡易化のために同一プロジェクト内に配置しております。 FILL_YOUR_XXX と記載がある箇所はご自身で適切なIPレンジに置き換えてください。 全体設定 provider "google" { project = "sample-project" region = "asia-northeast1" zone = "asia-northeast1-c" } data "google_client_config" "current" {} provider "cdap" { host = "${module.wait_healthy.service_endpoint}/api" token = data.google_client_config.current.access_token } terraform { required_providers { google = { source = "hashicorp/google" version = "4.78.0" } google-beta = { source = "hashicorp/google-beta" version = "4.73.2" } cdap = { source = "GoogleCloudPlatform/cdap" version = "~> 0.10" } } required_version = ">= 1.1" } Cloud Data Fusion関連リソース # Service Account resource "google_service_account" "sa_for_data_fusion" { project = "sample-project" account_id = "data-fusion-instance-sa" display_name = "For cloud data fusion" } resource "google_project_iam_member" "sa_for_data_fusion_role_bindings" { project = "sample-project" for_each = toset([ "roles/storage.admin", "roles/datafusion.runner", "roles/dataproc.worker", "roles/bigquery.jobUser", ]) role = each.key member = "serviceAccount:${google_service_account.sa_for_data_fusion.email}" } locals { data_fusion_service_account = "service-${data.google_project.data_fusion_project.number}@gcp-sa-datafusion.iam.gserviceaccount.com" } resource "google_service_account_iam_binding" "google_managed_sa_role_bindings" { service_account_id = "projects/sample-project/serviceAccounts/${google_service_account.sa_for_data_fusion.email}" role = "roles/iam.serviceAccountUser" members = [ "serviceAccount:${local.data_fusion_service_account}", ] } # Data Fusion resource "google_data_fusion_instance" "create_instance" { name = "data-fusion-instance-name" description = "data-fusion-instance-description" region = "asia-northeast1" type = "DEVELOPER" enable_stackdriver_logging = true enable_stackdriver_monitoring = true private_instance = true dataproc_service_account = google_service_account.sa_for_data_fusion.email network_config { network = "sample-private-network" ip_allocation = "FILL_YOUR_IP_RANGE_OF_DATAFUSION_INSTANCE" } version = "6.9.1" } # Source is from # https://cdfhub-asia-northeast1.storage.googleapis.com/hub/packages/plugin-salesforce/1.6.0/salesforce-plugins-1.6.0.json # https://cdfhub-asia-northeast1.storage.googleapis.com/hub/packages/plugin-salesforce/1.6.0/salesforce-plugins-1.6.0.jar resource "cdap_local_artifact" "salesforce-plugins" { name = "salesforce-plugins" version = "1.6.0" json_config_path = "path/to/file/salesforce-plugins-1.6.0.json" jar_binary_path = "path/to/file/salesforce-plugins-1.6.0.jar" depends_on = [google_data_fusion_instance.create_instance] } data "google_project" "data_fusion_project" { project_id = "sample-project" } resource "cdap_application" "sf-bq-sync-account" { name = "sf-bq-sync-account" spec = file("path/to/file/sf-bq-sync-account-cdap-data-pipeline.json") depends_on = [google_data_fusion_instance.create_instance, cdap_local_artifact.salesforce-plugins] } resource "cdap_application" "sf-bq-sync-user" { name = "sf-bq-sync-user" spec = file("path/to/file/sf-bq-sync-user-cdap-data-pipeline.json") depends_on = [google_data_fusion_instance.create_instance, cdap_local_artifact.salesforce-plugins] } # https://github.com/terraform-google-modules/terraform-google-data-fusion/tree/master/modules/wait_healthy module "wait_healthy" { source = "terraform-google-modules/data-fusion/google//modules/wait_healthy" version = "~> 0.1" service_endpoint = google_data_fusion_instance.create_instance.service_endpoint access_token = data.google_client_config.current.access_token } ネットワーク関連リソース # Gateway VM resource "google_service_account" "sa_for_gateway_vm" { project = "sample-project" account_id = "gateway-vm-instance-sa" display_name = "For cloud data fusion gateway" } resource "google_compute_instance" "sample_gateway_vm" { name = "sample-gateway-vm" machine_type = "e2-micro" zone = "asia-northeast1-b" tags = ["allow-http-for-data-fusion", "allow-https-for-data-fusion"] can_ip_forward = true boot_disk { initialize_params { image = "debian-cloud/debian-11" } } network_interface { network = google_compute_network.sample_private_network.self_link subnetwork = google_compute_subnetwork.sample_subnetwork.self_link } metadata_startup_script = "#! /bin/bash \n echo 1 > /proc/sys/net/ipv4/ip_forward \n iptables -t nat -A POSTROUTING -s FILL_YOUR_IP_RANGE_OF_DATAFUSION_INSTANCE -j MASQUERADE \n echo net.ipv4.ip_forward=1 > /etc/sysctl.d/11-gce-network-security.conf \n iptables-save" service_account { email = google_service_account.sa_for_gateway_vm.email scopes = ["cloud-platform"] } shielded_instance_config { enable_integrity_monitoring = true enable_vtpm = true } metadata = { block-project-ssh-keys = true } } # VPC resource "google_compute_network" "sample_private_network" { project = "sample-project" name = "sample-private-network" auto_create_subnetworks = "false" delete_default_routes_on_create = "false" routing_mode = "REGIONAL" } resource "google_compute_subnetwork" "sample_subnetwork" { project = "sample-project" region = "asia-northeast1" name = "sample-subnetwork" ip_cidr_range = "FILL_YOUR_IP_CIDR_RANGE" network = google_compute_network.sample_private_network.self_link private_ip_google_access = "true" } resource "google_compute_network_peering" "sample_peering" { name = "sample-peering" network = google_compute_network.sample_private_network.self_link peer_network = "https://www.googleapis.com/compute/v1/projects/${google_data_fusion_instance.create_instance.tenant_project_id}/global/networks/${google_data_fusion_instance.create_instance.region}-${google_data_fusion_instance.create_instance.name}" export_custom_routes = true } # NAT resource "google_compute_router" "router" { name = "sample-router" project = "sample-project" region = "asia-northeast1" network = google_compute_network.sample_private_network.self_link bgp { advertise_mode = "CUSTOM" advertised_groups = ["ALL_SUBNETS"] asn = "64512" } } resource "google_compute_address" "address" { name = "nat-ip" project = "sample-project" region = google_compute_router.router.region } resource "google_compute_router_nat" "cluster_router_nat" { name = "sample-router-nat" project = "sample-project" region = google_compute_router.router.region router = google_compute_router.router.name nat_ip_allocate_option = "MANUAL_ONLY" nat_ips = [google_compute_address.address.self_link] source_subnetwork_ip_ranges_to_nat = "ALL_SUBNETWORKS_ALL_IP_RANGES" log_config { enable = true filter = "ERRORS_ONLY" } } # Firewall rule resource "google_compute_firewall" "gateway_vm_for_data_fusion_allow_http_fw" { project = "sample-project" name = "gateway-vm-for-data-fusion-allow-http" network = "sample-private-network" allow { ports = ["80"] protocol = "tcp" } direction = "INGRESS" disabled = "false" priority = "1000" source_ranges = ["FILL_YOUR_IP_RANGE_OF_DATAFUSION_INSTANCE"] target_tags = ["allow-http-for-data-fusion"] } resource "google_compute_firewall" "gateway_vm_for_data_fusion_allow_https_fw" { project = "sample-project" name = "gateway-vm-for-data-fusion-allow-https" network = "sample-private-network" allow { ports = ["443"] protocol = "tcp" } direction = "INGRESS" disabled = "false" priority = "1000" source_ranges = ["FILL_YOUR_IP_RANGE_OF_DATAFUSION_INSTANCE"] target_tags = ["allow-https-for-data-fusion"] } # Route resource "google_compute_route" "sf_bq_sync_route" { name = "sample-route" dest_range = "0.0.0.0/0" network = google_compute_network.sample_private_network.self_link next_hop_instance = google_compute_instance.sample_gateway_vm.self_link priority = 1001 } Secret Manager関連リソース FILL_YOUR_CIPHERTEXT と記載がある箇所は google_kms_secret に従って、Cloud SDK を用いて暗号化したsecretを入れます。 google_kms_secretの例 だと、 my-secret-password にpasswordなどのsecretを入れ、outputとして出てきた CiQAaCd+xX4SsOXziF10a8JYq4spf~~~ を FILL_YOUR_CIPHERTEXT に登録します。 $ echo -n my-secret-password | gcloud kms encrypt \ > --project my-project \ > --location us-central1 \ > --keyring my-key-ring \ > --key my-crypto-key \ > --plaintext-file - \ > --ciphertext-file - \ > | base64 CiQAqD+xX4SXOSziF4a8JYvq4spfAuWhhYSNul33H85HnVtNQW4SOgDu2UZ46dQCRFl5MF6ekabviN8xq+F+2035ZJ85B+xTYXqNf4mZs0RJitnWWuXlYQh6axnnJYu3kDU= (引用: google_kms_secret ) # secret manager resource "google_secret_manager_secret" "salesforce_username" { project = "sample-project" secret_id = "salesforce-consumer-secret" replication { automatic = true } } resource "google_secret_manager_secret" "salesforce_password" { project = "sample-project" secret_id = "salesforce-consumer-key" replication { automatic = true } } resource "google_secret_manager_secret" "salesforce_consumer_secret" { project = "sample-project" secret_id = "salesforce-consumer-secret" replication { automatic = true } } resource "google_secret_manager_secret" "salesforce_consumer_key" { project = "sample-project" secret_id = "salesforce-consumer-key" replication { automatic = true } } data "google_secret_manager_secret_version" "salesforce_username" { project = "sample-project" secret = google_secret_manager_secret.salesforce_username.id } data "google_secret_manager_secret_version" "salesforce_password" { project = "sample-project" secret = google_secret_manager_secret.salesforce_password.id } data "google_secret_manager_secret_version" "salesforce_consumer_secret" { project = "sample-project" secret = google_secret_manager_secret.salesforce_consumer_secret.id } data "google_secret_manager_secret_version" "salesforce_consumer_key" { project = "sample-project" secret = google_secret_manager_secret.salesforce_consumer_key.id } data "google_kms_secret" "salesforce_username" { crypto_key = var.crypto_key ciphertext = var.salesforce_username } data "google_kms_secret" "salesforce_password" { crypto_key = var.crypto_key ciphertext = var.salesforce_password } data "google_kms_secret" "salesforce_consumer_secret" { crypto_key = var.crypto_key ciphertext = var.salesforce_consumer_secret } data "google_kms_secret" "salesforce_consumer_key" { crypto_key = var.crypto_key ciphertext = var.salesforce_consumer_key } variable "crypto_key" { type = string default = "sample-project/global/sample/terraform" } variable "salesforce_password" { type = string sensitive = true default = "FILL_YOUR_CIPHERTEXT" } variable "salesforce_username" { type = string sensitive = true default = "FILL_YOUR_CIPHERTEXT" } variable "salesforce_consumer_key" { type = string sensitive = true default = "FILL_YOUR_CIPHERTEXT" } variable "salesforce_consumer_secret" { type = string sensitive = true default = "FILL_YOUR_CIPHERTEXT" } BigQuery関連リソース # BigQuery resource "google_bigquery_dataset" "sf_cloud_data_fusion" { project = "sample-project" dataset_id = "sf_cloud_data_fusion" location = "asia-northeast1" } resource "google_bigquery_dataset_iam_member" "sf_cloud_data_fusion_owner" { project = "sample-project" dataset_id = google_bigquery_dataset.sf_cloud_data_fusion.dataset_id role = "roles/bigquery.dataOwner" member = "user:john_doe@caddi.jp" } resource "google_bigquery_dataset_iam_member" "data_fusion_editor" { project = "sample-project" dataset_id = google_bigquery_dataset.sf_cloud_data_fusion.dataset_id role = "roles/bigquery.dataEditor" member = "serviceAccount:${google_service_account.sa_for_data_fusion.email}" } 説明 いくつかCloud Data Fusionを定義する上でのポイントをかいつまんで説明します。 Service Account 図2をみるとわかる通り、Pipelineの実行時にはCloud Data Fusionは Salesforce に接続しておらず、Dataprocが Salesforce に接続して必要なデータの取得を行なっています。 Cloud Data Fusionのリソース定義を行う際にDataprocで利用するService Accountは宣言することができるのですが、Cloud Data Fusion自体が利用するService Accountは宣言することができません。 resource "google_data_fusion_instance" "create_instance" { name = "data-fusion-instance-name" description = "data-fusion-instance-description" region = "asia-northeast1" type = "DEVELOPER" enable_stackdriver_logging = true enable_stackdriver_monitoring = true private_instance = true dataproc_service_account = google_service_account.sa_for_data_fusion.email network_config { network = "sample-private-network" ip_allocation = "FILL_YOUR_IP_RANGE_OF_DATAFUSION_INSTANCE" } version = "6.9.1" } Cloud Data Fusion自体が利用するService AccountはCloud Data Fusion API を有効化した際に作成される、 Google Managed Service Accountになるので、Cloud Data Fusion自体が行う操作に対して追加で権限を付与する必要がある場合には、この Google Managed Service Accountに対して権限を付与してやる必要があります。( 参考:Cloud Data Fusion でのサービス アカウント ) 例えば、Pipeline作成時に別プロジェクトにあるBigQueryテーブルを確認しに行くためには、自身で定義したSerivce Accountではなく、 Google Managed Service Accountに対して必要なロールを付与する必要があります。 プライベート インスタンス からインターネット上のリソースへの接続 Cloud Data Fusionの インスタンス を作成した後に、パイプラインの作成が行われるのですが、その際に Salesforce (インターネット上に存在するデータソース)に接続し、 Salesforce 上の スキーマ 情報を取得する必要があります。 プライベートインスタンスからパブリックソースへの接続 のドキュメントを読むと、プライベート インスタンス からインターネット上に存在するデータソースに接続するためには、Network Peeringを設定し、 Gateway VM や Firewall Ruleなども設定し、Cloud Data Fusion インスタンス が外部に接続することができる状態を作る必要があることがわかります。 しかし、一度パイプラインを作成した後、Dataprocのプロビジョニングを行う際には既にCloud Data Fusion インスタンス が Salesforce の スキーマ 情報など必要な情報を 保有 しているため、再度インターネット上に存在するデータソースに接続する必要がありません。そのためパイプラインの編集を頻繁には行わない場合などには、パイプラインのデプロイ後、Network Peering, Gateway VM , Firewall Rule, Routeなど、インターネット上に存在するデータソースにCloud Data Fusionプライベート インスタンス が接続するために必要なリソースは削除することが可能です。 ただしこれらのリソースの削除にはメリットデメリットが存在するので、用途に応じて削除するかどうかの判断が必要です。 メリット VPC 構成の複雑さを抑えて、ネットワークに問題が生じた際の デバッグ が容易になる。 リソース削除により定常コストを削減できる。 デメリット 外部サービス(弊社の例では Salesforce )の最新 スキーマ を取得できなくなる。取得するためには再度これらのリソースを構築し直す必要がある。 弊社の場合、 Salesforce の更新頻度が低い且つIaCでリソースを管理しており再構築が容易に可能という状況だったため、これらのリソースを削除するという選択を行いました。 Cloud Data Fusion インスタンス とパイプラインの作成タイミング Cloud Data Fusion インスタンス の作成には30分ほど時間がかかります。パイプラインの作成はCloud Data Fusion インスタンス が存在してはじめて可能になるため、Cloud Data Fusion インスタンス の作成が完了するまでパイプライン作成は待つ必要があります。そこで、 wait_healty module を利用することでCloud Data Fusion インスタンス の作成を待ってパイプラインの作成に移ることが可能になります。 パイプラインの定義方法 パイプラインを定義する際に path/to/file/sf-bq-sync-user-cdap-data-pipeline.json で参照しているファイルは、以下のような JSON ファイルを参照しています。 resource "cdap_application" "sf-bq-sync-user" { name = "sf-bq-sync-user" spec = templatefile("sf-bq-sync-user-cdap-data-pipeline.json", { consumer_key = data.google_kms_secret.salesforce_consumer_key.plaintext, consumer_secret = data.google_kms_secret.salesforce_consumer_secret.plaintext, username = data.google_kms_secret.salesforce_username.plaintext, password = data.google_kms_secret.salesforce_password.plaintext }) depends_on = [google_data_fusion_instance.create_instance, cdap_local_artifact.salesforce-plugins] } sf-bq-sync-user-cdap-data-pipeline.json { "name": "sf-bq-sync-user", "description": "Data Pipeline Application", "artifact": { "name": "cdap-data-pipeline", "version": "6.9.1", "scope": "SYSTEM" }, "config": { "resources": { "memoryMB": 2048, "virtualCores": 1 }, "driverResources": { "memoryMB": 2048, "virtualCores": 1 }, "connections": [ { "from": "Salesforce", "to": "BigQuery" } ], "comments": [], "postActions": [], "properties": {}, "processTimingEnabled": true, "stageLoggingEnabled": false, "stages": [ { "name": "Salesforce", "plugin": { "name": "Salesforce", "type": "batchsource", "label": "Salesforce", "artifact": { "name": "salesforce-plugins", "version": "1.6.0", "scope": "USER" }, "properties": { "referenceName": "user", "useConnection": "false", "username": "${username}", "password": "${password}", "consumerKey": "${consumer_key}, "consumerSecret": "${consumer_secret}", "loginUrl": "https://login.salesforce.com/services/oauth2/token", "connectTimeout": "30000", "query": "select\nlastname,\nid,\nname,\ndivision\nfrom user", "operation": "query", "enablePKChunk": "false", "schema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"lastname\",\"type\":[\"string\",\"null\"]},{\"name\":\"id\",\"type\":[\"string\",\"null\"]},{\"name\":\"name\",\"type\":[\"string\",\"null\"]},{\"name\":\"division\",\"type\":[\"string\",\"null\"]}]}" } }, "outputSchema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"lastname\",\"type\":[\"string\",\"null\"]},{\"name\":\"id\",\"type\":[\"string\",\"null\"]},{\"name\":\"name\",\"type\":[\"string\",\"null\"]},{\"name\":\"division\",\"type\":[\"string\",\"null\"]}]}", "id": "Salesforce" }, { "name": "BigQuery", "plugin": { "name": "BigQueryTable", "type": "batchsink", "label": "BigQuery", "artifact": { "name": "google-cloud", "version": "0.22.1", "scope": "SYSTEM" }, "properties": { "useConnection": "false", "project": "sample-project", "datasetProject": "sample-project", "serviceAccountType": "filePath", "serviceFilePath": "auto-detect", "dataset": "sf_cloud_data_fusion", "table": "user", "operation": "upsert", "relationTableKey": "id", "allowSchemaRelaxation": "false", "location": "asia-northeast1", "createPartitionedTable": "false", "partitioningType": "NONE", "schema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"lastname\",\"type\":[\"string\",\"null\"]},{\"name\":\"id\",\"type\":[\"string\",\"null\"]},{\"name\":\"name\",\"type\":[\"string\",\"null\"]},{\"name\":\"division\",\"type\":[\"string\",\"null\"]}]}" } }, "outputSchema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"lastname\",\"type\":[\"string\",\"null\"]},{\"name\":\"id\",\"type\":[\"string\",\"null\"]},{\"name\":\"name\",\"type\":[\"string\",\"null\"]},{\"name\":\"division\",\"type\":[\"string\",\"null\"]}]}", "inputSchema": [ { "name": "Salesforce", "schema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"lastname\",\"type\":[\"string\",\"null\"]},{\"name\":\"id\",\"type\":[\"string\",\"null\"]},{\"name\":\"name\",\"type\":[\"string\",\"null\"]},{\"name\":\"division\",\"type\":[\"string\",\"null\"]}]}" } ], "id": "BigQuery" } ], "schedule": "0 */2 * * *", "engine": "spark", "numOfRecordsPreview": 100, "rangeRecordsPreview": { "min": 1, "max": "5000" }, "description": "Data Pipeline Application", "maxConcurrentRuns": 1 }, "version": "de67b401-29e2-11ee-9d6b-7ad3ba276e43" } この JSON ファイルを1から手で書くのは骨が折れますが、Cloud Data FusionではUIから定義したパイプラインの設定をパイプラインのページからExportし、利用することが可能です。 そのため、1番最初はUIからパイプラインの定義を行い、exportした JSON ファイルを雛形として利用し、必要に応じて編集しながら使うのが効率的かと思います。その際に、secretの扱いを気をつける必要があります。 設定をexportすると、 JSON ファイルの中に以下passwordやconsumerSecretなどの情報が直接入ってきます。これらを GitHub などにPushしてしまうとまずいため、templatefile function を利用して、Secret Managerなどから取得したsecretに置き換えてやる必要があります。 JSON ファイル内で "password": "${password}", と書いて変数を埋め込み、パイプラインの定義を行う際に以下のように templatefile function を利用してsecretに置き換えます。 resource "cdap_application" "sf-bq-sync-user" { name = "sf-bq-sync-user" spec = templatefile("sf-bq-sync-user-cdap-data-pipeline.json", { consumer_key = data.google_kms_secret.salesforce_consumer_key.plaintext, consumer_secret = data.google_kms_secret.salesforce_consumer_secret.plaintext, username = data.google_kms_secret.salesforce_username.plaintext, password = data.google_kms_secret.salesforce_password.plaintext }) depends_on = [google_data_fusion_instance.create_instance, cdap_local_artifact.salesforce-plugins] } スキーマ の更新 スキーマ の更新の際には JSON ファイルを編集する必要があります。変更内容が多い場合でも、置換をうまく使えば作業自体はそこまで大変ではないので、Heroku ConnectでUIから管理していた時よりも個人的には作業が楽になったように感じます。また、 JSON ファイルもGit管理下に置かれるので、変更前後のDiffが見られる安心感もメリットに感じています。 Salesforce 側の設定 Cloud Data Fusionを利用して Salesforce のデータを取得するためには、 Salesforce 側の設定も必要になります。 Salesforce の設定はClassmethodさんの記事「 Cloud Data FusionでSalesforceのデータをBigQueryに取り込んでみる 」を参考にさせていただきました。 困っている点 パイプラインの定期実行スケジュールのトリガー方法 パイプラインのデプロイまではIaCで自動化することができたのですが、パイプラインの定期実行スケジュールをデプロイと同時に開始することができず、スケジュールの開始だけはUIから操作する必要があります。UIから定期実行を開始したのちに、 google _data_fusion_instance に対してterraform import&terramform plan を実行しても差分が出ず、また、pipelineを作成しているcdap_applicationは terraform importをサポートしておらず、定期実行のスケジュールを開始する方法は見つけられておりません。 おわりに お決まりですが採用についてです。リアルな世界に向き合い複雑な ドメイン を取り扱うことに興味がある方、検証を回しつつ、スケールするための基盤作りに興味がある方を募集しています。カジュアル面談もやっていますのでぜひお気軽にご連絡ください。 エンジニア向け採用サイト https://recruit.caddi.tech/ 求人一覧 https://open.talentio.com/r/1/c/caddi-jp-recruit/homes/4139 参考文献 Cloud Data Fusion の概要 アーキテクチャとコンポーネント Cloud Data Fusion インスタンスを作成する プライベート インスタンスを作成する プライベート インスタンスからパブリック ソースへの接続 Cloud Data Fusion サービス アカウント Dataproc とは Secret Manager のコンセプトの概要 Data Fusion Wait Healthy Cloud Data FusionでSalesforceのデータをBigQueryに取り込んでみる