Zenn
😽

Cloud Storage にアップロードしたCSVファイルを BigQuery に自動インポートする

2025/02/14に公開
1

こんにちは、クラウドエース株式会社の許です。
この記事では、Cloud Storage にアップロードした CSV ファイルを BigQuery に自動インポートする方法について紹介します。

大まかな構成

以下のような構成で実装していきます。
alt text
Cloud Storage に CSV ファイルをアップロードすると、Cloud Run Functions がトリガーされ、BigQuery に自動インポートされます。
Cloud Run Functions は、いくつかトリガーが存在していますが、今回は Cloud Storage のファイルアップロード時にトリガーされる Cloud Storage トリガーを使用します。

事前準備

事前に以下の準備が必要ですが、今回の主題からは逸れるため省略します。

  1. Big Query データセット、テーブルの作成
  2. Cloud Storage バケットの作成
  3. CSV ファイルの準備

事前に用意する CSV ファイルは、以下のようなファイルになります。
Big Query テーブルのそれぞれのカラムの型は、それぞれのカラム名と同じです。

string,int,float,date,datetime
hello,1,1.1,2018-01-01,2018-01-01 00:00:00
world,2,2.2,2018-01-02,2018-01-02 00:00:00
apple,3,3.3,2018-01-03,2018-01-03 00:00:00
orange,4,4.4,2018-01-04,2018-01-04 00:00:00
banana,5,5.5,2018-01-05,2018-01-05 00:00:00
pineapple,6,6.6,2018-01-06,2018-01-06 00:00:00
grape,7,7.7,2018-01-07,2018-01-07 00:00:00
strawberry,8,8.8,2018-01-08,2018-01-08 00:00:00

Cloud Run Functions の作成

  1. Cloud Run のコンソール画面から、[関数を作成] をクリックします。
    Cloud Run Functions 作成画面
  2. サービス名、リージョンを設定します。
  3. ランタイムを選択します。今回は Java 21 を選択します。
  4. トリガーを選択します。今回は Cloud Storage トリガーを選択します。
    • トリガーの名前: 自動設定
    • バケット: 事前に作成した Cloud Storage バケット
    • イベントプロバイダ: Cloud Storage
    • イベントタイプ: google.cloud.storage.object.v1.finalize
    • サービスアカウント: Default compute service account
      • 今回はデフォルトのサービスアカウントを使用していますが、本番環境などで利用する場合は、適切な権限を持ったサービスアカウントを使用してください。
  5. 認証にて、[認証が必要] を選択します。
    • 未承認の呼び出しによる関数の実行を禁止する意図があります
  6. [作成] をクリックします。

作成を押すと自動的に関数がデプロイされます。

デプロイされた関数にソースコードを追加

先ほど作成した関数の[ソース]に以下のコードを記述します。
StorageFunction.java

package gcfv2storage;

import com.google.cloud.bigquery.*;
import com.google.cloud.functions.CloudEventsFunction;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.cloudevents.CloudEvent;

import java.util.Objects;
import java.util.logging.Logger;

public class StorageFunction implements CloudEventsFunction {
    private static final Logger logger = Logger.getLogger(StorageFunction.class.getName());

    //    各種環境情報を設定
    private static final String PROJECT_ID = {プロジェクトID};
    private static final String DATASET_ID = {データセットID};
    private static final String TABLE_ID =  {テーブルID};

    @Override
    public void accept(CloudEvent event) {
//        Cloud Run Functionsからの入力を受け付ける
        String eventName = new String(Objects.requireNonNull(event.getData()).toBytes());
        logger.info("Cloud Event data: " + eventName);

//        入力情報をGsonを使ってJson型に変換し、各種情報を取得
        JsonObject data = JsonParser.parseString(eventName).getAsJsonObject();
        String bucketName = data.get("bucket").getAsString();
        String fileName = data.get("name").getAsString();
        logger.info("Processing file: " + fileName + " from bucket: " + bucketName);

        String sourceUri = "gs://" + bucketName + "/" + fileName;

//        BigQueryの各カラムの設定を行う
//        Schema schema = Schema.of(
//                Field.of("string", LegacySQLTypeName.STRING),
//                Field.of("int", LegacySQLTypeName.INTEGER),
//                Field.of("float", LegacySQLTypeName.FLOAT),
//                Field.of("date", LegacySQLTypeName.DATE),
//                Field.of("datetime", LegacySQLTypeName.DATETIME)
//        );

        Schema schema = Schema.of(
                Field.newBuilder("string", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).setDescription("string Column").build(),
                Field.newBuilder("int", LegacySQLTypeName.INTEGER).setMode(Field.Mode.NULLABLE).setDescription("int Column").build(),
                Field.newBuilder("float", LegacySQLTypeName.FLOAT).setMode(Field.Mode.NULLABLE).setDescription("float Column").build(),
                Field.newBuilder("date", LegacySQLTypeName.DATE).setMode(Field.Mode.NULLABLE).setDescription("date Column").build(),
                Field.newBuilder("datetime", LegacySQLTypeName.DATETIME).setMode(Field.Mode.NULLABLE).setDescription("datetime Column").build()
        );

//        CSVファイルをBigQueryにアップロードための各種設定と処理
        try {
            BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();

//            先頭行をスキップ
            CsvOptions csvOptions = CsvOptions.newBuilder().setSkipLeadingRows(1).build();
            TableId tableId = TableId.of(PROJECT_ID, DATASET_ID, TABLE_ID);
            LoadJobConfiguration loadConfig =
                    LoadJobConfiguration
                            .newBuilder(tableId, sourceUri + fileName, csvOptions)
                            .setSchema(schema)
                            .build();

            // ロードジョブを作成し、完了するまで待機
            Job job = bigQuery.create(JobInfo.of(loadConfig));
            job = job.waitFor();
            if (job.isDone() && job.getStatus().getError() == null) {
                logger.info("Job completed successfully");
            }
            else {
                logger.severe("Job failed: " + job.getStatus().getError());
            }
        } catch (BigQueryException | InterruptedException e) {
            logger.severe("Error: " + e.getMessage());
        }
    }

}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>gcfv2storage</groupId>
    <artifactId>http</artifactId>
    <version>0.0.1</version>
    <name>Cloud Storage Function for GCFv2</name>

    <properties>
        <maven.compiler.release>21</maven.compiler.release>
    </properties>

    <dependencies>

        <!-- Google Cloud Storage client -->
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-storage</artifactId>
            <version>2.45.0</version>
        </dependency>

        <!-- Google BigQuery client -->
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-bigquery</artifactId>
            <version>2.44.0</version>
        </dependency>

        <!-- Apache Commons CSV for CSV parsing -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-csv</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>com.google.cloud.functions</groupId>
            <artifactId>functions-framework-api</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>io.cloudevents</groupId>
            <artifactId>cloudevents-api</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.11.0</version>
        </dependency>

    </dependencies>


</project>

記述が終わったら、[保存して再デプロイ] をクリックしてデプロイします。

Cloud Storage にファイルをアップロードして BigQuery にインポート

  1. 事前に準備していた CSV ファイルを Cloud Storage にアップロードします。
    アップロード画面
    ファイルをアップロードすると、自動的に関数がトリガーされます。

  2. Cloud Run Functions のログを確認し、処理が正常に完了していることを確認します。
    ログ画面
    画像にあるように、ログに Job completed successfully と表示されていれば、処理が正常に完了しています。

  3. BigQuery のデータセット、テーブルを確認し、データがインポートされていることを確認します。
    BigQuery画面
    正常にデータがインポートされていることが確認できればインポート完了です。

終わりに

Cloud Storage にアップロードした CSV ファイルを BigQuery に自動インポートする方法について紹介しました。
Cloud Run Functions は、他にもいくつかのトリガーが存在しており、それぞれのトリガーに合わせて処理を実装することができます。

今回の記事が皆様のお役に立てれば幸いです。

番外編

Python での実装

Python で実装する場合、Python の本体コードである main.py と、依存関係を記述した requirements.txt を作成します。

import functions_framework
from google.cloud import bigquery

# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def hello_gcs(cloud_event):
    data = cloud_event.data
    event_id = cloud_event["id"]
    event_type = cloud_event["type"]
    bucket_id = data["bucket"]
    file_name = data["name"]

    gcs_uri = f"gs://{bucket_id}/{file_name}"

    # BigQuery
    dataset_id = "{データセット名}"
    table_id = "{テーブル名}"

    client = bigquery.Client()

    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField("string", "STRING"),
            bigquery.SchemaField("int", "INTEGER"),
            bigquery.SchemaField("float", "FLOAT"),
            bigquery.SchemaField("date", "DATE"),
            bigquery.SchemaField("datetime", "DATETIME"),
        ],
        skip_leading_rows=1,
        source_format=bigquery.SourceFormat.CSV,
    )

    try:
        load_job = client.load_table_from_uri(
            gcs_uri, f"{dataset_id}.{table_id}", job_config=job_config
        )

        print(f"Starting job {load_job.job_id} for file {file_name}")

        load_job.result()
        print(f"File {file_name} successfully loaded into {dataset_id}.{table_id}")
    except Exception as e:
        print(f"Failed to load file {file_name} into BigQuery: {e}")


requirements.txt

functions-framework==3.*
google-cloud-bigquery

Go での実装

Go で実装する場合、Go の本体コードである main.go と、依存関係を記述した go.mod を作成します。

// Package helloworld provides a set of Cloud Functions samples.
package gcf

import (
	"cloud.google.com/go/bigquery"
	"context"
	"fmt"
	"log"
	"time"

	"github.com/GoogleCloudPlatform/functions-framework-go/functions"
	"github.com/cloudevents/sdk-go/v2/event"
)

func init() {
	functions.CloudEvent("HelloStorage", helloStorage)
}

// StorageObjectData contains metadata of the Cloud Storage object.
type StorageObjectData struct {
	Bucket         string    `json:"bucket,omitempty"`
	Name           string    `json:"name,omitempty"`
	Metageneration int64     `json:"metageneration,string,omitempty"`
	TimeCreated    time.Time `json:"timeCreated,omitempty"`
	Updated        time.Time `json:"updated,omitempty"`
}

const PROJECT_ID = "{プロジェクト名}"
const DATASET_ID = "{データセット名}"
const TABLE_ID = "{テーブル名}"

// helloStorage consumes a CloudEvent message and logs details about the changed object.
func helloStorage(ctx context.Context, e event.Event) error {
	log.Printf("Event ID: %s", e.ID())
	log.Printf("Event Type: %s", e.Type())

	var data StorageObjectData
	if err := e.DataAs(&data); err != nil {
		return fmt.Errorf("event.DataAs: %v", err)
	}

	log.Printf("Bucket: %s", data.Bucket)
	log.Printf("File: %s", data.Name)
	log.Printf("Metageneration: %d", data.Metageneration)
	log.Printf("Created: %s", data.TimeCreated)
	log.Printf("Updated: %s", data.Updated)

	gcsURI := "gs://" + data.Bucket + "/" + data.Name

	client, err := bigquery.NewClient(ctx, PROJECT_ID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	// Insert data into BigQuery
	gcsRef := bigquery.NewGCSReference(gcsURI)
	gcsRef.SourceFormat = bigquery.CSV
	gcsRef.SkipLeadingRows = 1

	gcsRef.Schema = bigquery.Schema{
		{Name: "string", Type: bigquery.StringFieldType},
		{Name: "int", Type: bigquery.IntegerFieldType},
		{Name: "float", Type: bigquery.FloatFieldType},
		{Name: "date", Type: bigquery.DateFieldType},
		{Name: "datetime", Type: bigquery.DateTimeFieldType},
	}

	loader := client.Dataset(DATASET_ID).Table(TABLE_ID).LoaderFrom(gcsRef)

	job, err := loader.Run(ctx)
	if err != nil {
		return fmt.Errorf("loader.Run: %v", err)
	}

	status, err := job.Wait(ctx)
	if err != nil {
		return fmt.Errorf("failed to load data: %v", err)
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}

	log.Printf("Job completed successfully")

	return nil
}

go.mod

module example.com/gcf

go 1.23.1

require (
	cloud.google.com/go/bigquery v1.66.0
	github.com/GoogleCloudPlatform/functions-framework-go v1.9.1
	github.com/cloudevents/sdk-go/v2 v2.15.2
)
1

Discussion

ログインするとコメントできます