TECH PLAY

SCSKクラウドソリューション

SCSKクラウドソリューション の技術ブログ

1199

こんにちは。 今回はAmazon MSK(Managed Streaming for Apache Kafka)上のデータを、S3 Sink Connectorによって Amazon S3 へ出力するパターンを解説します。 MSKはクラスター内のトピックにデータを保管しますが、そのデータをS3へバックアップすることができます。 前提条件 本記事で触れている内容は、以下の構成を前提としています。 ブローカータイプ:標準ブローカー(kafka.m5.large など) クラスタタイプ:プロビジョンドクラスタ メタデータ管理:Apache Zookeeper モード AWS公式ドキュメントに記載されている下記の内容を参考にしています。 Amazon S3 シンクコネクタを設定する 準備 今回、下記リソースは準備済みとして進めます。 MSK クラスター(プロビジョンド、Zookeeper モード) S3 バケット(クラスターと同リージョン) MSK Connect 用 IAM ロール まず、S3 Sink Connectorを用意するためドキュメントに記載されているリンクから、Confluent S3 Sink Connector を入手します。 一番左の Self-Hosted をダウンロードし、ZIP ファイルを取得します。 次に、MSK クラスターをデプロイした同じリージョンの S3 上に ZIP ファイルを配置しておきます。 ここまでで事前準備は完了です。 カスタムプラグインの作成 カスタムプラグインは、MSK Connect を通じて利用する外部 Kafka Connector の ZIP アーカイブです。S3 Sink Connector の Kotlin/Java クラスを含むバイナリパッケージを事前に登録します。 先ほど配置した ZIP ファイルの S3 パスを取得し、カスタムプラグインの作成画面で指定します。 カスタムプラグイン名を任意で設定し、「カスタムプラグインを作成」をクリックします。 作成が完了すると、ステータスがアクティブになります。 次に、このプラグインを用いてコネクターを作成します。 コネクターの作成 MSK Connector は、MSK Connect における Kafka Connect ジョブの単位です。既存のカスタムプラグインを指定して、接続先(MSK クラスター)・タスク数・出力先(S3)等を設定します。 コネクタ作成画面で、先ほど作成したカスタムプラグインを指定します。 画面に従い、設定を進めます。 任意のコネクター名 バックアップする対象となるMSKクラスター コネクタ設定は、 公式ドキュメント を参考に設定します。今回は下記の通り設定しました。 補足として、1つの MSK Connector で複数 Topic を S3 へバックアップするために `topics.regex` を利用しています。 このパラメータを利用することで、`topic` から始まるトピック名のみを対象とすることができます。 {   "connector.class": "io.confluent.connect.s3.S3SinkConnector",   "flush.size": "1",   "format.class": "io.confluent.connect.s3.format.json.JsonFormat",   "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",   "s3.bucket.name": "msk-connector-demo-s3bucket",   "s3.region": "us-east-1",   "schema.compatibility": "NONE",   "storage.class": "io.confluent.connect.s3.storage.S3Storage",   "tasks.max": "1",   "topics.regex": "topic.*" } (補足) プロパティについて 今回は同アカウントのS3に出力するため、下記の通り指定しています。 s3.bucket.name: 出力先バケット名 s3.region: 出力先バケットのリージョン また、公式ドキュメントの例だと、単一トピックを指定していますが、これだと「バックアップ対象のトピック」とConnectorが同じ数必要となります。 Connectorの数が増えると、下記のような問題につながります。 Connectorの利用料金が増える Connectorの数によりサブネットのIP数が足りなくなり、MSK Clusterのスケールアウトができなくなる等の影響がある 今回は1つのS3 Sink Connectorで複数TopicのデータをS3へ出力するため、「topics.regex」を利用しています。 残りの設定を行います。 。 最後に、IAMロールを指定します。 ここで重要なのが、警告にある通り「AWSServiceRoleforKafkaConnect」ロールを指定できないことです。 必ず、別途IAMロールを用意しましょう。 公式ドキュメント に必要なポリシーがまとめられています。 はまりやすいのが、今回のケースで利用できるMSK Connect用の管理ポリシーは現時点で存在しません。MSKFullAccessという管理ポリシーはありますが、MSK ConnectのIAMロールとして使うには権限が足りていません。 もし権限が不足している場合は、MSK Connectの作成途中に下記エラーが発生します。 コード: KafkaConnect.BrokerAuthenticationFailure メッセージ: MSK Connect was unable to connect to the Kafka Broker due to authentication issues. Please verify the authentication needed to connect to the Kafka broker and the related permissions and retry the operation. こうなったときは一度Connectorを削除し、再作成する必要があります。 IAMロールを指定し、「次へ」をクリックします。 セキュリティの画面はそのまま「次へ」とし、ログは必要に応じて設定します。 MSK Connectの作成が失敗した場合などのトラブルシューティングに役立つため、ログは有効化することをお勧めします。 各設定を確認し、「コネクタを作成」をクリックします。 しばらく時間がかかりますが、作成が完了するとステータスが「実行中」になります。 ここまでで Connector の作成は完了です。次はバックアップ対象として扱う Topic を用意し、正しく S3 へ出力されるかを確認します。 トピック作成 S3 に Topic 内のデータが出力されることを確認するため、まずいくつか Topic を作成します。今回は下記トピックを作成してみました。 demo-topic topic-demo-dev01 topic-demo-stg01 Connector作成時の設定により、「topic」から始まるトピックのみバックアップされることを確認します。 EC2などに配置した Kafka CLI で、トピックへメッセージを格納します。いったん、現在の Topic を一覧化します。 ./bin/kafka-topics.sh --list --bootstrap-server <bootstrap-server> --command-config config/client.properties __amazon_msk_canary __amazon_msk_connect_configs_S3SinkConnector_4ab1ac36-1695-4b03-9e22-dcf79cee7218-2 __amazon_msk_connect_configs_S3SinkConnector_63bdb115-000a-4432-8668-b8c70bba0b34-2 __amazon_msk_connect_configs_S3SinkConnector_a6910ee0-12fc-4c43-ad89-784a9a5136ca-2 __amazon_msk_connect_configs_S3SinkConnector_b293673e-999e-4f0e-a616-e8eaed5206c2-2 __amazon_msk_connect_configs_S3SinkConnector_fb60d2ef-8ae7-4621-ba7d-1b891d1602dd-2 __amazon_msk_connect_offsets_S3SinkConnector_4ab1ac36-1695-4b03-9e22-dcf79cee7218-2 __amazon_msk_connect_offsets_S3SinkConnector_63bdb115-000a-4432-8668-b8c70bba0b34-2 __amazon_msk_connect_offsets_S3SinkConnector_a6910ee0-12fc-4c43-ad89-784a9a5136ca-2 __amazon_msk_connect_offsets_S3SinkConnector_b293673e-999e-4f0e-a616-e8eaed5206c2-2 __amazon_msk_connect_offsets_S3SinkConnector_fb60d2ef-8ae7-4621-ba7d-1b891d1602dd-2 __amazon_msk_connect_status_S3SinkConnector_4ab1ac36-1695-4b03-9e22-dcf79cee7218-2 __amazon_msk_connect_status_S3SinkConnector_63bdb115-000a-4432-8668-b8c70bba0b34-2 __amazon_msk_connect_status_S3SinkConnector_a6910ee0-12fc-4c43-ad89-784a9a5136ca-2 __amazon_msk_connect_status_S3SinkConnector_b293673e-999e-4f0e-a616-e8eaed5206c2-2 __amazon_msk_connect_status_S3SinkConnector_fb60d2ef-8ae7-4621-ba7d-1b891d1602dd-2 __consumer_offsets demo-topic topic-demo-dev01 topic-demo-stg01 今回手動で作成したTopic以外にいくつか存在していますが、こちらはMSK内部で自動作成されるものです。 メッセージ送信と検証 kafka-console-producer を利用して、適当なメッセージを送ります。 topic-demo-dev01 にメッセージ送信 ./bin/kafka-console-producer.sh --bootstrap-server <bootstrap-server> --topic topic-demo-dev01 --producer.config config/client.properties message1 message2 message3 topic-demo-stg01 にメッセージ送信 ./bin/kafka-console-producer.sh --bootstrap-server <bootstrap-server> --topic topic-demo-stg01 --producer.config config/client.properties message1 message2 message3 demo-topic にメッセージ送信(対象外で確認) ./bin/kafka-console-producer.sh --bootstrap-server <bootstrap-server> --topic demo-topic --producer.config config/client.properties message1 message2 message3 次に、S3を確認します。 トピック名のオブジェクトが作成されています。 配下に移動すると、JSONオブジェクトとして出力されています。 このように、S3へTopic内のデータを出力することができました。 ちなみにオブジェクト名は、トピック名+パーティション数+オフセット番号から構成されています。 また、ここでわかるように「topic」から始まるトピックのみが S3 へ出力されています。demo-topic に関しては出力されていません。 まとめ MSK Connect の S3 Sink Connector を活用し、複数Topic のバックアップをする方法をご紹介しました。 MSKクラスターのデータをバックアップする手段はいくつかありますが、今回はS3 Sink Connectorを使っています。MSK Connector自体はクラスター同様、MSK ConnectorはMSKクラスターが配置されているサブネット上に作成されます。 サブネットのIPアドレスに空きが十分でないと、クラスターのスケールアウトなどにも影響があります。 MSKクラスターの設計と合わせて検討することをおすすめします。
アバター
Amazon MSKクラスターのKafkaに関する構成は「クラスター構成」として管理されます。   クラスター構成の設定変更には2段階の適用が必要となるため、その手順を理解しておくことが重要です。 前提条件 本記事で触れている内容は、以下の構成を前提としています。 ブローカータイプ:標準ブローカー(kafka.m5.large など) クラスタタイプ:プロビジョンドクラスタ メタデータ管理:Apache Zookeeper モード あくまで参考資料となります。環境によって最適解は変わるため、必要に応じて検討してください。 MSKクラスター構成 MSKクラスター構成には下記のようなプロパティを設定することができます。 default.replication.factor min.insync.replicas auto.create.topics.enable デフォルトで設定されるプロパティのほかに、利用者側でカスタムすることが可能です。プロパティの一覧は 公式ドキュメント をご覧ください。 変更手順 変更する手順は大きく下記のような流れとなります。 `AWS::MSK::Configuration`の `ServerProperties` を編集して新リビジョンを作成 `AWS::MSK::Cluster` の `ConfigurationInfo` を最新リビジョンに切り替え クラスタが `ACTIVE` になるまで待機し、設定反映を確認 1段階目:`MSKClusterConfig`を変更 1. `AWS::MSK::Configuration`リソースにある対象のKafka設定(`server.properties`)を確認 2. 任意のプロパティを追加もしくは、既存のプロパティを変更 3. CloudFormationスタックを更新すると、`AWS::MSK::Configuration`の新しいリビジョンが作成される。(もし最新のリビジョンが1の場合、スタック更新後に2が作成される) 変更前のClusterConfigを下記例として、説明します。 MSKClusterConfig:   Type: AWS::MSK::Configuration   Properties:     Name: msk-demo-config     ServerProperties: |       default.replication.factor=3       min.insync.replicas=2       auto.create.topics.enable=false     log.retention.hours=168 コンソール上で確認すると、それぞれ下記の通りです。 ・MSK Configuration 変更前のリビジョン ・MSK Configuration の現行設定 ・クラスターに適用されている現行設定(現行リビジョン) 今回は、下記の通り変更してみます。 MSKClusterConfig:   Type: AWS::MSK::Configuration   Properties:     Name: msk-demo-config     ServerProperties: |       default.replication.factor=3       min.insync.replicas=2       auto.create.topics.enable=true       log.retention.hours=72 1. 変更セット作成 2. MSK Configuration 変更後のリビジョン 変更セットによる変更が完了すると、上記の通りリビジョンがインクリメントされて最新が追加されます。 3. 変更後のリビジョン確認画面 インクリメントされた最新リビジョンを確認すると、変更したパラメータが適用されていることがわかります。 ポイント 更新時は常に新しいリビジョンが作成される リビジョン番号は自動的にインクリメントされるため、1つ前のリビジョンは保持される(ロールバック時に有用) 2段階目:クラスターに新リビジョンを適用 1. `AWS::MSK::Cluster` の `ConfigurationInfo` に、作成済みの `AWS::MSK::Configuration` リソースの最新リビジョンを指定 2. CloudFormationスタックを更新 3. クラスタの `Cluster Operation` タブで `UPDATE` 完了を確認 MSKCluster: Type: 'AWS::MSK::Cluster' DependsOn: MSKClusterConfig Properties: BrokerNodeGroupInfo: NumberOfBrokerNodes: 3 InstanceType: kafka.t3.small ClientAuthentication: ConfigurationInfo: Arn: !GetAtt MSKClusterConfig.Arn Revision: 2 1. 変更セット作成(クラスタリビジョンを最新に指定) 2. 変更セットの内容確認 リビジョンを最新のバージョンに設定し。変更セットを作成します。 3. 変更セット実行 変更セットが実行されると、MSKクラスターの「クラスターオペレーション」タブにて更新が開始されていることが確認できます。 更新が完了するまで、一定時間待機します。 4. クラスターオペレーションの完了 ブローカーやトピックによって所要時間は前後しますが、一定時間が経過するとクラスターオペレーションが完了します。 この時点で最新のリビジョンが適用されました。 5. 変更反映後のクラスタ状態 クラスターの詳細を見ると、リビジョンが変更されていることが確認できます。これで作業は完了となります。 押さえるポイント `AWS::MSK::Configuration`更新後も、クラスタ側の `ConfigurationInfo` を切り替えるまで新設定は適用されない ロールバックに備え、古いリビジョンは保持されていることを確認しておく `ConfigurationRevision` がインクリメントされた後、明示的にMSKクラスターが参照するリビジョンを切り替えないと反映されません。 まとめ MSKクラスタ設定の変更は、CloudFormation上で「設定リビジョン作成」と「クラスタ設定切替」の2段階処理になることを前提に運用します。もし最新のリビジョンで問題が発生した場合、古いリビジョンへ切り替えることも可能です。
アバター
Amazon MSKをCloudFormationで管理する場合、スタック更新時の制約や依存関係を把握しておくことが重要です。特にMSKリソース(`AWS::MSK::Cluster`など)は、複数のプロパティを一度に変更できない場合があります。 本書では、CloudFormationを使ったMSK運用で遭遇しやすいケースとエラーについて触れます。 前提条件 本記事で触れている内容は、以下の構成を前提としています。 ブローカータイプ:標準ブローカー(kafka.m5.large など) クラスタタイプ:プロビジョンドクラスタ メタデータ管理:Apache Zookeeper モード あくまで参考資料となります。環境によって最適解は変わるため、必要に応じて検討してください。 CloudFormationでのリソース変更 CloudFormationは、テンプレート(コード)を更新すると、差分検出に基づいて必要なリソース更新を自動実行します。一般的なAWSリソースでは、依存関係がなければ複数フィールドをまとめて変更しても問題なく適用されます。 一方で、MSK `AWS::MSK::Cluster`では、同時に変更できないパラメータが存在します。1回のスタック更新で複数項目をまとめて書いても、MSK側で順序依存や不可一致が発生して失敗するケースがあります。 そのため、運用上は1度に全項目を更新せず、複数回に分割した変更(段階的適用)が必要となることがあります。 (補足)マネジメントコンソールでの変更 マネジメントコンソール上でももちろん変更できますが、下記のようなメニューとなります。 それぞれ構成を変更すると、MSKクラスター側に変更が完了するまで他の構成は変更できません。 複数の設定を同時に変更する 例えば次のような変更を同時に行おうとすると、クラスター更新が失敗するケースがあります。 `BrokerNodeGroupInfo` の `InstanceType` 変更 (ブローカーのスケールアップ) `NumberOfBrokerNodes` の増減 (ブローカーのスケールアウト) `EBSVolumeInfo` の `VolumeSize` 変更 (ストレージサイズの変更) 変更セット適用時のエラーと原因 CloudFormationの変更セット実行時に、以下のようなエラーメッセージが発生することがあります。 Error: You can’t update multiple attributes of the cluster in same request. Use a different request for each update. このエラーは、MSKクラスターの同じリクエスト内に複数の変更を含めた場合に発生します。この場合、プロパティを分割して複数回の変更セット作成/適用する運用が必要です。 今回の例以外にもクラスタ更新が失敗するケースは多数存在します。必ず事前に検証環境で検証を実施してください。 また、複数の変更を1つずつ実施すると、想定以上の時間がかかることがあります。特にブローカー再起動が伴う場合は、ダウンタイムやパフォーマンス影響を見積もり、変更内容に応じたスケジュールを確保してください。 運用フロー例 ここまでの内容を踏まえて、変更したい内容次第ではありますが下記のような運用フローとなります。 変更は極力1つの要素ずつ行う 例: まず`KafkaVersion`を更新して安定確認してから、次に`NumberOfBrokerNodes`を変更する 変更計画とスケジュールを明示する 複数回に分ける必要がある場合、各変更に必要な時間・リードタイム(検証、変更セット作成、適用、監視)を見積もる ブローカー再起動が伴う変更は、業務影響が少ない時間帯に実施する 検証/テスト環境で事前に再現確認 本番環境で行う前に、同様の変更をステージング環境で実施して失敗パターンを確認 クラスタ設定変更は段階的に公開 設定変更後は時間を置いて安定性を確認し、問題なければ次の変更を適用 まとめ ご紹介した通り、MSKのCloudFormation運用では、複数プロパティを同時に変更するとスタック更新ができない場合があります。 分割して段階的に変更するなどを検討しつつ、ログやステータスを確認しながら進めることが推奨されます。
アバター
こんにちは。 今回は、Amazon MSKにおけるスケールアップとスケールアウトについてフォーカスします。   Amazon MSK はフルマネージド Kafka であり、クラスターを停止せずに スケールアップ(ブローカー性能の向上)やスケールアウト(ブローカー台数の増加)を行える点が魅力です。   この記事では、MSK のスケールアップ/スケールアウトの基礎や注意点の一部をまとめています。 前提条件 本記事で触れている内容は、以下の構成を前提としています。 ブローカータイプ:標準ブローカー(kafka.m5.large など) クラスタタイプ:プロビジョンドクラスタ メタデータ管理:Apache Zookeeper モード Serverless や Express ブローカーなどはユースケースに応じて検討してください。 あくまで参考資料となります。環境によって最適解は変わるため、必要に応じて検討してください。 スケールアップとスケールアウトの違い Amazon MSKのスケーリングには 2 つの方向があります。 スケールアップ(ブローカー性能を上げる方法) ブローカーの vCPU / メモリ / ストレージ IOPSを大きくし、既存のクラスタ構成のまま性能を引き上げるアプローチです。 ブローカー数は変化なし 性能が向上し、スループットやメタデータ処理が改善   パーティション再配置などのデータ移動が不要   ローリングアップデートのためクラスタの可用性を維持しやすい 接続文字列(bootstrap.servers)が変わらないため、クライアント側に影響が少ない AWS の 公式ドキュメント でも、Kafka の特性と MSK の運用観点から、以下の理由で スケールアウトよりスケールアップをまず検討することが推奨されています。 パーティション再割り当てが不要 ローリングリスタートのためクラスターのI/O停止なし(高可用性の構成になっていることが前提) クライアント側の設定変更が不要なため、全体的な運用コストが低い 特に、ブローカー追加後にパーティション再割り当てをトピックごとに行う必要があります。トピック数が多い場合、運用作業の負荷が高くなります。 スケールアウト(ブローカー台数を増やす方法)と注意点 ブローカー数を増やしてクラスタ容量やパーティション分散を強化する方法です。 ただし、運用上の注意点が多くスケールアップより負荷が高い操作です。 サブネットの IP アドレスを消費する MSK のブローカーは サブネット内に EC2 と同様に配置されるため、スケールアウトに応じてサブネットのIPアドレスを使用します。 また、ブローカーの追加はAZごとになるため、下記の通り「3AZを利用するMSKクラスター」において「1つのブローカー」を追加すると合計で6個のブローカーとなります。 試しにMSKクラスターのスケールアウトを実施してみます。3AZを利用するMSKクラスターの場合、下記のENIが作成されています。Apache Zookeeperノードとブローカーの合計で6個となります。 次に、1ブローカーをAZごとに追加した場合です。 下記の通り、9個となっています。ブローカー数が3つ追加されているためです。 このように、ENIが増えていくためサブネットのIPアドレスの空きには十分注意してください。 スケールアウト時はクライアント設定も更新が必要 追加されたブローカーを活用するには、  bootstrap.servers に “全ブローカー” を含める必要があります。 bootstrap.servers=b-1.example:9092,b-2.example:9092,b-3.example:9092,b-4.example:9092 更新しないと… 新ブローカーが障害時のフェイルオーバー先にならない   リーダーが移動しても接続先が偏る   クラスタ拡張が性能向上につながらない   スケールアウトしたら、必ず全クライアント(Producer / Consumer)の設定を更新しましょう。スケールアップでは接続文字列が変わらないため、この作業は不要です。 スケールアウトの料金インパクト スケールアウトは ブローカー数に比例して料金が増加します。3AZ 構成では 3の倍数での増加となるため注意が必要です。 参考例として、「m5.xlarge の MSK クラスターをスケールアウト」した場合の料金です。 構成 計算式 月額料金 現在(3ブローカー) 3 × $0.543/時間 × 730時間 $1,189.17 スケールアウト後(6ブローカー) 6 × $0.543/時間 × 730時間 $2,378.34 ※東京リージョンを利用した場合。またストレージサイズなど他のプロパティでも変動有。 3AZ 構成ではブローカーを最小 3 個単位で追加する必要があるため、スケールアウト時はコスト面の検討も必ず実施してください。 参考資料 公式にてスケールアップとスケールアウトについてまとめられた記事が公開されています。  より詳細なユースケースに応じた選択を下記から確認ください。 Best practices for right-sizing your Apache Kafka clusters to optimize performance and cost
アバター
こんにちは。 AWSでマネージドなKafkaを扱うとき、Amazon MSK(Managed Streaming for Apache Kafka)は選択肢の一つです。 この記事では、構築・運用の中で気づいた押さえておきたいポイントの一部を紹介します。 前提条件 本記事で触れている内容は、以下の構成を前提としています。 ブローカータイプ:標準ブローカー(kafka.m5.large など) クラスタタイプ:プロビジョンドクラスタ メタデータ管理:Apache Zookeeper モード Serverlessや Express ブローカーなどはユースケースに応じて検討してください。また、あくまで参考資料となります。環境によって最適解は変わるため、必要に応じて検討してください。 可用性のベストプラクティス Amazon MSK(Managed Streaming for Apache Kafka)で高可用性を実現する際の基本は レプリケーションファクタ(RF) と min.insync.replicas(ISR) の設定です。 基本の考え方 MSKにおいても、複数 AZ へ跨がって配置するのがベストプラクティスとなります。   MSKはブローカーと呼ばれるノードの数やスペックを指定し、クラスターを作成します。   そのクラスター構築では、下記を設定するのが一般的です。 設定項目 推奨値 レプリケーションファクタ(default.replication.factor) AZ 数と同じ値(3AZ の場合は 3 min.insync.replicas RF – 1(3AZ の場合は 2) この設定により、以下のような障害パターンに対応できる構成になります。 ブローカー単体のダウン(AZ 障害) メンテナンスによるローリングアップデート ネットワーク一時障害によるブローカー切断  デフォルト設定の活用 この設定はAmazon MSKの デフォルト設定 を選択すると自動で適用されます。カスタム設定を利用する場合は、明示的に設定しましょう。  MSK コンフィグ画面(例) 3AZに分散したクラスター上で作成したトピック詳細 Topic: MSKTutorialTopic TopicId: 8TdcHHTZRgaoF9JmsPS1yg PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2,message.format.version=3.0-IV1,unclean.leader.election.enable=true コスト面とサブネットへの注意   可用性を向上させるためには重要な設定ですが、下記も考慮しておく必要があります。 ブローカー数が増えるほど MSK の利用料金も線形に上昇する ブローカー数が増えると、サブネットの空き IP アドレスを消費する クライアント接続文字列には “全ブローカー” を含める 複数 AZ に跨ってクラスターを構築したら、それを正しく活かすために重要なのが クライアント側の接続設定です。 ベストプラクティスとしては、各 AZ のブローカーを最低1つ以上、bootstrap.servers に含めることを推奨します。全ブローカーを列挙しておくと、フェイルオーバーの信頼性が高まります。 bootstrap.servers=b-1.example:xxxx,b-2.example:xxxx,b-3.example:xxxx この設定の利点: どれか 1 台のブローカーが停止しても、他のブローカーへ自動フェイルオーバーできる AZ 障害発生時でも、残存するブローカーに切り替えられる 接続先の取得方法  MSKを構築したら、AWS コンソールから接続先をコピーすることで、全ての AZ(ブローカー)を含めることができます。 ブートストラップサーバー(接続先)の取得例 スケールアウト時に見落としやすい “AZ 数 × ブローカー数”  MSK では、ブローカーは AZ に均等配置される仕様になっています。  そのため、スケールアウトすると以下のように AZ 数の倍数で増えていきます。 AZ 構成 ブローカー数の遷移 3AZ 3台 → 6台 → 9台 → … 2AZ 2台 → 4台 → 6台 → … スケールアウトによるブローカーの追加 「ちょっと性能上げたいから ブローカーを 1 台だけ増やそう」ということはできません。 「追加するブローカー数」を1に設定すると、利用している全てのAZに1つずつ追加されます。 図:ブローカー追加時の AZ ごとの挙動イメージ この仕様が影響するポイントは次の通りです。 影響項目 詳細 コスト上昇 ブローカー数が想定以上に増えて、利用料金が大きく増大する IP 枯渇 サブネットの IP 消費量が AZ × ブローカー数で増えていき、デプロイ不可能に パーティション再配置 増やしたブローカーに合わせてパーティション再配置が必要になる コスト例(東京リージョン、m5.xlarge): ブローカー数 月額コスト 3台 1,189.17 USD 6台 2,378.34 USD ポイント  “IP 枯渇” は後戻りが難しい部分なので、MSKで使用するサブネットを小さく切りすぎない設計が重要です。 ネットワークインターフェースの画面から、各ブローカーの状況を確認できます。(Apache ZooKeeperノードも含まれる) 図:ENI(ネットワークインターフェース)状況の例 ブローカーのスケールアウトとスケールアップ及びパーティション再配置については、別の機会に改めて触れる予定です。 MSK のモニタリングレベル MSK では、クラスタの状態を監視するために モニタリングレベルが 4 種類用意されています。   どのレベルを選ぶかによって取得できるメトリクスの粒度やコストが変わるため、運用目的に応じて選択することが大切です。 MSK のモニタリングレベル 4 種類 レベル 特徴 コスト DEFAULT クラスター・コンシューマー・プロデューサーの基本的な性能を把握できる 無料 PER_BROKER ブローカー単位でより詳細なメトリクスを取得できる 別途料金が必要 PER_TOPIC_PER_BROKER ブローカー及びトピック単位でのメトリクスを取得可能 別途料金が必要 PER_TOPIC_PER_PARTITION ブローカーごと、トピックごと、パーティションレベルで詳細なメトリクスを取得可能。 別途料金が必要 4つのレベルからモニタリングレベルを選択できますが、DEFAULT レベルでも運用に必要な主要指標はほぼ揃っています。 監視の方針次第ではありますが、利用料金に影響するため、まずは DEFAULT で要件を満たせているか確認することをおすすめします。   ブローカータイプは“性能”+“パーティション上限”で選ぶ MSK のブローカータイプは単純に性能だけで選ぶのではなく、  ブローカーあたりのパーティション数の推奨値や、更新オペレーションをサポートするパーティション上限 も考慮する必要があります。 特に、大量のトピック(数百~数千)を作成する場合は、必ず考慮に入れましょう。 パーティション数の目安 MSKではブローカーサイズに応じて推奨のパーティション数が決まっています。詳しくは 公式ドキュメント をご確認ください。 ブローカーサイズ 推奨パーティション数 kafka.m5.large 1000 kafka.m5.2xlarge 2000 大量のトピック(数百~数千)を作成する場合、ブローカーあたりのパーティション数が増えるため、高スペックのブローカーを選ぶ必要があることを忘れないようにしてください。 例えば、下記のようなクラスターを用意しトピックを作成したとします。 MSKクラスターを 1 台用意 環境ごと(dev / stg / prd)に同じトピックを複製 トピック例(XXXは連番) SampleTopicXXX-dev SampleTopicXXX-stg 最終的にクラスターで 数百~千単位のトピックが生成され、これに伴ってブローカーのスペックが十分でないと CPU やメモリが不足する場合があります。 ブローカーサイズごとにパーティション上限が決まっている インスタンスタイプの性能のみに着目していると、気づきにくいポイントですが、運用時に困ることになります。   上限を超過した場合、 更新オペレーションができなくなる点 に十分注意すべきです。 この点を考慮せずにリリースした場合、MSKクラスターに対する様々な更新ができないという状態になってしまいます。 パーティション上限 MSK には ブローカーごとのパーティション上限があり、トピック作成やパーティション追加、ブローカー設定変更などの更新オペレーションを安全に行うための制約です。 例:kafka.t3.small ブローカーあたり300パーティションが上限 上限を超えてクラスターの負荷が高い、パーティション追加やクラスター設定変更時に エラーが発生 例えば、パーティション数が1のトピックを300個超作成した場合を考えてみます。 このような状態で MSK クラスターの設定変更を実施しようとすると、下記のようなエラーが発生します。 The number of partitions per broker is above the recommended limit. Add more brokers and rearrange the partitions per broker to be below the recommended limit, then retry the request このように、ブローカー当たりのパーティション数が上限を超えると、設定変更などのオペレーションができなくなります。 上限に余裕を持たせてパーティションを設計し、定期的にメトリクスを監視することが重要です。 スケールアウトに耐えるサブネット MSK クラスターを作成する際、サブネット設計は意外と見落とされがちなポイントです。   スケールアウトのたびに IP を消費する MSK のブローカーは EC2 と同じで、サブネットの IP を消費します。   スケールアウトを何度か行うと、IP が枯渇してデプロイできないケースがあります。 ポイント MSK クラスター作成後、サブネットを変更することはできません。 そのため、最初に十分な IP アドレスの余裕を持たせてサブネットを設計することが重要です。 例えば、ブローカーを1つ追加した際、各 AZ に 1 つずつ追加されるため、ネットワークインターフェースの数が急増します。Apache ZooKeeperノードも含めて管理する必要があります。 データ移行 MSK はクラスター内に作成したトピック内にデータを保持する仕組みのため、別クラスターを作成した場合はデータを移行する手間が発生します。 設計時のコツ サブネット設計時には 「将来的に別クラスターを作らずに済むための IP アドレス余裕」を確保することを推奨します。 またMSK Connect を使う場合、ワーカーが 自動的に同じサブネットにデプロイされます。 MSK Connector の構築画面(例) MSK Connect 構築時のENI ワーカーもブローカーと同じサブネットにデプロイされるため、IP アドレスの消費が増加します。 MSK クラスター単体でも十分に IP を消費しますが、Connect を追加するとさらに圧迫される可能性があります。 ストレージ:増やせるけど減らせない ブローカーのストレージは後から増やすことはできますが、減らすことはできません。 MSK の EBS ストレージは”スケールアップのみ” 現在設定されているストレージが初期値かつ最小値になっています。 コスト面も考慮し、初期設定は慎重に見積もりましょう。またMSK では retention 設定がない限り、無期限でデータを保存します。 無制限のままだと、いつの間にかディスクスペースがなくなっていた、という状況に陥ります。必ず適切な保持期間を設定しましょう。 IAM 認証で“運用しやすい”アクセス制御にする MSK は IAM を使った認証(IAM Access Control)をサポートしており、AWS の IAM ポリシーと同じ考え方で権限管理ができるのが大きなメリットです。 トピック単位でのアクセス拒否など、細かい制御が可能 IAM 認証では、IAM ポリシーにより柔軟なアクセス権限を表現できます。 アクセス制御の例 あるアプリには「特定トピックの読み取り・書き込みだけ許可」 別のアプリには「特定トピックへの読み取りだけ許可」 管理用ユーザーには「全トピック管理操作を許可」 IAMポリシー例 末尾が「-dev」のトピックへの操作のみを許可するポリシーです。 { "Sid": "VisualEditor1", "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData", "kafka-cluster:WriteData" ], "Resource": "arn:aws:kafka:*:123456789000:topic/*/*/*-dev" } 「-dev」トピックへのproducer実行例(成功) sh-5.2$ ./bin/kafka-console-producer.sh --bootstrap-server <bootstrap-server>--topic SampleTopic001-dev --producer.config ./config/client.properties >message1 「-stg」トピックへのproducer実行例(権限エラー) sh-5.2$ ./bin/kafka-console-producer.sh --bootstrap-server <bootstrap-server>--topic SampleTopic001-stg --producer.config ./config/client.properties >message1 [2025-11-28 09:15:09,879] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5215 : {SampleTopic001-stg=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient) [2025-11-28 09:15:09,885] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [SampleTopic001-stg] (org.apache.kafka.clients.Metadata) [2025-11-28 09:15:09,887] ERROR Error when sending message to topic SampleTopic001-stg with key: null, value: 8 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [SampleTopic001-stg] 末尾が「-stg」のトピックへ操作する権限はないため、権限エラーが発生します。 このように、IAMポリシーを利用することで、トピックレベルでの権限制御も可能です。 Kafka バージョンアップは“短いサイクル”を前提に Amazon MSK で利用する Kafka バージョンは、他のサービスと同様に常に一定の更新サイクルを意識しておく必要があります。 MSK のサポート終了 利用するバージョンにもよりますが、おおよそ 2 年ほどでサポートが終了します。  詳細はAWS公式ドキュメントをご確認ください。   サポート対象の Apache Kafka バージョン サポート終了後も同バージョンを使い続けることはできず、AWS によって自動的にバージョンアップが行われます。そのため、サポート期限を把握し、事前にバージョンアップの影響を確認しておくことが重要です。 互換性の確認 Kafka のバージョンを上げると、プロデューサー/コンシューマーのクライアントにも影響します。 ライブラリの互換性 API の変更 クライアントの再接続の挙動 新機能による設定パラメータの追加 これらは事前に検証しておきましょう。アプリ側のクライアントが古いバージョンのままで、MSK だけ先にアップグレードしてしまい、動作不具合が発生してしまう恐れがあります。 MSK のバージョンアップはローリング方式 MSK のメジャーアップデートはブローカー単位でローリング方式により実施されます。 クラスターを複数 AZ に配置しておけば、アップデート中も処理を継続できます。 クライアント側も、全てのブローカーを接続先に含めておくことで、切替時の影響を最小化できます。 bootstrap.servers=b-1.example:xxxx,b-2.example:xxxx,b-3.example:xxxx まとめ Amazon MSK は高可用性と優れた管理機能を備えたマネージドサービスですが、設計段階での決定が運用に大きく影響します。 本記事で紹介した内容は、Amazon MSK設計時に注意すべき項目の一部をまとめたものです。MSK のベストプラクティスはこれだけではなく、チーム規模やユースケースに応じて、さらに細かい調整が必要になる場合があります。あくまで参考資料として、自組織の要件に合わせて検討してください。 最後に、プロビジョンドクラスターは一時停止できません。ブローカータイプによっては利用料金が高額になる場合もあります。検証目的で構築し、不要になった場合は必ずクラスターを削除することを忘れないようにしましょう。 本記事が皆さんの MSK 構築・運用の一助となれば幸いです。
アバター
こんにちは。SCSKの石田です。 本記事より、次世代APIプラットフォームとして世界中で注目を集めている「Kong API Gateway」についてのブログを開始したいと思います。初めてブログを投稿するため、至らない点もありますがご容赦ください。 昨今のエンタープライズシステムにおいて、クラウドネイティブ化やマイクロサービス化が進む中、システム同士をつなぐ「API」の数は爆発的に増加しています。第1回目となる今回は、なぜ今エンタープライズ企業においてAPIマネジメントが重要視されているのか、そして「Kong API Gateway」がどのようにその課題を解決するのか、概要を説明します。   爆発的に増加するAPIトラフィックと新たな課題 エンタープライズにおけるAPIマネジメントの重要性を語る上で外せないのが、APIトラフィックの圧倒的な増加です。 近年の複数のグローバル調査データ( ※1 )によると、現在の Webトラフィック全体の約70%以上がAPI経由の通信 であると報告されています。さらに特筆すべきは、AI技術の普及に伴う変化です。Postman社の「2025 State of the API Report」等によれば、 AI主導のAPI呼び出し(マシン間通信)が前年比で40%以上も急増 しており、APIは単なるアプリケーションの連携口から「AIエージェントの実行レイヤー」へと進化しつつあります。 ※1 参考: Postman「2025 State of the API Report」 、 The State of API Security in 2024 | Resource Library 等の各調査レポートより このように、人間が操作する端末からの通信だけでなく、システムやAIによる自動化された大量のリクエストが飛び交う中、各システム(社内システム、SaaS、パブリッククラウド上のサービスなど)が個別にAPIを公開・管理したままでは、以下のような課題に直面します。 セキュリティのガバナンス低下: 認証・認可の仕組み(OIDCやmTLSなど)が各システムでバラバラになり、脆弱性の温床になる。 トラフィック制御の複雑化: AI等によるリクエストの急増や攻撃から、バックエンドシステムを保護する仕組みが統合されていない。 運用負荷の増大: どのAPIが、誰に(どのシステムに)、どれくらい利用されているのかを一元的に把握できない。 これらの課題を解決し、増え続けるすべてのAPIトラフィックを安全かつ効率的に統合管理する仕組みこそが「APIマネジメント」であり、その入り口となるのがAPIゲートウェイです。   Kong API Gatewayとは?その特徴と強み Kong API Gatewayは、世界で最も利用されているオープンソースベースのAPIゲートウェイの一つです。エンタープライズ環境でKongが選ばれるのには、大きく3つの理由があります。 1. 圧倒的なパフォーマンスと軽量さ NGINXをベースに構築された軽量なアーキテクチャにより、極めて低いレイテンシで大量のAPIリクエストを処理できます。第三者評価機関であるGigaOm社のAPIマネジメントベンチマーク調査( ※2 )においても、他の製品と比較して圧倒的な高スループット(1ノードあたり毎秒5万件以上のトランザクション)と、サブミリ秒(1ミリ秒以下)の低レイテンシを記録し、その卓越したパフォーマンスが実証されています。AWSなどのクラウド環境やコンテナ環境との親和性が非常に高く、モダンなインフラ上でも軽快に動作します。 ※2 参考: GigaOm「API and Microservices Management Benchmark」 より 2. 豊富なプラグインエコシステム Kongの最大の魅力は、APIのルーティング機能だけでなく、高度な要件を「プラグイン」として簡単に追加できる点です。例えば、トラフィック制御(レート制限)、高度な認証・認可(OIDC、OAuth2.0、SAML、OPA連携)、ログ転送などを、バックエンドのコードを改修することなくAPIゲートウェイ層で一元的に実装・自動化できます。Kongにて一元的にこれらの機能を集めることで、API開発者の負荷を下げることができます。また、プラグインはノーコード・ローコードで実装できる点も強みです。 3. あらゆる環境に対応する柔軟性 Kongはコンテナとして動作するため、オンプレミス、マルチクラウド、ハイブリッドなど、どこにでもデプロイ可能です。SaaS型の管理基盤である「Kong Konnect」を利用すれば、グローバルに分散したAPIゲートウェイ群を単一のコントロールプレーンで統合管理することも可能です。   SCSKとKongのパートナーシップ 私たちSCSKは、Kong Inc.の公式パートナーとして、商用版Kongにおけるライセンスの提供からシステム構築・導入支援まで、エンタープライズ企業様向けのサービスを展開しています。 多数のAPIが乱立する大規模環境への導入や、既存のレガシーシステムからモダンアーキテクチャへの移行に伴うAPI基盤の刷新など、お客様の課題に合わせた最適な構成をご提案可能です。「自社のAPI管理に課題を感じている」「Kongの導入を検討したい」といったご相談があれば、ぜひお気軽に「kong-sales@scsk.jp」までお問い合わせください。   まとめ・今後の連載について 今回は第1回のため、APIトラフィック増加の背景とAPIマネジメントの重要性、そしてKong API Gatewayの概要についてご紹介しました。APIを安全かつ効率的に公開・管理することが、これからの開発スピードを左右する重要な要素となります。 次回以降は、Kongの基本的なAPIのルーティングや、プラグインの具体的な「やってみた」、さらに今や欠かせない「Kong AI Gateway」の紹介まで、エンジニア目線でより詳細な技術情報をお届けしていく予定です。 次回もぜひご期待ください!
アバター
生成AIの ChatGPT (OpenAI)、 Gemini (Google)、 Claude (Anthropic)などの進化が激しいですが、それぞれのサービスの最新情報を収集し、新たにできることになったことを理解し、複雑なプロンプト(指示文)を使いこなすのに、そろそろ疲れてきていませんか? 生成AI疲れと、実際の一般の利用者視点、実際の業務の効率化視点におけるAI活用のひとつとして、 Genspark をご紹介します。   Genspark(ジェンスパーク)とは Genspark ( ジェンスパーク )は、ChatGPTのような対話型AIと異なる エージェント型AI となり、検索から資料作成までを自動化する次世代型のAIオールインワンワークスペースとうたわれています。2026年3月時点では Genspark AI ワークスペース 3.0 がリリースされています。 ちなみに、Gensparkで「Gensparkとは?」を尋ねると、以下の回答が返ってきます。 Gensparkは「オールインワンのAIワークスペース/AIコパイロット」を掲げ、ブラウザ上(拡張機能のサイドバー)でページ要約・Q&A・タスク自動化までまとめて行えるサービスです。 中核となる”スーパーエージェント(Super Agent)”を中心に「考えて、計画して、実行する」自律型AIとなり、調査・コンテンツ作成・データ分析・電話・メールなどを「1つのプロンプト」で実行します。 30+モデル・150+ツール・700+ MCP連携を組み合わせ、スライド(Slides)、シート(Sheets)、ドキュメント(Docs)、画像(Designer)、デベロッパー(Developer)等の用途別の専門エージェントと協調して動作する仕組みになっています。Genspark AI ワークスペース以前は、Genspark スーパーエージェントと言う名前でした。 直近の2026年3月に新機能の自律型エージェント Genspark Claw もリリースされ、メール送信、カレンダー調整、Slack連絡など、複雑なマルチタスクを指示だけで自動化することも可能となっています。 Gensparkは、「調べる」→「まとめる」→「成果物(資料/文章/表/デザイン)作成」→「レビュー(ファクトチェック)」→「連絡やスケジュール調整」というような一連の皆さんが普段実施している業務をすべてAIで自動に実施することが最大の強みとなり、また複雑なプロンプトではなく、やりたい内容をひとつの指示で投げられるのが特徴です(例:〇×市場の調査して10枚のスライド、PDFを集めて要約して、など)   Genspark 運営企業 MainFunc について Gensparkの運営元は、 MainFunc です。アメリカ カリフォルニア州パロアルトを本拠地とする企業で、創業者はBaidu(百度)出身のEric Jing(エリック・ジン氏)で、Microsoft、Google、Meta、Pinterest などの出身者によって2023年に設立されました。アメリカ以外には、シンガポールと東京にオフィスがあります。 現在、2億7,500万ドルのシリーズB資金調達ラウンドを経て、12.5億ドルの評価を受けています。 “MainFunc”という名前の由来は、コンピュータプログラミングの基本概念である「main function」に由来しており、プログラミングにおける main function(=主機能)はあらゆるソフトウェアアプリケーションの出発点として機能し、すべての操作がここから始まるのと同様に、MainFuncは技術分野において重要なプレーヤーとしての位置づけとなり、革新的なAI駆動製品の発信点となることを目指しています。   オプトアウト(学習拒否)について まず最初に、企業で利用する場合に問題になるのが オプトアウト(学習拒否) です。 Gensparkに限らず、AIチャットボットへ利用者が機密情報や個人情報などを誤って入力してしまった場合でも、入力したデータや検索履歴がAI学習に利用されないようにするのはオプトアウト(学習拒否)ですが、Gensparkでは、オプトアウト設定は、以下の手順で行えます。 左サイドバーの最下部のアイコンより[設定]を選択。 [アカウント]タブの「 AIデータ保持 」のスイッチをOffにします。(デフォルトではOnになっています)   AIチャット、AI画像、AI音楽、AI動画 まず、ChatGPTと同じ対話型AIである AIチャット(チャットボット) をご紹介します。 ここで見ていただいて分かるように、AIチャットを選ぶと、利用する生成AIを自由に選ぶことが可能になっています。 現時点では、 ChatGPT は、GPT-5.4、GPT-5.4 Mini、GPT-5.4 Nano、GPT-5.4 Pro、o3-pro、 Claude は、Sonnet 4.6、Sonnet 4.5、Opus 4.6、Opus 4.5、Haiku 4.5、 Gemini は、2.5 Pro、3 Flash Preview、3.1 Pro Preview、3 Pro Preview、また、イーロンマスクが率いる xAI社 のGrok4 0709が利用することができます。過去には、DeepSeek、Mistralなども含まれていました。 つまり、Gensparkを契約するだけで、ChatGPT、Claude、Gemini、Grokを個別に契約することなく、それぞれを利用することが可能になります。 AIチャットでは、生成AIを選択することが可能ですが、通常問い合わせ(スーパーエージェント)においては、プロンプトで支持されたタスクに最適な生成AIをGenspark側が自動で組み合わせて利用するため、利用者側で生成AIを選ぶ必要はなく、生成AI毎の得意・不得意を理解している必要もありません。また、最新の生成AIがリリースされると自動的にGensparkに組み込まれることになります。 次に、 AI画像生成 も同じです。 見ていただいて分かるように、画像生成するAIを選択することができます。 最近話題の Geminiの Nano Banana Pro 、Nano Banana 2から、ByteDance Seedream v5.0 Lite、Flux 2、Flux 2 Pro、GPT-Image 5.0、Recraft v3、Ideogram v3、Qwen Image 2、Recraft Clarity Upscale、Baria Background Remover、Text Removal から選択が可能です。Baria Background Remover、Text Removal については入力画像が必須となります。 AI音楽生成 も、ElevenLabs Music、MiniMax Music 2.5、Mureka Song、Lyria2 Music Generator、ElevenLabs Sound Effects、CassetteAI、Mureka Instrumentalから選択が可能です。 AI動画(ビデオ)生成 も、 Gemini Veo 3.1 、Gemini Veo 3.1 Ref、Gemini Veo 3.1 First-Last Frame、 Sora 2 、Sora 2 Pro、Gemini Veo 3、 Kling V3 、Kling V3 Motion Control、Kling O3 Image-to-Video、Kling O3 Refelence-to-Video、Seedance v1.5 Pro、Grok Imagine Video、MiniMax Hailuo-2.3 Standard、PixVerse V5、Seedance Pro Fast、Fal Lipssync V2、Wan V2.6、Vidu Q3、Runway、ByteDance Video Upscalerから選択が可能です。 AIチャットと同じですが、AI画像生成、AI音楽生成、AI動画(ビデオ)生成についても、Gensparkを契約するだけで、Nano Bananaを始め様々は専門の生成AIを利用することが可能になります。 ちなみに現在キャンペーンを実施しており、有償契約を行うと2026年末まで AIチャットとAI画像生成は無制限に利用することができます。   実業務での利用シーン AIスライド、AIシート活用 次に、実際に「調べる」→「まとめる」→「成果物(資料/文章/表/デザイン)作成」→「レビュー(ファクトチェック)」→「連絡やスケジュール調整」と言った業務での利用をイメージした利用方法についてご紹介します。 利用シーンとして「オンラインストレージのマーケティング業務」を例としています 。 背景としては、 最近、オンラインストレージのニーズが再び高まってきている からとなります。 オンラインストレージとは、 Dropbox 、 OneDrive 、 Google Drive 、 Box と言ったサービスになります。DropboxやBoxは10年以上前からサービス利用されていますが、最近ニーズが高まっている背景としては、 ①各種システム(サーバ)がクラウドへ移行され、企業内にファイルサーバだけが残っており、ファイルサーバもクラウド移行したい 。 ②単純にWindowsファイルサーバをクラウドへ移行すると、バックアップの設定やソフトウェア(OS含む)のパッチ適用やバージョンアップなど運用管理の負荷が変わらない 。 ③さらに、ランサムウェア対策として容易にデータ復元が可能なファイルサーバのサービスを利用したい 、などが挙げられます。 これまでオンラインストレージは、社外とのデータ共有やファイル受け渡しにしか利用していなかったが、改めて社内ファイルサーバとしての活用を検討する企業が増えています。 SCSKは、2017年に 日本初のDropboxサービスパートナーに認定 (2026年3月時点で当社のみ)されており、単純にDropboxのライセンス販売をするだけではなく、Dropboxの導入支援(初期設定、ID/SSO連携)から、ファイルサーバからのデータ移行、トレーニングを含む活用支援などの様々なサービスをご提供しています。 ということで、 Genspark でオンラインストレージのマーケティング調査を実施 します。 まず、マーケティング調査報告書を Genspark の AIスライド で作成します。 デフォルトで様々なテンプレートが準備されており、自社向けの独自テンプレートで作成することも可能ですが、今回はデフォルトで準備されている「 マーケティング戦略 2025 」を利用し、プロンプト(指示分)は「 オンラインストレージの主要サービスを比較し、国内市場シェア、Dropboxの販売促進を行うためのマーケティング戦略をまとめてください 」としました。 プロンプトをより詳細に記述することで、アウトプットの精度を向上させることが可能ですが、今回は一般の利用者を前提として作りたいアウトプットをひとつの指示で行っています。 先ほどの指示で上記のスライドが作成されました。Genspark の追加の指示サンプル「3ページ目の市場シェアデータを円グラフまたは棒グラフに変換して視覚的にわかりやすくしてください」にあるように細かい調整を行うことも可能です。 また、[ ファクトチェック ]や[ AI編集 ]を行うことができ、[ 高度な編集 ]にて直接文字の修正をすることも可能です。 他の生成AIでは、アウトプットが画像イメージやPDFで出力され、直接アウトプットを編集することができないケースが多いですが、Genspark のAIスライドは、このように直接編集することが可能です。さらにエクスポートを行うことができます。 PDFはもちろんですが、Microsoft Powerpoint(PPT形式)やGoogle Slidesでエクスポートすることが可能です。 現状のAIスライドはきちんとPPTの枠内に収まっていないなどがよくありますが、細かい修正はPPTで実施した方が圧倒的に早いです。 完成したマーケティング報告書は、ここで掲載はしませんが、日本の市場規模と年成長率(CAGR)、2034年の市場規模、各サービスのシェア、主要4サービス比較表(Dropbox、Box、OneDrive、Google Drive)、Dropboxの勝機(勝ち筋)、DropboxのSWOT分析、最後にDropboxの販売戦略についてのアウトプットが行われています。特に、販売戦略については、ターゲット、差別化するメッセージ、重要KPIなども提示されています。 Genspark のインプットとなるデータは、一般に公開されているWebサイトのデータ以外にも、Genspark が独自に購入しているデータも含まれています(詳細は後述) 同様のマーケティング調査報告書を、自社(自分)で作成するには何時間、何日か掛かりますし、外部調査企業へ依頼すると時間以外にお金も掛かりますが、Genspark の AIスライドであれば数分で完成します。 Gemini(Canvas)で同様のことができますが、PPTにするには、Google Slidesで一旦エクスポートし、再エクスポートする必要があります。 これは、個人的な意見ですが、他の生成AIで作成すると、初回は期待値の40~60%の出来栄えからスタートし、その後プロンプトの調整で70~90%の出来栄えになるイメージですが、Genspark は、最初から70~80%の出来栄えが完成するイメージです。   先ほどのマーケティング戦略のターゲットのひとつに、 SMB(20から500名) 、 クリエーター(代理店・制作会社・動画編集者) とありましたので、ターゲット企業を調査します。 ターゲット企業調査に Genspark の AIスライド を利用します。 AIシートのプロンプトに「 日本国内で従業員数が20名から500名までのクリエーター会社、映像制作会社、コンテンツ制作会社をリストアップしてください 」としました。 数分で、約200社の企業のリスト(スプレッドシート)が制作されました(会社名はマスキングしています) このスプレッドシートも元にしてさらに AIシートで、都道府県分析などを行ったりすることも可能です。 AIシートは、Excelとしてエクスポートをすることも可能ですが、今回の企業調査においては、外部の有償の企業データベース「Crunchbase」を活用しているため、Excelでのエクスポートはできなくなっていました。 このスプレッドシートを元に企業で保有するハウスリストと突き合わせることも可能ですが、今回は、それとは別に、AIシートへ「 日本国内で従業員数が20名から500名までのクリエーター会社、映像制作会社、コンテンツ制作会社の参加者が多い2025年に開催された国内イベントをリストアップしてください 」を指示することにしました。 数分で、クリエーター会社・映像制作会社・コンテンツ制作会社(従業員20〜500名規模)の参加者が多いイベントがスプレッドシートにまとめられました。さらに、特におすすめのイベントとして、① コンテンツ東京2025(7月) 、② Inter BEE 2025(11月) 、③ VIDEOGRAPHERS TOKYO ONLINE 2025(10月) の3つが挙げられていました。 これらイベントのさらなる調査や、実際の広告・イベント企業への情報の裏どりが必須ですが、これらのイベントへ参加しターゲット企業のリードを獲得することも可能です。 今回のAIシートは問題なくExcleでエクスポートが可能でした。   まとめ Genspark(ジェンスパーク)とは?から、対話型AI(AIチャットボット、画像・音楽・動画生成)、エージェント型AIである AIスライド、AIシートの活用例を紹介しました。 AIスライドは、マーケティング調査だけではなく、提案書骨子を作成し、骨子をプロンプトで指示することで、数分で提案書の叩き台を作成することが可能です。大量のPDF資料を元にしてサマリ資料を作成することもできます。 AIシートは、Excelのマクロやピボットテーブル、グラフなど複雑な操作を知らなくても、Excelスプレッドシートを元にして、加工・分析し報告書を作成することが可能です。 対話型AIのAIチャットボットや画像・音楽・動画生成は、利用されていた方が多いと思いますが、エージェント型AIのAIスライドやAIシートは、普段の実業務を効率化するのに有効だと思います。 Genspark では、対話型AIを「 基本エージェント 」、エージェント型AIを「 高度なエージェント 」と定義しており、以下のように業務の効率化に寄与できる便利なエージェントがたくさんあります。 エージェント種別 エージェント名 概要 高度なエージェント Genspark スーパーエージェント 自動調査(旅行計画・予約、記事・動画生成など) AIスライド スライド生成 AIシート スプレッドシート生成 AIドキュメント ドキュメント生成 AIデベロッパー Webサイト・アプリ開発 AIデザイナー デザイン制作 フォトジーニアス 話して写真編集 クリップジーニアス 動画編集 AIポッドキャスト ポッド(音声番組)生成 深層研究 ディープリサーチ ファクトチェック 複数ソースを用いた検証 通話代行 電話アシスタント ダウンロードエージェント AIドライブへダウンロード 基本エージェント AIチャット AIチャットボット AI画像 AI画像生成 AIオーディオ AI音声変換 AI音楽 AI音楽生成 AIビデオ AI動画生成 翻訳 AI翻訳 ミーティングメモ AI会議議事録生成 上記エージェント以外に、 Speakly (音声入力)、 AIドライブ 、 AI Inbox (メール/カレンダー管理)などもありますので、また次の記事で紹介していこうと思います。  
アバター
こんにちは、SCSKの松岡です🪣 データ基盤の構築でIceberg (S3 Tables)を導入した際の試行錯誤を整理しました。 従来のS3によるファイル格納型のデータレイクと比較し、Icebergを採用することで得られたメリットや、それをマネージドで扱えるS3 Tablesの利便性について紹介します。   背景 データ活用基盤におけるデータレイクは、単にデータを蓄積するだけでなく、直接参照して検索・分析に活用したいというニーズが増えています。そのような用途では、複数テーブルのJOINや同時アクセスなどに対する性能も重要な観点となります。 従来は、データレイクのデータをRedshiftやSnowflakeといったDWHに取り込み、集計・加工したうえで活用する構成が一般的でした。 一方で、マルチクラウド環境では、組織や用途に応じて複数のツールから同一データを参照したいケースも増えています。 また、データオーナーの視点では、業務プロセスの変化に伴うデータ構造の変更に対して、データレイク側が柔軟に対応できるかが重要になります。列追加やデータ型変更などを、既存データに影響を与えずに実施できることが望まれます。 さらに、データ基盤管理者にとっては、従来のファイル形式のデータレイクでは、フォルダ構成やファイルサイズ、データ配置などの設計が必要となり、構築・運用のハードルが高いという課題がありました。 加えて、大量の履歴データを長期保管する特性上、データ量の増加に伴うストレージコストの増大や、運用負荷の増加も懸念されました。   構成と選定理由 Why Iceberg? Apache Iceberg – Apache Iceberg™ 「背景」で述べた課題を踏まえ、データレイクにIcebergの仕組みを採用しました。 Icebergはオープンソースのテーブルフォーマットであり、オブジェクトストレージ上のデータをテーブルのように扱える点が大きな特徴です。 メタデータや統計情報を活用することで、必要なデータファイルのみを読み込むことが可能となり、従来の単純なファイル型データレイクと比較して効率的な検索が可能です。 また、Icebergはオープンテーブルフォーマットであるため、特定のDWHサービスに依存せず、AthenaやRedshift、Snowflakeなど複数の分析エンジンから同一データを参照できます。これにより、用途に応じてBI・AI/MLなどのツールを柔軟に選択でき、ベンダーロックインを避けた構成を実現できます。 さらに、用途ごとにテーブルを分離し、加工データを段階的に保持するデータレイク構成にも適しています。 機能面では、「スキーマ進化(Schema Evolution)」により、既存データを書き換えることなくカラム追加などのスキーマ変更が可能であり、データ構造の変化にも柔軟に対応できます。 Why S3 Tables? 表形式データの大規模ストレージ – Amazon S3 Tables – AWS S3 TablesはIcebergをマネージドで利用できるAWSのサービスです。S3 Tablesを採用することで、Icebergのテーブル管理を簡素化することができました。 テーブル単位でデータを管理できるため、フォルダ構成やファイル設計を個別に検討する必要がなく、データ管理をシンプルに保つことが可能です。 また、スナップショットやメタデータ管理がマネージドで提供されるため、履歴管理に伴う運用負荷を軽減できます。 必要に応じてスナップショットの保持期間などを手動で制御することも可能です。 さらに、データファイルの最適化が自動で実施されるため、運用による性能維持の負担を抑えつつ、必要に応じてパーティション設定などのチューニングも行えます。 加えて、S3ベースのストレージを利用するため低コストでのデータ保管が可能であり、コストタグやAWS Cost Explorerを用いた可視化・管理にも対応しています。 構成 今回のケースでは、オンプレミスのサーバから収集したデータの蓄積先としてS3 Tablesを採用しています。 未加工データと一次加工データをテーブル単位で分離し、用途や処理段階に応じたデータレイク構成としています。 S3 Tablesに格納したデータは、AthenaやRedshift、BIツールなどから直接参照しています。   気にしたポイント Amazon S3 Tables とテーブルバケットの使用 – Amazon Simple Storage Service 基本的なテーブルの構築は、S3 Tablesユーザーガイドのチュートリアルに従うことで簡単に実現可能でした。 運用も考慮した場合、追加で以下のような観点を考慮しました。   テーブル設定に関する考慮 基本的にS3 Tables側がマネージドで制御されるので、初期作成時には以下の点だけ気にしました メンテナンスジョブに関する考慮事項と制限事項 – Amazon Simple Storage Service ・テーブルプロパティ(スナップショットを保持する世代数、保持する最長時間) Iceberg テーブルを作成する – Amazon Athena ・パーティション設定   テーブル定義に関する考慮 Iceberg テーブルスキーマを進化させる – Amazon Athena Iceberg 形式では、次のスキーマ進化の変更がサポートされています。 Add  – 新しい列をテーブルまたはネストされた  struct  に追加します。 Drop  – 既存の列をテーブルまたはネストされた  struct  から削除します。 Rename  – 既存の列またはネストされた  struct  のフィールドの名前を変更します。 順序変更  – 列の順序を変更します。 型昇格  – 列、 struct  フィールド、 map  キー、 map  値、または  list  要素の型を広げます。Iceberg テーブルでは、現時点で次のケースがサポートされています。 整数から大きな整数 浮動小数点から倍精度浮動小数点 10 進数型の精度を上げる Icebergでは、既存テーブルに対してカラム追加や型拡張などのスキーマ変更(スキーマ進化)が可能です。 一方で、スキーマ進化には制約があり、数値型から文字列型のような互換性のない型変更はサポートされていません。その場合は、列の追加やテーブル再作成による対応が必要になります。 このため、スキーマ変更に柔軟に対応できる一方で、初期段階でのテーブル定義設計は依然として重要となります。   テーブル定義変更方法の制約 AWS Glue を使用した Amazon S3 テーブルでの ETL ジョブの実行 – Amazon Simple Storage Service S3 Tablesのテーブルの作成はAWS AthenaやAWS CLIから可能です。 一方で、作成済テーブルに対するパーティション変更やソート順の変更など、一部のテーブル設定はAthenaやCLIからは実行できず、GlueのSparkジョブからSQLを実行する必要がありました(2025年時点)。   コストに関する考慮 Amazon S3 Tables で個々のテーブルのストレージコストを可視化できるように – AWS 2025年6月のアップデートにより、S3 TablesはCost Explorer からテーブルレベルのコストデータを参照できるようになりました。 他のAWSサービスと同様に、コストタグの設定も行えるようになりました。 比較的に新しいサービスなので、このような機能アップデートが活発に行われている状況です。   権限に関する考慮 Lake Formation を使用したテーブルまたはデータベースへのアクセスの管理 – Amazon Simple Storage Service S3 Tablesの権限制御はLakeFormationベースで行う必要があります。 従来のS3バケットの権限管理とは異なるため、導入時には注意が必要です。 一般的には、IAMユーザー/ロールごとに、S3 Tablesの名前空間やテーブル単位で権限を付与する形となります。 Simplified permissions for Amazon S3 Tables and Iceberg materialized views – AWS (追記) S3 Tablesに対して、IAM ベースの認証をサポートするというような記事も投稿されていました。   まとめ Icebergを採用することで、データ構造の変化に柔軟に対応しつつ、効率的な検索が可能なデータレイクを構築することができました。 また、S3 Tablesを利用することで、構築・運用における設計負荷を大きく軽減できました。 本構成は柔軟性・性能・運用負荷のバランスに優れたデータ基盤の選択肢であると考えています。 一方で、Icebergにはスキーマ進化の制約があり、またS3 Tablesについても一部のテーブル設定変更に制約があるなど、設計時に考慮すべきポイントが存在していました。 IcebergはIoTデータなどの大規模データの格納基盤としても注目されており、今後はリアルタイムデータやAI/ML活用など、さらなるユースケースへの適用も検討していきたいです。   (宣伝) クラウドデータ活用サービス 今回ご紹介した内容は、SCSKで提供しているクラウドデータ活用サービスの中で扱っているテーマの一部になります。 お客様のデータ活用状況に応じて、基盤構築から可視化、データ連携、データマネジメント、高度データ活用までを段階的にご支援しています! 私自身もこのサービスに関わっており、AWS Summitのブースやミニセッションでもご紹介してきました。 ご関心あれば、以下のサービスページもご参照ください。 AWS データ活用|サービス|企業のDX戦略を加速するハイブリッドクラウドソリューション
アバター
こんにちは、 AWS内製化支援「テクニカルエスコート」 担当の間世田です。 先日、セッションマネージャーでのEC2接続についてLTを行ったところ、VPCエンドポイントの要否についてアドバイスをいただきました。 従来、プライベートサブネット内のEC2へセッションマネージャーで接続するためには、以下の 3つのエンドポイント が必要でした。 ssm ssmmessages ec2messages ところが、 約2年前にアップデートが行われ、SSM Agent バージョン 3.3.40.0 以上では ec2messages の代わりに ssmmessages が使用される ため、以下の 2つのエンドポイントで十分 とのことです。 ssm ssmmessages 本件については、以下のブログでも解説が行われています。 SSM セッションマネージャーに必要なVPCエンドポイントが2つになっていた | DevelopersIO 新しいバージョンのSSM Agentではec2messagesエンドポイントが不要になりました dev.classmethod.jp さて、セッションマネージャー接続では ec2messagesエンドポイント が不要であることが示されましたが、従来 ec2messageエンドポイント が必要とされているサービスは他にもあります。その一つが、 インスタンスのパッチを管理する Patch Manager です。 現時点では、Patch Manager に必要なエンドポイントとして ec2messages も含めて紹介している技術ブログが多く見られます。 そこで今回は、 Patch Manager においても ec2messages が不要となったのかどうかを検証 していきます。 前準備 以下のような環境を構築します。 エンドポイント エンドポイント 種別 com.amazonaws.ap-northeast-1.ssm Interface com.amazonaws.ap-northeast-1.ssmmessages Interface com.amazonaws.ap-northeast-1.s3 Gateway 検証のため、ec2messagesエンドポイントは作成しません。 EC2 OSはRHEL 9.2とし、プライベートサブネットに配置します。 RHELは初期状態では SSM エージェントがインストールされていないため、EC2 起動時のユーザーデータでインストールしました。 #!/bin/bash cd /tmp sudo dnf --disablerepo="*" install -y https://s3.ap-northeast-1.amazonaws.com/amazon-ssm-ap-northeast-1/latest/linux_amd64/amazon-ssm-agent.rpm sudo systemctl enable amazon-ssm-agent sudo systemctl start amazon-ssm-agent 【初心者向け】RHELでSession Managerを使うためにユーザーデータでSSMエージェントをインストールしてみた | DevelopersIO dev.classmethod.jp エージェントのバージョンを確認すると amazon-ssm-agent-3.3.3883.0-1.x86_64 でした。バージョン 3.3.40.0 以上であるため、前述のとおり ec2messages の代わりに ssmmessages が使用されます。 NAT Gateway RHEL では、パッチ適用時にインターネット上の RHUI(Red Hat Update Infrastructure)からパッケージを取得します。 第1章 Red Hat Update Infrastructure の概要 | システム管理者のガイド | Red Hat Update Infrastructure | 3.1 | Red Hat Documentation 第1章 Red Hat Update Infrastructure の概要 | システム管理者のガイド | Red Hat Update Infrastructure | 3.1 | Red Hat Documentation docs.redhat.com EC2 からのアウトバウンド通信が必要となるため、今回は Regional NAT Gateway および Internet Gateway を作成します。 【本題】Patch Managerの実行 今回は検証のため、「概要から開始」からパッチの適用を行います。 スキャンのみ それでは、最初に「スキャン」のみを行います。 結果、ec2messagesエンドポイント作成せずともスキャンが成功しました。 Patch Manager「スキャンのみ」でも、セッションマネージャと同様にec2messagesエンドポイントは不要で、ssmmessagesで代替されるようです。 解説 従来ec2messagesエンドポイントは、Systems Managerサービスへの API オペレーションのために使用されていました。 ec2messages API オペレーション Systems Manager は、Systems Manager Agent (SSM Agent) からクラウド上の Systems Manager サービスへの API オペレーションにこのエンドポイントを使用します。 リファレンス: ec2messages、ssmmessages およびその他の API オペレーション - AWS Systems Manager Systems Manager の内部オペレーションで使用される特殊な API オペレーションについて説明します。 docs.aws.amazon.com SSM Agentの内部には、ec2messagesエンドポイントと通信するMDSInteractorと、ssmmessagesエンドポイントとMGSInteractorという2つのコンポーネントが存在します。 SSM Agetntのソースコードには以下のコメントがあります。 // mdsSwitcher is responsible for turning on and off MDS based on MGS status. amazon-ssm-agent/agent/messageservice/interactor/mdsinteractor/mdsinteractor.go at mainline · aws/amazon-ssm-agent An agent to enable remote management of your EC2 instances, on-premises servers, or virtual machines (VMs). - aws/amazon... github.com つまり、ssmmessages(MGS)への接続が確立されると、ec2messages(MDS)のポーリングは自動的に停止する設計のようです。 では、/var/log/amazon/ssm/amazon-ssm-agent.logを確認してみましょう。 2026-03-23 03:21:25.3209 INFO [ssm-agent-worker] [MessageService] [MDSInteractor] Starting message polling 2026-03-23 03:21:25.3955 INFO [ssm-agent-worker] [MessageService] [MGSInteractor] SSM Connection channel status is set to ssmmessages 2026-03-23 03:22:25.3965 INFO [ssm-agent-worker] [MessageService] [MDSInteractor] Moving to stop poll job after a minute 2026-03-23 03:22:25.3965 INFO [ssm-agent-worker] [MessageService] [MDSInteractor] MDS Polling job stopped. ログでも SSM Connection channel status is set to ssmmessages の直後に MDS Polling job stopped が記録されており、この動作が確認できます。 スキャン&インストール 次に、「スキャンとインストール」を検証していきます。   ec2messagesエンドポイント作らずとも、パッチインストールについても無事成功しました。   結論 SSM Agent バージョン 3.3.40.0 以上では ec2messages の代わりに ssmmessages が使用されるため、Patch Managerのスキャン/インストールともにec2messagesエンドポイントは不要である。
アバター
組織改編(部署統合・分割・名称変更・異動)のたびに、Dropboxの権限周りで「誰が、どのチームフォルダにアクセスできるべきか」を整合させる作業は、Dropbox を運用している管理者にとって“事故が起きやすい”ポイントです。 実際、Dropboxグループが実組織と乖離したり、旧部署メンバーがチームフォルダに残り続けたり、新部署メンバーが必要なチームフォルダにアクセスできない、といった状態が発生し得ます。 組織改編時のDropbox運用では、アクセス権の棚卸し・更新を確実に行い、「適切な人が確実にフォルダにアクセスできること」「異動後も不要なフォルダにアクセスできてしまう状態を防ぐこと」を満たす必要があります。 本記事では、上記の課題に対し、弊社SCSKのDropbox環境で「どのように組織改編対応を現場負荷を抑えつつ権限整合を担保しているか」を、Dropbox管理者(情シス・部門IT担当者)向けにご紹介します。 1. 基本方針 まず、弊社SCSKのDropbox環境についてご紹介します。 Enterprise プラン Entra ID を介して、Dropboxユーザーの自動プロビジョニングを行っています。 人事情報をSmartdbxに取り込み、SmartdbxにてDropboxグループの作成、削除、およびグループメンバーの管理を行っています。 各チームフォルダの利用部署にてSmartdbxの共有フォルダ管理機能を使ってチームフォルダの作成、およびアクセス権限付与を実施しています。 SCSK Dropboxユーザーとグループの連携 SCSKテナントは、Smartdbxでチームフォルダの払い出し、およびアクセス権限付与を 行っており、チームフォルダ作成や共有メンバーの管理は、フォルダ利用部署の担当者の申請により実施されます。 チームフォルダの共有メンバーの管理は、利用者に委ねられています。 さて、Smartdbxとは何でしょうか?「Smartdbx」は、社内外の多くのDropboxプロジェクトで培ったSCSKの知見を活用して開発したサービスです。 Dropboxをより簡単に、便利に、安心して活用する Dropbox 統合管理ツールの「Smartdbx」は以前の投稿で紹介しています。こちらをご参照ください。 Smartdbxとは?~Dropbox統合管理ツール~ – TechHarmony 1.1 Dropbox チームフォルダはグループ単位に共有する まず押さえる前提として、共有フォルダは「チームフォルダ」を使っており、チームフォルダの共有先は個人のメンバー単位ではなく、Dropboxグループを指定しています。 そして、Dropboxグループには「課」単位の「組織グループ」があり、日次で人事情報をSmartdbxに取り込み、Dropboxグループが更新されています。 チームフォルダには主に「組織単位」と「業務単位」の2パターンがありますが、そのうち組織単位のチームフォルダの共有先は、主に「組織グループ」を指定しています。 1.2 組織改編時の作業フロー 組織改編時は、 「グループの更新」と「チームフォルダ共有先の付け替え」を混同せず、 ステップに分けて順番に作業を進めていきます。( 「グループの更新」と「チームフォルダ共有先の付け替え」の作業は、以降で説明します。) SCSK環境では、人事情報をSmartdbxに取り込み、組織グループが日次で更新される一方で、チームフォルダのアクセス権は、組織改編後も自動では新組織グループへ付与されません。そのため、改編前に利用部署側で影響範囲(チームフォルダの分類、共有先グループ、改編後も継続利用するか)を棚卸しし、改編後に「継続利用」対象のチームフォルダについて、旧組織グループから新組織グループへ共有先を付け替える作業を実施します。 2. 組織改編時の作業 2.1 組織グループの更新 Smartdbxにて自動で新組織に対応した Dropbox グループが作成・更新されます。そのため、「グループを新組織に合わせて作り直し、メンバーを移し替える」といった作業の実施は必要ありません。ただし、 組織改編の部署の変更パターン別に 、Dropboxの組織グループの挙動が変わるため、このルールを予め理解しておく必要があります。 【組織改編の部署の変更パターン】 部署は存続・部署のメンバーが変更される – 更新部署 新しい部署ができる – 新設部署 部署が廃止される – 廃止部署 部署は存続・部署のメンバーが変更される 更新部署 → 異動者をグループメンバーに追加・削除します Dropboxの組織グループは人事データと連携して、自動的に更新されます。組織グループは残り、旧年度メンバーから新年度メンバーへ 組織グループのメンバーの入れ替えが発生します。異動があった人は、異動元の組織グループからは除外され、異動先の組織グループに追加されます。 また、組織改編により「部署コード」に変更がなく部署名が変わった場合も、SmartdbxによってDropboxグループ名が新しい部署名に自動更新されます。 新しい部署ができる 新設部署 → 新組織グループ作成・所属する人をグループメンバーに追加します 新しい部署が新設された場合、その部署に所属する人をグループメンバーとした新しいDropbox組織グループが作成されます。 部署が廃止される 廃止部署 → 組織グループの名前を変更します 廃止された部署のグループ名は、Smartdbxによって自動で先頭に”OLD_”が付与されたDropboxグループ名に変更されます。廃止された部署の組織グループのメンバーは更新されず、そのまま残ります。 2.2 現行チームフォルダの棚卸し 前に記載したとおり、SCSKテナントはSmartdbxでチームフォルダの払い出しおよびアクセス権限付与を行っているため、基本的にはチームフォルダ作成や共有メンバーの管理は、フォルダ利用部署の担当者が実施します。チームフォルダの共有メンバーの管理は利用者に委ねられており、Dropbox管理者が棚卸しなどを実施することはありません。 組織改編前に、フォルダ利用部署の担当者は、管理するチームフォルダを一覧化し、組織改編による影響範囲の棚卸を行います。 チームフォルダの分類(組織単位 / 業務単位) チームフォルダの共有先グループ(どの組織グループに共有されているか) 組織改編後のチームフォルダ継続利用の要否(組織改編後も新しい組織グループで利用するか) 組織改編後も新組織グループにアクセス権を付与して継続利用するのか、もしくは、組織改編後は利用しない(現行の共有メンバーにアクセス権を付与した状態のまま残しておく)のかを確認します。 この棚卸しにて、「組織グループに共有」されている「継続利用」と判断したチームフォルダに対しては、フォルダ利用部署の担当者が組織改編後にアクセス権の変更を実施します。 2.3 チームフォルダのアクセス権を変更 組織改編後、チームフォルダのアクセス権は、”OLD_”が付与された、廃止部署のDropbox組織グループに付与されたままです。そして、チームフォルダのアクセス権は、自動で新しい組織グループに付与されません。 「組織グループに共有」されている「継続利用」と判断したチームフォルダに対して、チームフォルダの共有先を 旧組織グループから新組織グループへ付け替えます。 Smartdbx 共有フォルダ申請アクセス権を付与 対象のチームフォルダについて、新組織グループにアクセス権を付与し、必要に応じて旧組織グループのアクセス権を削除するとともに、編集/閲覧の権限の見直しを実施します。具体的には、各利用部署の担当者がSmartdbxから、新組織グループへのアクセス権付与や、必要に応じた旧組織グループのアクセス権削除など、共有メンバーの変更申請を行います。 申請されると、申請者の 上長(課長・部長)に承認フローが回り、上長が確認・承認した上で チームフォルダのアクセス権が 自動変更 されます。 以上により、弊社SCSKのDropbox環境は、組織改編時に起きやすい Dropbox 運用上の課題を解消しつつ、組織改編後も適切なアクセス権を付与したチームフォルダを利用しています。 まとめ 「Smartdbx」には管理者業務を自動化することで、管理者業務自体を極力なくし、管理者の運用負荷を軽減させる機能が用意されております。 チームフォルダを利用していても、管理者側で実施する作業を極力少なくしつつ、組織改編時に起きやすい権限課題を抑える ―― この考え方は、管理者が組織改編を年次イベントとして確実に対応するうえで有効です。 本記事の内容が、組織改編時の権限事故を避けるための運用設計・周知に役立てば幸いです。お問い合わせは本文記載の窓口をご活用ください。 本投稿に関するお問合せ先     :  Dropbox-sales@scsk.jp SCSKのDropbox取り組み紹介:    https://www.scsk.jp/product/common/dropbox/index.html   参照情報 Dropbox でグループを使用する方法 – Dropbox ヘルプ https://help.dropbox.com/ja-jp/account-access/groups チーム フォルダ マネージャー – Dropbox ヘルプ   https://help.dropbox.com/ja-jp/organize/team-folder-manager チーム フォルダの管理方法 – Dropbox ヘルプ   https://help.dropbox.com/organize/shared-folder-differences Dropbox 管理者になるためのガイド   https://learn.dropbox.com/ja/self-guided-learning/business-admin-course/dropbox-admin-guide Dropbox 運用ガイドライン(Dropbox NAVI) https://navi.dropbox.jp/dropbox-folder-operation-guideline
アバター
こんにちは。SCSKの福田です。 昨今、Microsoft 365 Copilot をはじめとする AI 技術や AI エージェントの進化が目覚ましく進んでいます。 2025 年 11 月に開催された Microsoft Ignite 2025 では「Agent 365」や「Entra Agent ID」など、エージェントを人の ID と同様にセキュアに管理できる新しい仕組みが発表され、ID 管理の重要性はこれまで以上に高まっています。 こうした背景の中で ID 管理の基盤となるのが Entra ID の条件付きアクセスポリシーです。 条件付きアクセスポリシーは、ユーザーやネットワーク、リスク状態などに応じて認証を柔軟に制御できる Entra ID の機能です。 本記事では、条件付きアクセスポリシーについて、基本的な考え方から実際のユースケースに基づく設定手順までを分かりやすく解説します。 今後も Entra ID に関するトピックを紹介していく予定ですので、ぜひ参考にしていただければ幸いです。   条件付きアクセスポリシーとは? 条件付きアクセスポリシーとは、指定した条件に合致したアクセスに対してアクセス制御(ブロック、MFA を要求、など)を実施する機能です。 ポイントは以下の通りです。 条件で指定できる項目 条件には接続ユーザー、接続元デバイス( OS )、接続元ネットワーク、接続先リソース、リスク状態などを指定できます。 また、条件で指定できる項目は一部を除き対象と対象外を設定可能です。 例えば、接続元ネットワークの対象に「すべてのネットワーク」、対象外に「社内のグローバル IP アドレス」を指定したブロックポリシーを作成することで、社外からのアクセスはブロックされます。 アクセス制御 「ブロック」または「アクセス権の付与」を指定できます。 「アクセス権の付与」は「MFAを要求する」など、アクセス許可するための制御方法を選択できます。 ブロックの扱い 複数のポリシーが同時に適用された場合は、すべてのポリシーが評価されます。 その結果、いずれかのポリシーで「ブロック」と判定された場合はアクセスがブロックされます。 ポリシーが適用されていないユーザーは制御がかからない ポリシーが適用されていないユーザーはアクセス制御がかからず、原則 ID・パスワードのみでアクセスできてしまいます。 ユーザー種別が多い環境だと、ポリシーの適用漏れがないように注意する必要があります。   必要なライセンス 条件付きアクセスポリシーを利用するには Microsoft Entra ID P1 ライセンス または Microsoft Entra ID P2 ライセンスが必要です。 尚、サインインリスクやユーザーリスクに基づいてアクセスを制御する「リスクベースの条件付きアクセスポリシー」を利用する場合は、Microsoft Entra ID P2 ライセンスが必要となります。 また、デバイスの準拠状態を条件としてアクセス制御を行うポリシーを構成する場合は、Microsoft Intune のライセンスも別途必要となります。 Microsoft Entra ID P1 と P2 の違いについては、以下のサイトをご参照ください。 Microsoft Entra のプランと価格 | Microsoft Security   今回の構成(ユースケース) 今回は架空の企業のセキュリティ要件を想定し、以下の構成でポリシー設定を行います。 グループ構成 一般ユーザー用セキュリティグループ:GeneralUsersGroup 管理者ユーザー用セキュリティグループ:AdminGroup ポリシー要件 一般ユーザー Windows または iOS の会社貸与デバイス( Intune 準拠)からのアクセスを許可し、それ以外のアクセスはブロックする。 管理者ユーザー 指定のネットワーク(運用保守用ネットワーク)からの Windows デバイスによるアクセスに対しては MFA を要求することでアクセスを許可し、それ以外のアクセスはブロックする。 その他のユーザー 上記2つのグループに属さないユーザーについてはすべてのアクセスをブロックする。 「何をもって準拠とするか」の定義は Microsoft Intune 側で設定となります。( Microsoft Intune ライセンスが必要) 本記事では割愛します。   設定手順:事前準備(場所の定義) まず、管理者ユーザーの制御で使用する「運用保守用ネットワーク」を定義するために、ネームドロケーションを登録します。 Microsoft Entra 管理センターにアクセスします。 左メニューより [条件付きアクセス] > [ネームド ロケーション] を開きます。 [+IP 範囲の場所] をクリックし、以下の通り登録します。 名前:運用保守用ネットワーク 信頼できる場所としてマークする:チェックを外す [+] をクリックし、グローバル IP アドレスを登録します。 [作成] をクリックします。 これでネームドロケーションの登録は完了です。   設定手順:各ポリシーの作成 ここからは実際のポリシー作成です。要件に合わせて順番に作成していきます。 一般ユーザー用ポリシー 一般ユーザー用のポリシーを作成します。 Microsoft Entra 管理センターにアクセスします。 左メニューより [条件付きアクセス] > [ポリシー] を開きます。 まずは、Windows、iOS からのアクセス時にデバイスの Intune 準拠を要求するポリシーを作成します。 [+新しいポリシー] をクリックします。 下表の通り設定します。 設定箇所 設定内容 名前 CA-General-DeviceCompliance ユーザーまたはエージェント (プレビュー) 対象 [ユーザーとグループの選択] > [ユーザーとグループ] を選択し、セキュリティグループ「GeneralUsersGroup」を指定する 対象外 チェックなし(デフォルト) 条件 > デバイスプラットフォーム ※ [構成] を [はい] に設定する 対象 [デバイス プラットフォームの選択] で Windows と iOS を選択する 対象外 チェックなし(デフォルト) アクセス制御 許可 [アクセス権の付与] を選択し、[デバイスは準拠しているとしてマーク済みである必要があります] にチェックを入れる [ポリシーの有効化] を [オン] に設定し、[作成] をクリックします。   同様の手順で、Windows、iOS 以外からのアクセスをブロックするポリシーを作成します。 設定箇所 設定内容 名前 CA-General-DeviceBlock ユーザーまたはエージェント (プレビュー) 対象 [ユーザーとグループの選択] > [ユーザーとグループ] を選択し、セキュリティグループ「GeneralUsersGroup」を指定する 対象外 チェックなし(デフォルト) 条件 > デバイスプラットフォーム ※ [構成] を [はい] に設定する 対象 [任意のデバイス] を選択する 対象外 Windows と iOS を選択する アクセス制御 許可 [アクセスのブロック] を選択する これにより、一般ユーザーが Windows、iOS からのアクセス時はデバイスの Intune 準拠を要求し、その他デバイスからのアクセスはブロックされます。   管理者ユーザー用ポリシー 管理者ユーザー用のポリシーを作成します。 まずは、Windows からのアクセス、かつ、運用保守用ネットワークからのアクセス時に MFA を要求するポリシーを作成します。 設定箇所 設定内容 名前 CA-Admin-MFA ユーザーまたはエージェント (プレビュー) 対象 [ユーザーとグループの選択] > [ユーザーとグループ] を選択し、セキュリティグループ「AdminGroup」を指定する 対象外 チェックなし(デフォルト) ネットワーク ※[構成] を [はい] に設定する 対象 [選択したネットワークと場所] で 「運用保守用ネットワーク」を指定する 対象外 選択なし(デフォルト) 条件 > デバイスプラットフォーム ※ [構成] を [はい] に設定する 対象 [デバイス プラットフォームの選択] で Windows を選択する 対象外 チェックなし(デフォルト) アクセス制御 許可 [アクセス権の付与] を選択し、[多要素認証を要求する] にチェックを入れる   次に、Windows 以外からのアクセスをブロックするポリシーを作成します。 設定箇所 設定内容 名前 CA-Admin-DeviceBlock ユーザーまたはエージェント (プレビュー) 対象 [ユーザーとグループの選択] > [ユーザーとグループ] を選択し、セキュリティグループ「AdminGroup」を指定する 対象外 チェックなし(デフォルト) 条件 > デバイスプラットフォーム ※ [構成] を [はい] に設定する 対象 [任意のデバイス] を選択する 対象外 Windows を選択する アクセス制御 許可 [アクセスのブロック] を選択する   次に、運用保守用ネットワーク以外からのアクセスブロックするポリシーを作成します。 設定箇所 設定内容 名前 CA-Admin-NetworkBlock ユーザーまたはエージェント (プレビュー) 対象 [ユーザーとグループの選択] > [ユーザーとグループ]を選択し、セキュリティグループ「AdminGroup」を指定する 対象外 チェックなし(デフォルト) ネットワーク ※[構成] を [はい] に設定する 対象 [任意のネットワークまたは場所] を選択する 対象外 [選択したネットワークと場所] で「運用保守用ネットワーク」を指定する アクセス制御 許可 [アクセスのブロック] を選択する これにより、管理者ユーザーは Windows からのアクセス、かつ、運用保守用ネットワークからのアクセス時に MFA を要求され、その他デバイス・ネットワークからのアクセスはブロックされます。   その他ユーザー向けポリシー 最後に、どのグループにも属していないユーザーや想定外のユーザーがアクセスできないよう、包括的なブロックポリシーを作成します。 設定箇所 設定内容 名前 CA-Unknown-Block ユーザーまたはエージェント (プレビュー) 対象 [すべてのユーザー] を選択する 対象外 [ユーザーとグループ] を選択し、セキュリティグループ「GeneralUsersGroup」「AdminGroup」を指定する アクセス制御 許可 [アクセスのブロック] を選択する 以上で条件付きアクセスポリシーの作成は完了です。   さいごに 今回は Entra ID の条件付きアクセスポリシーについて、基本的な考え方からユースケースに基づく設定例まで解説しました。 「適切なデバイスから」「適切な場所から」「多要素認証を経て」アクセスさせることは、ゼロトラストセキュリティの第一歩です。 この土台を整えることで、より安全で運用しやすい環境になります。 今後も Entra ID に関する役立つ情報を発信していきますので、ぜひチェックしてください!
アバター
こんにちは、SCSK林です! 様々なデータ活用が推進される中、データの蓄積場所(データレイク)と分析基盤を異なるクラウドで運用するようなケースもあると思います。一度AWS S3に溜まった膨大なデータを動かすことは容易ではありません。一方で、分析層ではGoogle CloudのBigQueryが持つクエリ性能や、マネージドなETLサービスを活用したいというニーズがあります。 私が担当した某プロジェクトでは、この「AWSにデータ、Google Cloudで分析」というハイブリッド構成を、完全閉域網で実現することが求められました。本記事では、100以上のインターフェースを抱える大規模なデータ連携基盤において、AWSのネットワーク機能をいかに駆使して「セキュア・低遅延・低運用コスト」を実現したか、その設計思想を解説します。 アーキテクチャ概要:Amazon S3 × Google Cloud Data Fusion 今回のアーキテクチャの主役は、Amazon S3とGoogle Cloud Data Fusionです。 システム構成の概要 Storage (AWS) : S3。数百万レコードにおよぶ日次の業務データが蓄積されるデータレイク。 ETL (Google Cloud) : Cloud Data Fusion。GUIベースでパイプラインを構築・管理。 Network : AWS Direct Connect ⇔ Partner Interconnect による専用線接続。 Security : インターネットを一切経由しない閉域網構成。 解決すべき技術的課題 接続性の確保 : 専用線経由でGCPからS3へどうやって「プライベートIP」でアクセスするか。 名前解決(DNS) : 異なるクラウド間で、複雑なインフラを立てずにどうやってS3のFQDNを解決するか。 スケーラビリティ : 100を超えるインターフェースのトラフィックをどう効率的に捌くか。 【技術的ポイント①】Gateway型を棄却し、PrivateLinkを選定 AWSでS3へのプライベートアクセスを考える際、まず頭に浮かぶのは「Gateway VPC Endpoint」でしょう。しかし、本プロジェクトでは「Interface VPC Endpoint」を使用しました。 Gateway Endpointの仕様的限界 S3 Gateway Endpointは、VPCのルートテーブルを書き換えることで機能します。しかし、この仕組みは「VPCの外部(Direct ConnectやVPNの向こう側)」からは利用できないという制約があります。Google CLoud側から専用線経由でアクセスしようとしても、Gateway Endpointへルーティングを飛ばすことはできません。 この制約を回避するためには、VPC内にForward Proxy(Squid等を搭載したEC2)を立てる必要がありますが、これは「サーバーレス・マネージド」というプロジェクトの方針に反し、運用コストと単一障害点(SPOF)のリスクを増大させます。 Interface VPC Endpoint (PrivateLink) の採用 今回使用したのが、Interface VPC Endpoint (AWS PrivateLink) です。 PrivateLinkは、VPC内のサブネットにENI(Elastic Network Interface)を払い出し、S3への通信をそのIPアドレス経由で行います。 メリット : 専用線(Direct Connect)越しに、Google CloudからS3のプライベートIPへ直接ルーティングが可能。 運用の排除 : EC2のようなOS管理が不要。AWSが提供するフルマネージドな高可用性をそのまま享受できる。 100以上のIFが集中する基盤において、インフラの保守をAWSにオフロードできるメリットは、処理量に応じた課金を十分に正当化できるものでした。 【技術的ポイント②】シンプルなマルチクラウドDNS設計 PrivateLinkを採用した際、次に問題となるのが「DNSの名前解決」です。Google Cloud上のData Fusionから、AWS S3のエンドポイント名をどう解決するか。 通常、ここでも「Route 53 Resolver Endpointを立てて、GCP Cloud DNSと条件付き転送(Forwarding)を設定する」という構成が検討されます。しかし、今回はシンプルで保守性の高い方式を採用しました。 PrivateLinkのDNS特性の活用 AWS PrivateLinkでS3エンドポイントを作成すると、 vpce-xxxx.s3.region.vpce.amazonaws.com のような専用のFQDNが払い出されます。このFQDNは、パブリックなDNSサーバから名前解決しても、VPC内のプライベートIPアドレスを返却するという特性を持っています。 Google Clooud の Cloud DNSでのCNAME変換 この特性を活かし、Google Cloud側の設定のみで名前解決を完結させました。 具体的には、Cloud DNSにおいて、Data Fusionが参照するS3の接続先ドメイン名を、AWSから払い出されたPrivateLink用のFQDNへCNAMEレコードとして登録したのです。 構成フロー: Data Fusionが s3.ap-northeast-1.amazonaws.com へアクセス。 Cloud DNSがそれを PrivateLinkのFQDN( vpce-xxxx... )にCNAME解決。 そのFQDNをパブリックDNS経由で解決すると、AWS VPC内のプライベートIPが返る。 専用線(Direct Connect)経由で、そのプライベートIPへ直接通信。 この設計により、AWS側にResolver Endpointという追加の有償リソースを立てることなく、また複雑なクロスクラウドのDNS転送設定も不要にしました。 まとめ 本プロジェクトを通じて、AWSとGoogle Cloudのいいとこ取りをしたハイブリッドデータ連携基盤が完成しました。 安定性 : 100以上のインターフェース、日次数百万レコードの転送において、専用線とPrivateLinkの組み合わせにより極めて低いエラー率と安定したスループットを維持。 コスト最適化 : 冗長化されたプロキシサーバやDNSフォワーダーの構築・運用工数を削減し、純粋なデータ処理に集中。 拡張性 : 今後インターフェースが増加しても、ネットワーク経路やDNS設定を変更することなく、Data Fusion上のパイプライン追加だけで対応可能な拡張性を確保。 AWSの各サービスは単体でも強力ですが、その特性を深く理解することで他クラウドとの連携において価値を発揮すると感じました。 この記事がどなたかのお役に立つと幸いです。
アバター
こんにちは、SCSK林です! モダンなシステムアーキテクチャにおいて、システム間を「疎結合」に保つことはもはや定番です。AWSにおいてその中心を担うのは、Amazon SQSやAmazon Managed Streaming for Apache Kafka (MSK)といったメッセージングサービス、あるいはAmazon S3を用いたバッファ層などかと思います。 ただ、実際のエンタープライズ領域におけるデータ連携案件、特にマルチクラウド構成やオンプレミスとの閉域網接続が絡むプロジェクトでは、単に「サービスを間に挟む」だけでは解決できない課題が多いと感じています。 「どのタイミングでデータの到達を保証すべきか」 「コストとスループットの妥協点をどこに置くか」 「リトライによって発生するデータの重複をどう制御するか」 本記事では、私が携わった、毛色の異なる2つのデータ連携プロジェクトを例に、アーキテクトが直面する「キューイング・バッファリング設計」のポイントについて解説していきたいと思います。 プロジェクト事例①:異種クラウド間連携における「Pull型」MSK設計 プロジェクトの背景と課題 最初にご紹介するのは、AWS上の基幹システムで発生する変更データを、Google Cloud上のDWH基盤(BigQuery)へリアルタイムに同期する案件です。 AWS側ではデータのハブとしてAmazon MSKを採用していました。当初の検討では、MSK Connectを利用してGoogle Cloud側のエンドポイントへデータをPush送信する構成が有力でした。しかし、精査を進めると以下の3つの大きな課題が浮上しました。 ネットワークの不確実性: AWSからGoogle Cloudへのクロスクラウド通信、かつ専用線経由という環境下で、ネットワーク瞬断時のエラーハンドリングをどこまでインフラ層に任せられるか。 コスト効率の悪化: 同期対象となるインターフェース(Topic)は20個以上存在しました。MSK Connectはコネクタ単位でのMCU(MSK Connect Unit)課金が発生するため、1日の流量が数千件程度の比較的小規模なインターフェース群に対して個別にコネクタを立てると、データ量に対して極めて割高な固定費が発生します。 責任分界点の曖昧さ: 送信側(AWS)が無理に押し込む「Push型」では、受信側(Google Cloud)の負荷状況に合わせた流量制御(バックプレッシャー)が難しく、受信失敗時の再送管理が複雑化します。 独自コンシューマーによる「Pull型」への転換 結果的には、マネージドサービスであるMSK Connectの採用を見送り、Google Cloud側のCloud RunからMSKへ「Pull型」でデータを取得しに行くカスタムコンシューマー構成を提案しました。 この設計は、「責任完了のポイント」をコンシューマー側に移譲したことにあります。 同期的なオフセット管理 : コンシューマーはMSKからメッセージを取得し、Google Cloud側のストレージ(Pub/Sub)への書き込みが完全に完了したことを確認してから、MSKに対して「Offset」をコミットします。これにより、処理の途中でコンシューマーがダウンしても、次回の起動時に未処理のデータから確実に再開できる「At-least-once(少なくとも一回)」を担保しました。 コストの最適化 : 20個以上の多くのインターフェースを単一または少数のCloud Runサービスに集約して処理することで、MSK Connectを利用した場合と比較してインフラコストを大幅に抑制しました。 重複排除の戦略的妥協 : At-least-once構成では、再送時にデータの重複が発生する可能性があります。これをインフラ層の複雑なロジックで排除しようとせず、最終的な格納先であるGoogle Cloud側(BigQuery)で、ユニークキーに基づいた「重複排除処理」を行う方針を策定しました。 技術的な「完璧さ」をインフラだけで追求するのではなく、エンドツーエンドでの整合性とコストのバランスを考慮した構成になっているかなと感じています。 ※詳細はこちらのブログも参照ください。 【AWS - Google Cloud】マルチクラウドでキューイングデータ連携 AWS MSKからGCPへのデータ連携において、MSK Connectの仕様制約に伴うコスト肥大化を回避するため、Cloud RunによるPull型アーキテクチャへと転換した事例を紹介します。コスト最適化と疎結合な設計により、大規模なマルチクラウド環境下で高効率かつ堅牢なデータパイプラインを実現した経緯を詳説します。 blog.usize-tech.com 2026.03.23   プロジェクト事例②:S3を「バッファ」と見立てた高耐久非同期アーキテクチャ プロジェクトの背景と課題 次にご紹介するのは、オンプレミス環境からAWSを経由し、データウェアハウスであるSnowflakeへデータをロードする基盤構築案件です。 この案件では、Direct Connect経由で送られてくるデータをAPI Gateway + Lambdaで受け取る構成をとりましたが、以下の制約が障壁となりました。 Lambdaのペイロード制限 : API GatewayおよびLambdaには数MBから数十MBのペイロード制限があり、将来的なデータ肥大化に対応できない懸念がありました。 Snowflakeへのロード遅延 : Snowflakeへの書き込み処理には、オーバーヘッドを含めて一定の時間が必要です。同期的な処理では、APIのタイムアウトや、オンプレミス側のクライアントを長時間待機させるリスクがありました。 性能要件の遵守 : 「データ発生から3分以内に分析可能にすること」という性能要件に対し、単一のプロセスで全てを完結させるのは可用性の観点から危険だと判断しました。 S3を「高耐久なキュー」として定義 私は、Amazon S3を単なるストレージではなく、「書き込みが極めて高速で、無限のキャパシティを持つキュー(バッファ)」として位置づける非同期アーキテクチャを採用しました。 取り込み層(受領)の軽量化: API Gatewayから起動されるLambdaの役割を「S3へのファイル保存」のみに限定しました。これにより、オンプレミス側に対しては数ミリ秒から数百ミリ秒という極めて短いレスポンスタイムで「受領完了」を返せます。 ロード層(処理)のデカップリング: S3へのファイル作成をトリガー(S3 Event Notifications)として、後続のLambdaがSnowflakeへのロードを実行します。この構成により、Snowflake側で一時的なメンテナンスや障害が発生しても、データはS3に「滞留(キューイング)」されるだけであり、取り込み層を止める必要がなくなります。 枯れた技術による信頼性: Snowflakeへのロードには、あえて最新のSnowpipeではなく「LambdaによるCOPYコマンド実行」を選択しました。これは既存の資産であるシェルスクリプトのロジックを流用しやすくするためであり、またエラー時の再実行制御をより細かくハンドリングできるようにするためでした。 結果としてのパフォーマンス この「S3バッファ」を介した非同期構成により、結果としてデータ発生からSnowflakeへの到達まで、平均して十数秒というパフォーマンスを実現しました。目標としていた「3分以内」という性能要件を大幅に上回る余裕を持った設計となりました。 ※詳細はこちらのブログも参照ください。 Amazon API Gateway + AWS Lambda + Snowflake によるニアリアルタイムデータ連携 オンプレミスからSnowflakeへのデータ連携において、API GatewayとLambdaを用いた非同期処理による、データ基盤構築の事例を解説します。S3を境界に「取り込み」と「ロード処理」を分離することで、閉域網での高いセキュリティと耐障害性を両立させた設計をご紹介します。 blog.usize-tech.com 2026.03.23   まとめ:キューイング設計における3つのポイント これら2つの案件を通じて、痛感した「キューイング設計のポイント」は以下の3点に集約されると感じました。 ① 責任分界点(Commit Point)をどこに置くか 「データを受け取った」とみなすタイミングをどこにするかは、システムの信頼性を左右する最も重要な決断です。事例①では、宛先システムが処理を終えたタイミング。事例②では、AWS側の高耐久ストレージ(S3)に書き込んだタイミング。 これを明確に定義することで、障害発生時に「どこからリトライすべきか」が自ずと決まります。 ② マネージドサービスとカスタム実装の天秤 マネージドサービスの利点は十分に理解していますが、事例①のように「インターフェース数が多いが、個々の流量が少ない」といった特殊な条件下では、マネージドサービスのコスト構造がボトルネックになることがあります。 「何でもマネージド」ではなく、ランニングコストと運用負荷(メンテナンス性)を天秤にかけ、時にはカスタムコンシューマー(手組のプログラム)を選択する勇気も必要です。 ③ 冪等性の確保 キューイングを導入する以上、リトライによる「重複」は避けられません。インフラ側で「Exact-once(正確に一回)」を実現しようとすると、アーキテクチャは極めて複雑になり、パフォーマンスも低下します。 「重複は発生するもの」と割り切り、アプリケーション層やデータベースのレイヤーで重複排除を行う設計にすることで、システム全体の堅牢性とシンプルさを両立させることができます。 おわりに AWSはじめ各クラウドサービスには、データ連携を支える強力なサービス群が揃っています。しかし、それらを組み合わせるだけで優れたシステムが出来上がるわけではないと改めて感じました。 今回の事例では、「あえてマネージドサービスを使わない」「あえて非同期にする」といった、ある種のデザインチョイス(選択と集中)です。ビジネス要件、コスト制約、そしてネットワークの物理的な限界を直視し、どこで妥協し、どこで妥協するか。その判断こそが難しいポイントだなと思いました。 今回の構成、事例がどなたかのお役に立つと幸いです。
アバター
こんにちは、SCSK林です! 私が担当した某プロジェクトで、社内向けWebアプリケーションとしてS3静的ホスティングによるSPA(Single Page Application)、およびALBとLambdaによるサーバーレスバックエンドを採用しました。技術的にはモダンで、運用コストを極小化できる構成です。 しかし、ここで大きな技術的課題としてあったのが「認証機能(Authentication)」です。 そのプロジェクトでは全社的なID管理基盤(IdP)として、Microsoft Entra ID(旧Azure AD)が導入されていました。セキュリティガバナンスの観点から、AWS上に独自のIDストア(データベース)を構築することはよろしくなく、Entra IDのアカウントでログインすることが要件としてありました。 本記事では、この要件に対し Entra ID と Cognito をどう連携させセキュアなWebアプリケーションを構築したかをご紹介します。 アーキテクチャの全体像と設計思想 まず、今回構築した認証・認可フローの全体像を示します。 フロントエンド (SPA) : Amazon S3 (静的ホスティング) Auth Broker (認証ブローカー) : Amazon Cognito User Pool Identity Provider (IdP) : Microsoft Entra ID (SAML 2.0連携) バックエンド : Elastic Load Balancing + AWS Lambda   この構成における最大のポイントは、「認証の責務をCognitoに集約し、アプリケーション(フロント・バックエンド)からEntra ID固有の実装を排除したこと」です。 また、通常、SPAからバックエンドAPIを呼び出す際、トークンの有効性検証(JWTの署名チェックや期限確認)をLambda内のコードで行う必要があります。しかし、ALBの「認証(Authenticate)」アクションを利用すれば、ALB自体がCognitoと直接対話し、検証済みのユーザー情報のみをLambdaにフォワードしてくれます。 これにより、Lambdaから認証の複雑性を完全に排除し、「インフラレイヤーでセキュリティを担保する」という、より堅牢な設計を実現しました。 技術的ポイント①:SAML 2.0 と OIDC このアーキテクチャのポイントとなるのが、Cognitoによるプロトコル変換と、ALBによる認証プロセスの自動化です。 プロトコルの抽象化(SAML to OIDC) Entra ID側では、Cognito を SAML 2.0のサービスプロバイダー(SP)として登録します。ユーザーがログインを試みると、Entra IDのサインイン画面へリダイレクトされ、認証成功後にSAMLアサーション(XML)を持ってCognitoに戻ります。 CognitoはこのSAMLアサーションを解析し、AWS内で扱いやすいJWT(ID Token / Access Token)を発行します。これにより、フロントエンドエンジニアはEntra ID固有の複雑なSAML仕様を意識せず、モダンなOIDCプロトコルに基づいて開発を進めることができます。これはフロントエンド側の開発としては認証機能を抽象化し実装を容易としました。 ALBリスナールールによるゼロトラストな実装 ALBの設定では、特定のパス( /api/* など)へのリクエストに対し、Cognito User Poolによる認証を強制するルールを定義しました。 ユーザーが未認証でAPIにアクセスしようとすると、ALBが自動的にCognitoのログインエンドポイントへリダイレクトさせます。認証が完了すると、ALBはCognitoから取得したトークンを検証し、署名付きヘッダーにユーザー情報を格納してLambdaへ渡します。 この仕組みのよい点は、Lambda側でトークンの検証ロジックを1行も書かなくて済む点です。Lambdaは、このヘッダーが存在すること自体が「認証済み」の証拠として扱えるため、コードの簡素化と脆弱性排除を同時に達成できました。 技術的ポイント②:AWS Amplifyによるフロントエンド統合 フロントエンド(SPA)には AWS Amplify (JavaScript Library) を採用しました。 今回の構成では、認証の主体がCognito(かつその背後にEntra ID)であるため、AmplifyのAuthライブラリを利用することで、複雑なリダイレクト処理やセッション管理を極めて簡潔に記述できました。 技術的ポイント③:セキュリティと認可 – Entra IDグループとの連動 本システムは社内ツールであるため、利用可能なユーザーを特定のメンバーに限定する必要がありました。 グループベースの認可制御 全社員が持つEntra IDのアカウントを使いつつ、アクセス制限を行うために、Entra ID側の「セキュリティグループ」を活用しました。 Entra ID側 : 特定のグループに属するユーザーのみに、本アプリケーションへのSAMLアサーションを発行するよう設定。 Cognito側 : 受信したSAMLクレームをユーザープールの属性にマッピング。 ALB / Lambda側 : ALBから渡される署名付きヘッダー内のグループ情報を、Lambda側でチェック。 これにより、IDのライフサイクル管理(入社・退社・異動による権限変更)はすべてEntra ID側に集約され、AWS側での二重管理という運用リスクを完全に排除しました。 まとめ:ID管理の脱サイロ化がもたらす価値 今回のプロジェクトを通じて、Microsoft Entra IDと、AWSのサーバーレス技術を、CognitoとALBという「ハブ」を通じてシームレスに結合させることができました。 この構成は、以下の魅力があると思っています。 ガバナンスの強化: 認証の源泉を1つに絞り、ID管理のサイロ化を解消。 開発工数の削減: マネージドサービスの活用により、認証周りの開発・テスト工数を省力化。 最高水準のセキュリティ: インフラレイヤーでのトークン検証により、実装ミスによる漏洩リスクを構造的に排除。(バグによるセキュリティホールの排除) 技術選定において、エンタープライズが抱える組織的制約を理解した上で、極力マネージドサービスを活用することそれ自体が様々なリスクを下げ、結果として優れたアーキテクチャになることを再認識しました。 今回の構成、事例がどなたかのお役に立つと幸いです。
アバター
こんにちは、SCSK林です! 昨今のエンタープライズシステムにおいて、単一のクラウドプロバイダーで全てのワークロードが完結するケースはかなり稀だと思います。 とある案件では、「AWS上の業務データを閉域網経由でGoogle Cloudへ転送し、BigQueryで分析する」という要件に加え、オンプレミスの基幹システムとも連携が必要な「3地点接続」のネットワーク構築が必要でした。 本記事では、AWSの実装そのものではなく、全体アーキテクトの視点から、「AWS Direct Connect を他クラウドやオンプレミスと接続する際に、ハマりやすいポイントと設計の勘所」について共有します。 細かい実装の話ではないので、マルチクラウド接続を実際に設計/構築する時にはここら辺考えないといけないよな~的な目線で見ていただけると幸いです。 アーキテクチャ概要:SCNXをハブとしたハブ&スポーク構成 今回の要件において、最大の課題は「AWS、Google Cloud、オンプレミスの3地点を、いかにシンプルかつセキュアに接続するか」でした。 各拠点をフルメッシュで接続(AWS⇔Google Cloud、AWS⇔On-Prem、Google Cloud⇔On-Prem)すると、管理コストとルーティングの複雑さが指数関数的に増大します。 そこで今回は、SCSKのクラウド接続サービス「SCNX」をハブとして採用し、物理的な複雑さを抽象化しました。 AWS: AWS Direct Connect (DX) GCP: Cloud Interconnect On-Premises: 閉域網接続 Hub: SCNX (Virtual Router) ※SCNXの紹介はコチラ: https://www.scsk.jp/sp/netxdc/lp1/ 設計ポイント BGPルーティング設計 例えばActive/Standby構成を実現するためには、物理的に線を繋ぐだけでなく、BGP(Border Gateway Protocol)を用いて「どちらの道を優先するか」を論理的に制御する必要があります。 AWS Direct Connectにおいて、経路制御を行いActive/Standbyを正しく機能させるには、以下の設計が必要となります。 AWSへの流入制御 Google CloudやオンプレミスからAWSへデータを送る際、AWS側で受け取る経路をPrimaryに固定する必要があります。 ここで重要になるのが AS_PATH Prepend です。AWS側(Direct Connect Gateway)の設定において、Standby回線側のAS Path(自律システム経路)を意図的に長く見せる(Prependする)ことで、対向ルーター(SCNX/Google Cloud)に対して「こちらの道は遠回りだ」と判断させ、自然とPrimary回線が選択されるよう設計しました。 AWSからの流出制御 逆に、AWSからGoogle Cloudへデータを送る際は、AWS側で Local Preference 値を調整し、Primary回線の優先度を高く設定する必要があります。 ※参考URL: https://aws.amazon.com/jp/blogs/news/dx-trafficcontrol-osaka/ 他クラウドと接続する場合、AWSのBGP仕様(Prependの反映挙動など)を理解し、対向システム側とパラメータの整合性を取らなければ、頻繁に経路が切り替わる「フラッピング」の原因となります。 データ転送の最適化:MTUとMSSの調整 複数拠点を接続する際に考慮すべきなのがパケットサイズ(MTU)です。 AWS Direct Connectはジャンボフレーム(MTU 9001)をサポートしていますが、経路上にあるSCNXやGoogle Cloud Interconnect、あるいは途中の仮想アプライアンスでMTUが1500に制限されている場合があります。 この不一致を放置すると、ハンドシェイク(小さなパケット)は成功するのに、いざ大量のデータを転送し始めるとパケットがドロップされるという厄介な現象が発生します。 それの予防策、安全策として、TCP MSS Clamping(最大セグメントサイズの調整)を導入し、経路上の最小MTUに合わせてパケットサイズを最適化することで、安定した通信を確立することができます。 IPアドレス設計:AWS Security Groupはじめファイアウォール設定 構築・テストフェーズでありがちなのが、通信がタイムアウトする系のエラーです。 マルチクラウド環境では、IPアドレス設計が非常に重要です。AWS、Google Cloud、オンプレミスでCIDRが重複しないことはもちろん、「どの範囲のIPが、どのポートで通信してくるか」を厳密に管理し、SGのルールへ反映させるプロセスを徹底する必要があります。 また、アプリの追加要件で当初想定より広いIPレンジが後から必要になることもありがちです。 インフラ担当の皆さんは、特にクラウドサービスだと余裕を持ったIPレンジの確保をしておくことが心の余裕につながります。笑   さいごに 単一のクラウドに閉じていれば難しくないことも他クラウド、他拠点が出てくると技術的難易度が上がってしまいます。 また、往々にして担当者・担当チームがクラウドごとにわかれていて全体設計が蔑ろにされ、問題が後から噴出することもままあると思います。 そのためにも、AWSだけでなく、Google Cloudだけでなく、複数のクラウドに関する知識、知見を持っておくことが重要だと感じました。 この記事がどなたかのお役に立つと幸いです。
アバター
こんにちは、広野です。 以下の記事の続編記事です。RAG で CSV データからの検索精度向上を目指してみました。本記事は UI 編で、主にフロントエンド (React) のコードや UI の動作について記載しています。全体的なアーキテクチャやバックエンドについては前回記事をご覧ください。 Amazon Bedrock Knowledge Bases で構造化データ(CSV)を使用した RAG をつくる -アーキテクチャ編- Amazon Bedrock Knowledge Bases と Amazon S3 Vectors で構築した RAG 環境で、構造化データをデータソースにしたときの検索精度向上を目指しました。本記事はアーキテクチャ編です。 blog.usize-tech.com 2026.03.09 Amazon Bedrock Knowledge Bases で構造化データ(CSV)を使用した RAG をつくる -実装編- Amazon Bedrock Knowledge Bases と Amazon S3 Vectors で構築した RAG 環境で、構造化データをデータソースにしたときの検索精度向上を目指しました。本記事は実装編です。 blog.usize-tech.com 2026.03.23 UI は以前書いた以下の記事の UI をカスタマイズしています。 React で Amazon Bedrock Knowledge Bases ベースの簡易 RAG チャットボットをつくる [2026年1月版] UI編 AWS re:Invent 2025 で、Amazon S3 Vectors が GA されました。それを受けて、以前作成した RAG チャットボット環境をアレンジしてみました。本記事は UI 編です。 blog.usize-tech.com 2026.01.06 この記事では雑多な非構造化データ (PDF等) の中から参考となる情報やファイル名を見つけ出すことを目的としていましたが、本記事では構造化データ (CSV) から参考となる情報やその情報のメタデータを見つけ出すことを目的としています。コードはほぼ同じです。メタデータフィルタリングの機能が追加されているぐらいだと思って下さい。   やりたいこと (再掲) 以下のような架空のヘルプデスク問い合わせ履歴データ (CSV) を用意しました。 ヘルプデスク担当者が新たな問い合わせを受けたときに、似たような過去の対応履歴を引き当てられるようにしたい、というのが目的です。 LLM に、今届いた新しい問い合わせに対する回答案を提案させたい。 回答案を生成するために、自然言語で書かれた問い合わせ内容と回答内容から、意味的に近いデータを引き当てたい。 カテゴリで検索対象をフィルタしたい。その方が精度が上がるケースがあると考えられる。 LLM が回答案を提案するときには、参考にした過去対応履歴がどの問合せ番号のものか、提示させたい。その問合せ番号をキーに、生の対応履歴データを参照できるようにしたい。 以下の前提があります。 データソースとなる CSV ファイルは 1つのみ。過去の対応履歴は 1 つの CSV ファイルに収まっているということ。 つまり、データの1行が1件の問い合わせであり、その項目間には意味的なつながりがある。 まあ、ごくごく一般的なニーズではないかと思います。   前回記事のおさらい メタデータフィルタリングについて 今回のブログ記事では、簡略化のため上記のように「販売形態」と「カテゴリ」の 2 つのメタデータのみフィルタリング可能なように設計します。特定の項目でフィルタリングすることで、検索精度を向上させます。 フィルタ対象項目 販売形態(2種類: 直販, 代理店) カテゴリ(10種類: 家庭用コタツ, 家庭用テーブル, 家庭用収納棚, 家庭用チェア, 家庭用デスク, 業務用ラック, 業務用キャビネット, 業務用会議テーブル, 業務用チェア, 業務用デスク) フィルタ条件選択時の動作 両方未選択 → フィルタリングなし(全件検索) 片方のみ選択 → equals (完全一致) で単一条件検索 両方選択 → andAll で AND 条件検索、それぞれは equals (完全一致) とする つくったもの UI 一般的なチャットボット UI ですが、ユーザーのメッセージ入力欄の下に「絞り込み」という欄を追加しています。ここにあるプルダウンメニューの項目であれば、フィルタリングできるようになっています。 全件検索した例 メタデータフィルタリングを使用せず(プルダウンを選択せず)、「海外発送ができるか」を問い合わせてみました。 過去の問い合わせ履歴データから、4件が見つかりました。 参考問合せ番号のリンクを押すと、それぞれの実データを参照できます。ここでは省略しますが、以下の販売形態、カテゴリで海外発送に言及している履歴があることがわかりました。 問合せ番号 販売形態 カテゴリ AB01234650 直販 家庭用デスク AB01234636 代理店 業務用キャビネット AB01234577 代理店 業務用チェア AB01234653 直販 家庭用テーブル 以降、メタデータフィルタリングでこの検索結果を絞り込みたいと思います。 メタデータ1件でフィルタリングした例 販売形態が「代理店」でフィルタリングして、同じく「海外発送ができるか」を問い合わせた例です。 想定通り、「販売形態が代理店」の問い合わせ履歴データが 2 件、検索されました。 メタデータ2件でフィルタリングした例 販売形態が「直販」、かつカテゴリが「家庭用テーブル」でフィルタリングして、同じく「海外発送できるか」を問い合わせた例です。 想定通り、該当する問い合わせ履歴データ 1 件だけが検索されました。 参考問合せ番号のリンクを押すと、該当問い合わせ履歴の実データが見られます。確かに海外発送についての問い合わせです。   React コード この画面を提供している React のコードですが、詳細は実コードを見てください。 絞り込みのオプションはベタ書きの固定値にしており、選択したデータを Amazon API Gateway に渡しているだけです。当たり前ですが、フォーマットはバックエンドの AWS Lambda 関数で定義したものと合わせています。 AWS AppSync Events から送られてくるレスポンスの中に、citation と呼ばれる、回答の参考になったチャンクとそのメタデータが含まれます。それを元に参考問合せ番号のリンクを作成し、モーダルウィンドウで問い合わせ番号詳細を表示しています。 import { useState, useEffect, useRef } from "react"; import { Container, Grid, Box, Paper, Typography, TextField, Button, Avatar, Dialog, DialogTitle, DialogContent, DialogActions, Link, FormControl, InputLabel, Select, MenuItem } from "@mui/material"; import SendIcon from '@mui/icons-material/Send'; import { blue, grey } from '@mui/material/colors'; import { v4 as uuidv4 } from "uuid"; import { events } from "aws-amplify/data"; import ReactMarkdown from "react-markdown"; import remarkGfm from "remark-gfm"; import { inquireRagSr } from "./Functions.jsx"; import Header from "../Header.jsx"; import Menu from "./Menu.jsx"; const RagSr = (props) => { //定数定義 const groups = props.groups; const sub = props.sub; const idtoken = props.idtoken; const imgUrl = import.meta.env.VITE_IMG_URL; const channelOptions = ["直販", "代理店"]; const categoryOptions = ["家庭用コタツ", "家庭用テーブル", "家庭用収納棚", "家庭用チェア", "家庭用デスク", "業務用ラック", "業務用キャビネット", "業務用会議テーブル", "業務用チェア", "業務用デスク"]; //変数定義 const appsyncSessionIdRef = useRef(); const bedrockSessionIdRef = useRef(null); const channelRef = useRef(); const streamingRefMap = useRef(new Map()); const citationDataMap = useRef(new Map()); //state定義 const [prompt, setPrompt] = useState(""); const [conversation, setConversation] = useState([]); const [streaming, setStreaming] = useState({ text: "", refs: [] }); const [dialogOpen, setDialogOpen] = useState(false); const [selectedCitation, setSelectedCitation] = useState(null); const [filterChannel, setFilterChannel] = useState(""); const [filterCategory, setFilterCategory] = useState(""); //RAGへの問い合わせ送信関数 const putRagSr = () => { if (streaming.text) { setConversation(prev => [ ...prev, { role:"ai", text: streaming.text, ref: streaming.refs }, { role:"user", text: prompt, ref: [] } ]); streamingRefMap.current.clear(); setStreaming({ text:"", refs:[] }); } else { setConversation(prev => [...prev, { role:"user", text: prompt, ref: [] }]); } inquireRagSr(prompt, appsyncSessionIdRef.current, bedrockSessionIdRef.current, idtoken, (() => { const f = []; if (filterChannel) f.push({"販売形態": filterChannel}); if (filterCategory) f.push({"カテゴリ": filterCategory}); return f; })()); //プロンプト欄をクリア setPrompt(""); }; //content.textから問合せ内容と回答内容を抽出 const parseContent = (text) => { const inquiryMatch = text.match(/問合せ内容:\s*([\s\S]*?)(?=\n\n回答内容:|$)/); const answerMatch = text.match(/回答内容:\s*([\s\S]*?)$/); return { inquiry: inquiryMatch ? inquiryMatch[1].trim() : "", answer: answerMatch ? answerMatch[1].trim() : "" }; }; //問合せ番号クリック時の処理 const handleCitationClick = (inquiryNumber) => { const data = citationDataMap.current.get(inquiryNumber); if (data) { setSelectedCitation(data); setDialogOpen(true); } }; //Dialog閉じる処理 const handleDialogClose = () => { setDialogOpen(false); setSelectedCitation(null); }; //サブスクリプション開始関数 const startSubscription = async () => { const appsyncSessionId = appsyncSessionIdRef.current; if (channelRef.current) await channelRef.current.close(); const channel = await events.connect(`rag-stream-response/${sub}/${appsyncSessionId}`); channel.subscribe({ next: (data) => { //Bedrock Knowledge base の session id 取得 if (data.event.type === "bedrock_session") { console.log("=== Session received ==="); console.log(data.event.bedrock_session_id); bedrockSessionIdRef.current = data.event.bedrock_session_id; return; } //問い合わせに対する回答メッセージ (chunkされている) if (data.event.type === "text") { console.log("=== Message received ==="); setStreaming(s => ({ ...s, text: s.text + data.event.message })); return; } //回答に関する関連ドキュメント (citation) if (data.event.type === "citation") { console.log("=== Citation received ==="); console.log(data.event.citation); data.event.citation.forEach(ref => { const inquiryNumber = ref.metadata?.["問合せ番号"]; if (!inquiryNumber) return; if (!streamingRefMap.current.has(inquiryNumber)) { streamingRefMap.current.set(inquiryNumber, { id: inquiryNumber, label: inquiryNumber }); citationDataMap.current.set(inquiryNumber, { metadata: ref.metadata, content: ref.content }); setStreaming(s => ({ ...s, refs: Array.from(streamingRefMap.current.values()) })); } }); } }, error: (err) => console.error("Subscription error:", err), complete: () => console.log("Subscription closed") }); channelRef.current = channel; }; //セッションIDのリセット、サブスクリプション再接続関数 const resetSession = async () => { appsyncSessionIdRef.current = uuidv4(); bedrockSessionIdRef.current = null; setPrompt(""); streamingRefMap.current.clear(); citationDataMap.current.clear(); setStreaming({ text:"", refs:[] }); setConversation([]); setFilterChannel(""); setFilterCategory(""); await startSubscription(); }; //画面表示時 useEffect(() => { //画面表示時に最上部にスクロール window.scrollTo(0, 0); //Bedrockからのレスポンスサブスクライブ関数実行 appsyncSessionIdRef.current = uuidv4(); bedrockSessionIdRef.current = null; startSubscription(); //アンマウント時にチャンネルを閉じる return () => { if (channelRef.current) channelRef.current.close(); }; }, []); //Chatbot UI 会話部分 const renderMessage = (msg, idx) => ( <Box key={idx} sx={{ display: "flex", justifyContent: msg.role === "user" ? "flex-end" : "flex-start", alignItems: "flex-start", mb: 1, width: "100%", minWidth: 0 }} > {msg.role === "ai" && ( <Avatar src={`${imgUrl}/images/ai_chat_icon.svg`} alt="AI" sx={{ mr: 2, mt: 2 }} /> )} <Paper elevation={2} sx={{ p: 2, my: 1, maxWidth: "90%", minWidth: 0, wordBreak: "break-word", overflowWrap: "break-word", bgcolor: msg.role === "user" ? blue[100] : grey[100] }} > <ReactMarkdown remarkPlugins={[remarkGfm]} components={{ p: ({node, ...props}) => <p style={{margin: 0}} {...props} />, code: ({node, inline, ...props}) => ( <code style={{whiteSpace: inline ? 'normal' : 'pre-wrap', wordBreak: 'break-word', overflowWrap: 'break-word'}} {...props} /> ) }} > {msg.text} </ReactMarkdown> {msg.ref.length > 0 && ( <> <h4>参考問合せ番号</h4> <ul style={{ paddingLeft: 20, margin: 0 }}> {msg.ref.map(s => ( <li key={s.id}> <Link component="button" variant="body2" onClick={() => handleCitationClick(s.id)} sx={{ cursor: "pointer" }} > {s.label} </Link> </li> ))} </ul> </> )} </Paper> {msg.role === "user" && ( <Avatar src={`${imgUrl}/images/human_chat_icon.svg`} alt="User" sx={{ ml: 2, mt: 2 }} /> )} </Box> ); return ( <> {/* Header */} <Header groups={groups} signOut={props.signOut} /> <Container maxWidth="lg" sx={{mt:2}}> <Grid container spacing={4}> {/* Menu Pane */} <Grid size={{xs:12,md:4}} order={{xs:2,md:1}}> {/* Sidebar */} <Menu /> </Grid> {/* Contents Pane IMPORTANT */} <Grid size={{xs:12,md:8}} order={{xs:1,md:2}} my={2}> <main> <Grid container spacing={2}> {/* Heading */} <Grid size={{xs:12}}> <Typography id="bedrocksrtop" variant="h5" component="h1" mb={3} gutterBottom>Amazon Bedrock RAG Stream Response テスト</Typography> </Grid> <Grid size={{xs:12}}> {/* Chatbot */} <Paper sx={{p:2,mb:2,width:"100%"}}> {/* あいさつ文(固定) */} {renderMessage({ role: "ai", text: "こんにちは。何かお困りですか?", ref: []}, -1)} {/* 会話履歴 */} {conversation.map((msg, idx) => renderMessage(msg, idx))} {/* 直近のレスポンス */} {streaming.text && renderMessage({ role:"ai", text: streaming.text, ref: streaming.refs }, "stream")} </Paper> {/* 入力エリア */} <Box sx={{display:"flex",gap:1}}> <TextField fullWidth multiline value={prompt} onChange={(e) => setPrompt(e.target.value)} placeholder="Type message here..." sx={{ flexGrow: 1 }} /> <Button variant="contained" size="small" onClick={putRagSr} disabled={!prompt} startIcon={<SendIcon />} sx={{ whiteSpace: "nowrap", flexShrink: 0 }}>送信</Button> </Box> {/* オプション */} <Box sx={{mt:1,p:2,border:"1px solid",borderColor:"divider",borderRadius:1}}> <Typography variant="subtitle2" mb={1}>絞り込み</Typography> <Box sx={{display:"flex",flexWrap:"wrap",gap:2}}> <FormControl size="small" sx={{minWidth:150}}> <InputLabel>販売形態</InputLabel> <Select value={filterChannel} label="販売形態" onChange={(e) => setFilterChannel(e.target.value)}> <MenuItem value="">すべて</MenuItem> {channelOptions.map(v => <MenuItem key={v} value={v}>{v}</MenuItem>)} </Select> </FormControl> <FormControl size="small" sx={{minWidth:150}}> <InputLabel>カテゴリ</InputLabel> <Select value={filterCategory} label="カテゴリ" onChange={(e) => setFilterCategory(e.target.value)}> <MenuItem value="">すべて</MenuItem> {categoryOptions.map(v => <MenuItem key={v} value={v}>{v}</MenuItem>)} </Select> </FormControl> </Box> </Box> {/* クリアボタン */} {(streaming.text || conversation.length > 0) && ( <Box sx={{ display: "flex", justifyContent: "flex-end", mt: 2 }}> <Button variant="contained" size="small" onClick={resetSession}>問い合わせをクリアする</Button> </Box> )} </Grid> </Grid> </main> </Grid> </Grid> </Container> {/* Citation詳細Dialog */} <Dialog open={dialogOpen} onClose={handleDialogClose} maxWidth="md" fullWidth> <DialogTitle>問合せ詳細</DialogTitle> <DialogContent dividers> {selectedCitation && (() => { const { inquiry, answer } = parseContent(selectedCitation.content.text); const m = selectedCitation.metadata; return ( <Box> <Typography variant="subtitle2" color="text.secondary">問合せ番号</Typography> <Typography variant="body1" mb={2}>{m["問合せ番号"] || "-"}</Typography> <Typography variant="subtitle2" color="text.secondary">受付日時</Typography> <Typography variant="body1" mb={2}>{m["受付日時"] || "-"}</Typography> <Typography variant="subtitle2" color="text.secondary">完了日時</Typography> <Typography variant="body1" mb={2}>{m["完了日時"] || "-"}</Typography> <Typography variant="subtitle2" color="text.secondary">販売形態</Typography> <Typography variant="body1" mb={2}>{m["販売形態"] || "-"}</Typography> <Typography variant="subtitle2" color="text.secondary">商品番号</Typography> <Typography variant="body1" mb={2}>{m["商品番号"] || "-"}</Typography> <Typography variant="subtitle2" color="text.secondary">カテゴリ</Typography> <Typography variant="body1" mb={2}>{m["カテゴリ"] || "-"}</Typography> <Typography variant="subtitle2" color="text.secondary">問合せ内容</Typography> <Typography variant="body1" mb={2} sx={{ whiteSpace: "pre-wrap" }}>{inquiry || "-"}</Typography> <Typography variant="subtitle2" color="text.secondary">回答内容</Typography> <Typography variant="body1" mb={2} sx={{ whiteSpace: "pre-wrap" }}>{answer || "-"}</Typography> <Typography variant="subtitle2" color="text.secondary">ステータス</Typography> <Typography variant="body1" mb={2}>{m["ステータス"] || "-"}</Typography> </Box> ); })()} </DialogContent> <DialogActions> <Button onClick={handleDialogClose}>閉じる</Button> </DialogActions> </Dialog> </> ); }; export default RagSr; 途中、inquireRagSr という関数がありますが、axios で Amazon API Gateway をコールするだけの関数です。   まとめ いかがでしたでしょうか。 今回のシリーズ記事の肝は Amazon Bedrock Knowledge Bases データソースのカスタムチャンキングでしたが、結果をこうしてアプリの UI で確認できるようになると、本当にできていることを実感できますよね。引き続き検証して精度向上に努めます。 本記事が皆様のお役に立てれば幸いです。
アバター
こんにちは、SCSK林です! 昨今のデータ活用において、マルチクラウド環境でのデータパイプライン構築は珍しい要件ではなくなっていると思います。 今回紹介する事例でも、AWS上のシステムから発生する大量のストリームデータを、分析基盤であるGoogle Cloud(GCP)のBigQueryへリアルタイム連携するという要件がありました。 ソースとなるのは Amazon Managed Streaming for Apache Kafka (Amazon MSK)で、 当初、MSK Connectの採用を検討しましたが、最終的にはGoogle CloudのCloud Runを用いた独自Consumerの実装というアーキテクチャに落ち着きました。 本記事では、MSK Connectではなく独自実装を選択せざるを得なかった問題と、AWS-Google Cloud間を専用線でセキュアにつなぐためのアーキテクチャ設計の変遷について共有します。 初期構想:MSK Connect 当初構想していた構成は以下のとおりです。構成的にもマネージドサービスを使用しており申し分はなかったと思っています。 Source: Amazon MSK Connector: MSK Connect (Sink Connector to Google Cloud Pub/Sub) Sink : Pub/Sub(Google Cloud) Network: AWS Direct Connect と Google Cloud Partner Interconnect を使用した専用線接続 ※参考URL: https://docs.cloud.google.com/pubsub/docs/connect_kafka#convert-to-pubsub 技術検証の結果、専用線経由でのGoogle Cloudエンドポイントへの到達性や、基本的なデータ転送自体には問題がないことが確認できました。機能要件としては、MSK Connectで満たしている構成でした。 課金問題とアーキテクチャの転換 ただ、この構成ではサービス制約から以下の問題が生じました。 「1コネクタ = 1 Pub/Subトピック」の制約 今回採用しようとしたコネクタの構成上、「1つのMSK Connectリソースにつき、1つのPub/Subトピックへの連携しか定義できない」という制約がありました。 通常、Kafka Consumerであれば1つのプロセスで複数のトピックをSubscribeし、ロジックで振り分けることが容易です。しかし、MSK Connect(および該当のプラグイン)の仕様に準拠すると、連携したいトピックの数だけMSK Connect(Connector)を作成する必要がありました。初期フェーズではトピック数は10個ほどでしたが、次フェーズでは計100トピックまで拡張予定だったので、運用面からも受け入れられない状態となりました。 コストの大幅な増加 MSK Connectの課金体系は MCU (MSK Connect Unit) × 利用時間 です。 データ流量が少ないトピックであっても、コネクタを分割すれば最低1MCU分のコストが発生します。今回のシステムには多数のトピックが存在したため、それら全てに対して個別にConnectorを立ち上げると、MCUの総数がトピック数に比例して増加し、月額コストが想定以上に超過することが判明しました。 ※参考URL: https://aws.amazon.com/jp/msk/pricing/   上記問題の解決:集約による効率化 上記制約から、解決策は「1つのコンピュートリソースで多対多(N対N)の処理をさばくこと」を実現する必要がありました。 MSK Connectの採用断念: コネクタ管理の複雑さとコスト増が見合わないため。 独自実装(Cloud Run): コンテナベースのアプリケーションであれば、1つのConsumerグループで複数トピックをSubscribeし、メモリ上でPub/Subトピックへ振り分けるロジックを実装可能です。これにより、リソースを極限まで集約し、コストを圧縮できると判断しました。   最終アーキテクチャ:Cloud Run 最終的に採用したアーキテクチャは以下のとおりです。 Consumer (Google Cloud): Cloud Run上にKafka Consumerアプリをデプロイ。 Buffer (Google Cloud): 取得したデータを一度 Pub/Sub へPublish。 ETL (Google Cloud): Cloud Data Fusion が Pub/Sub からデータを読み出し、変換処理を行って BigQuery へロード。 構成のポイントは以下の3点です。 Consumer Groupの集約によるリソース効率の最大化 MSK Connect(採用検討時のコネクタ)では「1コネクタ = 1トピック」という制約があり、トピック数に比例してコネクタ(MCU)が線形に増加する構造でした。これに対し、Cloud Runを用いた独自実装では、1つのコンテナアプリケーション(Consumer Group)で複数のトピックをまとめてSubscribeする方式を採用しました。 Before (MSK Connect案): トピックごとにコネクタプロセスが起動。データ流量が少ないトピックでも最低限のMCUリソースを占有し、コスト効率が極めて悪い。 After (Cloud Run案): 1つのConsumerアプリで複数のトピックをSubscribe。メモリ空間を共有しながら効率的にメッセージを処理し、Cloud RunのCPU使用率ベースでオートスケールさせることで、リソースの余剰が少なくなるようにしました。 ※今回はGoogle CloudのCloud Runで実装しましたが、AWS上での実装でもよいと思います。 Pub/Subをバッファとした「疎結合」なパイプライン もう一つの重要な設計判断は、Consumerアプリ(Cloud Run)から直接BigQueryへ書き込まず、必ず Google Cloud Pub/Sub を挟む構成にしたことです。これにより、システムを「データ取得層」と「データ加工・ロード層」に明確に分離(疎結合化)しました。 責務の分離: Cloud Run (Consumer): 「MSKからデータを取り出し、Pub/Subへ投げる」ことだけに集中。データの変換ロジックやBigQueryのスキーマ定義を持たないため、軽量かつステートレスに保たれます。 Data Fusion (ETL): Pub/SubからデータをPullし、複雑な変換を行ってBigQueryへロード。 耐障害性の向上: 仮にBigQueryやData Fusion側で障害や遅延が発生しても、データはPub/Subに滞留(バッファリング)するだけです。Cloud Run(Consumer)は影響を受けず、AWS MSKからのデータ取得を継続できます。これにより、「AWS側のログ保持期間切れ(データロスト)」のリスクを最小限に抑える設計としました。 AWS-Google Cloud間のセキュアな接続 このアーキテクチャを支えるネットワークは、AWS Direct ConnectとGoogle Cloud Partner Interconnectを結ぶ専用線です。 AWS側のSecurity Groupでは、Google Cloud Cloud RunがデプロイされているサブネットからのInboundのみを許可し、かつConsumer Groupの集約によって接続元IPの管理もシンプルになりました。   まとめ 今回の最終的な構成は、おそらく初期検討段階では確実に外される構成だと思います。 機能的には要件を満たしていても、高トラフィック環境下、エンタープライズ環境ではコストがボトルネックになる場合があります。AWSの課金体系を深く理解し、全体的に適切な構成を選択していくことの重要性を改めて認識しました。 今回の構成、事例がどなたかのお役に立つと幸いです。
アバター
こんにちは、広野です。 以下の記事の続編記事です。RAG で CSV データからの検索精度向上を目指してみました。本記事は実装編で、主にバックエンドの設定について記載しています。UI や実際の動作については続編記事の UI 編で紹介します。 Amazon Bedrock Knowledge Bases で構造化データ(CSV)を使用した RAG をつくる -アーキテクチャ編- Amazon Bedrock Knowledge Bases と Amazon S3 Vectors で構築した RAG 環境で、構造化データをデータソースにしたときの検索精度向上を目指しました。本記事はアーキテクチャ編です。 blog.usize-tech.com 2026.03.09   やりたいこと (再掲) 以下のような架空のヘルプデスク問い合わせ履歴データ (CSV) を用意しました。 ヘルプデスク担当者が新たな問い合わせを受けたときに、似たような過去の対応履歴を引き当てられるようにしたい、というのが目的です。 LLM に、今届いた新しい問い合わせに対する回答案を提案させたい。 回答案を生成するために、自然言語で書かれた問い合わせ内容と回答内容から、意味的に近いデータを引き当てたい。 カテゴリで検索対象をフィルタしたい。その方が精度が上がるケースがあると考えられる。 LLM が回答案を提案するときには、参考にした過去対応履歴がどの問合せ番号のものか、提示させたい。その問合せ番号をキーに、生の対応履歴データを参照できるようにしたい。 以下の前提があります。 データソースとなる CSV ファイルは 1つのみ。過去の対応履歴は 1 つの CSV ファイルに収まっているということ。 つまり、データの1行が1件の問い合わせであり、その項目間には意味的なつながりがある。 まあ、ごくごく一般的なニーズではないかと思います。   関連記事 以前、私が公開した Amazon Bedrock Knowledge Bases や Amazon S3 Vectors を使用した RAG 基盤の記事です。今回はこの基盤のチャンキング戦略をカスタマイズして臨みました。 React で Amazon Bedrock Knowledge Bases ベースの簡易 RAG チャットボットをつくる [2026年1月版] アーキテクチャ概要編 AWS re:Invent 2025 で、Amazon S3 Vectors が GA されました。それを受けて、以前作成した RAG チャットボットをアレンジしてみました。 blog.usize-tech.com 2026.01.06   本記事の言及範囲 RAG そのものや、RAG 基盤については本記事では語りません。 以下のアーキテクチャ図の中の、赤枠の部分に着目します。ベクトルデータを格納するまでのデータソースのカスタムチャンキングと、それを実装した Amazon Bedrock Knowledge Bases にどう問い合わせするか、です。   アーキテクチャ (再掲) 前回記事で紹介した、カスタムチャンキングを実装するアーキテクチャです。 実装 Amazon Bedrock Knowledge Bases カスタムチャンキングの一連の処理は、Amazon Bedrock Knowledge Bases で行われます。各種設定の全体像は AWS マネジメントコンソールの画面で一望できます。 カスタムチャンキングを AWS Lambda 関数に処理させるので、当然 Lambda 関数が必要です。(内容は後述) Lambda 関数が出力するチャンク分割後のデータ (中間成果物の JSON) を保存する S3 バケットが必要です。これはオリジナルのドキュメント配置用 S3 バケットとは別にする必要があります。 チャンキング戦略は NO にします。NO にすると、オリジナルのドキュメントの内容をそのまま Lambda 関数に渡してくれます。別の戦略を選択すると、その戦略によってチャンク分割されたデータごとに Lambda 関数を実行してしまうので、期待するカスタムチャンク分割ができなくなります。 解析戦略は Default にします。 チャンキング戦略と解析戦略は、データソース作成後には変更できません。変更したいときは作り直しになります。 データ削除ポリシーは DELETE にすることをお勧めします。同期をかけたときに過去のデータを残すかどうかの設定で、残してしまうと古い情報が検索に引っ掛かってしまいます。 AWS Lambda 関数 (カスタムチャンキング) カスタムチャンキングする Lambda 関数コード (Python) です。 冒頭に紹介した CSV を、Amazon Bedrock Knowledge Bases が理解できるフォーマットの JSON データに変換します。内部的には 1 チャンクごとに自然言語で検索させたいデータとメタデータに分けて出力します。 Lambda レイヤーは不要です。モジュールは Lambda 標準でサポートしているものだけで実装可能でした。  インプットとなる S3 バケット内の CSV データのメタデータは、Amazon Bedrock Knowledge Bases がこの Lambda 関数を呼び出すときに渡してくれるので、こちらが特に気にすることはありません。受け取ったバケット名、キーから CSV データを取得しに行きます。出力先となる S3 バケットやキー名も Amazon Bedrock Knowledge Bases から渡されますのでこの関数内でベタ書きすることはありません。 データフォーマットの変換処理の内容的には、そんなに難しいことはしていません。大事なのは出力フォーマットです。 import json import csv import boto3 from io import StringIO s3 = boto3.client('s3') def lambda_handler(event, context): try: bucket_name = event.get('bucketName') input_files = event.get('inputFiles', []) output_files = [] for file_info in input_files: original_file_location = file_info.get('originalFileLocation', {}) s3_location = original_file_location.get('s3Location', {}) original_uri = s3_location.get('uri', '') content_batches = file_info.get('contentBatches', []) output_batches = [] for batch in content_batches: input_key = batch.get('key') # Read input file from S3 response = s3.get_object(Bucket=bucket_name, Key=input_key) input_content = json.loads(response['Body'].read().decode('utf-8')) # Extract CSV content csv_content = input_content['fileContents'][0]['contentBody'] # Remove BOM if present (input may have BOM) if csv_content.startswith('\ufeff'): csv_content = csv_content[1:] csv_reader = csv.DictReader(StringIO(csv_content)) # Process each row as a chunk file_contents = [] for row in csv_reader: content_body = f"問合せ番号: {row.get('問合せ番号', '')}\n商品番号: {row.get('商品番号', '')}\n\n問合せ内容:\n{row.get('問合せ内容', '')}\n\n回答内容:\n{row.get('回答内容', '')}" content_metadata = { "問合せ番号": row.get('問合せ番号', ''), "販売形態": row.get('販売形態', ''), "受付日時": row.get('受付日時', ''), "完了日時": row.get('完了日時', ''), "商品番号": row.get('商品番号', ''), "カテゴリ": row.get('カテゴリ', ''), "ステータス": row.get('ステータス', '') } file_contents.append({ "contentBody": content_body, "contentType": "TEXT", "contentMetadata": content_metadata }) # Write output file to S3 output_key = input_key.replace('.json', '_transformed.json') output_data = {"fileContents": file_contents} s3.put_object( Bucket=bucket_name, Key=output_key, Body=json.dumps(output_data, ensure_ascii=False), ContentType='application/json' ) output_batches.append({"key": output_key}) output_files.append({ "originalFileLocation": original_file_location, "fileMetadata": file_info.get('fileMetadata', {}), "contentBatches": output_batches }) return {"outputFiles": output_files} except Exception as e: print(f"Error: {str(e)}") import traceback traceback.print_exc() raise チャンク分割された後のデータ構造 (再掲) Lambda 関数がチャンク分割した後のデータ構造 (上のアーキテクチャ図では 5番の処理によって作成されるもの) は、以下のようになります。 { "fileContents": [ { "contentBody": "問合せ番号: AB01234569\n商品番号: SH001-01BL\n\n問合せ内容:\n[問合せ内容の文章]\n\n回答内容:\n[回答内容の文章]", "contentType": "TEXT", "contentMetadata": { "問合せ番号": "AB01234569", "販売形態": "代理店", "受付日時": "2026/2/23 12:59", "完了日時": "2026/2/23 13:39", "商品番号": "SH001-01BL", "カテゴリ": "家庭用収納棚", "ステータス": "完了" } }, { "contentBody": "問合せ番号: AB01234573\n商品番号: TB19541\n\n問合せ内容:\n[問合せ内容の文章]\n\n回答内容:\n[回答内容の文章]", "contentType": "TEXT", "contentMetadata": { "問合せ番号": "AB01234573", "販売形態": "直販", "受付日時": "2026/2/24 9:15", "完了日時": "2026/2/24 14:30", "商品番号": "TB19541", "カテゴリ": "家庭用テーブル", "ステータス": "完了" } } ] } fileContents 配列の各要素が 1 チャンク(CSV の 1 行に相当) contentBody がベクトル化・検索対象にできるテキスト contentMetadata が引用表示やフィルタリングに使用されるメタデータ ※contentBody ももちろん引用可能 ここまで実装できると、Amazon Bedrock Knowledge Bases に対して contentBody に書かれた内容に対して自然言語で検索できたり、検索時にメタデータの項目単位でフィルタリングできるようになります。 メタデータフィルタリングについて Amazon Bedrock Knowledge Bases ができてしまえば、自然言語による問い合わせは RetrieveAndGenerate API を使用して極論プロンプトさえ送ればいいので、難しいことはありません。しかし、メタデータフィルタリング機能を追加すると、設計次第ではコードが複雑になります。 ここで、メタデータフィルタリングについて仕様を説明します。 メタデータ条件にマッチするチャンクのみにベクトル検索を行うため、不要な結果を排除することができ、検索精度の向上が期待できる。 メタデータ項目に対してかけられる文字列検索条件は、完全一致や、指定した文字列を含む、などいろいろできる。Amazon S3 Vectors でサポートしている条件は以下公式ドキュメントを参照。 メタデータ項目は、複数項目を And や Or で組み合わせることが可能。 メタデータフィルタリング - Amazon Simple Storage Service メタデータフィルタリングを使用して、ベクトルにアタッチされた特定の属性に基づいてクエリ結果を絞り込む方法について説明します。 docs.aws.amazon.com つまり、かなり細かいフィルタリングができるということです。 以下にメタデータフィルタリングを設定するときの Lambda 関数コードの一部を紹介します。 単一のメタデータ条件 「販売形態が代理店で完全一致」でフィルタリングしたいとき retrievalConfiguration={ "vectorSearchConfiguration": { "filter": { "equals": { "key": "販売形態", "value": "代理店" } } } } 複数のメタデータ条件 「販売形態が代理店で完全一致」かつ「カテゴリが家庭用コタツで完全一致」でフィルタリングしたいとき retrievalConfiguration={ "vectorSearchConfiguration": { "filter": { "andAll": [ { "equals": { "key": "販売形態", "value": "代理店" } }, { "equals": { "key": "カテゴリ", "value": "家庭用コタツ" } } ] } } } 見てもらえるとわかると思いますが、複数のメタデータ条件では 2 つの equals 条件を andAll で囲んでいると思います。上記はまだシンプルですが、複数の条件が重なれば重なるほど、このような階層構造をさらにコーディングしなければなりません。Or 条件も可能とすると、さらに複雑になりそうです。 今回のブログ記事では、簡略化のため上記のように「販売形態」と「カテゴリ」の 2 つのメタデータのみフィルタリング可能なように設計します。 フィルタ対象項目 販売形態(2種類: 直販, 代理店) カテゴリ(10種類: 家庭用コタツ, 家庭用テーブル, 家庭用収納棚, 家庭用チェア, 家庭用デスク, 業務用ラック, 業務用キャビネット, 業務用会議テーブル, 業務用チェア, 業務用デスク) フィルタ条件選択時の動作 両方未選択 → フィルタリングなし(全件検索) 片方のみ選択 → 選択したキーワードに完全一致で単一条件検索 両方選択 → AND 条件検索、それぞれ選択したキーワードに完全一致とする AWS Lambda 関数 (ナレッジベースへの問い合わせ) 前述のメタデータフィルタリング機能を実装した、Amazon Bedrock Knowledge Bases の RetrieveAndGenerate API をコールする AWS Lambda 関数コードは以下のようになります。 Amazon API Gateway REST API から呼び出され、AWS AppSync Events にストリームレスポンスを返す構成です。コメントで メタデータフィルタリングの組み立て と書いてある部分が先ほど説明した部分の実装です。 インプットとして "filters": [ {"販売形態": "代理店"}, {"カテゴリ": "家庭用コタツ"} ] のようなメタデータフィルタリングパラメータを受け取る想定です。条件が2つあれば andAll で囲う処理を実装しています。 import os import json import boto3 import urllib.request from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest # common objects and valiables session = boto3.session.Session() bedrock_agent = boto3.client('bedrock-agent-runtime') endpoint = os.environ['APPSYNC_API_ENDPOINT'] model_arn = os.environ['MODEL_ARN'] knowledge_base_id = os.environ['KNOWLEDGE_BASE_ID'] region = os.environ['REGION'] service = 'appsync' headers = {'Content-Type': 'application/json'} # AppSync publish message function def publish_appsync_message(sub, appsync_session_id, payload, credentials): body = json.dumps({ "channel": f"rag-stream-response/{sub}/{appsync_session_id}", "events": [ json.dumps(payload) ] }).encode("utf-8") aws_request = AWSRequest( method='POST', url=endpoint, data=body, headers=headers ) SigV4Auth(credentials, service, region).add_auth(aws_request) req = urllib.request.Request( url=endpoint, data=aws_request.body, method='POST' ) for k, v in aws_request.headers.items(): req.add_header(k, v) with urllib.request.urlopen(req) as res: return res.read().decode('utf-8') # handler def lambda_handler(event, context): try: credentials = session.get_credentials().get_frozen_credentials() # API Gateway からのインプットを取得 prompt = event['body']['prompt'] appsync_session_id = event['body']['appsyncSessionId'] bedrock_session_id = event['body'].get('bedrockSessionId') sub = event['sub'] # Amazon Bedrock Knowledge Bases への問い合わせパラメータ作成 request = { "input": { "text": prompt }, "retrieveAndGenerateConfiguration": { "type": "KNOWLEDGE_BASE", "knowledgeBaseConfiguration": { "knowledgeBaseId": knowledge_base_id, "modelArn": model_arn, "generationConfiguration": { "inferenceConfig": { "textInferenceConfig": { "maxTokens": 10000, "temperature": 0.5, "topP": 0.9 } }, "performanceConfig": { "latency": "standard" }, "promptTemplate": { "textPromptTemplate": ( "あなたは優秀なヘルプデスクアシスタントです。ヘルプデスク担当者からの質問に対して、必ず日本語で回答してください。" "適切な回答が見つからない場合は、正直に「分かりません」と回答してください。\n\n" "検索結果:\n$search_results$\n\n" "回答指示: $output_format_instructions$" ) } } } } } # メタデータフィルタ条件の組み立て filters = event['body'].get('filters', []) if filters: conditions = [{"equals": {"key": k, "value": v}} for f in filters for k, v in f.items()] if len(conditions) == 1: retrieval_filter = conditions[0] else: retrieval_filter = {"andAll": conditions} request["retrieveAndGenerateConfiguration"]["knowledgeBaseConfiguration"]["retrievalConfiguration"] = { "vectorSearchConfiguration": { "filter": retrieval_filter } } # Bedrock sessionId は存在するときのみ渡す (継続会話時のみ) if bedrock_session_id: request["sessionId"] = bedrock_session_id # Bedrock Knowledge Bases への問い合わせ response = bedrock_agent.retrieve_and_generate_stream(**request) # Bedrock sessionId if "sessionId" in response: publish_appsync_message( sub, appsync_session_id, { "type": "bedrock_session", "bedrock_session_id": response["sessionId"] }, credentials ) for chunk in response["stream"]: payload = None # Generated text if "output" in chunk and "text" in chunk["output"]: payload = { "type": "text", "message": chunk["output"]["text"] } print({"t": chunk["output"]["text"]}) # Citation elif "citation" in chunk: payload = { "type": "citation", "citation": chunk['citation']['retrievedReferences'] } print({"c": chunk['citation']['retrievedReferences']}) # Continue if not payload: continue # Publish AppSync publish_appsync_message(sub, appsync_session_id, payload, credentials) except Exception as e: print(str(e)) raise AWS CloudFormation テンプレート Amazon API Gateway REST API や AWS AppSync Events API など、関連するリソースを一式デプロイするテンプレートを掲載します。これ単体では動かないと思いますので、参考までに。ここまで実装できると、アプリ UI から API をコールすることでチャットボット UI を作れます。 AWSTemplateFormatVersion: 2010-09-09 Description: The CloudFormation template that creates a S3 vector bucket and index as a RAG Knowledge base. # ------------------------------------------------------------# # Input Parameters # ------------------------------------------------------------# Parameters: SystemName: Type: String Description: System name. use lower case only. (e.g. example) Default: example MaxLength: 10 MinLength: 1 SubName: Type: String Description: System sub name. use lower case only. (e.g. prod or dev) Default: dev MaxLength: 10 MinLength: 1 DomainName: Type: String Description: Domain name for URL. xxxxx.xxx (e.g. example.com) Default: example.com AllowedPattern: "[^\\s@]+\\.[^\\s@]+" SubDomainName: Type: String Description: Sub domain name for URL. (e.g. example-prod or example-dev) Default: example-dev MaxLength: 20 MinLength: 1 Dimension: Type: Number Description: The dimensions of the vectors to be inserted into the vector index. The value depends on the embedding model. Default: 1024 MaxValue: 4096 MinValue: 1 EmbeddingModelId: Type: String Description: The embedding model ID. Default: amazon.titan-embed-text-v2:0 MaxLength: 100 MinLength: 1 LlmModelId: Type: String Description: The LLM model ID for the Knowledge base. Default: global.amazon.nova-2-lite-v1:0 MaxLength: 100 MinLength: 1 Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "General Configuration" Parameters: - SystemName - SubName - Label: default: "Domain Configuration" Parameters: - DomainName - SubDomainName - Label: default: "Embedding Configuration" Parameters: - Dimension - EmbeddingModelId - Label: default: "Knowledge Base Configuration" Parameters: - LlmModelId Resources: # ------------------------------------------------------------# # S3 # ------------------------------------------------------------# S3BucketKbDatasource: Type: AWS::S3::Bucket Properties: BucketName: !Sub ${SystemName}-${SubName}-kbdatasource PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true CorsConfiguration: CorsRules: - AllowedHeaders: - "*" AllowedMethods: - "GET" - "HEAD" - "PUT" - "POST" - "DELETE" AllowedOrigins: - !Sub https://${SubDomainName}.${DomainName} ExposedHeaders: - last-modified - content-type - content-length - etag - x-amz-version-id - x-amz-request-id - x-amz-id-2 - x-amz-cf-id - x-amz-storage-class - date - access-control-expose-headers MaxAge: 3000 Tags: - Key: Cost Value: !Sub ${SystemName}-${SubName} S3BucketPolicyKbDatasource: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref S3BucketKbDatasource PolicyDocument: Version: "2012-10-17" Statement: - Effect: Deny Principal: "*" Action: "s3:*" Resource: - !Sub "arn:aws:s3:::${S3BucketKbDatasource}" - !Sub "arn:aws:s3:::${S3BucketKbDatasource}/*" Condition: Bool: "aws:SecureTransport": "false" DependsOn: - S3BucketKbDatasource S3VectorBucket: Type: AWS::S3Vectors::VectorBucket Properties: VectorBucketName: !Sub ${SystemName}-${SubName}-vectordb S3BucketKbIntermediate: Type: AWS::S3::Bucket Properties: BucketName: !Sub ${SystemName}-${SubName}-kbintermediate PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true Tags: - Key: Cost Value: !Sub ${SystemName}-${SubName} S3BucketPolicyKbIntermediate: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref S3BucketKbIntermediate PolicyDocument: Version: "2012-10-17" Statement: - Effect: Deny Principal: "*" Action: "s3:*" Resource: - !Sub "arn:aws:s3:::${S3BucketKbIntermediate}" - !Sub "arn:aws:s3:::${S3BucketKbIntermediate}/*" Condition: Bool: "aws:SecureTransport": "false" DependsOn: - S3BucketKbIntermediate S3VectorBucketIndex: Type: AWS::S3Vectors::Index Properties: IndexName: !Sub ${SystemName}-${SubName}-vectordb-index DataType: float32 Dimension: !Ref Dimension DistanceMetric: cosine VectorBucketArn: !GetAtt S3VectorBucket.VectorBucketArn MetadataConfiguration: NonFilterableMetadataKeys: - AMAZON_BEDROCK_TEXT - AMAZON_BEDROCK_METADATA DependsOn: - S3VectorBucket # ------------------------------------------------------------# # Bedrock Knowledge Base # ------------------------------------------------------------# BedrockKnowledgeBase: Type: AWS::Bedrock::KnowledgeBase Properties: Name: !Sub ${SystemName}-${SubName}-kb Description: !Sub RAG Knowledge Base for ${SystemName}-${SubName} KnowledgeBaseConfiguration: Type: VECTOR VectorKnowledgeBaseConfiguration: EmbeddingModelArn: !Sub arn:aws:bedrock:${AWS::Region}::foundation-model/${EmbeddingModelId} RoleArn: !GetAtt IAMRoleBedrockKb.Arn StorageConfiguration: Type: S3_VECTORS S3VectorsConfiguration: IndexArn: !GetAtt S3VectorBucketIndex.IndexArn VectorBucketArn: !GetAtt S3VectorBucket.VectorBucketArn Tags: Cost: !Sub ${SystemName}-${SubName} DependsOn: - IAMRoleBedrockKb BedrockKnowledgeBaseDataSource: Type: AWS::Bedrock::DataSource Properties: Name: !Sub ${SystemName}-${SubName}-kb-datasource Description: !Sub RAG Knowledge Base Data Source for ${SystemName}-${SubName} KnowledgeBaseId: !Ref BedrockKnowledgeBase DataDeletionPolicy: DELETE DataSourceConfiguration: Type: S3 S3Configuration: BucketArn: !GetAtt S3BucketKbDatasource.Arn VectorIngestionConfiguration: ChunkingConfiguration: ChunkingStrategy: NONE CustomTransformationConfiguration: Transformations: - TransformationFunction: TransformationLambdaConfiguration: LambdaArn: !GetAtt LambdaCsvChunker.Arn StepToApply: POST_CHUNKING IntermediateStorage: S3Location: URI: !Sub s3://${S3BucketKbIntermediate}/ DependsOn: - S3BucketKbDatasource - BedrockKnowledgeBase - S3BucketKbIntermediate # ------------------------------------------------------------# # AppSync Events # ------------------------------------------------------------# AppSyncChannelNamespaceRagSR: Type: AWS::AppSync::ChannelNamespace Properties: Name: rag-stream-response ApiId: Fn::ImportValue: !Sub AppSyncApiId-${SystemName}-${SubName} CodeHandlers: | import { util } from '@aws-appsync/utils'; export function onSubscribe(ctx) { const requested = ctx.info.channel.path; if (!requested.startsWith(`/rag-stream-response/${ctx.identity.sub}`)) { util.unauthorized(); } } Tags: - Key: Cost Value: !Sub ${SystemName}-${SubName} # ------------------------------------------------------------# # API Gateway REST API # ------------------------------------------------------------# RestApiRagSR: Type: AWS::ApiGateway::RestApi Properties: Name: !Sub rag-sr-${SystemName}-${SubName} Description: !Sub REST API to call Lambda rag-stream-response-${SystemName}-${SubName} EndpointConfiguration: Types: - REGIONAL IpAddressType: dualstack Tags: - Key: Cost Value: !Sub ${SystemName}-${SubName} RestApiDeploymentRagSR: Type: AWS::ApiGateway::Deployment Properties: RestApiId: !Ref RestApiRagSR DependsOn: - RestApiMethodRagSRPost - RestApiMethodRagSROptions RestApiStageRagSR: Type: AWS::ApiGateway::Stage Properties: StageName: prod Description: production stage RestApiId: !Ref RestApiRagSR DeploymentId: !Ref RestApiDeploymentRagSR MethodSettings: - ResourcePath: "/*" HttpMethod: "*" LoggingLevel: INFO DataTraceEnabled : true TracingEnabled: false AccessLogSetting: DestinationArn: !GetAtt LogGroupRestApiRagSR.Arn Format: '{"requestId":"$context.requestId","status":"$context.status","sub":"$context.authorizer.claims.sub","email":"$context.authorizer.claims.email","resourcePath":"$context.resourcePath","requestTime":"$context.requestTime","sourceIp":"$context.identity.sourceIp","userAgent":"$context.identity.userAgent","apigatewayError":"$context.error.message","authorizerError":"$context.authorizer.error","integrationError":"$context.integration.error"}' Tags: - Key: Cost Value: !Sub ${SystemName}-${SubName} RestApiAuthorizerRagSR: Type: AWS::ApiGateway::Authorizer Properties: Name: !Sub restapi-authorizer-ragsr-${SystemName}-${SubName} RestApiId: !Ref RestApiRagSR Type: COGNITO_USER_POOLS ProviderARNs: - Fn::ImportValue: !Sub CognitoArn-${SystemName}-${SubName} AuthorizerResultTtlInSeconds: 300 IdentitySource: method.request.header.Authorization RestApiResourceRagSR: Type: AWS::ApiGateway::Resource Properties: RestApiId: !Ref RestApiRagSR ParentId: !GetAtt RestApiRagSR.RootResourceId PathPart: ragsr RestApiMethodRagSRPost: Type: AWS::ApiGateway::Method Properties: RestApiId: !Ref RestApiRagSR ResourceId: !Ref RestApiResourceRagSR HttpMethod: POST AuthorizationType: COGNITO_USER_POOLS AuthorizerId: !Ref RestApiAuthorizerRagSR Integration: Type: AWS IntegrationHttpMethod: POST Credentials: Fn::ImportValue: !Sub ApigLambdaInvocationRoleArn-${SystemName}-${SubName} Uri: !Sub "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaRagSR.Arn}/invocations" PassthroughBehavior: NEVER RequestTemplates: application/json: | { "body": $input.json('$'), "sub": "$context.authorizer.claims.sub" } RequestParameters: integration.request.header.X-Amz-Invocation-Type: "'Event'" IntegrationResponses: - ResponseParameters: method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token,Cache-Control'" method.response.header.Access-Control-Allow-Methods: "'POST,OPTIONS'" method.response.header.Access-Control-Allow-Origin: !Sub "'https://${SubDomainName}.${DomainName}'" ResponseTemplates: application/json: '' StatusCode: '202' MethodResponses: - StatusCode: '202' ResponseModels: application/json: Empty ResponseParameters: method.response.header.Access-Control-Allow-Origin: true method.response.header.Access-Control-Allow-Headers: true method.response.header.Access-Control-Allow-Methods: true RestApiMethodRagSROptions: Type: AWS::ApiGateway::Method Properties: RestApiId: !Ref RestApiRagSR ResourceId: !Ref RestApiResourceRagSR HttpMethod: OPTIONS AuthorizationType: NONE Integration: Type: MOCK Credentials: Fn::ImportValue: !Sub ApigLambdaInvocationRoleArn-${SystemName}-${SubName} IntegrationResponses: - ResponseParameters: method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token,Cache-Control'" method.response.header.Access-Control-Allow-Methods: "'POST,OPTIONS'" method.response.header.Access-Control-Allow-Origin: !Sub "'https://${SubDomainName}.${DomainName}'" ResponseTemplates: application/json: '' StatusCode: '200' PassthroughBehavior: WHEN_NO_MATCH RequestTemplates: application/json: '{"statusCode": 200}' MethodResponses: - ResponseModels: application/json: Empty ResponseParameters: method.response.header.Access-Control-Allow-Headers: true method.response.header.Access-Control-Allow-Methods: true method.response.header.Access-Control-Allow-Origin: true StatusCode: '200' # ------------------------------------------------------------# # API Gateway LogGroup (CloudWatch Logs) # ------------------------------------------------------------# LogGroupRestApiRagSR: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/apigateway/${RestApiRagSR} RetentionInDays: 365 Tags: - Key: Cost Value: !Sub ${SystemName}-${SubName} # ------------------------------------------------------------# # Lambda # ------------------------------------------------------------# LambdaRagSR: Type: AWS::Lambda::Function Properties: FunctionName: !Sub rag-sr-${SystemName}-${SubName} Description: !Sub Lambda Function to invoke Bedrock Knowledge Bases for ${SystemName}-${SubName} Architectures: - x86_64 Runtime: python3.14 Timeout: 300 MemorySize: 128 Environment: Variables: APPSYNC_API_ENDPOINT: Fn::ImportValue: !Sub AppSyncEventsEndpointHttp-${SystemName}-${SubName} MODEL_ARN: !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:inference-profile/${LlmModelId}" KNOWLEDGE_BASE_ID: !Ref BedrockKnowledgeBase REGION: !Ref AWS::Region Role: !GetAtt LambdaBedrockKbRole.Arn Handler: index.lambda_handler Tags: - Key: Cost Value: !Sub ${SystemName}-${SubName} Code: ZipFile: | import os import json import boto3 import urllib.request from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest # common objects and valiables session = boto3.session.Session() bedrock_agent = boto3.client('bedrock-agent-runtime') endpoint = os.environ['APPSYNC_API_ENDPOINT'] model_arn = os.environ['MODEL_ARN'] knowledge_base_id = os.environ['KNOWLEDGE_BASE_ID'] region = os.environ['REGION'] service = 'appsync' headers = {'Content-Type': 'application/json'} # AppSync publish message function def publish_appsync_message(sub, appsync_session_id, payload, credentials): body = json.dumps({ "channel": f"rag-stream-response/{sub}/{appsync_session_id}", "events": [ json.dumps(payload) ] }).encode("utf-8") aws_request = AWSRequest( method='POST', url=endpoint, data=body, headers=headers ) SigV4Auth(credentials, service, region).add_auth(aws_request) req = urllib.request.Request( url=endpoint, data=aws_request.body, method='POST' ) for k, v in aws_request.headers.items(): req.add_header(k, v) with urllib.request.urlopen(req) as res: return res.read().decode('utf-8') # handler def lambda_handler(event, context): try: credentials = session.get_credentials().get_frozen_credentials() # API Gateway からのインプットを取得 prompt = event['body']['prompt'] appsync_session_id = event['body']['appsyncSessionId'] bedrock_session_id = event['body'].get('bedrockSessionId') sub = event['sub'] # Amazon Bedrock Knowledge Bases への問い合わせパラメータ作成 request = { "input": { "text": prompt }, "retrieveAndGenerateConfiguration": { "type": "KNOWLEDGE_BASE", "knowledgeBaseConfiguration": { "knowledgeBaseId": knowledge_base_id, "modelArn": model_arn, "generationConfiguration": { "inferenceConfig": { "textInferenceConfig": { "maxTokens": 10000, "temperature": 0.5, "topP": 0.9 } }, "performanceConfig": { "latency": "standard" }, "promptTemplate": { "textPromptTemplate": ( "あなたは優秀なヘルプデスクアシスタントです。ヘルプデスク担当者からの質問に対して、必ず日本語で回答してください。" "適切な回答が見つからない場合は、正直に「分かりません」と回答してください。\n\n" "検索結果:\n$search_results$\n\n" "回答指示: $output_format_instructions$" ) } } } } } # メタデータフィルタ条件の組み立て filters = event['body'].get('filters', []) if filters: conditions = [{"equals": {"key": k, "value": v}} for f in filters for k, v in f.items()] if len(conditions) == 1: retrieval_filter = conditions[0] else: retrieval_filter = {"andAll": conditions} request["retrieveAndGenerateConfiguration"]["knowledgeBaseConfiguration"]["retrievalConfiguration"] = { "vectorSearchConfiguration": { "filter": retrieval_filter } } # Bedrock sessionId は存在するときのみ渡す (継続会話時のみ) if bedrock_session_id: request["sessionId"] = bedrock_session_id # Bedrock Knowledge Bases への問い合わせ response = bedrock_agent.retrieve_and_generate_stream(**request) # Bedrock sessionId if "sessionId" in response: publish_appsync_message( sub, appsync_session_id, { "type": "bedrock_session", "bedrock_session_id": response["sessionId"] }, credentials ) for chunk in response["stream"]: payload = None # Generated text if "output" in chunk and "text" in chunk["output"]: payload = { "type": "text", "message": chunk["output"]["text"] } print({"t": chunk["output"]["text"]}) # Citation elif "citation" in chunk: payload = { "type": "citation", "citation": chunk['citation']['retrievedReferences'] } print({"c": chunk['citation']['retrievedReferences']}) # Continue if not payload: continue # Publish AppSync publish_appsync_message(sub, appsync_session_id, payload, credentials) except Exception as e: print(str(e)) raise DependsOn: - LambdaBedrockKbRole - BedrockKnowledgeBase LambdaRagSREventInvokeConfig: Type: AWS::Lambda::EventInvokeConfig Properties: FunctionName: !GetAtt LambdaRagSR.Arn Qualifier: $LATEST MaximumRetryAttempts: 0 MaximumEventAgeInSeconds: 300 LambdaCsvChunker: Type: AWS::Lambda::Function Properties: FunctionName: !Sub csv-chunker-${SystemName}-${SubName} Description: !Sub Lambda Function to embed with custom chunk for ${SystemName}-${SubName} Architectures: - x86_64 Runtime: python3.14 Handler: index.lambda_handler Timeout: 900 MemorySize: 512 Role: !GetAtt LambdaCsvChunkerRole.Arn Tags: - Key: Cost Value: !Sub ${SystemName}-${SubName} Code: ZipFile: | import json import csv import boto3 from io import StringIO s3 = boto3.client('s3') def lambda_handler(event, context): try: bucket_name = event.get('bucketName') input_files = event.get('inputFiles', []) output_files = [] for file_info in input_files: original_file_location = file_info.get('originalFileLocation', {}) s3_location = original_file_location.get('s3Location', {}) original_uri = s3_location.get('uri', '') content_batches = file_info.get('contentBatches', []) output_batches = [] for batch in content_batches: input_key = batch.get('key') # Read input file from S3 response = s3.get_object(Bucket=bucket_name, Key=input_key) input_content = json.loads(response['Body'].read().decode('utf-8')) # Extract CSV content csv_content = input_content['fileContents'][0]['contentBody'] # Remove BOM if present (input may have BOM) if csv_content.startswith('\ufeff'): csv_content = csv_content[1:] csv_reader = csv.DictReader(StringIO(csv_content)) # Process each row as a chunk file_contents = [] for row in csv_reader: content_body = f"問合せ番号: {row.get('問合せ番号', '')}\n商品番号: {row.get('商品番号', '')}\n\n問合せ内容:\n{row.get('問合せ内容', '')}\n\n回答内容:\n{row.get('回答内容', '')}" content_metadata = { "問合せ番号": row.get('問合せ番号', ''), "販売形態": row.get('販売形態', ''), "受付日時": row.get('受付日時', ''), "完了日時": row.get('完了日時', ''), "商品番号": row.get('商品番号', ''), "カテゴリ": row.get('カテゴリ', ''), "ステータス": row.get('ステータス', '') } file_contents.append({ "contentBody": content_body, "contentType": "TEXT", "contentMetadata": content_metadata }) # Write output file to S3 output_key = input_key.replace('.json', '_transformed.json') output_data = {"fileContents": file_contents} s3.put_object( Bucket=bucket_name, Key=output_key, Body=json.dumps(output_data, ensure_ascii=False), ContentType='application/json' ) output_batches.append({"key": output_key}) output_files.append({ "originalFileLocation": original_file_location, "fileMetadata": file_info.get('fileMetadata', {}), "contentBatches": output_batches }) return {"outputFiles": output_files} except Exception as e: print(f"Error: {str(e)}") import traceback traceback.print_exc() raise LambdaInvokePermissionCsvChunker: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref LambdaCsvChunker Action: lambda:InvokeFunction Principal: bedrock.amazonaws.com SourceAccount: !Ref AWS::AccountId # SourceArn: !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:knowledge-base/${BedrockKnowledgeBase}" DependsOn: - LambdaCsvChunker # - BedrockKnowledgeBase # ------------------------------------------------------------# # Lambda Role (IAM) # ------------------------------------------------------------# LambdaBedrockKbRole: Type: AWS::IAM::Role Properties: RoleName: !Sub LambdaBedrockKbRole-${SystemName}-${SubName} Description: This role allows Lambda functions to invoke Bedrock Knowledge Bases and AppSync Events API. AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess Policies: - PolicyName: !Sub LambdaBedrockKbPolicy-${SystemName}-${SubName} PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "bedrock:InvokeModel" - "bedrock:InvokeModelWithResponseStream" - "bedrock:GetInferenceProfile" - "bedrock:ListInferenceProfiles" Resource: - !Sub "arn:aws:bedrock:*::foundation-model/*" - !Sub "arn:aws:bedrock:*:${AWS::AccountId}:inference-profile/*" - Effect: Allow Action: - "bedrock:RetrieveAndGenerate" - "bedrock:Retrieve" Resource: - !GetAtt BedrockKnowledgeBase.KnowledgeBaseArn - Effect: Allow Action: - "appsync:connect" Resource: - Fn::ImportValue: !Sub AppSyncApiArn-${SystemName}-${SubName} - Effect: Allow Action: - "appsync:publish" - "appsync:EventPublish" Resource: - Fn::Join: - "" - - Fn::ImportValue: !Sub AppSyncApiArn-${SystemName}-${SubName} - /channelNamespace/rag-stream-response LambdaCsvChunkerRole: Type: AWS::IAM::Role Properties: RoleName: !Sub LambdaCsvChunkerRole-${SystemName}-${SubName} AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyName: !Sub LambdaCsvChunkerPolicy-${SystemName}-${SubName} PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "s3:GetObject" - "s3:PutObject" - "s3:ListObject" Resource: - !GetAtt S3BucketKbDatasource.Arn - !Sub ${S3BucketKbDatasource.Arn}/* - !GetAtt S3BucketKbIntermediate.Arn - !Sub ${S3BucketKbIntermediate.Arn}/* # ------------------------------------------------------------# # IAM Role for Bedrock Knowledge Base # ------------------------------------------------------------# IAMRoleBedrockKb: Type: AWS::IAM::Role Properties: RoleName: !Sub BedrockKbRole-${SystemName}-${SubName} AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - sts:AssumeRole Principal: Service: - bedrock.amazonaws.com Condition: StringEquals: "aws:SourceAccount": !Ref AWS::AccountId # ArnLike: # "aws:SourceArn": !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:knowledge-base/*" Policies: - PolicyName: !Sub BedrockKbPolicy-${SystemName}-${SubName} PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "s3:GetObject" - "s3:ListBucket" - "s3:PutObject" Resource: - !GetAtt S3BucketKbDatasource.Arn - !Sub ${S3BucketKbDatasource.Arn}/* - !GetAtt S3BucketKbIntermediate.Arn - !Sub ${S3BucketKbIntermediate.Arn}/* - Effect: Allow Action: - "s3vectors:GetIndex" - "s3vectors:QueryVectors" - "s3vectors:PutVectors" - "s3vectors:GetVectors" - "s3vectors:DeleteVectors" Resource: - !GetAtt S3VectorBucketIndex.IndexArn - Effect: Allow Action: - "bedrock:InvokeModel" Resource: - !Sub arn:aws:bedrock:${AWS::Region}::foundation-model/${EmbeddingModelId} - Effect: Allow Action: - "lambda:InvokeFunction" Resource: - !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:csv-chunker-${SystemName}-${SubName}*" DependsOn: - S3BucketKbDatasource - S3VectorBucketIndex - LambdaCsvChunker - S3BucketKbIntermediate # ------------------------------------------------------------# # Output Parameters # ------------------------------------------------------------# Outputs: # S3 S3BucketKbDatasourceName: Value: !Ref S3BucketKbDatasource # API Gateway APIGatewayEndpointRagSR: Value: !Sub https://${RestApiRagSR}.execute-api.${AWS::Region}.${AWS::URLSuffix}/${RestApiStageRagSR}/ragsr Export: Name: !Sub RestApiEndpointRagSR-${SystemName}-${SubName} 続編記事 Amazon Bedrock Knowledge Bases で構造化データ(CSV)を使用した RAG をつくる -UI編- Amazon Bedrock Knowledge Bases と Amazon S3 Vectors で構築した RAG 環境で、構造化データをデータソースにしたときの検索精度向上を目指しました。本記事は UI 編です。 blog.usize-tech.com 2026.03.23 まとめ いかがでしたでしょうか。 メタデータフィルタリングは設計次第でかなり細かい検索ができそうですが、その分コーディングが大変です。むやみに汎用的なフィルタ設定を実装しようとすると開発負担増やバグの温床になりそうなので、フィルタ対象項目はなるべく厳選した方がよいと思います。 本記事が皆様のお役に立てれば幸いです。
アバター
こんにちは、SCSK林です! 今回は、AWS、Snowflakeで実現したニアリアルタイムデータ連携について解説します。 本記事では、実際に構築した例をベースにアーキテクチャ選定の背景と、構成や技術的に気をつけるポイントについて共有していきたいと思います。   構成の背景(いわゆる要件) 今回の主要件は、オンプレミスのシステムから出力される業務データを、AWSを経由してSnowflakeへ連携し、数分以内(ニアリアルタイム)に分析可能にすることでした。 主な要件と制約は以下の通りです。 セキュリティ: 秘匿性の高いデータを扱うため、インターネット経由の転送は不可。閉域網のみを通すこと。 データ特性: 1リクエストあたり約10MB(圧縮前)。頻度は1日100件程度だが、データの欠損は許されない。 クライアント制約: 送信元システムはHTTPリクエストの送出のみ対応。 既存資産: 組織内で実績のあるSnowflake連携用スクリプト(Shell)を流用したい。   アーキテクチャ概要 データ取り込み処理 : Client (On-prem) → Direct Connect → VPC Endpoint (Interface) → Amazon API Gateway (HTTP API) → AWS Lambda 一時保存 : Amazon S3 (Staging Bucket) ロード処理 : S3 Event Notification → AWS Lambda → Snowflake (COPY INTO) このアーキテクチャのポイントは、「データ受信」と「データ処理」をS3を介して完全に切り離した点にあります。 クライアントからのデータ受信を行うLambda(データ取り込み処理)は、データの検証とS3への保存のみを行い、即座にレスポンスを返します。一方、Snowflakeへのロードを行うLambda(ロード処理)は、S3イベントをトリガーに非同期で実行されます。 これにより、仮にSnowflake側の処理に時間がかかったとしても、クライアント側のHTTP通信がタイムアウトすることはありません。   アーキテクチャのポイント セキュリティ要件(閉域網)の実現 オンプレミスからのHTTPリクエストを安全に受け取るため、API Gatewayの前段にInterface VPC Endpoint (PrivateLink) を配置しました。 Private API Gatewayを使用する選択肢もありましたが、今回はネットワーク経路を厳密に制御するため、VPC Endpointのリソースポリシーを活用しました。これにより、特定のDirect Connect経由のトラフィックのみを許可し、それ以外のアクセスをネットワークレベルで遮断することを実現しています。 Lambdaの6MB制約とその回避 今回、技術的なハードルとなったのが、AWS Lambdaのペイロード制限です。 API Gatewayの制限: 最大10MB Lambda(同期呼び出し)の制限: 最大6MB クライアントから上記制限を越えるデータが送られてくると、そのままではLambdaに引き渡す段階で 413 Request Entity Too Large エラーが発生してしまいます。 これを回避するために、S3署名付きURLを発行してクライアントから直接S3へアップロードさせる方式も検討しましたが、クライアント側の実装負荷が複雑になるため、アーキテクチャは変更せず「データ圧縮」で解決する方針を決定しました。 今回は、クライアント側でデータをGZIP圧縮することで、ペイロードサイズを数MBまで削減し、これによりLambdaの6MB制限をクリアしました。ただ、そういうわけにも毎回いかないと思いますので同様の構成を検討する際はぜひご注意ください。 「取り込み」と「ロード処理」の責務の分離による耐障害性の確保 今回は、API Gatewayから直接Snowflakeへデータを流し込むのではなく、S3を境界として「Ingest(取り込み)」と「Process(ロード処理)」の責務を明確に分離しました。 データ取り込み処理層 (同期): 役割: クライアントからのリクエストを高速に受け付け、S3へ永続化することだけに集中する。 効果: Snowflake側の状態(一時的なパフォーマンス低下など)の影響をクライアントに与えない。クライアントへは即座に 200 OK を返却し、接続タイムアウトのリスクを排除。 データロード層 (非同期): 役割: S3へのオブジェクト作成イベントをトリガーに、非同期でSnowflakeへの COPY INTO を実行する。 効果: 重い処理(DB接続・ロード)をここへ集約。もしロード処理が失敗しても、データはS3上に「ファイル」として安全に残っているため、データロード層(クライアント)に影響を与えることなくリトライやリカバリが可能。 この「S3をバッファとした疎結合アーキテクチャ」を採用したことで、クライアントに対するレスポンス性能(レイテンシ)を一定に保ちつつ、バックエンド処理の安定性を高めることを実現しました。 運用を見据えた設計 データ連携基盤においてもっとも考慮が必要なことは「データのロスト」です。 今回は、万が一Snowflakeへのロードが失敗した場合(データフォーマット不正やウェアハウスの一時的な問題など)に備え、以下の仕組みを導入しました。 エラーハンドリング : Lambda内で例外をキャッチした場合、対象のオブジェクトをS3上の「Error」フォルダへ移動(Move)。 監視 : エラーフォルダへの配置をトリガーに、管理者へ即時通知。 これにより、失敗したデータが「どこにあるか分からない」状態を防ぎ、リカバリが必要なデータを明確に分離する運用を設計しました。   まとめ 今回の構成では、マネージドサービスベースのデータ連携基盤(ニアリアルタイム)を実現しました。 データ連携は頻度をあげることでより難易度が増していきます。 今回の構成、事例がどなたかのお役に立つと幸いです。
アバター
こんにちは、SCSK林です! 今回は、AWSで完全にプライベート環境で実現するサーバレスAPI、S3静的ホスティングについて解説します。 エンタープライズ領域だと、クラウド導入の大きな壁となるのが『セキュリティ要件』だと感じています。 データは絶対にインターネットに出してはならない・・・ アクセスは専用線やVPN経由の閉域網に限る・・・ などなど、会社ごとに厳格なポリシーをお持ちだと思います。 本来、パブリックなアクセスを前提とするサーバーレスサービスを、いかにして閉域網の中に封じ込め、かつ安全に運用するか。 本記事では、実際に構築した例をベースにアーキテクチャ選定の背景と、構成や技術的に気をつけるポイントについて共有していきたいと思います。   構成の背景(いわゆる要件) 今回想定する割とありがちな(と個人的には思っている)要件は以下のとおりです。 アクセス経路: ユーザーはオンプレミス環境からのみアクセス可能。インターネットからのアクセスは一切遮断する。 運用負荷の軽減: 極力EC2などのサーバ管理を廃止し、マネージドサービスを活用したい。 モダンなUX: SPA(Single Page Application)によるリッチなUIを提供する。 通常、SPAの配信にはAmazon S3の静的ウェブサイトホスティングやAmazon CloudFrontが定石ですが、これらはパブリックアクセスが前提となります。閉域網要件を満たすために、「サーバーレスの利便性」と「ネットワークの閉塞性」をどう両立させるかが、アーキテクチャ設計の肝となります。 アーキテクチャ概要 最終的に採用したアーキテクチャは、ALB (Application Load Balancer) をシステムの唯一の入り口とし、バックエンドのリソースを全てプライベートネットワーク内に配置する構成です。 【構成のポイント】 アクセス元: オンプレミス環境(Direct Connect / VPN経由) 入口: VPC内に配置した Internal ALB フロントエンド: ALB → VPC Endpoint (Interface型) → S3 バケット バックエンド: ALB → Lambda この構成により、トラフィックが一切インターネットに出ることなく、AWSのネットワーク内だけで完結するセキュアな通信経路を確立しました。 アーキテクチャのポイント ALBによる入口の集約 当初、S3やAPI GatewayのVPCエンドポイントを直接クライアントに公開する案もありました。しかし、前段にALBを配置する構成を採用しました。S3やAPI Gatewayのエンドポイントが個別に分散すると、クライアント(オンプレ側)でのDNS設定やファイアウォール設定が複雑化します。ALBを挟むことで以下のメリットを享受できました。 インターフェースの集約: フロントエンドもAPIも、単一のドメインでアクセス可能にする(パスベースルーティング)。 セキュリティの一元化: SSL/TLS終端をALBに集約し、証明書管理を一本化。将来的なWAF導入などの拡張性も確保。 Route 53 Resolver によるハイブリッドDNS設計 オンプレミス環境からAWS内のプライベートリソースへアクセスさせる際、大きな技術的課題は「名前解決」です。オンプレミスのDNSサーバーは、AWS内のプライベートIPアドレスをもちろん知りません。  hostsファイル等での個別対応は運用破綻のリスクがあるため、Amazon Route 53 Resolver (Inbound Endpoint) の導入をしています。 オンプレミスのDNSサーバーから、特定ドメインへのクエリをAWS側のResolverにフォワードするハイブリッド構成とすることで、ユーザーはネットワークの境界を意識することなく、シームレスにシステムを利用できるようになりました。   まとめ 今回のプロジェクトを通じて、「閉域網」と「サーバーレス」は決して相容れないものではないことを実証できました。 セキュリティ: 完全プライベートな環境で、企業の厳しいコンプライアンス要件を遵守。 運用効率: サーバー(EC2)レスにより、OSパッチ適用などの運用コストを大幅に削減。 単に流行りの技術を使うだけでなく、ビジネス要件という制約の中で、技術をどう組み合わせ、最適な解を導き出すかというアーキテクト視点の重要性を再認識しました。 今後も、技術の理想とビジネスの現実のバランスを取りながら、顧客にとって価値あるクラウド活用を推進していきたいと思います。
アバター