Zenn
🎨

【外部へ脱出?】BigQuery リモート関数を使用して Slack に通知してみた

2025/03/04に公開

はじめに

クラウドエース第三開発部の角谷(かどたに)です。
BigQuery リモート関数を使用したクエリを実行し、 Slack とメールアドレス(Gmail)に通知する仕組みを作ってみましたのでその方法を紹介します。

システム

アーキテクチャ

使い方

BigQuery のクエリエディタで、送りたいメッセージを引数にリモート関数 send_message を実行します。

SELECT `{プロジェクト ID}.my_dataset.send_message`(test message);

すると、あらかじめ設定していた Slack チャンネルと Gmail にそのメッセージが通知されます(下の画像は Gmail で受信したメール)。

次のように複数行のメッセージを送ることも可能です。

SELECT `{プロジェクト ID}.my_dataset.send_message`(messages)
FROM UNNEST(['test message 1', 'test message 2']) AS messages;
WITH messages AS (
  SELECT 'test message 1' AS col
  UNION ALL
  SELECT 'test message 2' AS col
)
SELECT `{プロジェクト ID}.my_dataset.send_message`(col)
FROM messages;

ユースケース

この仕組みは実際に弊社のプロジェクトでも使用されています。例えば、処理に時間がかかるクエリを実行する際に、その最後に「Query executed successfully」といったメッセージを送ったり、エラー発生時には例外ハンドラを使ってエラーメッセージを送ったりしています。クエリ側から能動的に通知を送信する Push 型であるため、実行状況を見張る必要がありません。

また、これはクエリ内のデータを外へ出せることがポイントです。今回は通知機能として活用していますが、クエリ内のデータを基に外部システムの API を呼び出すといったことも考えられます。リモート関数のおかげですね。

BEGIN

  BEGIN TRANSACTION;
  -- 長い処理
  -- 処理の途中でエラーが発生した場合は EXCEPTION 句が実行されます

  SELECT `{プロジェクト ID}.my_dataset.send_message`('Query executed successfully');
  COMMIT TRANSACTION;

EXCEPTION WHEN ERROR THEN
  SELECT `{プロジェクト ID}.my_dataset.send_message`(@@error.message);
  ROLLBACK TRANSACTION;

END;

BigQuery では通常のクエリに対して通知機能はありませんが、 Scheduled Query では実行失敗時のメール通知や、実行終了時に Pub/Sub トピックへのメッセージ通知機能があります。ですが、上のようにクエリの途中で通知することや任意の内容の通知はできません。

https://cloud.google.com/bigquery/docs/scheduling-queries?hl=ja

手順

Pub/Sub

通知を送るための中継地点となる Pub/Sub トピック notification を作成します。

gcloud pubsub topics create notification

Cloud Run Functions

Cloud Run Functions の言語のランタイムは執筆時点(2024 年 3 月)で最新の Node.js 22 を使用します。

notify2slack

Pub/Sub からメッセージを受け取り、その内容を Slack に通知する Cloud Run Functions notify2slack を作成します。以下の index.js と package.json を任意のフォルダに配置します。

index.js
const functions = require('@google-cloud/functions-framework');
const axios = require('axios');

const slackWebhookUrl = process.env.SLACK_WEBHOOK_URL;

functions.cloudEvent('sendMessage', async (cloudEvent) => {
  const pubsubMessage = cloudEvent.data.message;
  const message = pubsubMessage.data
    ? Buffer.from(pubsubMessage.data, 'base64').toString()
    : 'No data found';

  try {
    await axios.post(slackWebhookUrl, {text: message});
    console.log('Message sent successfully');
  } catch (error) {
    console.error('Error sending message to Slack:', error);
  }
});
package.json
{
  "dependencies": {
    "@google-cloud/functions-framework": "^3.3.0",
    "axios": "^0.21.1"
  }
}

任意のフォルダで以下のコマンドを実行し、Cloud Run Functions notify2slack をデプロイします。

gcloud functions deploy notify2slack \
--gen2 \
--region=asia-northeast1 \
--runtime=nodejs22 \
--source=. \
--entry-point=sendMessage \
--trigger-topic=notification \
--set-env-vars SLACK_WEBHOOK_URL={Slack Webhook URL}

--trigger-topic=notification と指定することで、上で作成した Pub/Sub トピック notification にメッセージが Publish されたときにこの Cloud Run Functions notify2slack をトリガーします。また、リソースとして notification に対する Push 型の Pub/Sub サブスクリプションが作成されます。

環境変数 SLACK_WEBHOOK_URL には Incoming webhooks で作成した URL を使用します。詳細は以下をご確認ください。

https://api.slack.com/messaging/webhooks

notify2gmail

Pub/Sub からメッセージを受け取り、その内容を特定のメールアドレスに通知する Cloud Run Functions notify2gmail を作成します。同様に以下の index.js、package.json を配置し、gcloud コマンドによりデプロイします。

index.js
const functions = require('@google-cloud/functions-framework');
const nodemailer = require('nodemailer');

const gmailUser = process.env.GMAIL_USER;
const gmailPassword = process.env.GMAIL_PASSWORD;

const transporter = nodemailer.createTransport({
  service: 'gmail',
  auth: {
    user: gmailUser,
    pass: gmailPassword,
  },
});

functions.cloudEvent('sendEmail', async (cloudEvent) => {
  const pubsubMessage = cloudEvent.data.message;
  const message = pubsubMessage.data
    ? Buffer.from(pubsubMessage.data, 'base64').toString()
    : 'No data found';

  const mailOptions = {
    from: gmailUser,
    to: gmailUser,
    subject: 'Message from BigQuery',
    text: message,
  };

  try {
    await transporter.sendMail(mailOptions);
    console.log('Email sent successfully');
  } catch (error) {
    console.error('Error sending email:', error);
  }
});
package.json
{
  "dependencies": {
    "@google-cloud/functions-framework": "^3.3.0",
    "nodemailer": "^6.9.0"
  }
}
gcloud functions deploy notify2gmail \
--gen2 \
--region=asia-northeast1 \
--runtime=nodejs22 \
--source=. \
--entry-point=sendEmail \
--trigger-topic=notification \
--set-env-vars GMAIL_USER={Gmail のメールアドレス},GMAIL_PASSWORD={アプリ パスワード}

環境変数 GMAIL_USER には Google アカウントのメールアドレス、GMAIL_PASSWORD にはアプリ パスワードを使用します。アプリ パスワードについては以下をご確認ください。

https://support.google.com/accounts/answer/185833?hl=ja

また、環境変数 GMAIL_USER 、GMAIL_PASSWORD のような秘匿情報は、複数人で利用するプロジェクトであれば、環境変数ではなく Secret Manager に保存することを推奨します。

https://cloud.google.com/security/products/secret-manager?hl=ja

send-message

BigQuery からメッセージを受け取り、その内容を Pub/Sub トピックに Publish する Cloud Run Functions send-message を作成します。同様に以下の index.js、package.json を配置し、 gcloud コマンドによりデプロイします。

index.js
const functions = require('@google-cloud/functions-framework');
const { PubSub } = require('@google-cloud/pubsub');

const topicId = process.env.TOPIC_ID;
const pubSubClient = new PubSub();

functions.http('sendMessage', async (req, res) => {
  try {
    const calls = req.body.calls;
    const preMessage = calls.map(call => call[0]).join('\n');
    const message = preMessage.trim() ? preMessage : 'No message found';
    const messageBuffer = Buffer.from(message);
    await pubSubClient.topic(topicId).publish(messageBuffer);
    console.log(`Message published to topic ${topicId}`);
    const replies = calls.map(() => 'Message sent successfully');
    res.json({ replies: replies });
  } catch (error) {
    console.error(`Error sending message: ${error.message}`);
    res.status(400).json({ errorMessage: error.message });
  }
});

BigQuery から送信された HTTP リクエスト内の calls フィールドのメッセージを加工して Pub/Sub トピックに Publish します。エラーがなく Publish できた場合は replies フィールドに「Message sent successfully」を、エラーが起きた場合は errorMessage フィールドにエラーメッセージをそれぞれ指定し、HTTP レスポンスを返します。特に replies フィールドの配列のサイズは calls フィールドの配列のサイズと一致させる必要があります。

リモート関数及び統合するエンドポイント(今回は Cloud Run Functions)の仕様については以下をご確認ください。

https://cloud.google.com/bigquery/docs/remote-functions?hl=ja

package.json
{
  "dependencies": {
    "@google-cloud/functions-framework": "^3.3.0",
    "@google-cloud/pubsub": "^3.0.0"
  }
}
gcloud functions deploy send-message \
--gen2 \
--region=asia-northeast1 \
--runtime=nodejs22 \
--source=. \
--entry-point=sendMessage \
--set-env-vars TOPIC_ID=notification

BigQuery 外部接続

BigQuery から他の Google Cloud リソースへアクセスするための接続 functions_connection を作成します。

bq mk --connection \
--location=asia-northeast1 \
--connection_type=CLOUD_RESOURCE \
functions_connection

作成した接続のサービス アカウント ID(自動で発行される)を取得します。

bq show --connection {プロジェクト ID}.asia-northeast1.functions_connection

出力(一部抜粋)は次のようになります。

properties
{"serviceAccountId": "{サービス アカウント ID}"}

Cloud Run Functions を呼び出すために、取得したサービス アカウントに「Cloud Run Service Invoker」ロールを付与します。

gcloud projects add-iam-policy-binding {プロジェクト ID} \
--member="serviceAccount:{サービス アカウント ID}" \
--role="roles/run.servicesInvoker" \
--condition=None

BigQuery リモート関数

BigQuery データセット my_dataset を作成します。

bq mk --location=asia-northeast1 \
{プロジェクト ID}:my_dataset

上で作成した Cloud Run Functions send-message のエンドポイントを取得します。

gcloud run services describe send-message \
--region asia-northeast1 \
--format="value(status.url)"

BigQuery のクエリエディタで以下のクエリを実行し、データセット my_dataset 内にリモート関数 send_message を作成します。

CREATE OR REPLACE FUNCTION `{プロジェクト ID}.my_dataset.send_message`(message STRING) RETURNS STRING
REMOTE WITH CONNECTION `{プロジェクト ID}.asia-northeast1.functions_connection`
OPTIONS (
  endpoint = '{Cloud Run Functions `send-message` のエンドポイント}'
);

REMOTE WITH CONNECTION で 外部接続 functions_connection を、OPTIONS で Cloud Run Functions send-message のエンドポイントを指定することで、リモート関数 send_message からその Cloud Run Functions send-message を呼び出すことができます。

また、外部接続、リモート関数及び Cloud Run Functions は全て同じリージョンにある必要があり、今回は asia-northeast1 で作成しています。詳細は以下をご確認ください。

https://cloud.google.com/bigquery/docs/remote-functions?hl=ja#supported_regions

おわりに

以上で、BigQuery リモート関数を用いたクエリによる外部通知方法を Slack と Gmail の例を交えながら紹介しました。
この記事が何かの役に立てば幸いです。

Discussion

ログインするとコメントできます