G-gen の福井です。Google Workspace Events API と NotebookLM Enterprise API を利用して、Google ドライブ上のファイル操作をトリガーに NotebookLM のノートブック管理を自動化するアーキテクチャと、その実装例を紹介します。 はじめに 当記事の概要 NotebookLM Enterprise とは Google Workspace Events API とは 免責事項 実現したいこと 目的と処理の内容 構成図(To-be) 構成図(Can-be) イベント処理方式について 事前準備 デベロッパープレビュープログラムへの登録 各種APIの有効化 サービスアカウントの準備と権限移譲 サービスアカウントの作成 ドメイン全体の委任の設定 Pub/Sub の準備 サンプルアプリケーション ファイル構成 ライブラリのインストール 環境変数の設定 Google Workspace Events APIでイベントを購読 イベントを処理し NotebookLM を操作 動作確認 はじめに 当記事の概要 NotebookLM は、組織のナレッジ活用に非常に強力なツールですが、運用において課題もあります。例えば、一度ノートブックに登録したソースは、元のファイルが更新されても自動では同期されません。そのため、常に最新の情報を反映させるには、手作業でノートブック内のソースの更新が必要です。また、新しいナレッジのトピックごとにノートブックの作成やソースの追加を手作業で行うのは、運用負荷が高くなりがちです。 当記事では、これらの課題を解決するため、Google ドライブ上のファイルを更新すると、関連する NotebookLM Enterprise のノートブックが自動で作成され、ソースが新規追加・更新される仕組みと実装方法を解説します。 この自動化を実現するために、 NotebookLM Enterprise API と、現在デベロッパープレビュー版として提供されている Google Workspace Events API を使用します。 NotebookLM Enterprise とは NotebookLM Enterprise は、組織の内部情報(ドキュメントやデータ)をソースとして使用する、AI搭載の調査・作成アシスタントです。無償版の NotebookLM や、Google Workspace に付属する NotebookLM Pro とは異なり、Enterprise 版は Google Cloud プロジェクト内で有効化され、より高度な組織利用を前提とした機能が提供されます。 無償版や Pro 版との詳細な機能比較については、以下の記事をご参照ください。 blog.g-gen.co.jp Enterprise 版の特筆すべき点として、まず高度なID連携とアクセス制御が挙げられます。Google Workspace アカウントだけでなく、Microsoft Entra ID や Okta などのサードパーティIDプロバイダーとの連携に対応しており、IAM を使用したアクセス制御も可能です。 また、VPC Service Controls による強固なセキュリティ境界を設定できるほか、サポートするファイル形式として Microsoft Word、PowerPoint、Excel が追加されています。 そして最も重要な特徴として、Enterprise 版は唯一 API が提供されており、当記事で紹介するような外部システムとの連携や業務自動化を実現できます。 参考 : What is NotebookLM Enterprise? Google Workspace Events API とは Google Workspace Events API は、Google Workspace 上で発生するイベント(例: Google ドライブでのファイルの作成・更新、Google Chat のスペースへのメッセージ投稿など)をサブスクライブ(購読)するための API です。 イベントが発生すると、通知エンドポイントである Cloud Pub/Sub トピック に対してリアルタイムに近い形で通知が送信されます。これにより、Google Workspace 上のアクティビティをトリガーとしたアプリケーション連携や自動化処理を実装できます。 当記事執筆時点(2025年10月)では、デベロッパープレビュー版として提供されています。したがって、当記事で解説する内容は一般提供(GA)の際に変更される可能性があることを予めご了承ください。 参考 : Google Workspace Events API 概要 参考 : Google Workspace デベロッパー プレビュー プログラム 免責事項 当記事で紹介するプログラムのソースコードや構成ファイルは、ご自身の責任のもと、使用、引用、改変、再配布して構いません。 ただし、同ソースコードが原因で発生した不利益やトラブルについては、当社は一切の責任を負いません。 実現したいこと 目的と処理の内容 当記事で実装するシステムは、 「特定の Google ドライブフォルダと NotebookLM Enterprise のノートブックを同期させる」 ことを目的としています。 ユーザーの操作をトリガーにシステムがどのように自動処理を行うか、具体的な流れを以下に示します。 処理の流れ ユーザーの操作(Google ドライブ) システムの自動処理(NotebookLM Enterprise) 1 NotebookLM-TopicA フォルダを作成し、最初のファイルを追加する NotebookLM-TopicA ノートブックを新規作成し、追加されたファイルをソースとして登録する 2 NotebookLM-TopicA フォルダに2つ目のファイルを追加する 既存の NotebookLM-TopicA ノートブックに、2つ目のファイルをソースとして登録する 3 NotebookLM-TopicA フォルダ内の既存ファイルを更新する 既存の NotebookLM-TopicA ノートブック内の対応するソースを削除し、改めてソースとして登録する このように、ユーザーは Google ドライブ上でファイルを管理するだけで、意識することなく NotebookLM のナレッジを常に最新の状態に保つことができます。 ただし、今回はノートブック管理自動化の構成検討のきっかけを与えるための記事です。本記事で提示するサンプルは基本動作に留まるため、実際の運用では以下のような考慮が別途必要となります。 Google Workspace イベントのサブスクリプション更新 ノートブックのソースに登録するファイル種類の判定 ノートブックのソースに登録したファイルの移動や削除のハンドリング 同期処理の中で同じソースに対するイベントが複数発生している場合は同期処理を1回にまとめる 例外処理、リトライ処理 ...など 構成図(To-be) 今回実装する自動化システムは、複数の Google Cloud サービスを連携させることで実現します。以下に、本番運用を想定した「あるべきアーキテクチャ」の全体図と、各コンポーネントの役割を解説します。 構成図(To-be) コンポーネント 役割 Google ドライブ ユーザーがノートブックのソースとなるドキュメントを保存・管理する場所です。特定のフォルダへのファイルの追加や更新が、自動化のトリガーとなります。 Google Workspace Events API Google ドライブでのファイル操作を監視し、イベント(ファイルの作成・内容変更など)が発生した際に、その情報を Cloud Pub/Sub へ通知する役割を担います。 Cloud Pub/Sub Google Workspace Events API からの通知を受け取り、メッセージをキューイングします。後述の Cloud Run jobs がこのキューからメッセージを取得(Pull)して処理を開始します。 Cloud Run jobs イベント処理のメインロジック(コンテナ化されたPythonスクリプト)を実行する環境です。 Cloud Scheduler によって定期実行され、Pub/Sub からイベントを一括(バッチ)で取得し、処理します。 Firestore Google ドライブのフォルダやファイル ID と、それに対応する NotebookLM のノートブック ID の対応関係を保存・管理するための NoSQL データベースです。 NotebookLM Enterprise API Cloud Run jobs で実行されるスクリプトからの指示を受け、実際にノートブックの作成、ソースの追加といった操作を実行します。 構成図(Can-be) 当記事ではこのアーキテクチャの中核をなす イベント処理ロジック に焦点を当てます。そのため、実装の解説では Cloud Run jobs の設定までは踏み込まず、Python スクリプトを ローカル環境で実行する 方法をご紹介します。 当記事で実装する範囲の構成図を、以下に記載します。 構成図(Can-be) イベント処理方式について 今回のアーキテクチャでは、イベント処理に Cloud Pub/Sub からの Push 型 ではなく、Cloud Run jobs による Pull 型 の定期実行を採用しています。 その理由は、Google Workspace Events API の性質にあります。ファイルの編集中は、短時間に多数の更新イベントが発生する可能性があり、その都度 Push 型で処理を起動すると、非効率かつコスト増に繋がる恐れがあります。 そのため、一定間隔(例: 5分ごと)でジョブを起動し、Pub/Sub に溜まったイベントをまとめて取得・処理する Pull 型のバッチ処理方式が、コスト効率が良くなります。 事前準備 デベロッパープレビュープログラムへの登録 2025年10月現在、Google Workspace Events API は、デベロッパープレビューのため、Google Workspace デベロッパー プレビュー プログラムへの登録が必要です。 登録に関する内容は、以下の参考サイトをご参照ください。 参考 : Google Workspace デベロッパー プレビュー プログラム 各種APIの有効化 使用する Google Cloud プロジェクトで以下の API を有効化します。 gcloud services enable \ workspaceevents.googleapis.com \ pubsub.googleapis.com \ drive.googleapis.com \ discoveryengine.googleapis.com \ firestore.googleapis.com サービスアカウントの準備と権限移譲 今回の仕組みでは、スクリプトがユーザーに代わって Google ドライブのファイルにアクセスする必要があります。これを実現するために、 サービスアカウント を作成し、「ドメイン全体の委任」という仕組みを使用して、そのサービスアカウントにユーザーデータへのアクセス権を付与します。 ドメイン全体の委任は、ユーザーの同意を必要とせずに、Google Workspace ユーザーのデータにアクセスする権限をアプリケーションに付与できる機能です。 サービスアカウントの作成 サービスアカウントの作成とロール付与 # 環境変数の設定 PIPELINE_GCP_PROJECT_ID =t-fukui NOTEBOOKLM_GCP_PROJECT_ID =massive-clone-455106-n9 SA_NAME =domain-wide-delegation-sa # サービスアカウントの作成 gcloud iam service-accounts create $SA_NAME \ --display-name =" ドメイン全体の委任用のサービスアカウント " # ロール付与(NotebookLM を動かすプロジェクト用) gcloud projects add-iam-policy-binding $PIPELINE_GCP_PROJECT_ID \ --member =" serviceAccount: $SA_NAME @ $PIPELINE_GCP_PROJECT_ID .iam.gserviceaccount.com " \ --role =" roles/logging.logWriter " \ --condition = None gcloud projects add-iam-policy-binding $PIPELINE_GCP_PROJECT_ID \ --member =" serviceAccount: $SA_NAME @ $PIPELINE_GCP_PROJECT_ID .iam.gserviceaccount.com " \ --role =" roles/pubsub.subscriber " \ --condition = None gcloud projects add-iam-policy-binding $PIPELINE_GCP_PROJECT_ID \ --member =" serviceAccount: $SA_NAME @ $PIPELINE_GCP_PROJECT_ID .iam.gserviceaccount.com " \ --role =" roles/datastore.user " \ --condition = None # ロール付与(NotebookLM ノートブック自動管理の仕組みを動かすプロジェクト用) gcloud projects add-iam-policy-binding $NOTEBOOKLM_GCP_PROJECT_ID \ --member =" serviceAccount: $SA_NAME @ $PIPELINE_GCP_PROJECT_ID .iam.gserviceaccount.com " \ --role =" roles/logging.logWriter " \ --condition = None 作成したサービスアカウントの OAuth 2 クライアント ID を取得(後の手順で使用) gcloud iam service-accounts describe \ domain-wide-delegation-sa@ $( gcloud config get-value project ) .iam.gserviceaccount.com \ --format =' value(uniqueId) ' ドメイン全体の委任の設定 特権管理者アカウントで Google 管理コンソールにログインします。 Google 管理コンソール:ホーム [セキュリティ] > [アクセスとデータ管理] > [API の制御] > [ドメイン全体の委任を管理] > [新しく追加] を選択します。 ドメイン全体の委任:新しく追加を選択 以下の値を入力して、[承認] を選択します。 項目名 入力値 クライアント ID 前の手順で取得した「サービスアカウントの OAuth 2 クライアント ID」の値 OAuth スコープ https://www.googleapis.com/auth/drive.readonly https://www.googleapis.com/auth/cloud-platform https://www.googleapis.com/auth/pubsub 新しいクライアント ID を追加 参考 : API アクセスをドメイン全体の委任で制御する Pub/Sub の準備 Google Workspace Events API からの通知を受け取るための Pub/Sub トピックと、スクリプトがメッセージを読み出すためのサブスクリプションを作成します。 トピックとサブスクリプションの作成 # トピックを作成 gcloud pubsub topics create blog-topic # サブスクリプションを作成 gcloud pubsub subscriptions create blog-subscription --topic = blog-topic --ack-deadline = 600 トピックへのパブリッシュ権限の付与 # トピックIDを取得 export TOPIC_ID = $( gcloud pubsub topics describe " blog-topic " --format =' value(name) ' ) # Workspace Events API のサービスアカウントに権限を付与 gcloud pubsub topics add-iam-policy-binding ${TOPIC_ID} \ --member =" serviceAccount:drive-api-event-push@system.gserviceaccount.com " \ --role =" roles/pubsub.publisher " 参考 : Pub/Sub トピックを作成してサブスクライブする サンプルアプリケーション ファイル構成 今回開発する Python ファイルの構成は以下のとおりです。 ファイル名 用途 xxx-yyy.json ドメイン全体の委任で設定したサービスアカウントのサービスアカウントキーファイル サービスアカウントキーの取り扱いには注意が必要です。詳細は、参考サイト「サービス アカウント キーを管理するためのベスト プラクティス」をご参照ください。 当記事では、検証目的でプログラムをローカルで実行するため、サービス アカウント キーが必要ですが、Cloud Run jobs 等で実行する場合は、サービスにアタッチされたサービスアカウントの情報が使用されるため、サービス アカウント キーは不要です。 .env 環境変数用のファイル create_subscription.py Google Workspace Events APIでイベントを購読するファイル process_events.py 変更イベントを処理し、NotebookLM のノートブックやソースを作成、更新するファイル 参考 : サービス アカウント キーを作成する 参考 : サービス アカウント キーを管理するためのベスト プラクティス ライブラリのインストール サンプルプログラムを動かすために、以下の Python ライブラリをインストールします。 python-dotenv == 1 . 1 . 1 google-auth-oauthlib == 1 . 2 . 2 firebase-admin == 7 . 1 . 0 google-api-python-client == 2 . 181 . 0 google-cloud-pubsub == 2 . 31 . 1 環境変数の設定 設定する値は、ご自身の環境に合わせて変更してください。 .env # ドメイン全体の委任で設定したサービスアカウントのサービスアカウントキーのパス GOOGLE_APPLICATION_CREDENTIALS = " /aaa/bbb/ccc/xxx-yyy.json " # NotebookLM ノートブック自動管理の仕組みを動かす Google Cloud プロジェクト ID PIPELINE_GCP_PROJECT_ID = " xxx " # NotebookLM を動かす Google Cloud プロジェクト 番号 NOTEBOOKLM_GCP_PROJECT_NUMBER = " 123 " # 委任されたユーザーのメールアドレス # ※ ドメイン全体の委任設定を行った組織のユーザーであること # ※ 以下の権限を有していること # → 監視対象のフォルダの共有ドライブのルートに対して「閲覧者」以上の権限 # → Cloud NotebookLM ユーザー(ベータ版)(roles/discoveryengine.notebookLmUser)以上の権限 DELEGATED_USER_EMAIL = " xxx@xxx.co.jp " # 監視対象のフォルダ # ※ 仕様により、共有ドライブのルートフォルダは指定不可です。共有ドライブ内にフォルダを作成して、作成したフォルダを指定してください TARGET_RESOURCE = " //drive.googleapis.com/files/xxx " # 監視の通知先の Pub/Sub トピック名, サブスクリプション TOPIC_NAME = " blog-topic " PUBSUB_SUBSCRIPTION_ID = " blog-subscription " # ノートブックのロケーション NOTEBOOKLM_LOCATION = " global " Google Workspace Events APIでイベントを購読 Google Workspace Events API を使用するには、まず「どのリソース」の「どのイベント」を監視したいかを定義し、「サブスクリプション(購読)」を作成する必要があります。 今回は、特定の Google ドライブフォルダ配下で発生するファイルの作成と内容変更のイベントを購読するサブスクリプションを作成します。 create_subscription.py import json import os import time from typing import Any, Dict from dotenv import load_dotenv from google.auth import default from google.auth.exceptions import RefreshError from google.auth.transport.requests import AuthorizedSession, Request from requests.exceptions import HTTPError # .envファイルから環境変数を読み込む load_dotenv() class DriveSubscriptionCreator : """ Google Workspace Events API を使用して、Google ドライブのリソースに対する サブスクリプションを作成するクラス。 """ def __init__ (self): """共通で使用する変数の初期化、各種クライアントをセットアップする。""" self._set_env_value() self.credentials = self._initialize_credentials() self.authed_session = AuthorizedSession(self.credentials) def _set_env_value (self): """環境変数の値を取得して変数に設定する""" # 必須の環境変数とインスタンス変数名のマッピング required_env_vars = { "DELEGATED_USER_EMAIL" : "delegated_user_email" , "PIPELINE_GCP_PROJECT_ID" : "pipeline_gcp_project_id" , "TOPIC_NAME" : "topic_name" , "TARGET_RESOURCE" : "target_resource" , } # ループで環境変数を取得し、設定されていなければ例外を発生させる for env_name, attr_name in required_env_vars.items(): value = os.getenv(env_name) if not value: raise ValueError (f "環境変数 {env_name} が設定されていません。" ) # setattr() を使ってインスタンス変数(self.xxx)を設定 setattr (self, attr_name, value) def _initialize_credentials (self): """ドメイン全体の委任を使用して認証情報を初期化およびリフレッシュする。""" print ( "認証情報を初期化しています..." ) try : default_credentials, _ = default(scopes=[ "https://www.googleapis.com/auth/drive.readonly" ]) credentials = default_credentials.with_subject(self.delegated_user_email) credentials.refresh(Request()) print ( "認証に成功しました。" ) return credentials except RefreshError as e: print ( "認証エラー: サービスアカウントの権限委任設定を確認してください。" ) print (f "エラー詳細: {e}" ) raise def _create_subscription (self) -> str : """サブスクリプション作成リクエストを送信し、オペレーション名を取得する。""" print (f "サブスクリプションを作成しています... Target: {self.target_resource}" ) body = { "targetResource" : self.target_resource, "eventTypes" : [ "google.workspace.drive.file.v3.created" , "google.workspace.drive.file.v3.contentChanged" , ], "notificationEndpoint" : { "pubsubTopic" : f "projects/{self.pipeline_gcp_project_id}/topics/{self.topic_name}" }, "driveOptions" : { "includeDescendants" : True }, "payloadOptions" : { "includeResource" : True , "fieldMask" : "file.id,file.name,file.mimeType,file.parents" , }, "ttl" : "14400s" , } try : response = self.authed_session.post( "https://workspaceevents.googleapis.com/v1beta/subscriptions" , json=body, ) response.raise_for_status() operation = response.json() print ( "サブスクリプション作成リクエストが受理されました。" ) print ( "LRO:" , json.dumps(operation, ensure_ascii= False , indent= 2 )) return operation[ "name" ] except HTTPError as e: print ( "サブスクリプションの作成に失敗しました。" ) print (f "エラー: {e.response.text}" ) raise def _wait_for_operation (self, op_name: str ) -> Dict[ str , Any]: """Long Running Operation の完了を待つ。""" print (f "オペレーションの完了を待っています: {op_name}" ) while True : try : poll = self.authed_session.get(f "https://workspaceevents.googleapis.com/v1beta/{op_name}" ) poll.raise_for_status() j = poll.json() if j.get( "done" ): if "error" in j: raise RuntimeError (json.dumps(j[ "error" ])) print ( "オペレーションが完了しました。" ) return j.get( "response" , {}) print ( "オペレーションはまだ進行中です。2秒待機します..." ) time.sleep( 2 ) except HTTPError as e: print ( "オペレーションのポーリング中にエラーが発生しました。" ) print (f "エラー: {e.response.text}" ) raise def run (self): """サブスクリプション作成のメインフローを実行する。""" try : op_name = self._create_subscription() result = self._wait_for_operation(op_name) print ( " \n --- サブスクリプション作成完了 ---" ) print (json.dumps(result, ensure_ascii= False , indent= 2 )) print ( "---------------------------------" ) except (HTTPError, ValueError , RuntimeError ) as e: print (f " \n サブスクリプション作成プロセスでエラーが発生しました: {e}" ) if __name__ == "__main__" : try : creator = DriveSubscriptionCreator() creator.run() except Exception as e: print (f "スクリプトの実行に失敗しました: {e}" ) 参考 : Google Workspace サブスクリプションを作成する 参考 : サブスクリプション作成のイベントタイプ イベントを処理し NotebookLM を操作 Pub/Sub に溜まったイベントメッセージを取得し、その情報に基づいて実際に NotebookLM のノートブックとソースを管理するスクリプトを作成します。 このスクリプトは、今回の自動化の仕組みにおける心臓部であり、以下の役割を担います。 Pub/Sub から Drive のイベントメッセージを取得する。 イベント情報からファイルIDを特定し、Drive API で詳細なファイル情報を取得する。 Firestore を使って、Drive のフォルダと NotebookLM のノートブックの対応関係を管理する。 NotebookLM Enterprise API を呼び出し、ノートブックの作成・ソースの追加・更新(削除・追加)を行う。 process_events.py import json import os from typing import Any, Dict, Optional, Tuple from dotenv import load_dotenv from firebase_admin import firestore from google.auth import default from google.auth.exceptions import RefreshError from google.auth.transport.requests import AuthorizedSession, Request from google.cloud import pubsub_v1 from googleapiclient.discovery import build from requests.exceptions import HTTPError # .envファイルから環境変数を読み込む load_dotenv() class DriveEventProcessor : """ Google ドライブのイベントを処理し、NotebookLMと連携するプロセッサ。 """ def __init__ (self): """共通で使用する変数の初期化、各種クライアントをセットアップする。""" self._set_env_value() sa_credentials, delegated_credentials = self._initialize_credentials() # 【委任ユーザーとして操作するAPI】 # Drive, NotebookLM (authed_session) はユーザーのデータにアクセスする self.credentials = delegated_credentials # 互換性のために維持 self.authed_session = AuthorizedSession(delegated_credentials) self.drive = build( "drive" , "v3" , credentials=delegated_credentials) # 【サービスアカウントとして操作するAPI】 # Firestore, Pub/Sub はGCPリソースにアクセスする self.firestore_client = firestore.Client(credentials=sa_credentials) self.subscriber_client = pubsub_v1.SubscriberClient(credentials=sa_credentials) # NotebookLM APIのエンドポイント設定 endpoint_prefix = "" if self.notebooklm_location == "global" else f "{self.notebooklm_location}-" self.notebooklm_base_url = f "https://{endpoint_prefix}discoveryengine.googleapis.com/v1alpha" def _set_env_value (self): """環境変数の値を取得して変数に設定する""" # 必須の環境変数とインスタンス変数名のマッピング required_env_vars = { "DELEGATED_USER_EMAIL" : "delegated_user_email" , "PIPELINE_GCP_PROJECT_ID" : "pipeline_gcp_project_id" , "NOTEBOOKLM_GCP_PROJECT_NUMBER" : "notebooklm_gcp_project_number" , "PUBSUB_SUBSCRIPTION_ID" : "pubsub_subscription_id" , "NOTEBOOKLM_LOCATION" : "notebooklm_location" , } # ループで環境変数を取得し、設定されていなければ例外を発生させる for env_name, attr_name in required_env_vars.items(): value = os.getenv(env_name) if not value: raise ValueError (f "環境変数 {env_name} が設定されていません。" ) # setattr() を使ってインスタンス変数(self.xxx)を設定 setattr (self, attr_name, value) def _initialize_credentials (self) -> Tuple[Any, Any]: """サービスアカウント自身の認証情報と、ドメイン全体の委任を使用した認証情報の両方を返す。""" try : # 共通のスコープでデフォルト(サービスアカウント自身)の認証情報を取得 sa_credentials, _ = default( scopes=[ "https://www.googleapis.com/auth/drive.readonly" , "https://www.googleapis.com/auth/cloud-platform" , "https://www.googleapis.com/auth/pubsub" , ] ) # 委任されたユーザーの認証情報を作成 delegated_credentials = sa_credentials.with_subject(self.delegated_user_email) delegated_credentials.refresh(Request()) print ( "認証に成功しました。" ) # (サービスアカウント自身の認証情報, 委任ユーザーの認証情報) のタプルで返す return sa_credentials, delegated_credentials except RefreshError as e: print ( "認証エラー: サービスアカウントの権限委任設定を確認してください。" ) print (f "エラー詳細: {e}" ) raise def _create_notebook (self, title: str ) -> str : """NotebookLMに新しいノートブックを作成する。""" url = f "{self.notebooklm_base_url}/projects/{self.notebooklm_gcp_project_number}/locations/{self.notebooklm_location}/notebooks" response = self.authed_session.post(url, json={ "title" : title}) response.raise_for_status() return response.json()[ "notebookId" ] def _add_drive_source (self, notebook_id: str , file_id: str , mime_type: str , display_name: str ) -> dict : """NotebookLMのノートブックにGoogle ドライブのソースを追加する。""" url = f "{self.notebooklm_base_url}/projects/{self.notebooklm_gcp_project_number}/locations/{self.notebooklm_location}/notebooks/{notebook_id}/sources:batchCreate" body = { "userContents" : [ { "googleDriveContent" : { "documentId" : file_id, "mimeType" : mime_type, "sourceName" : display_name, } } ] } response = self.authed_session.post(url, json=body) response.raise_for_status() return response.json() def _delete_source (self, notebook_id: str , source_id: str ): """NotebookLMのノートブックからソースを削除する。""" url = f "{self.notebooklm_base_url}/projects/{self.notebooklm_gcp_project_number}/locations/{self.notebooklm_location}/notebooks/{notebook_id}/sources:batchDelete" source_full_name = f "projects/{self.notebooklm_gcp_project_number}/locations/{self.notebooklm_location}/notebooks/{notebook_id}/sources/{source_id}" body = { "names" : [source_full_name]} response = self.authed_session.post(url, json=body) response.raise_for_status() def _get_file_info (self, file_id: str ) -> Optional[Dict[ str , Any]]: """Google ドライブからファイルメタデータを取得する。""" try : return ( self.drive.files() .get( fileId=file_id, fields= "id,name,mimeType,parents,driveId" , supportsAllDrives= True , ) .execute() ) except HTTPError as e: print (f "Driveファイル情報の取得に失敗しました (ID: {file_id}): {e}" ) return None def _get_notebook_folder_info (self, file_meta: Dict[ str , Any]) -> Optional[Tuple[ str , str ]]: """ファイルの親を遡り、ノートブックの基準となるフォルダのIDと名前を取得する。""" path_items = [] parent_id = (file_meta.get( "parents" ) or [ None ])[ 0 ] while parent_id: try : parent_meta = self._get_file_info(parent_id) if not parent_meta: return None path_items.insert( 0 , { "id" : parent_meta[ "id" ], "name" : parent_meta[ "name" ]}) is_shared_drive_root = not parent_meta.get( "parents" ) and parent_meta.get( "driveId" ) if is_shared_drive_root: drive_info = self.drive.drives().get(driveId=parent_meta[ "driveId" ], fields= "name" ).execute() path_items.insert( 0 , { "id" : parent_meta[ "driveId" ], "name" : drive_info[ "name" ]}) break parent_id = (parent_meta.get( "parents" ) or [ None ])[ 0 ] except HTTPError as e: print (f "親フォルダの探索中にエラーが発生しました (ID: {parent_id}): {e}" ) return None # 階層が期待通りかチェック (共有ドライブ > ドライブ(自動で入る) > サブスクリプションフォルダ > ノートブックフォルダ) if len (path_items) >= 4 : notebook_folder = path_items[ 3 ] return notebook_folder.get( "id" ), notebook_folder.get( "name" ) return None def _get_or_create_notebook (self, folder_id: str , title: str ) -> Optional[ str ]: """Firestore/NotebookLMからノートブックを取得または新規作成する。""" notebook_ref = self.firestore_client.collection( "notebooks" ).document(folder_id) notebook_doc = notebook_ref.get() if notebook_doc.exists: print (f "既存のノートブックをFirestoreで発見: '{title}'" ) return notebook_doc.to_dict().get( "notebookId" ) else : print (f "ノートブックが見つかりません。新規作成します: '{title}'..." ) try : new_notebook_id = self._create_notebook(title) print (f "NotebookLMにノートブックを作成しました: {new_notebook_id}" ) notebook_data = { "notebookId" : new_notebook_id, "title" : title, "sources" : {}} notebook_ref.set(notebook_data) print (f "Firestoreにノートブック情報を保存しました: '{title}'" ) return new_notebook_id except HTTPError as e: print (f "ノートブックの新規作成またはFirestoreへの保存に失敗しました: {e}" ) return None def process_drive_event (self, event_data: Dict[ str , Any]): """単一のDriveイベントを処理する。""" file_id = event_data.get( "file" , {}).get( "id" ) if not file_id: print ( "イベントデータにファイルIDが見つかりません。" ) return print (f "イベントを処理中 (File ID: {file_id})" ) file_info = self._get_file_info(file_id) if not file_info: return if file_info[ "mimeType" ] == "application/vnd.google-apps.folder" : print (f "フォルダはスキップします: {file_info.get('name')}" ) return folder_info = self._get_notebook_folder_info(file_info) if not folder_info: print (f "ノートブックフォルダを特定できませんでした: {file_info.get('name')}" ) return notebook_folder_id, notebook_title = folder_info notebook_id = self._get_or_create_notebook(notebook_folder_id, notebook_title) if not notebook_id: return notebook_ref = self.firestore_client.collection( "notebooks" ).document(notebook_folder_id) notebook_data = notebook_ref.get().to_dict() or {} existing_source = notebook_data.get( "sources" , {}).get(file_id) try : if existing_source and existing_source.get( "sourceId" ): print (f "既存ソースを更新します: {file_info['name']}" ) self._delete_source(notebook_id, existing_source[ "sourceId" ][ "id" ]) print (f "古いソースを削除しました: {existing_source['sourceId']['id']}" ) print (f "新しいソースを追加します: {file_info['name']}" ) source_info = self._add_drive_source(notebook_id, file_id, file_info[ "mimeType" ], file_info[ "name" ]) new_source_id = source_info.get( "sources" , [{}])[ 0 ].get( "sourceId" ) if new_source_id: update_path = f "sources.{file_id}" notebook_ref.update( { update_path: { "sourceId" : new_source_id, "displayName" : file_info[ "name" ], "updatedAt" : firestore.SERVER_TIMESTAMP, } } ) print ( "Firestoreのソース情報を更新しました。" ) else : print ( "NotebookLMの応答から新しいSourceIDを取得できませんでした。" ) except HTTPError as e: print (f "ソースの処理中にエラーが発生しました: {e}" ) except Exception as e: print (f "予期せぬエラーが発生しました: {e}" ) print (f "ファイル(ID: {file_id})の処理が完了しました。" ) def run (self, batch_size: int = 10 ): """ Pub/Subからメッセージを取得し、イベント処理ループを開始する。 batch_size の値を大きくしすぎると、Pub/Subメッセージ取得からACKまでの時間が長くなり、確認応答期限を過ぎてメッセージが再送される可能性が高くなります。 """ subscription_path = self.subscriber_client.subscription_path( self.pipeline_gcp_project_id, self.pubsub_subscription_id ) print (f "サブスクリプションをリッスン中: {subscription_path}..." ) total_processed = 0 try : while True : response = self.subscriber_client.pull( subscription=subscription_path, max_messages=batch_size, return_immediately= True ) if not response.received_messages: print ( "処理するメッセージはありません。" ) break print (f "{len(response.received_messages)}件のメッセージを取得しました。" ) ack_ids = [] for msg in response.received_messages: try : event_data = json.loads(msg.message.data.decode( "utf-8" )) self.process_drive_event(event_data) ack_ids.append(msg.ack_id) total_processed += 1 except json.JSONDecodeError as e: print (f "メッセージのJSONパースに失敗しました: {e}" ) ack_ids.append(msg.ack_id) # 処理できないメッセージはACKして再配信を防ぐ except Exception as e: print (f "メッセージ処理中に予期せぬエラーが発生しました。再試行されます。: {e}" ) # エラーが発生したメッセージはACKせず、再配信を待つ if ack_ids: self.subscriber_client.acknowledge(subscription=subscription_path, ack_ids=ack_ids) print (f "{len(ack_ids)}件のメッセージを確認応答しました。" ) except Exception as e: print (f "メッセージ取得ループでエラーが発生しました: {e}" ) finally : self.subscriber_client.close() print ( "サブスクライバーをクローズしました。" ) print (f "--- バッチ処理完了。合計 {total_processed} 件のメッセージを処理しました ---" ) if __name__ == "__main__" : try : processor = DriveEventProcessor() processor.run() except Exception as e: print (f "スクリプトの実行に失敗しました: {e}" ) 参考 : Create and manage notebooks (API) 動作確認 ノートブックが作成されていない状態で、ノートブックの新規作成、ソースの追加・更新の確認を行います。 NotebookLM:トップ画面(動作確認前の状態) 操作1 create_subscription.py を実行します。 操作2 監視対象フォルダに、フォルダを新規作成し、その中にファイルを追加します。 監視対象フォルダ ├── 案件1 │ └── 案件1のファイル └── 案件2 └── 案件2のファイル ファイル種類 名前 ファイルの内容 フォルダ 案件1 − フォルダ 案件2 − Google ドキュメント 案件1のファイル 今日の天気は「えび」です Google スライド 案件2のファイル 好きな食べ物は「羊」です 操作3 process_events.py を実行します。 フォルダに対応したノートブックが作成されています。 NotebookLM:トップ画面(動作確認後の状態) 作成したファイルが正しくソースとして追加されています。 NotebookLM:ノートブック(案件1のソース確認) NotebookLM:ノートブック(案件2のソース確認) 操作4 既存ファイルの更新と、既存フォルダへファイルを追加します。 既存ファイル「案件1のファイル」の内容を 今日の天気は「桜えび」です に変更します。 既存フォルダ「案件2」に、Google ドキュメントのファイルを新規に追加します。(内容: 適度な大きさのえびは「車海老」です ) 操作5 process_events.py を実行します。 更新したファイルが正しくソースとして反映されています。 NotebookLM:ノートブック(案件1の更新ソース確認) 新たに追加したファイルもソースとして正しく追加されています。 NotebookLM:ノートブック(案件2のソース追加確認) 福井 達也 (記事一覧) カスタマーサクセス課 エンジニア 2024年2月 G-gen JOIN 元はアプリケーションエンジニア(インフラはAWS)として、PM/PL・上流工程を担当。G-genのGoogle Cloudへの熱量、Google Cloudの魅力を味わいながら日々精進