TECH PLAY

アプトポッド

アプトポッド の技術ブログ

252

aptpod Advent Calendar 2024 12月18日の記事を担当します、Roboticsグループの久保田です。 今回は、AWS EC2インスタンス間でのROS2通信を可能にする解決策として、FastDDS Discovery Serverを取り上げます。Discovery Serverは、ROS2のデフォルトのディスカバリープロトコルが抱える課題を克服し、マルチキャストパケットが利用できない環境や、LiDARデータや動画データといった大容量データを扱う高負荷なネットワークでも、効率的かつ信頼性の高い通信を可能にします。 概要 Discovery Server AWS EC2インスタンスで使用する 構成 AWS EC2インスタンス設定 リソースベースのホスト名へ変更 セキュリティグループでのUDPインバウンドを許可 FastDDS設定 Server Client 実行手順 Discovery Server起動 ROSトピックをPublish ROSトピックをSubscribe まとめ 概要 ROS2では通信相手を検出するためにDiscoveryプロトコルを使用します。このDiscoveryプロトコルはマルチキャストパケットを使用しており、ROS1のような単一障害点を持たないという利点があります。 一方で、いくつかの課題があります。 新しいノードが追加されるたびに発生するパケット数が増加し、ネットワーク負荷が高まるため、効率的なスケーリングが困難となる。 WiFiネットワーク環境やLiDARデータや動画データなどを通信する高負荷なネットワークでは、通信のパフォーマンスが低下する可能性がある。 クラウド環境や一部のネットワーク構成では、マルチキャスト通信がサポートされていない場合がある。 WiFiなどの環境では、マルチキャスト通信が安定して動作しないケースが多い。 AWS EC2インスタンス間では、マルチキャストおよびブロードキャスト通信がデフォルトではサポートされていません。これにより、ROS2のデフォルトDiscoveryプロトコルが使用するマルチキャスト通信による相手検出が機能しなくなります。この制限はAWSの仮想プライベートクラウド(VPC)の設計に起因しており、VPC内ではマルチキャストトラフィックが意図的にブロックされています。一方、AWS Transit Gatewayを利用することで、限定的にマルチキャスト通信を有効化することが可能です。ただし、この方法は高度なネットワーク設定を伴い、構成の複雑さが課題となります。これらの理由から、AWS環境でROS2のマルチキャスト通信を実現する際には、設定の複雑さと技術的制約を十分に考慮する必要があります。 そこで、FastDDSがDiscoverプロトコルの代替として用意しているDiscovery Serverを使用します。Discovery Serverは、ROS2ノード間の通信を管理するための集中型サーバーとして機能します。このサーバーは、各ノードのメタデータ(場所、アドレス、ポート情報など)を保存し、他のノードが必要な情報を効率的に取得できるようにします。 Discovery Server ROS 2におけるDiscovery Serverは、ネットワーク上のノード間の発見と通信を効率化するための集中型メカニズムです。Fast DDSが提供するDiscovery Serverは、従来の分散型Discoveryプロトコルに代わり、クライアント-サーバーアーキテクチャを採用しています。 docs.ros.org Discovery Serverを使用する以下の利点があります ネットワークトラフィックの削減 : デフォルトのDiscoveryプロトコルが生成する大量のマルチキャストトラフィックを削減し、ネットワークの負荷を軽減します。 マルチキャスト非対応環境での利用 : AWS VPCやその他のマルチキャスト非対応の環境でも動作可能です。 一方で、デメリットも留意する必要はあります 単一障害点のリスク : Discovery Serverは単一障害点となる可能性があり、冗長構成を考慮する必要があります。 負荷の集中 : Discovery Serverに負荷が集中するため、適切なリソース(CPU、メモリ、ネットワーク帯域など)を割り当てることが推奨されます。 管理の手間 : サーバーの監視や障害対応のために、追加の管理作業が必要となります。 AWS EC2インスタンスで使用する 構成 AWS EC2インスタンス設定 リソースベースのホスト名へ変更 Discovery ServerのIPアドレスをLocal DNSから取得するためにリソースベースのホスト名へ変更しておくと便利です。これにより、DNSリゾルバを利用してIPアドレスを動的に管理できるため、固定IPアドレスの手動設定が不要となります。 「アクション」=> 「インスタンスの設定」=> 「リソースベースの命名オプションを変更」 リソースベースの命名オプションを変更 セキュリティグループでのUDPインバウンドを許可 Discovery Serverとの通信のために、UDPプロトコルのインバウンド通信を許可しておきます。これは、ROS2ノード間でのDiscovery Serverとのメタデータ交換を可能にするために必要です。 「インスタンス」=> 「セキュリティ」タブ=>「セキュリティグループ」=>「インバウンドのルールを編集」 インバウンドのルールを編集 FastDDS設定 Discovery Serverを使用するために、Server/ClientそれぞれにFastDDSの設定が必要です。Fast DDSの設定は、設定ファイルfastrtps-profile.xmlを作成し、環境変数 (FASTRTPS_DEFAULT_PROFILES_FILE) で指定します。 通信相手を検出するため、Discovery ServerのIPアドレス(ホスト名)および使用するポート番号を設定ファイルに記載してください。 Server fastrtps-profile.xml <? xml version = "1.0" encoding = "UTF-8" ?> <profiles xmlns = "http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles" > <transport_descriptors> <transport_descriptor> <transport_id> CustomUdpTransport </transport_id> <type> UDPv4 </type> </transport_descriptor> </transport_descriptors> <participant profile_name = "participant_profile" is_default_profile = "true" > <rtps> <userTransports> <transport_id> CustomUdpTransport </transport_id> </userTransports> <useBuiltinTransports> false </useBuiltinTransports> <prefix> 44.53.00.5f.45.50.52.4f.53.49.4d.41 </prefix> <builtin> <discovery_config> <discoveryProtocol> SERVER </discoveryProtocol> </discovery_config> <metatrafficUnicastLocatorList> <locator> <udpv4> <address> i-05a0c3018209bc88b.ap-northeast-1.compute.internal </address> <port> 11811 </port> </udpv4> </locator> </metatrafficUnicastLocatorList> </builtin> <sendSocketBufferSize> 31457280 </sendSocketBufferSize> <listenSocketBufferSize> 31457280 </listenSocketBufferSize> </rtps> </participant> </profiles> Client fastrtps-profile.xml <? xml version = "1.0" encoding = "UTF-8" ?> <profiles xmlns = "http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles" > <transport_descriptors> <transport_descriptor> <transport_id> CustomUdpTransport </transport_id> <type> UDPv4 </type> </transport_descriptor> </transport_descriptors> <participant profile_name = "participant_profile" is_default_profile = "true" > <rtps> <userTransports> <transport_id> CustomUdpTransport </transport_id> </userTransports> <useBuiltinTransports> false </useBuiltinTransports> <builtin> <discovery_config> <discoveryProtocol> SUPER_CLIENT </discoveryProtocol> <discoveryServersList> <RemoteServer prefix = "44.53.00.5f.45.50.52.4f.53.49.4d.41" > <metatrafficUnicastLocatorList> <locator> <udpv4> <address> i-05a0c3018209bc88b.ap-northeast-1.compute.internal </address> <port> 11811 </port> </udpv4> </locator> </metatrafficUnicastLocatorList> </RemoteServer> </discoveryServersList> </discovery_config> </builtin> <sendSocketBufferSize> 31457280 </sendSocketBufferSize> <listenSocketBufferSize> 31457280 </listenSocketBufferSize> </rtps> </participant> </profiles> XMLタグの説明 タグ名 説明 <transport_descriptors> 通信に使用するトランスポートの設定を指定します。 <transport_descriptor> トランスポートの詳細(IDやタイプなど)を設定します。 ここではUDPを指定しています <participant> ROS2ノードのプロファイル設定を指定します。この設定には、トランスポート、バッファサイズ、Discoveryプロトコルなどが含まれます。 <prefix> Discovery ServerのGUIDをしていします。 <discovery_config> Discovery Serverのプロトコルやサーバー情報を設定します。このタグを使用して通信方式(SERVER/SUPER_CLIENTなど)やリモートサーバーリストを定義します。 <discoveryProtocol> Discovery Serverの動作モードを定義します(例: SERVERはサーバーとして、SUPER_CLIENTはクライアントとして機能)。 <discoveryServersList> Discovery Serverのリストを定義します。これにより、クライアントノードがどのサーバーに接続するかを指定できます。 <metatrafficUnicastLocatorList> メタトラフィック通信に使用するロケータリストを指定します。このロケータリストは、Discovery Serverがクライアントノードと通信するためのIPアドレスやポート番号を定義します。これにより、特定のネットワーク設定に基づいて通信経路を明示的に管理できます。 実行手順 AWS EC2インスタンスを構築後、動作を確認していきます。 Discovery Server起動 Discovery Serverでの通信相手の検出のために、まずDiscovery Serverを起動します。起動時には、ROS DOMAIN IDや通信相手検出の通信のためのポート番号などが設定できます。 $ docker run -it --rm --name ros-server \ --net=host \ -e FASTRTPS_DEFAULT_PROFILES_FILE=/tmp/fastrtps-profile.xml \ -v ./fastrtps-profile.xml:/tmp/fastrtps-profile.xml \ ros:humble \ fastdds discovery --server-id 0 ### Server is running ### Participant Type: SERVER Server ID: 0 Server GUID prefix: 44.53.00.5f.45.50.52.4f.53.49.4d.41 Server Addresses: UDPv4:[0.0.0.0]:11811 ROSトピックをPublish トピックをPublishするために、FastDDS設定が必要です。環境変数 (FASTRTPS_DEFAULT_PROFILES_FILE) で指定してください。 $ docker run -it --rm --name rospub \ --net=host \ -e FASTRTPS_DEFAULT_PROFILES_FILE=/tmp/fastrtps-profile.xml \ -v ./fastrtps-profile.xml:/tmp/fastrtps-profile.xml \ ros:humble \ ros2 topic pub /my_topic std_msgs/msg/String '{data: "hello"}' publisher: beginning loop publishing #1: std_msgs.msg.String(data='hello') publishing #2: std_msgs.msg.String(data='hello') publishing #3: std_msgs.msg.String(data='hello') ROSトピックをSubscribe トピックをSubscribeするために、FastDDS設定が必要です。環境変数 (FASTRTPS_DEFAULT_PROFILES_FILE) で指定してください。 $ docker run -it --rm --name rossub \ --net=host \ -e FASTRTPS_DEFAULT_PROFILES_FILE=/tmp/fastrtps-profile.xml \ -v ./fastrtps-profile.xml:/tmp/fastrtps-profile.xml \ ros:humble \ ros2 topic echo /my_topic data: hello --- data: hello --- data: hello --- SubscribeしたROSトピックが表示され、ROS2通信が問題なく動作しました。Discovery Serverを使うことで、AWS EC2インスタンス間でもROS2通信が実現できることが確認できました。 まとめ 今回は、AWS EC2インスタンス間でのROS2通信を実現する方法として、FastDDS Discovery Serverを活用する手法を紹介しました。このソリューションは、マルチキャストパケットが使用できない環境において有効であるだけでなく、LiDARデータや動画データの通信時にネットワーク負荷を低減する効果も期待できます。ROS2の通信性能や動作環境の最適化を目指す際に、ぜひ参考にしてみてください。
aptpod Advent Calendar 2024 12月17日の記事です。 RoboticsグループでROSやロボット関連の開発を担当している影山です。 今日は、REST APIを利用して、リソースの限られた組み込みデバイスから、弊社のintdashへリアルタイムで映像をアップロードしてそれをブラウザで確認する方法について紹介したいと思います。 概要 利用するエッジデバイスについて ESP32を利用する理由 Timer Camera X の開発環境 Timer Camera Xのサンプルコードのビルド REST APIによるアップストリーム ESP32上での実装 実装時の注意点 動かしてたみた様子 まとめ 参考:JPEG送信部分のC++コード抜粋 概要 intdashを利用するために、弊社はPythonやTypeScript、Rustなど色々な言語に対応した SDK を準備してます。 SDKを利用するには、基本的にそれぞれの言語やビルドしたバイナリが実行できる環境が必要になります。 そのため基本的にはPCやJetsonのようなLinuxが動作するデバイス上での利用が前提となります。 弊社では、プログラミングによるSDKの他にもREST APIを利用してintdashサーバとのデータの送受信を行う方法も提供しています。 REST APIであれば、よりリソースの限られたデバイスでも利用できる可能が広がります。今回は、その一例として、ESP32を搭載した小さなカメラを使って、撮影した映像をリアルタイムでintdashに送信して、ブラウザで映像を確認してみたいと思います。 利用するエッジデバイスについて TimerCamera X 今回は、ESP32を搭載したTimer Camera X を利用して、映像のアップストリームを確認してきたいと思います。 Timer Camera Xは、 Amazon や Switch Science などで販売しています。 ESP32を利用する理由 ESP32は、WiFi機能を搭載しており、HTTPS通信を始めとした、各種APIに対応したSDK( ESP-IDF )を提供しています。 intdashとREST API通信するには、HTTPSを用いるので、ESP32のSDKを使えばintdashとの接続が実現できそうなので、選択しました。 今回はESP32とカメラが一体化したTimer Camera Xを利用しましたが、ESP32は、I2CやSPIに対応しており、カメラ以外の色々なセンサやデバイスと組み合わせ使うことも容易です。 Timer Camera X の開発環境 今回はESP-IDFを用いて開発していきます。 Timer Camera Xについての詳細は、 公式のドキュメント が参考になります。 ビルドやテスト環境にはDockerを利用しています。開発環境の話については筆者の ブログ で触れていますので、興味があればご参照ください。 Timer Camera Xのサンプルコードのビルド まずはサンプルコードで動作を確認します。 検索するといくつかTimer Camera X向けのサンプルが見つかりますが、古いものもあるので、現在は、以下のサンプルを利用するのが良いようです。 github.com ESP-IDF環境でhttp-streamというサンプルをビルドします。 最新のESP-IDFではビルド時にエラーが起きたので、 v4.4.8 を利用しています。 ビルドとフラッシュが上手くいくと、Timer Camera Xが起動したHTTPサーバのアドレスをブラウザで開くことで、リアルタイムでカメラの映像を確認できます。 今回はこのサンプルをベースにREST API over HTTPSで、撮影した画像をリアルタイムでintdashに送信していきます。 REST APIによるアップストリーム 基本的なREST APIの使い方については、別の記事で ATOM Lite 上でMicroPythonを用いた例を紹介しているので、そちらをご参照ください。 tech.aptpod.co.jp まずはcurlコマンドを使ったシェルスクリプトでJPEGを送信する動作を確認してから、それをESP32に移植していきます。 画像はBASE64エンコードする必要ある点に注意必要です。以下は2枚のJPEG画像を交互に送信する処理を抜粋したサンプルです。 function send_chunk_jpg() { local base_time=$1 local stream_id=$2 local jpg_file="./1.jpg" for i in {1..10}; do if (( i % 2 == 1 )); then jpg_file="./1.jpg" else jpg_file="./2.jpg" fi echo "$jpg_file" local encoded_jpg=$(base64 -w 0 "$jpg_file") local body_file=$(mktemp) local send_time send_time=$(date +%s%N) cat <<EOF > "$body_file" { "data_point_groups": [ { "data_id": { "type": "jpeg", "name": "10/image" }, "data_points": [ { "elapsed_time": $((send_time - base_time)), "payload": "$encoded_jpg" } ] } ] } EOF curl \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $access_token" \ -X POST \ --data @"$body_file" "${base_url}/api/iscp/projects/${project_uuid}/upstreams/${stream_id}/chunks" | jq . sleep 0.5 done } ESP32上での実装 先ほどのcurlを使った手順を参考に、ESP-IDFのマニュアルを参照しつつ、ESP-IDFのAPIを使った実装にしていきます。 今回、使用している主なESP32のHTTPSに関係するAPIは以下の通りです。使い方は マニュアル に記載があります。 esp_http_client_init() esp_http_client_set_header() esp_http_client_set_post_field() esp_http_client_open() esp_http_client_write() esp_http_client_fetch_headers() esp_http_client_read_response() esp_http_client_cleanup() 基本的には、HTTPヘッダの設定、HTTPS接続、Bodyの書き込み、レスポンスの解析、終了処理、という順番で処理していきます。 コードの全体は掲載するには長いので、データ送信部分を抜粋したサンプルコードを記事の末尾に記載してます。 実装時の注意点 ESP32での実装時、以下のようなポイントに注意が必要です。 カメラの映像はサイズが大きいので、menuconfigで、ESP32のPSRAM機能を有効にすることが必要です。 Timer Camera XにはPSRAM 8Mが搭載されており、ESP32の内蔵SRAM 512KBよりも大幅に多くのメモリを利用できますが、アクセス速度が遅い点には留意が必要です 画像の送信時には、タイムスタンプを設定する必要があります。そのため今回はESP-IDFが提供する esp_sntp.h を利用しています。 動かしてたみた様子 実際にリアルタイムで撮影してみた様子です。 パソコンの前でTimer Camera Xを構えて、現在時刻が表示されたData Visualizerの画面を撮影しています。 アップストリーム動作確認 動画内の画面右枠に表示されているのが、Timer Camera Xからアップストリームされた映像です。 Data Visualizer上でのTimer Camera Xで撮影した映像表示 実際に試してみると1枚のJPEGの送信に時間がかかるため、滑らかな表示とまではいきませんでしたが、定期的な映像のアップストリームが確認できました。 まとめ ESP32を搭載したカメラデバイスを使って、intdashにリアルタイムで画像を送るサンプルを実装しました。 滑らかな動画を送ることは難しかったですが、一定間隔で撮影した画像をパラパラアニメのような感じで送信することには使えそうでした。 高性能なエッジデバイスを準備しなくても、intdashとの連携ができるので、REST APIを利用することで、ユースケースの幅を広げることができると思います。 この記事を見て、エッジデバイスとクラウドの連携技術について、ご興味を持たれた方は窓口からご相談頂ければ幸いです。 www.aptpod.co.jp 参考:JPEG送信部分のC++コード抜粋 void send_chunk(const char *base_url, const char *project_uuid, const char *stream_id, int64_t base_time) { char url[256]; snprintf(url, sizeof(url), "%s/api/iscp/projects/%s/upstreams/%s/chunks", base_url, project_uuid, stream_id); camera_fb_t *fb = NULL; size_t _jpg_buf_len; uint8_t *_jpg_buf; int err_count = 0; char *base64_buf = NULL; size_t base64_len; for (int i = 1; i <= 100; i++) { //100枚送信する int64_t send_time = esp_timer_get_time() * 1000; // マイクロ秒をナノ秒に変換 // カメラのフレームバッファ取得 fb = esp_camera_fb_get(); if (!fb) { ESP_LOGE(TAG, "Camera capture failed"); err_count++; if(err_count>10){ return; }else{ vTaskDelay(500 / portTICK_PERIOD_MS); continue; } } if (fb->format != PIXFORMAT_JPEG) { bool jpeg_converted = frame2jpg(fb, 80, &_jpg_buf, &_jpg_buf_len); if (!jpeg_converted) { ESP_LOGE(TAG, "JPEG compression failed"); esp_camera_fb_return(fb); return; } } else { _jpg_buf_len = fb->len; _jpg_buf = fb->buf; } // Base64エンコード用のバッファを確保 size_t encoded_len = 4 * ((_jpg_buf_len + 2) / 3) + 1; base64_buf = (char *)malloc(encoded_len); if (!base64_buf) { ESP_LOGE(TAG, "Failed to allocate memory for Base64 buffer"); if (fb->format != PIXFORMAT_JPEG) { free(_jpg_buf); } esp_camera_fb_return(fb); return; } // Base64エンコード int ret = mbedtls_base64_encode((unsigned char *)base64_buf, encoded_len, &base64_len, _jpg_buf, _jpg_buf_len); if (ret != 0) { ESP_LOGE(TAG, "Base64 encoding failed with error code: %d", ret); free(base64_buf); if (fb->format != PIXFORMAT_JPEG) { free(_jpg_buf); } esp_camera_fb_return(fb); return; } // 送信 // JSON全体のサイズを計算してメモリを確保 size_t json_size = snprintf(NULL, 0, JSON_FORMAT, send_time - base_time, base64_buf) + 1; char *body = (char *)malloc(json_size); if (!body) { ESP_LOGE(TAG, "Failed to allocate memory for JSON body"); free(base64_buf); if (fb->format != PIXFORMAT_JPEG) { free(_jpg_buf); } esp_camera_fb_return(fb); return; } snprintf(body, json_size, JSON_FORMAT, send_time - base_time, base64_buf); esp_http_client_config_t config = { .url = url, .crt_bundle_attach = esp_crt_bundle_attach, .buffer_size = 2048, .buffer_size_tx = 2048, .method = HTTP_METHOD_POST, }; esp_http_client_handle_t client = esp_http_client_init(&config); esp_http_client_set_header(client, "Content-Type", "application/json"); esp_http_client_set_header(client, "Accept-Encoding", "identity"); esp_http_client_set_header(client, "Authorization", access_token); esp_http_client_set_post_field(client, body, strlen(body)); esp_err_t err = esp_http_client_open(client,strlen(body)); if( err != ESP_OK) { ESP_LOGE(TAG, "Failed to open HTTP connection: %s", esp_err_to_name(err)); esp_http_client_cleanup(client); free(base64_buf); free(body); if (fb->format != PIXFORMAT_JPEG) { free(_jpg_buf); } esp_camera_fb_return(fb); continue; } int wlen = esp_http_client_write(client, body, strlen(body)); if (wlen >= 0) { ESP_LOGI(TAG, "Chunk %d sent successfully, elapsed:%lld", i, send_time - base_time); } else { ESP_LOGE(TAG, "HTTP request failed: %s", esp_err_to_name(err)); } // メモリ開放 esp_http_client_cleanup(client); free(base64_buf); free(body); if (fb->format != PIXFORMAT_JPEG) { free(_jpg_buf); } esp_camera_fb_return(fb); } }
aptpod Advent Calendar 2024 12月16日を担当するintdashグループの呉羽です。 弊社が提供する intdash は動画データをアップロードすることで、HLSやMP4への変換や、Webブラウザ上での閲覧機能を提供しています。 そこで今回は動画を使ったネタとして『 Pythonを使って動画ファイルから顔を検出し、人物ごとにグループ分けするプログラム 』をご紹介します。 動画内の顔を認識して分類することで、誰がどのシーンに現れるかを把握することが可能です。 Pythonライブラリ face_recognition について プログラム全体の流れ 動画ファイルからフレームを順に読み取る。 フレームから顔の検出を行う。 過去に検出した顔と比較し、同一人物かどうかを判定する。 検出した顔の画像を保存する。 まとめ Pythonライブラリ face_recognition について 今回用いる face_recognition は、Pythonで顔認識を行うためのライブラリです。このライブラリは、 dlib という機械学習ライブラリの顔認識モデルをベースにしており、以下の機能を提供します。 顔の検出(位置の特定) 顔のエンコード(特徴量の抽出) 顔同士の類似度計算(同一人物の判定) インストール方法は face_recognitionのGitHubリポジトリ を参考にします。基本的にはPythonのパッケージ管理ツール pip でインストールするだけです。今回は v1.2.2 を用います。 プログラム全体の流れ 以下に、プログラムの流れを説明します。 動画ファイルからフレームを順に読み取る。 フレームから顔の検出を行う。 過去に検出した顔と比較し、同一人物かどうかを判定する。 検出した顔の画像を保存する。 実装したコードはGitHubで公開しています。 github.com 今回はサンプル動画として、フリー素材として公開されているこちらの動画を利用します。動画内でいくつかの人物がディスカッションしている様子が伺えます。 pixabay.com 動画ファイルからフレームを順に読み取る。 Pythonで動画ファイルのフレームを読み取るために opencv-python を使います。以下のようにフレームを順に読み取れます。 import cv2 path_to_video = "./xxx.mp4" video_capture = cv2.VideoCapture(path_to_video) while video_capture.isOpened(): ret, raw_frame = video_capture.read() if not ret: print ( "Video file ended" ) break 全てのフレームを取得すると、例えば30FPSかつ5分間の動画のフレーム数はおよそ9,000になります。ここでは処理の軽量化のため、以下のように約2秒間隔のフレームごとに処理します。 fps = video_capture.get(cv2.CAP_PROP_FPS) frame_count = 0 while video_capture.isOpened(): frame_count += 1 # take a frame every 2 seconds if frame_count % int (fps* 2 ) != 0 : continue フレームから顔の検出を行う。 face_recognition.face_locations() にフレームを渡すことで、そのフレーム内の全ての顔の位置を検出できます。以下はフレーム内の顔の全ての位置情報を出力するコードです。 import face_recognition face_locations = face_recognition.face_locations(frame) print ( "Found {} face(s) in frame #{}." .format( len (face_locations), frame_count)) for i in range ( len (face_locations)): face_location = face_locations[i] top, right, bottom, left = face_location face_size = (right - left) * (bottom - top) print ( " - A face is located at pixel location Top: {}, Left: {}, Bottom: {}, Right: {}, Size: {}" .format(top, left, bottom, right, face_size)) 過去に検出した顔と比較し、同一人物かどうかを判定する。 次に face_recognition.face_encodings() を使い、顔を比較するための特徴量を計算します。そして face_recognition.face_distance() を使うことで、2つ顔の特徴量の距離 [0,1] (小さいほど類似している)を計算できます。 これを用いて、ある閾値(ここでは 0.6 )以下の場合に同一人物と判定し、グループ分けします。 face_groups = [] # list of face encodings face_encodings = face_recognition.face_encodings(frame, face_locations) for i in range ( len (face_locations)): face_location = face_locations[i] face_encoding = face_encodings[i] group_id = - 1 if len (face_groups) > 0 : distances = face_recognition.face_distance(face_groups, face_encoding) if np.min(distances) < 0.6 : # threshold to consider the face as the same person group_id = np.argmin(distances) if group_id == - 1 : # create a new group face_groups.append(face_encoding) group_id = len (face_groups) - 1 検出した顔の画像を保存する。 検出した顔の画像を、人物(グループ)ごとにディレクトリに保存します。分かりやすいように、フレームの経過時間をファイル名として保存しています。 name = "./output/{}/{}/{}.jpg" .format(output_dir, group_id, time.strftime( "%H_%M_%S" , frame_time)) os.makedirs(os.path.dirname(name), exist_ok= True ) face = raw_frame[top:bottom, left:right] cv2.imwrite(name, face, [ int (cv2.IMWRITE_JPEG_QUALITY), 95 ]) サンプル動画では5人の人物が検知されました。例えば、ある一つのディレクトリに保存された画像は以下のようになりました。 結果の例 他のディレクトリを見てみると、同一人物の誤判定があるようでした。これを解消するためには、手動で閾値を調整する必要があります。 他の人物と混同している例 まとめ 本記事では、Pythonを使った動画内の顔検出・分類プログラムを解説しました。このプログラムをさらに改善・発展させるには、以下のような方法が考えられます。 GPUを使った高速な処理 dlib をインストールする際にNVIDIA CUDAを有効化すればより高速に処理ができます。 高度なグループ分け k-means法を使えば、人物数(グループ数)を指定しつつ精度良く分類できます。 リアルタイム処理への対応 Webカメラなどのライブ映像から直接顔を検出するように変更することで、リアルタイムでの分析が可能になります。 また弊社が提供する intdash を活用することで、intdashからの動画ストリーミングをリアルタイム処理したり、得られたデータを別途保存したりすることが可能です。ぜひこのプログラムを参考にオリジナルのアプリケーション開発に挑戦してみてください。
aptpod Advent Calendar 2024 12月13日の記事を担当します、ソリューションアーキテクトの門脇です。 今回はintdash を活用し、取得した時系列データの分析にフォーカスします。簡易的な例として計測データの傾向を可視化するための分析ボード 構築手法を共有します。 そして計測から分析可視化までのプロセスを自動化します。 今回の要件 概要構成 intdashとは? intdash Motionとは? 分析ボードの構築プロセス intdash APIを使ったデータ取得 データ分析 データアップロード 分析ボード作成 結果とメリット おわりに 今回の要件 1週間の歩行イベントにおける、5名の歩行距離を可視化したい。 計測>分析>可視化のプロセスを、出来る限りシンプルな構成で自動化したい。 概要構成 概要構成を示します。 計測はiPhoneにインストールした indash Motion V2 で行います。 分析したデータの可視化ツール(BIツール)には LookerStudio を使用します。 概要構成 intdashとは? intdash は、様々なIoTデータを低遅延に伝送するハイパフォーマンスなデータ伝送ミドルウェアです。産業用バスなどの秒間数百〜数千点にもなる大量データにも対応。独自開発のプロトコルにより、帯域の細いモバイル回線でも確実かつリアルタイムにデータ伝送が可能です。 intdash Motionとは? intdash Motion は、本格的なIoTデバイス導入前のPoCやトライアル用途でご利用いただける、簡易データ収集用スマートフォンアプリです。スマートフォンに搭載された加速度センサーやジャイロスコープ、GPS、カメラやマイクといった各種センサーを利用して、簡易的な実験を行うことができます。 分析ボードの構築プロセス 今回のプロジェクトは以下の流れで進めます。 intdash APIを使ったデータ取得 データ分析 データアップロード 分析ボード作成 intdash APIを使ったデータ取得 分析には、intdash Motion V2アプリから送信されるGNSSデータを利用します。 GNSSデータ intdash APIで計測データを取得します。 # 計測の取得 def get_measurements (edge_uuid: str , start: pd.Timestamp, end: pd.Timestamp) -> list [ dict ]: measurements = [] page = 1 while True : resp = get( path=f "/v1/projects/{PROJECT_UUID}/measurements" , params={ "edge_uuid" : edge_uuid, "start" : start.tz_convert( 'utc' ).strftime( "%Y-%m-%dT%H:%M:%S.%fZ" ), "end" : end.tz_convert( 'utc' ).strftime( "%Y-%m-%dT%H:%M:%S.%fZ" ), "page" : page, }, ) for meas in resp[ "items" ]: measurements.append(meas) if resp[ "page" ][ "next" ]: page += 1 else : break return measurements # 時系列データの取得 def get_data_points (meas_uuid: str , idq: list [ str ]) -> list [ dict ]: resp = requests.get( url=INTDASH_URL + f "/api/v1/projects/{PROJECT_UUID}/data" , headers=HEADERS, params={ "name" : meas_uuid, "idq" : idq, "time_format" : "ns" , }, stream= True , ) resp.raise_for_status() data_points = [] received = "" for chunk in resp.iter_content(chunk_size= None , decode_unicode= True ): received += chunk for line in received.split( " \n " ): try : decoded = json.loads(line) data_points.append(decoded) except ValueError : received = line break return data_points # DataFrameに変換 def convert (data_point: dict ) -> list [ dict ]: match data_point[ "data_name" ]: case "0/gnss_speed" : bin = base64.b64decode(data_point[ "data" ][ "d" ]) value = struct.unpack( ">d" , bin )[ 0 ] return [{ "time" : data_point[ "time" ], "name" : "GNSS_Speed" , "value" : value, }] case _: raise ValueError (f "Unsupported data_type: {data_point['data_type']}" ) データ分析 次に、各計測ごとにデータ分析を行います。 速度の分析値と、速度を基にした移動距離を計算しています。 def analyze_gnss_speed (df: pd.DataFrame) -> dict : # タイムスタンプをdatetime型に変換 df.index = pd.to_datetime(df.index) # 開始時刻と終了時刻 start_time = df.index.min() end_time = df.index.max() # 時間間隔を計算(秒単位から時間単位に変換) time_deltas = df.index.to_series().diff().dt.total_seconds() time_deltas_in_hours = time_deltas / 3600 # 時間単位に変換 # 移動距離の計算 distances = df[ "GNSS_Speed" ] * time_deltas_in_hours # km/h × 時間 = km total_distance = distances.sum() # スピードの分析値 speed_stats = { "mean_speed" : df[ "GNSS_Speed" ].mean(), "median_speed" : df[ "GNSS_Speed" ].median(), "max_speed" : df[ "GNSS_Speed" ].max(), "min_speed" : df[ "GNSS_Speed" ].min(), "range_speed" : df[ "GNSS_Speed" ].max() - df[ "GNSS_Speed" ].min(), "std_speed" : df[ "GNSS_Speed" ].std(), "time_of_peak_speed" : df[ "GNSS_Speed" ].idxmax(), } # 分析結果のまとめ analysis = { "start_time" : start_time, "end_time" : end_time, "total_distance" : total_distance, **speed_stats, } return analysis データアップロード 今回はGoogle スプレッドシートをデータソースにしてLookerStudioで可視化を行います。 スプレッドシートへのデータアップロードは GoogleAPI を使います。 プロセス図 1行1計測で集約した分析データをデータソースにします。 分析データ 分析ボード作成 Looker Studioで歩行傾向を可視化します。 作成した分析ボード1 作成した分析ボード2 結果とメリット 完成した分析ボードでは、以下の成果が得られました。 データの即時可視化: 最新の計測データがダッシュボードで即座に確認可能。 コラボレーションの向上: Looker Studioで共有機能を利用し、チーム全体でデータを分析。 自動化の実現: 定期的なデータ更新が自動化され、作業時間を大幅に削減。参加者が何人増えても集計分析の手間は増えません。 おわりに 今回のプロジェクトを通じて、計測したデータを自動で可視化する仕組みを構築できました。 intdash を使う事で、IoTデータ活用の可能性がさらに広がります。ぜひ、貴社の課題解決にもご活用ください。 ご興味のある方は、 intdash 無償トライアル からお申込みいただき、実際にその効果を体験してみてください。
aptpod Advent Calendar 2024 12月12日の記事です。 こんにちは、開発本部Visual M2Mグループの遠藤です。フロントエンドエンジニアとして働いています。 Playwright 1.49.0から Aria snapshots という機能が新たに追加されました。この記事では、PlaywrightのAria snapshotsを実際に試しながら機能の紹介を行いたいと思います。 Aria snapshots とは アクセシビリティツリーについて Aria snapshots を使ってみる インストール 基本的な使い方 テストの修正 部分一致と正規表現を使用したマッチング スナップショットの生成と更新 ariaSnapshot()メソッド おわりに 参考文献 Aria snapshots とは PlaywrightのAria snapshotsでは、ページのアクセシビリティツリーをYAML形式の表現として提供します。提供されたツリーのスナップショットを保存することで、ページの変更時に差分を比較し、ページ構造が期待通りであるかをチェックすることができます。 たとえば次のようなHTML構造があるとします。 < h1 > Title </ h1 > < h2 > SubTitle </ h2 > こちらは次のYAML形式で表現することができます。 - heading "Title" [ level=1 ] - heading "SubTitle" [ level=2 ] アクセシビリティツリーについて アクセシビリティツリー はUI構造を表すアクセス可能なオブジェクトのツリーです。DOMやCSSOMに基づいて生成され、スクリーンリーダーや他の支援技術が情報をユーザーに提供するための基盤となります。 Chrome DevTools を利用することでどのようなアクセシビリティツリーが生成されているかを確認することもできます。たとえば前述したHTML構造は、Chrome DevToolsで次のようなアクセシビリティツリーになっていることを確認できます。 Aria snapshots を使ってみる ここからは実際にAria snapshotsを試しながら機能を確認していきたいと思います。 インストール 次のコマンドでPlaywrightのインストールを行います。 $ npm init playwright@latest もしくは VS CodeのPlaywright拡張機能 を使用して作業を行うこともできます。 基本的な使い方 まずはじめに次のHTML構造を想定してください。 < header id = "header" > < h1 > Title </ h1 > </ header > 次に、Playwrightのテストコードを書いてみます。 localhost:5500 は上記のHTMLをホストしているローカルサーバーを想定しています。 Aria snapshotsでは、 toMatchAriaSnapshot() メソッドを使用することで、 locator の範囲内のアクセシビリティツリーと、引数のアクシビリティツリーを比較することができます。 tests/example.spec.ts を次のように書き換えてください。 import { test , expect } from "@playwright/test" ; test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading "Title" [level=1] ` ); } ); 次のコマンドを実行してみましょう。上記のテストは成功します。 $ npx playwright test Running 1 test using 1 worker 1 passed ( 525ms ) 次にHTMLファイルに次のような変更を加えてみましょう。 <h1> タグを <h2> タグに変更 テキストを"Title"から"SubTitle"に変更 < header id = "header" > < h2 > SubTitle </ h2 > </ header > 再度テストを実行してみましょう。テストが失敗すれば期待通りです。 $ npx playwright test Running 1 test using 1 worker 1 ) [ chromium ] › example.spec.ts:3:5 › test example ─────────────────────────────────────────────── Error: Timed out 5000ms waiting for expect ( locator ) .toMatchAriaSnapshot ( expected ) Locator: locator ( ' #header ' ) - Expected - 1 + Received + 1 - banner: - - heading " Title " [ level = 1 ] + - heading " SubTitle " [ level = 2 ] テストの修正 与えられたエラーメッセージを元に、テストを修正して、再度実行してみましょう。 import { test , expect } from "@playwright/test" ; test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading "SubTitle" [level=2] ` ); } ); 今度は成功しました。 $ npx playwright test Running 1 test using 1 worker 1 passed ( 475ms ) 部分一致と正規表現を使用したマッチング 部分一致や正規表現を利用することで、より柔軟なテストが可能となります。 たとえば、以下のようにアクセブルな名前や属性を省略することで、部分一致を適用することができます。 import { test , expect } from "@playwright/test" ; test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading ` ); } ); 上記は、 header > heading という構造を持つHTML要素を期待します。この場合、 heading 要素のレベルやテキストは無視されます。つまり要素が h1 でも h2 でも、またテキストが Title でも SubTitle でも、テストは成功します。 HTML構造を変更せずに再度テストを実行してみましょう。テストが成功することを確認できます。 $ npx playwright test Running 1 test using 1 worker 1 passed ( 475ms ) 次のように正規表現を使用することで、動的なコンテンツに対してテストを行うこともできます。 test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading /.*Title$/ ` ); } ); スナップショットの生成と更新 スナップショットファイルの生成や更新は --update-snapshots オプションを使用して行うことができます。 一度、HTML構造とテストファイルを次の状態に戻します。 < header id = "header" > < h2 > SubTitle </ h2 > </ header > test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading "Title" [level=1] ` ); } ); この状態で npx playwright test を行なうと、テストは失敗します。したがって先ほどは、HTML構造を自分で書き換えることで、テストを成功させました。今度は --update-snapshots オプションを付与して実行してみます。 $ npx playwright test --update-snapshots Running 1 test using 1 worker New baselines created for: tests/example.spec.ts git apply test-results/rebaselines.patch 1 passed ( 5 .5s ) 今回はテストが成功し、 test-results/rebaselines.patch にパッチファイルが生成されました。 diff --git a/tests/example.spec.ts b/tests/example.spec.ts --- a/tests/example.spec.ts +++ b/tests/example.spec.ts @@ - 3 , 7 + 3 , 7 @@ test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - - banner: - - heading "Title" [level=1] + - banner: + - heading "SubTitle" [level=2] ` ); } ); git apply を使うことでパッチファイルの内容を tests/example.spec.ts に適用することができます。 $ git apply test-results/rebaselines.patch tests/example.spec.ts を確認してみると、下記のように差分が適用されていることを確認できます。 import { test , expect } from "@playwright/test" ; test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading "SubTitle" [level=2] ` ); } ); また、 toMatchAriaSnapshot() に空文字列を渡すことで、その場でスナップショットを生成することもできます。この方法を利用して初回テスト実行時に、簡単にスナップショットを生成することができます。 import { test , expect } from "@playwright/test" ; test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( "" ); } ); 上記の状態で --update-snapshots を付与したテストを実行してみましょう。 $ npx playwright test --update-snapshots test-results/rebaselines.patch に次のパッチファイルが生成されます。 diff --git a/tests/example.spec.ts b/tests/example.spec.ts --- a/tests/example.spec.ts +++ b/tests/example.spec.ts @@ -2 , 5 + 2 , 8 @@ test ( " test example " , async ( { page } ) => { await page.goto ( " http://localhost:5500/ " ) ; - await expect ( page.locator ( " #header " )) .toMatchAriaSnapshot ( "" ) ; + await expect ( page.locator ( " #header " )) .toMatchAriaSnapshot ( ` + - banner: + - heading " SubTitle " [ level = 2 ] + ` ) ; } ) ; ariaSnapshot() メソッド ariaSnapshot() メソッドは、指定された locator の範囲内のスナップショットをYAML形式で出力します。これは、テスト実行中にスナップショットを動的に生成したい場合などで役に立ちます。 const snapshot = await page.locator( "#header" ).ariaSnapshot(); console . log (snapshot); // - banner: // - heading "SubTitle" [level=2] おわりに UIテストはその性質上、非常に繊細で難しい作業です。画面の構造や見た目の変化はアプリケーションの成長とともに頻繁に発生します。一方でテストコードのメンテナンスや、意図しない変更の検出にはなるべく労力をかけたくないものです。 そんな中、PlaywrightのAria snapshotsは、アクセシビリティツリーというアプローチでUIの構造を簡潔かつ柔軟にテストできる点で有用に感じました。YAML形式で視覚的にわかりやすくページの構造を記述できることもメリットの一つです。 今後も、実際のプロジェクトに適した方法を模索しながら、製品の品質向上につなげていければと思います。 参考文献 https://playwright.dev/docs/aria-Snapshots https://github.com/microsoft/playwright/releases/tag/v1.49.0
aptpod Advent Calendar 2024 12月11日の記事です。 ネイティブアプリケーション開発を担当している上野です。近年、デジタルツインの注目度が高まりつつある中、実際にデジタルツインアプリケーションを作る際の開発プラットフォームの選択肢として、UnityやROSが挙げられると思います。 aptpod,Inc News 上記ニュースで開発しているアプリケーションもUnityとROSが活用されています。 今回は実際に私がUnityとROSを利用してデジタルツインアプリケーションを開発している際に利用しているツールや取り扱い方法などをご紹介します。 UnityとROSの接続方法 ROSサーバーのセットアップ Docker Composeファイルの作成 Dockerコンテナの起動 Unityアプリのセットアップ プロジェクトの作成 ROS-TCP-Connectorのセットアップ ROSバージョンの選択 ROSコネクションの配置 ROSメッセージのサブスクライブ/パブリッシュ検証 ROSメッセージをサブスクライブしてみる ROSbagファイルに格納されたROSメッセージの確認方法 ROSメッセージをパブリッシュしてみる 独自に設計されたROSメッセージの生成方法 ROS用に設計されたロボットモデルのインポート その他、開発する上で注意ポイント Windowsの場合、Dockerで作成したROSサーバーは同一LAN内のROSノード(サーバー)とは接続できない UnityとROSでは座標系が違う 最後に UnityとROSの接続方法 UnityとROSとの接続は以下の2つのモジュールを利用しています。 ROS-TCP-Connector はUnity内にインストールするパッケージです。こちらを利用する事でTCP経由でROSメッセージのサブスクライブやパブリッシュが可能です。 ROS-TCP-Endpoint はROS用のパッケージでROSネットワーク内のメッセージをTCPでUnityへ橋渡してくれるエンドポイントです。 よくある例でROSBridgeを利用してWebSocket経由でROSメッセージのやり取りをする例がありますが、それらでやり取りするよりも圧倒的にパフォーマンスが良いようです。実際にPC内にDockerで作成したROSサーバーで発行した数百Mbpsクラスの点群データもスムーズにUnityに届けられているのでかなり高速と言えます。 ROSサーバーのセットアップ 今回はUnityがインストールされたWindowsPC内にDockerを利用してROS2サーバー(ノード)をセットアップし、ROSメッセージのやり取りを行えるようにします。 今回の構成 Docker Composeファイルの作成 Dockerのセットアップ方法は Unity-Robotics-Hub というROSとUnityのチュートリアルがあり、同一リポジトリ内の ros_unity_integration にROS-TCP-Endpointのパッケージを自動追加しつつROSサーバーをセットアップするDockerファイルがあるので利用すると簡単です。 私はros_unity_integrationフォルダと同階層にDocker Composeファイルを作成して利用しています。 ROSサーバーをDockerで建てる際のフォルダ例 docker-compose.yml #version: "3" services : ros2-endpoint : image : foxy container_name : ros2 build : context : ./ros_unity_integration dockerfile : ./ros2_docker/Dockerfile ports : - 10000:10000 volumes : - ./bagfiles:/home/dev_ws/bagfiles networks : - ros2_external restart : always command : > bash -c ". /home/dev_ws/install/setup.bash && export FASTRTPS_DEFAULT_PROFILES_FILE=/home/dev_ws/src/fastrtps-profile.xml && ros2 run ros_tcp_endpoint default_server_endpoint --ros-args -p ROS_IP:=0.0.0.0 -p ROS_TCP_PORT:=10000" ros2-endpoint2 : image : foxy container_name : ros2_for_controller build : context : ./ros_unity_integration dockerfile : ./ros2_docker/Dockerfile ports : - 10001:10001 volumes : - ./bagfiles:/home/dev_ws/bagfiles networks : - ros2_external restart : always command : > bash -c ". /home/dev_ws/install/setup.bash && export FASTRTPS_DEFAULT_PROFILES_FILE=/home/dev_ws/src/fastrtps-profile.xml && ros2 run ros_tcp_endpoint default_server_endpoint --ros-args -p ROS_IP:=0.0.0.0 -p ROS_TCP_PORT:=10001" networks : ros2_external : 上記Docker Composeファイルでは、 チュートリアル で行っているROS-TCP-Endpointのセットアップコマンドなどを一括実行させています。 また、Docker Composeファイルと同階層に作ったbagfilesフォルダをマウントする事で検証で利用するROSbagファイルを簡単に参照できるようにしています。 コンテナが複数ありますが、こちらはUnityアプリとROS-TCP-Endpointの疎通は1対1でないと接続不調になるケースがあった為です。 ROSは同一ネットワーク内のメッセージを自動で送受信する事が出来ます。Docker Composeの設定でコンテナ同士の通信を許可しておけば複数のコンテナに分かれていても問題はありません。 各コンテナの違いはコンテナ名とポートのみです。 私はコンテナが複数と1つだけのDocker Composeファイルを作成して使い分けています。 Dockerコンテナの起動 Dockerコンテナの起動コマンドは以下です。 # for docker-compose.yml docker compose up -d # for docker-compose2.yml docker compose -f docker-compose2.yml up -d 対象のDocker Composeファイルのファイル名が docker-compose.yml 以外の場合は -f オプションでファイル名を指定します。 コンテナの停止例は以下です。 # for docker-compose.yml docker compose down --rmi all --volumes --remove-orphans or docker compose -f docker-compose2.yml down --rmi all --volumes --remove-orphans Docker自体のセットアップ方法は今回は解説しませんがWindowsの場合はWSL(Windows Subsystem for Linux)内、Macの場合は Colima を利用、Linuxの場合はOSへ直接セットアップして利用しています。 Unityアプリのセットアップ プロジェクトの作成 執筆日(2024/12) ではUnityのプロジェクトを作成する際のプロジェクトテンプレートは ビルトインレンダーパイプライン を利用しています。 Unityプロジェクト作成時のビルトインレンダーパイプラインのテンプレート ユニバーサルレンダーパイプラインでも開発できるかと思いますがiPadなどタブレット対応も考慮している点と、より処理を軽量化したいケースで従来のビルトインレンダーパイプラインの方が勝っているかなと現時点では感じています。 ROS-TCP-Connectorのセットアップ ROS-TCP-Connectorのセットアップ方法は README に書かれている通りでPackage Manager内でGitのURLからインポートする方法があるのでそちらからインストールしてください。 Package Managerで選択する項目 ただ、私がROS-TCP-Connectorを利用する場合は、送受信しているROSメッセージの伝送帯域を確認したいケースが多くあり、標準では伝送帯域を収集する機能を要していない為、フォークして少し調整した物を利用しています。 github.com ROS-TCP-Connector インストール後の画面 帯域の測定方法に興味がある場合は テスト用のスクリプト がありますので参照ください。 ROSバージョンの選択 ROS-TCP-ConnectorはROS1とROS2の両方に対応しています。必要に応じて Robotics/ROS Settings よりバージョンの切り替えを行ってください。 ROS Settingsの表示方法 ROSバージョンは変更可能 ROSコネクションの配置 Unity内でGameObjectにROS Connectionコンポーネントを配置し、必要に応じてIPアドレスやポートを変更すればROSサーバーとの疎通に関しては準備完了です。 ROS Connection のInspector うまくROSサーバーと接続できていればアプリ実行時にシーン画面の左上に表示される矢印が青く点灯します。 ROSとの接続が成功している際の表示 接続できていない場合は、矢印が赤色になります。 ROSとの接続に失敗している際の表示 ROSメッセージのサブスクライブ/パブリッシュ検証 ここからはデータの疎通確認の例を記載します。 ROSメッセージをサブスクライブしてみる ROSメッセージをサブスクライブするには対象のメッセージのトピック名と型をROS Connectionに渡すだけで可能です。 ROS_StringSubscriber.cs using RosMessageTypes.Std; using System; using Unity.Robotics.ROSTCPConnector; using UnityEngine; using UnityEngine.Events; public class ROS_StringSubscriber : MonoBehaviour { [SerializeField] private string topicName = "/string" ; public string ReceivedTime; public string Data; public bool ShowReceiveLog = true ; public UnityEvent< string > OnReceiveData = new UnityEvent< string >(); // Start is called before the first frame update void Start() { ROSConnection.GetOrCreateInstance().Subscribe<StringMsg>( this .topicName, (message) => { if ( !this .enabled) return ; if (ShowReceiveLog) Debug.Log( $"OnReceiveStringMessage(topicName: { this.topicName } , data: { message.data } )" ); var dateTime = DateTime.UtcNow; ReceivedTime = dateTime.ToLocalTime().ToString( "HH:mm:ss.ffffff" ); Data = message.data; OnReceiveData ? .Invoke(message.data); }); } } 作成したコンポーネントをGameObjectに追加し、対象のトピック名を設定するだけで利用できます。 作成した ROS_StringSubscriber の Inspector Unityのプロジェクトを実行したらROSサーバーからメッセージをパブリッシュしてみます。 # Launching Docker containers. docker compose up -d # Access to Docker containers. docker exec -it ros2 /bin/bash # Sending ROS messages. ros2 topic pub --once /string std_msgs/msg/String " data: Hello World! " or ros2 topic pub --once /string std_msgs/String " data: Hello World! " ROSメッセージを受信した際の ROS_StringSubscriber の Inspector JSONなども送信可能です。 ros2 topic pub --once /string std_msgs/String " data: {'key': 'value'} " JSON形式を受信した際の ROS_StringSubscriber の Inspector ROSbagファイルに格納されたROSメッセージの確認方法 ROSbagに格納されたメッセージの確認を行う場合はまずは ros2 bag info で格納されているROSメッセージのTopic名や型を調べます。 ros2 bag info の実行例 上記で判明したTopic名と型を用いてUnityでサブスクライブ設定を行います。 ROSbag内のデータは時系列に格納されており、 ros2 bag play を行う事で実際に記録した際と同じ時間間隔でメッセージの再生が可能です。 ros2 bag play { ROSbagファイルパス } 上記を実行する事で格納されたROSメッセージがROSネットワークに発信され、Unityにも同様にROSメッセージを伝達する事が可能になります。 ROSメッセージをパブリッシュしてみる 続いて逆にUnityからROSサーバーへデータを送信してみます。以下の2つのファイルを用意してみました。 ROS_StringPublisher.cs using RosMessageTypes.Std; using System.Collections; using System.Collections.Generic; using Unity.Robotics.ROSTCPConnector; using UnityEngine; public class ROS_StringPublisher : MonoBehaviour { [SerializeField] private string topicName = "/string2" ; public bool ShowSendLog = true ; private ROSConnection ros; private StringMsg msg; private bool enable = false ; // Start is called before the first frame update void Start() { // Setup ROS this .ros = ROSConnection.GetOrCreateInstance(); this .ros.RegisterPublisher<StringMsg>( this .topicName); // Setup ROS Message this .msg = new StringMsg(); } public void Publish( string data) { if ( ! enable) return ; if (ShowSendLog) Debug.Log( $"PublishStringMessage(topicName: { this.topicName } , data: { data } )" ); this .msg.data = data; this .ros.Publish( this .topicName, this .msg); } private void OnEnable() { enable = true ; } private void OnDisable() { enable = false ; } } PublishSample.cs using UnityEngine; using UnityEngine.Events; public class PublishSample : MonoBehaviour { public string Message = "Unity to ROS test" ; public UnityEvent< string > PublishEvent = new UnityEvent< string >(); // Update is called once per frame void Update() { if (Input.GetKeyDown(KeyCode.Space)) { PublishEvent ? .Invoke(Message); } } } ROS_StringPublisherではROSConnectionに対し、送信するトピック名と型を登録し、関数Publish()に送られてきた文字列を送信する例になります。 PublishSampleはスペースキーが押されたら、変数Messageに格納されている文字列をPublisherに伝達できるようにイベントを発火しています。 新しくGameObjectを作成し、それぞれのコンポートを追加したら、PublishSampleのInspectorでPublishEventにROS_StringPublisherのPublish関数を登録します。 PublishSampleにROS_StringPublisherのROSメッセージ送信処理を登録する例 PublishSampleにROSメッセージ送信のイベントが登録できた際の例 Unityのプロジェクトを実行後、スペースキーを押すとログが出力されればOKです。 ROSメッセージ送信のイベントが発生した際に表示されるログ 実際にROSサーバー側で送信されたROSメッセージを確認するには ros2 topic echo を利用します。 # Access to Docker containers. docker exec -it ros2 /bin/bash # Checking ROS messages. ros2 topic echo /string2 std_msgs/String ROSサーバーでros topic echoを実行した際の例 正しくメッセージが受信できれば、型に定義されたデータ名とその中身が表示されます。 独自に設計されたROSメッセージの生成方法 標準でサポートされているROSメッセージだけでなく、状況に応じてカスタマイズされたROSメッセージを利用したいケースがあると思います。 ROS-TCP-ConnectorではROSメッセージの生成もサポートしているので対応可能です。 ROS Message Browserの開き方 Robotics/Generate ROS Messages... で ROS Message Browserを開き、パスの設定を行います。 ROS Messageの生成例 基本はUnityプロジェクト内のAssetsフォルダ配下に ROSMessages フォルダを作成し、ROSエンジニアから共有頂いたROSMessage定義ファイル( 拡張子 .msg )を格納すれば自動生成されます。 今回は NMEAメッセージをUnity用ROSMessageに変換してみました。 github.com ROS用に設計されたロボットモデルのインポート ROS用に設計されたロボットモデル多くは xacro や urdf といったxml形式のモデルが多いです。これらのモデルはUnityでは標準サポートしていない為、Unityへインポートする例も紹介します。 TurtleBot3 Waffle Piモデル 今回はURDFで定義されたTurtleBot3モデルを利用します。 利用するリポジトリは以下です。 github.com URDFのインポートには URDF-Importer というUnityパッケージを利用します。 github.com ROS-TCP-Connectorの時と同じようにGithubのリンクからインポートを行います。 URDF-Importer のインストール後の画面 Assets配下にURDFフォルダを作成しその配下に TurtleBot3の プロジェクト をクローンしてきます。 URDFファイルのインポート方法 クローンしてきたプロジェクトのトップディレクトリにurdfファイルありますのでそちらを右クリックなどでオプションを表示し、 Import Robot from Selected URDF file 選択し、 URDF Import Settings を表示します。 URDF Import Settings 表示された URDF Import Settings で Import URDF を選択することでモデルが生成されると思います。 URDFから生成された3Dモデル例 今回用意したurdfのリポジトリは元のリポジトリからフォークして若干調整しています。 turtlebot3_waffle_pi.urdf ... <link name = "base_link" > <visual> <origin xyz = "-0.064 0 0.0" rpy = "0 0 0" /> <geometry> <mesh filename = "package://turtlebot3_description/meshes/bases/waffle_pi_base.stl" scale = "0.001 0.001 0.001" /> </geometry> <material name = "light_black" /> </visual> ... urdfファイルの中身を見るとmeshタグにpackageの定義がされていますが、執筆時点では URDF-Importer はpackageの解決が出来ない為、そのまま利用するとエラーになっていました。 URDFの変換エラー packageのトップディレクトリはurdfファイルがある階層と同じ位置から始まる為、修正した内容としてはurdfファイルをリポジトリのトップへ移動して対応しました。 一旦これらで可視化まで出来ましたが、実はこのままだと生成されたモデルに重力が適用されていてアプリケーションを実行するとモデルが自由落下してしまいます。 自由落下中のモデル デジタルツインの様なアプリ開発のケースではROS空間で動作しているROSモデルをそのままの座標で可視化することが目的な為、重力といったシミュレーション要素は不要のため、デフォルトで生成された不要なコンポーネントは削除して利用しています。 削除するスクリプト例は以下です。 Editor/RemoveURDFImporterComponents.cs using UnityEngine; using UnityEditor; #if UNITY_EDITOR using Unity.Robotics.UrdfImporter; using Unity.Robotics.UrdfImporter.Control; public class RemoveURDFImporterComponents : MonoBehaviour { [MenuItem( "GameObject/Remove URDF-Importer Components" , false , 0 )] static void Execute(MenuCommand command) { foreach (GameObject obj in Selection.gameObjects) { RemoveChildComponents(obj.transform); } } private static void RemoveChildComponents(Transform t) { for ( int i = 0 ; i < t.childCount; i ++ ) { var child = t.GetChild(i); RemoveComponents(child); RemoveChildComponents(child); } RemoveComponents(t); } private static void RemoveComponents(Transform t) { // RemoveComponent<>(t); RemoveComponent<UrdfRobot>(t); RemoveComponent<Controller>(t); RemoveComponent<UrdfPlugins>(t); RemoveComponent<UrdfPlugin>(t); RemoveComponent<UrdfLink>(t); RemoveComponent<UrdfVisuals>(t); RemoveComponent<UrdfVisual>(t); RemoveComponent<UrdfCollisions>(t); RemoveComponent<UrdfCollision>(t); RemoveComponent<UrdfLink>(t); RemoveComponent<UrdfInertial>(t); RemoveComponent<UrdfJointFixed>(t); RemoveComponent<UrdfJointRevolute>(t); RemoveComponent<UrdfJointPrismatic>(t); RemoveComponent<UrdfJointContinuous>(t); // Must be last... RemoveComponent<ArticulationBody>(t); } private static void RemoveComponent< T >(Transform t) where T : Object { var s = t.GetComponent< T >(); while (s != null ) { DestroyImmediate(s); s = t.GetComponent< T >(); } } } #endif 上記はAssetsフォルダ配下に Editor フォルダを作成してその中に作成します。 URDF-Importer の不要な項目を削除する あとは Hierachy内のURDF-Importerから生成されたモデルを右クリック等でオプションを表示し、新たに追加された Remove URDF-Importer Components を実行する事で不要な項目が削除されます。 Remove URDF-Importer Components 実行後 あとはROSメッセージのTFなどからロボットの位置や関節情報をUnityのモデルにバインドしてあげると同期が成立すると思います。 TFのサブスクライブ例は以下です。 ROS_TFSubscriber using RosMessageTypes.Geometry; using RosMessageTypes.Tf2; using System; using System.Collections.Generic; using Unity.Robotics.ROSTCPConnector; using Unity.Robotics.ROSTCPConnector.ROSGeometry; using UnityEngine; using UnityEngine.Events; public class ROS_TFSubscriber : MonoBehaviour { [SerializeField] private string topicName = "/tf" ; [Serializable] public class TFFrame : IEquatable<TFFrame> { public string ChildFrameId; public string FrameId; public string ReceivedTime; public bool DisablePosition = false ; public Vector3 Position; public bool DisableRotation = false ; public Quaternion Rotation; public bool ShowReceiveLog = true ; public UnityEvent<Vector3, Quaternion> OnReceiveTransform = new UnityEvent<Vector3, Quaternion>(); public UnityEvent<Vector3> OnUpdatePosition = new UnityEvent<Vector3>(); public UnityEvent<Quaternion> OnUpdateRotation = new UnityEvent<Quaternion>(); public void OnReceiveMessage(TransformMsg message) { if ( !this .Script.enabled) return ; if (ShowReceiveLog) Debug.Log( $"OnReceiveTFMessage(childFrameId: { ChildFrameId } , frameId: { FrameId } , data: { message } )" ); var dateTime = DateTime.UtcNow; ReceivedTime = dateTime.ToLocalTime().ToString( "HH:mm:ss.ffffff" ); if ( ! DisablePosition) Position = message.translation.From<FLU>(); if ( ! DisableRotation) Rotation = message.rotation.From<FLU>(); OnUpdatePosition ? .Invoke(Position); OnUpdateRotation ? .Invoke(Rotation); OnReceiveTransform ? .Invoke(Position, Rotation); } internal MonoBehaviour Script; public bool Setup(MonoBehaviour Script) { // Load settings. if ( string .IsNullOrEmpty( this .ChildFrameId) && string .IsNullOrEmpty( this .FrameId)) return false ; this .Script = Script; return true ; } public static bool operator == (TFFrame l, TFFrame r) => l.Equals(r); public static bool operator != (TFFrame l, TFFrame r) => ! (l == r); public bool Equals(TFFrame other) { if (ReferenceEquals(other, null )) { return false ; } if ( this .ChildFrameId != other.ChildFrameId) { return false ; } if ( this .FrameId != other.FrameId) { return false ; } return true ; } public override int GetHashCode() { int hash = 1 ; if ( ! string .IsNullOrEmpty(ChildFrameId)) hash ^= ChildFrameId.GetHashCode(); if ( ! string .IsNullOrEmpty(FrameId)) hash ^= FrameId.GetHashCode(); return hash; } public override bool Equals( object obj) { if (obj is TFFrame v) { return Equals(v); } return false ; } } [SerializeField] private List<TFFrame> dataList = new List<TFFrame>(); // Start is called before the first frame update void Start() { var deleteList = new List<TFFrame>(); foreach ( var data in this .dataList) { if ( ! data.Setup( this )) { deleteList.Add(data); } } foreach ( var data in deleteList) { this .dataList.Remove(data); } if ( this .dataList.Count == 0 ) { Debug.Log( $" { topicName } list is empty, so the subcrite is skipped." ); this .gameObject.SetActive( false ); return ; } ROSConnection.GetOrCreateInstance().Subscribe<TFMessageMsg>( this .topicName, (message) => { foreach ( var t in message.transforms) { foreach ( var frame in dataList) { if (t.child_frame_id.Equals(frame.ChildFrameId)) { if ( ! string .IsNullOrEmpty(frame.FrameId) && ! t.header.frame_id.Equals(frame.FrameId)) { continue ; } frame.OnReceiveMessage(t.transform); } } } }); } } ROS_TFSubscriberとモデルとのバインド例 その他、開発する上で注意ポイント Windowsの場合、Dockerで作成したROSサーバーは同一LAN内のROSノード(サーバー)とは接続できない 上記の通り開発や検証端末でWindowsを選択する場合は注意が必要です。設定によってはローカルホスト外と通信できる可能性もありますが恐らく簡単ではないと思います。 LinuxでUnityアプリを動かし、DockerでROSサーバーを建てた場合はこの課題は発生しないので場合によっては動作端末にLinuxを選ばなければならないケースもあるかもしれません。 UnityとROSでは座標系が違う ROSメッセージのTFなどでロボットの座標をUnity内のモデルにそのまま適用すると関節があらぬ方向へいったりします。 ROS: 右手系 Unity: 左手系 それぞれの座標系は上記の通りでして、一応ROS-TCP-Connector内でそれらの変換もサポートしているので利用すると良いと思います。 // Convert from ROS to Unity var unity = message.translation.From<FLU>(); // Convert from Unity to ROS var ros = FLU.ConvertFromRUF(unity); 最後に いかがでしたでしょうか? Unity Technologiesで用意されているモジュールがとても優秀な為、ROSとの通信自体はかなり簡単に行えます。 今回はROSとUnityのみの解説となりましたが、ROSConnectionで行っているお作法は私が開発を担当している intdash-unity に簡単に置き換える事が可能です。 tech.aptpod.co.jp intdash では現存するROSサーバーと併用して、簡単にインターネットを超えてROSメッセージを伝送する事も可能なのでROSプラットフォームとしての可能性が広がると思います。 IoTデータを利用したプラットフォーム、IoTサービスの開発や、リアルタイムデジタルツイン、製造現場のDX化など、IoTシステムの開発でお困りのことがあれば、ぜひ弊社までお声がけください。 弊社の問い合わせフォームはこちらです。 www.aptpod.co.jp
こちらは aptpod Advent Calendar 2024 12月10日の記事になります。 本日はVPoPの岩田が担当します。 今回は、弊社製品のちょっとへんな使い方ということで、リモートデスクトップ用ソフトウェアのひとつである VNC がやり取りする通信を、VM2M Data Visualizer で可視化してみようと思います。 そもそもVNC(Virtual Network Computing)とは? なぜこんなことをしようと思ったのか? 本記事でつくるもののアーキテクチャー 先にできたものを動画で紹介 RFBプロトコルの仕様 ハンドシェイクプロセス プロトコルバージョン のハンドシェイク セキュリティ方式 のハンドシェイク 初期化プロセス クライアントからの初期化メッセージ(ClientInit) サーバーからの初期化メッセージ(ServerInit) 初期化後の通信 クライアントからサーバーへ送るメッセージ サーバーからクライアントへ送るメッセージ RFBプロトコルからの画面キャプチャー画像の構築 Raw Encoding における画像のエンコーディング方式 RFBプロトコルにおける画面データの送信方式 Pythonによる実際のコードの紹介 intdash へのデータの転送 今後の展望とあとがき そもそもVNC(Virtual Network Computing)とは? 弊社のテックブログを読んでくださるような方々であればもちろんご存知だと思いますが、 VNC( V irtual N etwork C omputing) とは、ネットワーク上の離れた場所にあるコンピュータを遠隔操作するための、いわゆるリモートデスクトップ用のソフトウェアです。詳細は Wikipedia に譲りますが、最初に Olivetti & Oracle Research Lab によって開発され、これがGPLライセンスで公開されたために、現在ではいろいろな派生ソフトウェアが存在する、という状況のようです。 私自身もあまり正しく理解できていなかったのですが、VNCというのはあくまでソフトウェアの名称であって、VNCが通信に使用しているプロトコルは RFBプロトコル( R emote F rame b uffer Protocol) という名称で、別途定義されているそうです。このRFBプロトコルは、RFB 3.8 というバージョンが2011年に RFC 6143 として標準化されており、仕様は誰でも閲覧することができます。 本記事のテーマは、このVNCの通信で送受信される以下のようなデータをうまく中継して、弊社製品である VM2M Data Visualizer のダッシュボード上で可視化してやろう、というものになります。 ディスプレイ画面の映像 マウス・キーボード操作 クリップボード情報 なぜこんなことをしようと思ったのか? 若干ふざけたタイトルなのでお遊び記事かと思いきや、一応このテーマにはきちんとした狙いがあり、実は技術調査のアウトプットだったりします。 弊社は、 VM2M Data Visualizer というリアルタイム描画が得意な可視化ダッシュボードツールを開発・提供しています。この VM2M Data Visualizer は、ノーコードで自由にダッシュボードを組み上げられる可視化ツールで、映像からセンサーデータまで様々なデータをひとつのダッシュボードにまとめて可視化することができます。 www.aptpod.co.jp VM2M Data Visualizer がありとあらゆるデータを一つのダッシュボードにまとめられるおかげで、ご利用いただいているお客様から 「あれもこれも、なんでもかんでも、 VM2M Data Visualizer のダッシュボードにまとめて扱いたい」 というご要望をいただくことがあります。 よくあるセンサーデータや映像・音声などであれば既存機能のみで十分対応が可能ですが、使い込んでいけばいくほどまとめたい対象も広がっていき、最終的には 「遠隔地に配置したPC上で動いているレガシーソフトウェアのUIも、まとめてダッシュボードで確認できないか」 という相談をいただいたりする機会も出てくるようになりました。 PCの画面をダッシュボードに表示するだけであれば、画面をキャプチャーして中継することもできますが、PCで稼働することを前提としたソフトウェアは、多くの場合マウスやキーボードによる操作を想定した作りになっており、画面を転送して表示しただけでは使い物になりません(可視化だけでなく、やはり操作を行いたくなります)。 もちろん、画面キャプチャーだけでなくマウスやキーボードの操作情報を中継する機能を自前実装することもできなくはありませんが、いろいろと実現方法を考えていくうちに、 「もはやこれはリモートデスクトップ用のプロトコルそのものなのではないか?」 という考えに思い至るようになりました。 リモートデスクトップ用のプロトコルであれば、Wiindows 向けの超有名どころである RDP や今回対象とした VNC など、すでに広く使われていて実績のあるものが存在します。そういった既存の技術を上手く活用すれば、わざわざ自前実装して車輪の再発明をする必要はありません。 これらをどうにか活用できないかと深堀って調べていくなかで、前述したとおりVNCで使用されているRFBプロトコルが標準化されていることを知り、さらにそのプロトコル仕様が極めてシンプルで簡単に扱えそうなことが分かってきたので、これを調査して製品化に向けた検討をしてみよう、と思い本記事の企画が生まれました。 本記事でつくるもののアーキテクチャー 今回の目的は、最終製品の実装ではなく、RFBプロトコルのフォーマットやシーケンスを理解することに限定し、なるべくライトに対応します。 検証作業を単純化するために、VNCサーバーの前段にTCPプロキシーを挟み込み、そのプロキシーでRFBプロトコルのやり取りをスニッフィングして intdash に引っ張り出すことで、VM2M Data Visualizer までデータを送り届けることにしました。 ※ intdash は VM2M Data Visualizer がリアルタイムデータの通信のために使用する、バックエンドの通信ミドルウェアです。 www.aptpod.co.jp ただしこれは最終的な構成ではなく、実際のプロダクトに機能を取り込むときには、もしかするとRFBプロトコルを intdash で中継できるようにするかもしれませんし、 VM2M Data Visualizer がVNCクライアントとして振る舞えるようにするかもしれません。 機能 案1)RFBプロトコルを intdash で中継し Data Visualizer が受信するパターン 機能 案2)Data Visualizer がそもそもVNCクライアントになるパターン 先にできたものを動画で紹介 Docker Compose でVNCサーバーにする Ubuntu Desktop 環境と、今回実装したプロキシーを立ち上げます。VNCサーバーには 以下の Docker イメージを使わせてもらいました。 github.com VNCサーバーとプロキシーが起動したら、プロキシーが待ち受けているポートに向かって、VNCクライアントでアクセスしてみます。今回の動画では VNC Viewer を使用しました。 プロキシーは、VNCクライアントからのデータをそのままVNCサーバーに中継するとともに、 VM2M Data Visualizer で可視化したい情報については intdash に分岐送信します。 プロキシーが送信しているデータを可視化するように VM2M Data Visualizer のダッシュボードを構成して再生をすると、みごとにVNCサーバーとVNCクライアントが送り合っている情報を VM2M Data Visualizer のダッシュボード上に表示することができました! youtu.be RFBプロトコルの仕様 前述の通り、RFBプロトコルは RFC 6143 として標準化されており、誰でも仕様を確認することができます。 datatracker.ietf.org ざっとプロトコルの流れを確認すると 7.1 Handshake Messages に記載されたハンドシェイクプロセスによって、サーバーとクライアントの間でコネクションが確立されます。その後、 7.3 Initialization Messages に記載された初期化メッセージによって、サーバークライアント間で相互に通知が必要な情報をやり取りします。その後は、基本的には一方通行の通信のみが行われ、その際に使用されるメッセージが 7.5 Client-to-Server Messages および 7.6 Server-to-Client Messages です。 唯一複雑なところといえば、 7.7 Encodings で規定されている、画面イメージを送信する際のエンコーディングの仕様ですが、後述するように最も単純な Raw Encoding に限定して使用すれば、 TRLE や ZRLE などの高度なエンコーディングの仕様は一旦理解を先送りすることができます。 ハンドシェイクプロセス ハンドシェイクは、主に プロトコルバージョン と セキュリティ方式 をサーバーとクライアントで確認しあいます。 プロトコルバージョン のハンドシェイク プロトコルバージョンのハンドシェイクは極めて単純で、 RFB xxx.yyy\n という12文字をサーバーから1回、クライアントから1回の1往復分送り合うだけです。今回はプロキシーを作るだけなので、12バイト分パススルーするだけでやり過ごします。 セキュリティ方式 のハンドシェイク プロトコルバージョンのハンドシェイクが完了したら、次はセキュリティ方式のハンドシェイクを行います。ここではまずサーバーがサポート可能なセキュリティタイプのリストを送信し、クライアントが実際に使用するセキュリティタイプを選択して返信します。 VNCがもともと暗号化を想定しない単純なプロトコルであったこともあり、RFCで規定されているセキュリティ方式は、「 None 」または「 VNC Authentication 」の2方式のみようです。今回の検証においては、「 None 」のみ使用すれば十分であったため、「 VNC Authentication 」の説明は割愛します。なお、弊社製品に取り込む際には、VNCのデータはセキュアな intdash のプロトコルに格納されて伝送されることになるため、VNCやRFBプロトコルのレベルで高度な認証や暗号化に対応していなくても全く問題ありません。 クライアントが送付したセキュリティタイプをサーバーが受け入れ可能な場合は、結果を応答してハンドシェイクは完了です。受け入れられない場合は、結果や理由を応答した後にサーバーからコネクションを切断します。 初期化プロセス 初期化プロセスでは、サーバーとクライアントが相互に通知すべき情報を、それぞれ1メッセージの送信で通知しあいます。 クライアントから送信されるメッセージを ClientInit 、サーバーから送信されるメッセージを ServerInit といいます。 クライアントからの初期化メッセージ( ClientInit ) ClientInit メッセージによって送信される情報は、 shared-flag のみです。本記事においてはあまり重要な情報ではないため、説明は割愛します。 ClientInit メッセージ サーバーからの初期化メッセージ( ServerInit ) ServerInit メッセージによって送信される情報は、以下のとおりです。 画面サイズ(framebuffer-width, framebuffer-height) 各ピクセルの表現方法(PIXEL_FORMAT) 画面の名称(デスクトップに関連付けられた名前) ServerInit メッセージ ここで通知された画面サイズが画面全体のサイズになります。 また、PIXEL_FORMATでは1ピクセルを何ビットで表すか、RGBは何ビットずつどの順で配置されているかなどの情報を含みます。細かな仕様はRFCに譲りますが、この情報により、サーバーから送られる画面データをどの様にパースしてイメージに再構成すればよいかを知ることができます。 PIXEL_FORMAT のフォーマット 初期化後の通信 クライアントからサーバーへ送るメッセージ クライアントからサーバーに送付するメッセージには、以下のものがあります。 SetEncodings SetPixelFormat KeyEvent PointerEvent ClientCutText FramebufferUpdateRequest SetEncodings を用いて、クライアントがサポート可能なエンコーディングを指定することで、サーバーに対して使用するエンコーディングを制限させることができます。 ちなみに今回の実装では、最も単純な Raw Encoding のみにエンコーディングを制限させるため、クライアントからの SetEncodings メッセージを中継する際に、クライアントに内緒でこっそり Raw Encoding 以外のエンコーディングをオミットする処理を入れています。プロトコルのシーケンスとしては SetEncodings に対してサーバーからの応答確認などもなくクライアントからメッセージを送りつけっぱなしにするだけなので、このような内緒の通信内容の書き換えもできててしまいます。 SetPixelFormat は、途中でピクセルの表現方法を変更するときに使用します。 KeyEvent 、 PointerEvent はその名の通り、キーボードの押下やマウスの移動を通知するメッセージです。こちらのメッセージをパースすることで、どのキーが押されたか、マウスがどこに動いたかをトラッキングできます。 KeyEvent メッセージ PointerEvent メッセージ ClientCutText はクライアント側のクリップボードの状態をサーバーに通知します。クライアント側がクリップボードに何かを載せたとき、それがサーバーに通知されてサーバー側で利用可能になります。 ClientCutText メッセージ FramebufferUpdateRequest は、画面キャプチャー画像を送信するよう要求するメッセージです。VNCでは基本的に、クライアントからの要求に応じて画面キャプチャー画像が返信されるプロトコルとなっています。ただし、サーバーは要求された範囲に対して画面の更新があるまで応答を先延ばしにすることができ、また、複数の要求に対して一つの応答を返せばよい、という決まりになっており、必要に応じて必要なだけ変更情報を送信できる効率のよい仕組みになっています。 サーバーからクライアントへ送るメッセージ サーバーからクライアントに送付するメッセージには、以下のものがあります。 SetColorMapEntries Bell FramebufferUpdate ServerCutText SetColorMapEntries および Bell については、本記事においては重要なメッセージではないため、説明を割愛します。 ServerCutText は、クライアント側の ClientCutText と同様に、サーバー側のクリップボードの状態をクライアントに通知します。 FramebufferUpdate は、サーバー側で画面に更新のあった特定範囲の画面キャプチャー画像を送信します。メッセージには複数の範囲のキャプチャー画像が含まれ rectangle と呼ばれます。rectangle は位置(x, y)、サイズ(w, h)とエンコーディングのタイプ、rectangle 内のキャプチャー画像データで構成されます。 画像データはエンコーディングタイプそれぞれ応じた形式で格納されますが、前述の通り今回はクライアントからの SetEncodings メッセージに細工をして Raw Encoding でしかデータが返ってこないようにしているため、 Raw Encoding として読み出せば事足ります。 FramebufferUpdate メッセージのヘッダー情報 各 rectangle のフォーマット RFBプロトコルからの画面キャプチャー画像の構築 Raw Encoding における画像のエンコーディング方式 Raw Encoding においては、1ピクセル分のデータサイズは ServerInit メッセージや SetPixelFormat メッセージによって送られる PIXEL_FORMAT によって決められます。 PIXEL_FORMAT のフォーマット(再掲) 今回使用したVNCサーバーでは、32ビット = 4バイトを使用することになっていました( bits-per-pixel = 32 )。ただし、実際にRGBデータが格納されるのは24ビット = 3バイトのみで、1バイト分はパディングのようです( depth = 24 )。また、RGBのアサインについても若干注意が必要で、RGBの順ではなくBGRの順番で並んでいるようでした( red-shift=16, green-shift=8, blue-shift=0 )。 このように1ピクセルあたり4バイトとして、これを画面の左から右へ横1行分並べて、さらにその各行分のバイト列を上から下の順番になるように結合したものが、イメージデータとなります。 RFBプロトコルにおける画面データの送信方式 RFBプロトコルにおける画面キャプチャー画像のデータは、前述の通り FramebufferUpdate メッセージに含まれる複数の rectangle によって送信されます。各 rectangle はその位置(x, y)、サイズ(w, h)とエンコーディングのタイプ、rectangle 内のキャプチャー画像データを持ちます。 このように、RFBプロトコルでは画面全体のキャプチャー画像を毎回送信するのではなく、画面全体のうちの更新のあった特定の範囲の画像のみをオンデマンドで送信するという仕様を採用することで、効率良く画面の更新を伝えるプロトコルになっているようです。 今回のように、画面全体のキャプチャー画像を構成しようとする場合には、各 rectangle 毎に画像を生成するだけでなく、それまでに受け取った rectangle を保持して、画面全体を表す画像に適宜パッチをあてていく処理を実装しなければなりません。 ちなみに、画面全体の大きさは、 ServerInit メッセージに格納された画面サイズで知ることが出来ます。 Pythonによる実際のコードの紹介 以上を踏まえると、受け取った rectangle を Python で画像化する処理は次のようになります。なお、以降のコードは PIL ライブラリの Image オブジェクトへの変換までの処理になります。 ServerInit メッセージを受け取ったときの処理 ServerInit メッセージによって画面全体のサイズ (framebuffer_width, framebuffer_height) が分かったら、画面キャプチャー全体を格納するための Image オブジェクト display_img を初期化します。 display_img = Image.new( "RGB" , (framebuffer_width, framebuffer_height)) FramebufferUpdate メッセージを受け取ったときの処理 rectangle から画像データ rect_data 、位置 (x, y) 、サイズ (w, h) を取得し、rectangle に相当する Image オブジェクト rect_img を生成したのちに、画面キャプチャー全体を格納している Image オブジェクト display_img の該当箇所に貼り付けます。 # イメージデータをNumPy配列化し、(h, w, 4) の3次元配列に整形する arr = np.frombuffer(rect_data, dtype=np.uint8).reshape(h, w, 4 ) # RGBの順序に気をつけて、Imageオブジェクトに変換する rect_img = Image.fromarray(arr[:, :, [ 2 , 1 , 0 ]]) # 全体画面に対して、更新箇所を貼り付ける display_img.paste(rect_img, (x, y)) あとは、 Image オブジェクトを JPEG にするなり PNG にするなり好きな方法で画像ファイル化すれば、画面キャプチャー画像をファイルとして取得出来ます。 intdash へのデータの転送 RFBプロトコルをパースして画面キャプチャーデータを取得できるようになったら、あとはそれを intdash に送信すれば VM2M Data Visualizer で受信してダッシュボードに表示できるようになります。今回はプロキシーを Python で実装したので、Python 用の intdash のクライアントライブラリ iscp-py を使用しました。iscp-py には、intdash とのリアルタイム接続のためのインターフェイスが一通り実装されており、intdashへのデータ送信プログラムを簡単に書くことができます。 docs.intdash.jp 今回は、この iscp-py を使って、プロキシー内の以下の処理ポイントに intdash へのデータ送信を加えてみました。 FramebufferUpdate の受信時(画面キャプチャー) KeyEvent の受信時(キーボード操作) PointerEvent の受信時(マウス操作) ClientCutText, ServerCutText の受信時(クリップボード) KeyEvent 、 PointerEvent メッセージはJSON形式のテキストで、 ClientCutText 、 ServerCutText メッセージはそのままの文字列で、 FramebufferUpdate メッセージは JPEG 形式の画像として送信することで、 VM2M Data Visualizer のダッシュボード上で、標準パーツを使用して可視化できるようになります。 今後の展望とあとがき 今回のお試し実装により、RFBプロトコルの概要が掴めました、RFCには高度なエンコーディングも記載されているものの、クライアントからの要求によりサーバーに単純なエンコーディングを強制することが可能であることも分かりました。また、一部のエンコーディングを除けば、全体として極めて単純なプロトコルであることが改めて確認できました。 ここまで単純明快なプロトコルであれば、 例えば VM2M Data Visualizer のパーツに、VNCクライアントとして振る舞うものを追加することもさほど難しくはなさそうです (もちろん、実際にプロダクト化を進めるにあたっては、 TRLE や ZRLE などのエンコーディングも必要に応じて調査をし、対応を検討します)。 様々なお客様とのプロジェクトを進めていくなかで、実はDX化を進める現場にも、まだまだレガシーなシステムがたくさん稼働していることは把握しています。弊社の VM2M Data Visualizer は、そういったレガシーシステムとも連携ができる柔軟なプロダクトになれれば良いなと、今回の調査以外にも日々色々と検討を進めています。 今後も、皆さまのご意見を取り入れつつ、より良いプロダクトを作っていきたいと考えておりますので、すでに利用してくださっている皆さま、これから使ってみたいとご興味を持っていただけた皆さま、ぜひ気軽にお声がけいただき、弊社のサービスやプロダクトに対するご意見をいただければ幸いです。 本記事で取り上げたレガシーシステムとのVNCによる連携につきましても、もし活用イメージをお持ちいただけましたら、ぜひご意見をお寄せください。お問い合わせはこちらです。 www.aptpod.co.jp 最後までお読みいただきありがとうございました。
aptpod Advent Calendar 2024 の12月9日の記事です。 本日は、テクニカルライターの篠崎が担当します。 当社では今年、ドキュメントサイトをリニューアルし、複数のプロダクトのドキュメントを1つのウェブサイトに統合しました。 これにより、読み手は全体を俯瞰しながら各製品の詳細に読み進めることができるようになりました。 また、私のような制作側にとっては、ページ間のリンクを張りやすくなりました。内容の重複も減らすことができ、更新作業もしやすくなりました。 統合されたドキュメントサイト 製品ドキュメントの作成には Sphinx を使っています。これまでプロダクトごとに別々だったSphinxデータは、これからは1つのプロジェクトとして管理するようにしました。 ただし、いままで別のプロジェクトだったものを1つにまとめるには、Sphinxデータのフォルダーをマージするだけでなく、いろいろな調整が必要でした。 例えば、複数のプロダクトを1つのドキュメントで扱うことになったため、ドキュメント内には複数のプロダクトのバージョン番号が登場するようになりました。 通常、Sphinxでは説明対象プロダクトのバージョンを設定ファイル内に書くことで、それを文中に展開して使用することができますが、複数のプロダクトを扱う場合には一工夫が必要でした。 ここではそれについて説明します。 ドキュメント内で展開できる変数(substitution) 外部へのリンクを作成するsphinx.ext.extlinks 説明対象プロダクトの情報をまとめて定義する まとめ 参考文献 ドキュメント内で展開できる変数(substitution) Sphinxでは、設定ファイル conf.py 内で version という変数を定義しておくことにより、本文のreStructuredText原稿で |version| と書いた部分にその値を展開することができます。 後日バージョンが変更になったときは、設定ファイル内の version 定義を変更するだけで文中に反映することができます。 conf.py version= '1.0.0' reStructuredText原稿 本サイトでは、MyApp v\ |version| について説明します。 (マークアップとしては |version| の前後にスペースが必要です。ただし MyApp v の後にバックスラッシュを入れ、その直後のスペースを無視するようにしています。) HTMLビルド結果 展開された |version| この |version| のような |...| を使った一種の変数展開はSphinxでは substitution と呼ばれています。 conf.py 内で使用される変数のうち、いくつかの変数はデフォルトで substitutionとして使用可能 になっています。 最初に書いたとおり、これからは複数のプロダクトを1つのドキュメントで扱うことになったので、「各プロダクトのバージョンを全部substitutionとして扱えるようにしたい」と思いました。そのために、この version とは別のsubstitutionを独自に設定することにしました。 substitutionは、reStructuredText上で以下のような書式で定義することができます。 reStructuredText原稿 .. |my-app1-version| replace:: 1.2.3 .. |my-app2-version| replace:: 4.5.6 これにより |my-app1-version| が 1.2.3 という文字列に、 |my-app2-version| が 4.5.6 という文字列に置換されます。 文中で以下のように書くと展開されます。 reStructuredText原稿 本サイトは以下のバージョンについて説明しています。 * MyApp1 v\ |my-app1-version| * MyApp2 v\ |my-app2-version| HTMLビルド結果 2つのバージョン番号を展開 ですが、 .. |my-app1-version| replace:: 1.2.3 のような定義はreStructuredTextファイルごとに書かなければなりません。複数のページからなるドキュメントでは大変です。 これを解決するのが、 conf.py の rst_prolog (または rst_epilog )です。 rst_prolog 変数の内容は、ビルド時にすべてのreStructuredTextファイルの先頭に付加されます。 そのため、 conf.py を以下のようにすればよいわけです。 conf.py rst_prolog= ''' .. |my-app1-version| replace:: 1.2.3 .. |my-app2-version| replace:: 4.5.6 ''' これで、ひとつひとつのreStructuredTextファイルで定義したのと同じ効果を得られます。 どのファイルでもこれらのsubstitutionを使えるようになりました。 外部へのリンクを作成するsphinx.ext.extlinks さらに弊社製品では、ドキュメントにはAPIリファレンスへのリンクが含まれます。 どうしても長いURLを原稿内に書くことになります(例えばこのような形式のURL: https://www.example.com/app1-reference/1.2.3/get-users )。 長いURLを繰り返し記載したいときに便利なのが、 sphinx.ext.extlinks です。 sphinx.ext.extlinksは機能拡張ですが、Sphinxに組み込まれているため新たにインストールする必要はありません。使用するには、 conf.py で extensions リストに入れるだけでOKです。 conf.py # すでにextensionsの定義がある場合は適宜修正してください extensions = [ 'sphinx.ext.extlinks' ] そのうえで、以下のように、リンクのテンプレートを定義します。 conf.py extlinks = { 'app1-api' : ( 'https://www.example.com/app1-reference/1.2.3/%s' , None ), } このようにすると、原稿内で以下のような表記ができるようになります。 reStructuredText原稿 :app1-api: `ユーザー一覧取得APIリファレンス <get-users>` HTMLをビルドすると以下のように展開されます。 HTMLビルド結果(一部省略) < a href = "https://www.example.com/app1-reference/1.2.3/get-users" > ユーザー一覧取得APIリファレンス </ a > sphinx.ext.extlinksによるリンク例 上のreStructuredText原稿で <> の中に入っていたURL断片 get-users が https://www.example.com/app1-reference/1.2.3/ の後ろ(extlinks定義の %s のところ)に展開されています。これにより、長いURLを毎回書く必要がなくなり、原稿がシンプルになります。 当社の場合、APIリファレンスはバージョンごとに用意しているので、リンクには特定のバージョン番号を含める必要があります。 説明対象プロダクトの情報をまとめて定義する バージョン文字列をsubstitutionsやextlinksなどの複数の形式で使うのであれば、重複のないように1か所で定義したくなります。 ここで「設定ファイル conf.py は実行されるPythonコードでもある」ということが活きてきます。以下のようなことができます。 conf.py # 2つの説明対象プロダクトの情報をまとめてdictionaryとして定義 app_versions = { 'my-app1' : { 'name' : 'MyApp1' , 'version' : '1.2.3' , }, 'my-app2' : { 'name' : 'MyApp2' , 'version' : '4.5.6' , }, } # substitutionを定義 # app_versionsをループして、 # `.. |my-app1| replace:: MyApp1 - v1.2.3` の形式でrst_prologを作成 rst_prolog= '' for app_id, app_info in app_versions.items(): rst_prolog += f '.. |{app_id}| replace:: {app_info["name"]} - v{app_info["version"]} \n ' # extlinksを定義 # app_versionsをループして、特定バージョン番号の入った外部リンクの短縮表記を定義する extensions =[ 'sphinx.ext.extlinks' ] extlinks = {} for app_id, app_info in app_versions.items(): extlinks[f '{app_id}-api' ] = (f 'https://www.example.com/{app_id}-reference/{app_info["version"]}/%s' , None ) ループの中が少し読みづらいですが、結果は以下のようになるはずです。 rst_prolog = ''' .. |my-app1| replace:: MyApp1 - v1.2.3 .. |my-app2| replace:: MyApp2 - v4.5.6 ''' extlinks = { 'my-app1-api' : ( 'https://www.example.com/my-app1-reference/1.2.3/%s' , None ), 'my-app2-api' : ( 'https://www.example.com/my-app2-reference/4.5.6/%s' , None ), } これにより、以下のような原稿を書くことができるようになります。 reStructuredText原稿 本サイトは以下のバージョンについて説明しています。 * |my-app1| * |my-app2| 以下のAPIを使用できます: :my-app1-api: `ユーザー一覧取得APIリファレンス <get-users>` :my-app2-api: `プロジェクト一覧取得APIリファレンス <get-projects>` HTMLビルド結果 2つのプロダクトのバージョンと、リファレンスへのリンク ループを使ってsubstitutionと外部リンクをいっぺんに定義することができました。 これで、説明対象のプロダクトが増えても conf.py の app_versions に追加していくことで管理できます。 まとめ Sphinxの設定ファイル conf.py は、これ自体がPythonで書かれたプログラムなので、設定ファイル内でさまざまな操作をプログラム的に行うことができます。この柔軟性は大変便利です。 当社では日ごろ多くのドキュメントをSphinxで作成していますが、この柔軟性にいつも助けられています。 参考文献 用語を定義したい — Python製ドキュメンテーションビルダー、Sphinxの日本ユーザ会 sphinx.ext.extlinks -- 外部リンクを短縮表記させるマークアップ — Sphinx documentation
aptpod Advent Calendar 2024 12月6日の記事を担当します、人事の照井です。 アプトポッドでは昨年春頃から、従業員の心身の健康維持/促進・生産性向上を目指し健康経営の取り組みを始めました。 今回はその取り組みの内容をご紹介したいと思います。 健康経営とは 健康経営が注目される社会的背景 健康経営を始めたきっかけと健康課題 健康経営の取り組み内容 健康経営に取り組んだ結果 まとめ 健康経営とは 健康経営とは、従業員の健康を企業の重要な資産と捉え健康管理や職場環境の改善に積極的に取り組むことで、生産性向上や企業価値の向上を目指す経営戦略です。 健康経営が注目される社会的背景 <1>少子高齢化と労働力人口の減少 日本の労働力人口は2030年までに約10%減少すると予測されています(総務省/労働力調査)。 労働力が減少する中、社員の健康を維持し生産性を向上させることが企業存続の鍵となっています。 <2>健康問題の拡大 ・メンタル不調やストレスの増加:  厚生労働省の「令和2年労働安全衛生調査」では、労働者の58%がストレスを感じていると回答。 ・プレゼンティーイズム(※)による損失:  日本生産性本部の調査によれば、プレゼンティーイズムによる損失額は欠勤の約2~3倍で、従業員1人あたり年間約50万円と試算されています。  ※出勤しているが生産性が低い状態 <3>政府の健康経営推進 経済産業省は、企業の健康経営の取り組みを評価する「健康経営優良法人認定制度」を導入しました。 健康経営優良法人に選定された企業は、財務面での成果や株価の上昇が見られ、社会的評価が高まるとともに投資家からの関心も集めやすくなっています。 健康経営を始めたきっかけと健康課題 アプトポッドはコロナ禍以降もフルリモートを続けているため、「運動不足」「体重増加」で悩む声が多々ありました。 一部、スタンディングデスクを取り入れる社員、ウォーキングマシーンに乗りながら ミーティングに参加する社員、それぞれ工夫して健康を維持している社員もいますが、普段から運動習慣がないとなかなか腰が上がらない社員が大多数です。 全社に情報を発信し続けることで小さなことから意識するきっかけになり、社員がより健やかに日々を過ごせたらと思い、取り組みを始めました。 健康経営の取り組み内容 健康経営に取り組むにあたり、まずは健康保険組合が主体で行っている 健康優良企業認定を受けることを目標 に開始しました。 これは、企業が従業員の健康づくりに積極的に取り組んでいることを評価・認定する制度です。 <1>健康経営を検討する場 全社員の健康管理に取り組むには人事だけが啓発するのではなく、現場の社員にも協力してもらう必要があると考えました。 そこで、月に1度行われる「衛生委員会」の機会を利用し、各本部から選出されたメンバーにも協力を仰ぎました。 <2>健康についての情報を展開 事前に人事で各月のテーマを決め、年始の衛生委員会にてメンバー内で担当振りをしました。 担当がテーマに沿った資料を作成し、衛生委員会で発表してもらいます。 その内容についてメンバーで少し話したり産業医からコメントをもらい、委員会終了後にその資料を全社展開しています。 下記、2024年に実施した衛生委員会でのテーマです。 テーマは季節に合わせて花粉や熱中症対策等、タイムリーな情報を送ることでより身近に、より関心を持てるようにしています。 展開資料一部抜粋 <3>その他安全衛生に関わる事項の展開 衛生委員会でのテーマ発表の内容展開の他に、人事からも毎月全社に情報展開しています。 この内容も基本的には季節に合わせた内容にしていますが、リモートワークでの就業のためどうしてもオフラインでのコミュニケーションより雑談の機会が少なくなることも考慮し「メンタル・ハラスメント相談窓口」の案内を半期に1度展開しています。 <4>社内のウォーキングイベントを年2回実施 健康保険組合のウォーキングイベントに便乗し、社内でも全社員を対象に年2回行っています。 社員の参加率は毎回9割以上で、このウォーキングイベントをきっかけに体質改善に挑んだ社員もいます。 <5>人事との距離を縮める全社員面談の実施 今年から人事と社員が半期に1度、1時間程度面談する機会をつくりました。 開始当初の目的は健康経営ではなかったものの、リモートワークでのコミュニケーション課題もあり、人事と直接対話することで人事が問題をキャッチアップし、直接上層部へ働きかけ、課題解決に向けて迅速に動くことができています。 普段の上長との1on1とはまた違うメンタル管理等、多角的なケアの取り組みです。 健康経営に取り組んだ結果 これらの活動を健康保険組合に提出し、見事「銀の認定」をいただくことができました。 ※令和5年認定、令和6年認定更新済 まとめ 今後も銀の認定をいただけるよう、継続的に健康経営には取り組んで行こうと思っています。 心身の健康は、個人がどれだけ「気づき」「実行」するかで大きな差が出てきます。 社員が心身共に健やかに、気持ちよく業務を行うことができるよう、今後も社員に寄り添った取り組みを目指していきます。
製品開発グループの大久保です。 aptpod Advent Calendar 2024 の12月5日の記事を担当します。 社内では、エッジ側でintdashに接続可能なデバイスを簡単に開発するためデバイス開発キットの基盤製品を開発しており、それを Device Connector と呼んでいます。これは主にRustによって開発しています。 Rustは高速なコンパイル型言語であり、デバイスとの通信、制御に適しています。しかしながら、データのフィルタリングなどの細かい挙動を、再ビルドを必要とせず利用者が多くて学習コストの小さいPythonで記述したいというニーズもあります。そのような、アプリケーションの基本部分はRustで実装しながら、状況に応じて制御を変えたい部分はPythonで記述したいという場合、RustからPythonを呼び出すことになります。それを今回は PyO3 で試してみたいと思います。 PyO3とは RustからPythonを呼び出す 実行結果 マルチスレッドの場合 まとめ PyO3とは PyO3 は、PythonをRustを利用するためのバインディングを提供します。RustからPythonを呼び出すだけでなく、Pythonから利用されるモジュールをRustで記述することができます。 大体の使い方は以下のユーザーガイドを見れば分かると思います。 https://pyo3.rs/main/getting-started.html RustからPythonを呼び出す 実際にRustからPythonを呼び出してみましょう。実行時には libpython3.x.so の機能を呼び出すので、例えばUbuntuなら以下のように依存ライブラリをインストールする必要があります。 sudo apt install python3-dev Cargo.tomlに以下の依存関係を記述します。 [dependencies] pyo3 = { version = "0.22.6", features = ["gil-refs"] } main.rs は以下のように記述します。 #[pyclass(name = "Greeting" )] #[derive( Clone , PartialEq , Eq , Debug )] pub struct PyGreeting { text: String , } #[pymethods] impl PyGreeting { #[getter] fn get_text ( & self ) -> &str { & self .text } } #[pyclass(name = "Response" )] #[derive( Clone , PartialEq , Eq , Debug )] pub struct PyResponse { text: String , } #[pymethods] impl PyResponse { #[new] fn new (text: String ) -> Self { Self { text } } } #[pymodule] fn mymodule (module: & Bound < '_ , PyModule > ) -> PyResult < () > { module. add_class :: < PyGreeting > () ? ; module. add_class :: < PyResponse > () ? ; Ok (()) } fn main () { pyo3 :: append_to_inittab! (mymodule); pyo3 :: prepare_freethreaded_python (); let script = r#" import mymodule def greeting(g): if g.text == "hello": res = "こんにちは" else: res = "なんですか?" return mymodule.Response(res) "# ; println! ( "{:?}" , run_script ( script, PyGreeting { text: "hello" . into () } ) ); println! ( "{:?}" , run_script ( script, PyGreeting { text: "hoge" . into () } ) ); } fn run_script (script: &str , greeting: PyGreeting) -> PyResponse { let result: PyResult < PyResponse > = Python :: with_gil ( | py | { let pyfunc: Py < PyAny > = PyModule :: from_code_bound (py, & script, "" , "" ) ? . getattr ( "greeting" ) ? . into (); let result = pyfunc. call1 (py, (greeting,)); let response = match result { Ok (response) => response, Err (e) => { eprintln! ( "{}" , e); if let Some (e) = e. traceback_bound (py) { if let Ok (e) = e. into_gil_ref (). format () { eprintln! ( "{}" , e); } } panic! (); } }; let response: PyResponse = response. extract (py) ? ; Ok (response) }); result. unwrap () } 以下細かく解説します。 #[pyclass(name = "Greeting" )] #[derive( Clone , PartialEq , Eq , Debug )] pub struct PyGreeting { text: String , } #[pymethods] impl PyGreeting { #[getter] // getterとして定義 fn get_text ( & self ) -> &str { & self .text } } #[pyclass(name = "Response" )] #[derive( Clone , PartialEq , Eq , Debug )] pub struct PyResponse { text: String , } #[pymethods] impl PyResponse { #[new] // コンストラクタ fn new (text: String ) -> Self { Self { text } } } #[pymodule] fn mymodule (module: & Bound < '_ , PyModule > ) -> PyResult < () > { module. add_class :: < PyGreeting > () ? ; module. add_class :: < PyResponse > () ? ; Ok (()) } このあたりはPythonスクリプトとRust両方から使用できる型を定義し、モジュールにまとめている箇所です。 #[pyclass(name = "...")] 属性で型を定義しつつ、Pythonから違う名前で見えるようリネームしています。 #[pymethods] でコンストラクタ等のメソッドも定義しており、これらもPythonから呼び出せるようになっています。 pyo3 :: append_to_inittab! (mymodule); pyo3 :: prepare_freethreaded_python (); Pythonの初期化を行います。 append_to_inittab!() で自作モジュールが読み込まれるように登録し、 prepare_freethreaded_python() で初期化します。なお、pyo3 に auto-initialize という feature を指定していれば prepare_freethreaded_python() は不要になります。 let script = r#" import mymodule def greeting(g): if g.text == "hello": res = "こんにちは" else: res = "なんですか?" return mymodule.Response(res) "# ; 実行するPythonスクリプトを定義します。 greeting 関数は Greeting 型の引数を受け取り、 Response 型を返します。この例では上記で用意したカスタムの型を受け渡ししていますが、文字列のようなプリミティブな型もやり取りできます。 println! ( "{:?}" , run_script ( script, PyGreeting { text: "hello" . into () } ) ); println! ( "{:?}" , run_script ( script, PyGreeting { text: "hoge" . into () } ) ); スクリプトを実行し、その結果を表示します。 fn run_script (script: &str , greeting: PyGreeting) -> PyResponse { let result: PyResult < PyResponse > = Python :: with_gil ( | py | { let pyfunc: Py < PyAny > = PyModule :: from_code_bound (py, & script, "" , "" ) ? . getattr ( "greeting" ) ? . into (); スクリプトを実際に実行します。Pythonスクリプトの実行は、 Python::with_gil に渡した関数の中で行います。 PyModule::from_code_bound でPythonのソースコードを読み込み、その中の greeting 関数を取得します。 let result = pyfunc. call1 (py, (greeting,)); let response = match result { Ok (response) => response, Err (e) => { eprintln! ( "{}" , e); if let Some (e) = e. traceback_bound (py) { if let Ok (e) = e. into_gil_ref (). format () { eprintln! ( "{}" , e); // スタックトレースの表示 } } panic! (); } }; let response: PyResponse = response. extract (py) ? ; Ok (response) greeting 関数を実行し、その結果を引き出します。Python関数内でエラーが起きた場合スタックトレースの情報が含まれていれば、 e.traceback_bound(py) で取り出して表示します。最後に extract() で返り値を PyResponse に変換して結果を返します。 実行結果 実行してみると以下のようになります。 PyResponse { text: "こんにちは" } PyResponse { text: "なんですか?" } Pythonスクリプト内に記述した分岐の通りの返答を返してくれています。 マルチスレッドの場合 上記の例では関係ないのですが、実際にPyO3を利用して実装した時に苦労した点として、 with_gil に渡した関数を実行できるのは1スレッドという制限があげられます。言い換えればPythonスクリプトを実行できるのは常に1つのスレッドのみです。これはPythonのGILという仕組みによるもののようです。 allow_threads を使えば、一時的にGILを解放できるので、Rustのコード中でブロックするような処理があるところはこれを追加して解決しました。Pythonソースコード内にブロックするところがあると処理が進まなくなる可能性は出るので、完全な解決ではありませんが。 まとめ 今回はRust製アプリケーションにPythonを組み込む方法を紹介しました。ビルド不要で柔軟に挙動を変えたいところ、たとえばデータのフィルタリング処理やゲームのスクリプト等があり、メインの部分はRustで書いているアプリケーションがあるなら、今回の手法は役立つと思います。
aptpod Advent Calendar 2024 12月4日の記事です。 デザインチームの上野です。 近年、デジタルツインの普及に伴い、建設機械や製造業でROSを用いて3Dモデルを扱う事例が増加しています。 そこで今回はROSで用いられる3DモデルのURDFについて0からBlenderを使って作成してみた流れをまとめたいと思います。 URDFとは Phobosとは 3Dモデルを作成しエクスポートしてみる アドオンのインストール モデリング ジョイントの設定 コリジョンの設定 慣性の設定 モーターの追加 マテリアルについて エクスポート モデルの確認方法 URDFとは URDF (Unified Robot Description Format)とは、ロボットの構造を記述するためのXMLのフォーマットです。 ロボットの各パーツの座標や向き、質量などが全てを数値化して記述されています。 Phobosとは Phobosは、Blender向けのオープンソースアドオンで、ROS用ロボットモデルをエクスポートする機能を提供します。URDFに加えて、SDFやSMURFといった形式でのエクスポートも可能です。 3Dモデルを作成しエクスポートしてみる 今回は手順の概要を理解するため、以下のような簡易モデルを作成し、それに設定を加えていきます。 それでは早速進めていきます。 アドオンのインストール 1.ダウンロード 以下のリンクからZipファイルをダウンロードしてください。 github.com 2.アドオンのインポート Blenderのプリファレンスからアドオンをインストールします。 インストールを押し先ほどDLしたZIPファイルを選択。 Phobosのアドオンが表示されるのでチェックをいれる。 この後一度Blenderを再起動するとよいです。 モデリング アドオンがインポートできたのでここからモデリングを進めていきたいと思います。 モデリング時に各パーツの中心座標の位置にジョイントができるのであらかじめ向きを意識して作成しておくと良いと思います。 モデリングが完成したら各メッシュのPhobos Typeを「Visual」に設定します。 メッシュを選択→プロパティの[Phobos Type] →[Visual]に変更 ジョイントの設定 モデルが用意できたらそれぞれのパーツにジョイントを設置していきます。 メッシュ選択後「kinematics」>「Create Link(s)」を押します。 オプションから位置を「Selected Object」にすることでパーツの原点にジョイントが自動的に移動します。 また「Parent Objects」にチェックを入れることでオブジェクトが自動的にジョイントの子供になります。 各パーツジョイントを設置し終わったら階層を整理します。 対象2つ選択後、Ctrl+Pを押すことでペアレントメニューが表示されるので「ボーン相対」を選択しどんどん親子関係を整理していきましょう。 以下のように親子関係を整理しました。 今度は各ジョイントの設定をしていきます。 ジョイントを選択後「Define Joint(s)」を押すことでジョイントの形状が変化し、ロボットの細かい制御の値などが編集できるようになります。 例えばロボットの首の部分に旋回用のJoint Typeを設定し、Lower/Upperの値を変更することで左右首を振り切らないようにできます。 コリジョンの設定 衝突判定用のコリジョンを設定します。 メッシュを選択した後「Create Collision Object(s)」を押すことで衝突反転用のメッシュが追加されます。オプションから形状を変化したり、個別でスケールなどの微調整を行えます。 慣性の設定 メッシュを選択した後「Create Inertials」を押すことで慣性の値を設定することができます。 モーターの追加 稼働させるジョイントを選択して「Add Motor」を押すと設定するモーターの詳細が設定できるようになります。 マテリアルについて マテリアルはスペキュラーBSDFまたはプリンシパルBSDFの使用を推奨しています。 テクスチャがある場合出力をBSDFノードのベースカラーにイメージテクスチャノードを追加する必要があります(ノーマルマップも同様)。 エクスポートされた形式 (URDF、SDF、SMURF など) でサポートされているプロパティと、それをロードするプログラムによって異なる場合があるので結果は確認してください。 エクスポート 赤枠の部分で出力の設定をした後Export Modelを押すと指定したパスに自動的に出力されます。 モデルの確認方法 以下のURLにアクセスし、URDFフォルダをドラッグ&ドロップすると、Webブラウザ上でもモデルを確認できます。 gkjohnson.github.io 以上でBlenderを用いたURDFの作成してみたになります。 結構複雑そうな処理を簡単にモデルに実装できるので是非試してみてください。 まだまだ制御できることはたくさんありそうなので引き続き研究していきたいと思います。
aptpod Advent Calendar 2024 12月3日の記事です。 ソリューションアーキテクトの奥山です。痒いところに手が届くそんな弊社製品の機能をご紹介します。 はじめに 産業IoTの分野では、膨大な計測データを効率的に管理・活用することが重要です。特に、自動車業界のCAEエンジニアやAD/ADASエンジニアにとって、計測データの解析や設計評価に役立てることは、業務効率化や技術革新の鍵となります。しかし、以下のような課題がデータ活用の障壁となっています。 独自フォーマットのデータ形式 が解析や外部ツール連携を難しくしている。 REST APIを利用した独自開発 に多くの工数がかかる。 業界標準形式への対応不足 により、データの再利用や解析が限定されている。 これらの課題を解決するために、aptpodでは計測データを効率的に業界標準形式へ変換する新しいミドルウェア「Meas Converter」を開発しました。 Meas Converter は、特に自動車業界で求められるMDF(Measurement Data Format)形式へのデータ変換をスムーズに行うツールです。この機能により、専門的な知識や追加開発を必要とせず、データ変換をシンプルかつ効率的に実現できます。 本記事では、Meas Converterの概要、特徴、そして操作方法を詳しく解説します。 はじめに Meas Converterとは? 主な特長 なぜMeas Converterが必要なのか? 主な課題 Meas Converterによる解決策 Meas Converterの主な特徴 1. 直感的で簡単な操作 2. 高度な変換機能 3. 将来を見据えた対応範囲 実際の操作説明 1. データを選択する 2. 変換設定を行う 3. 変換を実行する 4. 変換結果をダウンロードする まとめ 注意事項 Meas Converterとは? Meas Converter は、産業IoT分野の計測データを効率的に変換するために設計されたツールです。特に、自動車業界で広く使用される計測データを、業界標準の MDF(Measurement Data Format) に変換することで、データ活用の可能性を大きく広げます。 主な特長 対応データタイプ : CANデータ (車両内通信プロトコルで収集した情報) NMEAデータ (GPSなどの位置情報) アナログ信号データ (センサーから取得した値) 映像データ(テスト走行時の記録、音声対応も予定) 出力形式 : MDF(Version 4) AVI (映像データ) 出力方法 : 複数のデータタイプを変換後、ZIP形式で一括ダウンロード。 「データの価値を最大化し、解析や設計プロセスをシンプルにする、それがMeas Converterの使命です。」 なぜMeas Converterが必要なのか? 膨大な計測データを活用する際、特に以下のような課題に直面することが多いです。 主な課題 独自形式の制約 intdash独自形式のデータは解析や他ツールへの連携が難しく、専用知識や追加開発が必要でした。 業界標準形式への対応不足 MDF形式への変換は外部ツールの導入や複雑な開発が求められるため、業務負担が増していました。 運用の複雑さ REST APIベースのデータ取得や変換は手間がかかり、使い勝手が良いとは言えませんでした。 Meas Converterによる解決策 学習コストの削減 GUIを活用し、専門知識がなくても簡単にデータ変換が可能。 MDF形式への迅速な対応 自動車業界標準のMDF形式へ即座に変換。CAEやAD/ADASのエンジニアが活用しやすい環境を提供。 操作の簡略化 GUIでのワンクリック操作でデータ変換を実現。REST APIに頼らず、直感的な操作が可能。 Meas Converterの主な特徴 Meas Converterの設計理念は、「簡単操作で高い成果」です。その特徴を以下にまとめます。 1. 直感的で簡単な操作 シンプルなデータ変換フロー : データ検索 → 設定 → 出力 → ダウンロードの4ステップで作業完了。 GUIでプログラミングなしでも簡単に操作可能。 2. 高度な変換機能 自動変換機能 : 計測データと変換定義(CANでいうDBCファイルのような設定)を自動で紐付け、手間を軽減。 インポート可能な変換定義 : 変換定義をファイルからインポートして設定可能。 MDF形式への対応 : CAN、NMEA、アナログデータを業界標準形式に変換。 3. 将来を見据えた対応範囲 現在のMDF形式やAVI形式に加え、将来的にJSONやCSV形式への対応を予定。 実際の操作説明 Meas Converter を利用することで、計測データの変換が簡単な操作で完了します。このセクションでは、操作手順をご紹介します。 1. データを選択する UUIDを使って選択または、 Meas Hub(intdashの計測データの管理画面) から計測データを選択することができます。 UUIDからデータを選択 MeasHubからデータを選択 2. 変換設定を行う 選択後、変換結果名や出力範囲を設定することができます。出力するデータは過去の変換結果や新しく設定することができます。 選択した後の画面 出力データを選択します。 出力データをリストから選択する 選択後の出力対象データ 3. 変換を実行する 変換ボタン をクリックするだけで、変換プロセスが開始されます。 進行状況や結果は変換結果一覧にリアルタイムで表示されます。 変換結果 4. 変換結果をダウンロードする 変換結果一覧 から、変換済みファイルを確認。 青い「ダウンロード」ボタンをクリックして、ZIP形式で一括ダウンロードできます。 ダウンロード結果 まとめ Meas Converterは、自動車業界を中心としたエンジニアが直面する課題を解決し、業界標準形式でのデータ活用を加速します。直感的な操作と高度な変換機能により、計測データの価値を最大限に引き出します。 「データ変換をもっとシンプルに。Meas Converterでデータ活用の新しい未来へ。」 注意事項 本記事でご紹介した「Meas Converter」は先行リリース版です。 現段階では、試供版の提供は行っておりませんが、以下の内容をご理解いただけると幸いです。 現在の状態: 本記事に記載された機能や特長は、先行リリース版の仕様に基づいています。正式版リリースに向けてさらなる改良を行う予定です。 ご意見・ご感想のお願い: 本記事をご覧いただき、機能へのご意見や改善のご提案がございましたら、ぜひお知らせください。いただいたフィードバックを元に、より良い製品開発を目指します。 今後の展開: 先行リリース版で得た知見を活かし、正式リリースではさらに幅広いニーズに応える機能を追加予定です。
aptpod Advent Calendar 2024 12月2日の記事です。 みなさまお久しぶりです。アプトポッドで営業企画をしている神前(こうさき)と申します。 またもや 前回の登場 から1年です。4年連続でAdvent Calendarのトップバッターをやっているので、もはや風物詩ですね。 さて、昨年はSalesforceの導入について書いたのですが、今年は運用を1年回してみての感想と、その中での気付きについて書いていきたいと思います。 Salesforceに限らずになにかのシステムを導入する際に、導入するまでも大変だったりしますが、本番はその運用です。いざ導入したけども現場の人は使ってくれなければ宝の持ち腐れです。 導入してみたけどどうも現場に定着しない、使ってもらえてはいるけどもどうも深く使ってくれてはいない、そんな悩みを持っている人に届くと嬉しいです(想定読者が狭い)。 今年やったこと 実運用にあわせた改善 他ツールと連携したSalesforce周辺の改善 閑話休題 SFAはどのように構築、運用していくべきなのか よくある光景 SFAはどう寄与できるか 自分たちにとってのキーはなにか チームで立ち向かう 3者のコミット まとめ 今年やったこと この1年間でやったことは、大きく2つにわけることができます。 実運用にあわせた改善 他ツールと連携したSalesforce周辺の改善 それぞれもう少し詳細をみていきます。 実運用にあわせた改善 昨年の10月から本番運用を開始し、しばらくは実際に現場で使ってもらったフィードバックを元に改善をすすめていました。といっても、劇的に使い勝手が変わるような改善をするのではなく、むしろ項目やフローの追加、整理といった、小さいものを中心にとにかく数多くこなしていきました。 運用開始当初は「操作方法がわからない」、「エラーがでてやりたいことができない」といった問い合わせも多く、その都度対応していましたが、現在ではそういった問い合わせも月に一度あるかないかといったレベルまで使い慣れてもらったと思います。 来た問い合わせに対応するのはもちろんのことですが、自分から課題を拾いに行くことも意識していました。打ち合わせの合間に雑談として操作感をヒアリングしたり、困り事がないか確認するのもそうですが、実際に入力された案件(デフォルトだと「商談」ですが弊社では「案件」と名称を変更しています)の内容を日々眺めることで、使われている項目、使われていない項目はわかりますし、操作に苦慮している部分もなんとなくわかってきます。そこから、自分で実際に操作をしてみて、手間がかかる部分や使いにくい部分、あるいは、あったら便利なフロー等を推察し、時にはアイデアを現場にあてながら改善をすすめました。 運用から半年ほどは主に、直接Salesforceを触る現場の営業メンバーを中心に、言い換えると、「対社内」向けの改善を重点的にすすめてきた形になります。 ちなみに、昨年のブログで紹介をしたフローもリファクタリングを実施し、コンパクトに、かつ、シンプルにすることができました。 他ツールと連携したSalesforce周辺の改善 それに比べ、直近の半年ほどは、「対社外」を意識した改善をすすめてきました。 実際の操作感をよくする、ちょっとしたことをフローを使って自動化して便利にする、というのがだいぶ落ち着いたので、自社で使ってる他のツールとの連携を、フローをうまく活用しながら構築したり、エンドユーザーに送るメールを自動化させたりすることをすすめてきました。 一つの例として、 Zendesk との連携を強化があります。 Zendeskとは、クラウドベースのカスタマーサポートツールなのですが、およそ1年半くらい前に導入をしています。当時はSalesforceの導入が決まり、カスタマーサポートもSalesforceを利用するか検討したのですが、シンプルなツールとしての使いやすさからZendeskになった、という経緯があります(なお、カスタマーサポートの担当が弊社では別にいたため、このあたりの経緯については私自身は直接関わっていたわけではありません)。 現在ではお問い合わせのほとんどがZendeskを経由してくるようになるまで利用されているのですが、改善前まではSalesforceとの連携面が弱く、だれが問い合わせをしてきているかはもちろんわかるものの、その問い合わせがどの案件についての問い合わせかがすぐにわからない等の課題がありました。 そこで、まずは案件に自動連番でユニークな「案件番号」というものを案件作成時に生成し、案件内の項目である「納品日」になると、案件に紐づいたエンドユーザーの担当者さんにメールでZendeskの案内と、お問い合わせの際には案件番号をフォームに入力してもらうように依頼するように自動化しました。 これにより、もれなくエンドユーザーさんに対して問い合わせの窓口であるZendeskの案内ができ、かつ、問い合わせがあった際にも、どの案件についての問い合わせかがすぐにわかるようにできました。 これはあくまで一例ですが、ZendeskとSalesforceの連携強化について他にいくつか実施しています。 Zendesk以外にも、一部のパートナーさん向けのポータルサイトの構築と公開や、エンドユーザーさん向けのポータルサイトの構築をすすめています。 前半(実運用にあわせた改善)に比べると、改善の影響範囲が社内から社外へ広がり、関わるツールもSalesforce以外のものまで含めてだいぶ広くなったと思います。 閑話休題 話は変わるのですが、昨年のAdvent Calendarで書いた記事がSalesforce社の人の目にとまったようで、今年の夏頃にCustomer Evangelistのバッジをいただけました。 袋にいれてとっておいてあるバッジ 自身としては、構築や運用を四苦八苦しながらやってきたものをブログで書いただけなので、特段エヴァンジェリストとしての何かを意識して日々活動をしていたわけではないのですが、なんと今年の10月にこのバッジをいただいたことがこ゚縁となりSalesforce社でのイベントに登壇させていただくこととなりました(貴重な場をいただきありがとうございました)。 Salesforce社が主催で、Salesforce社のオフィスで開催するにも関わらず公式のイベントではない(と当日に話されてました)という摩訶不思議な立ち位置のイベントではありましたが、自身にとってもよい機会となりました。 当日は導入から3年未満くらいの立ち上げ初期のユーザーさんを対象に、私自身が実際に苦しんだところや、導入をしてから現場に使ってもらうためにやってきたことをお話したのですが、その際にでてきた質問や、参加されていた方の状況を聞くことで、色々な気付きがありました。 SFAはどのように構築、運用していくべきなのか ※以下に記載することは神前個人の所感です。また、以下に記載することが弊社で実現できているというわけではないことをあらかじめ記載しておきます。 この1年半強の構築と運用、そして、先日のイベント登壇を経て、改めてSFA(=Salesforce)を導入する理由、目指すべきゴールはどこなのかを振り返ってみました。 導入をしたそもそもの経緯の詳細は 昨年のブログ記事 をご覧いただくとして、大きく言えば、ERPや勤怠システムといった他のツールとの連携による全体最適化が導入の目的だったと言えます。これまで連携をしておらず、バラバラだった各ツールをSalesforceを中心にリプレイスしていくことで、データの二重入力や二重管理を防ぐ、つまり、一つ一つは小さいけど、積み重なるととてつもない無駄になる作業を排除していくことが目的でした。 実際に、導入から運用を開始してから半年ぐらいはそのように動いていました。ですが、それもある程度整ってくると、もちろん日々状況は変化するので要望がつきることはないものの、だいぶ落ち着いてきます。 そこからは方向転換をして、これまで手が回ってなかったこと、他のツールとの連携を活用していくことになるわけですが、とはいえ、どういう方向で動いていくべきかについては様々な選択肢があります。マイナスをゼロにしていく作業はわかりやすいのですが、ゼロをどうプラスにしていくのか、どこをプラスにしていくのかは色々考えられるからです。 先に、どういう方向性で動いていくべきかについて書いておくと、 「SFAは、各ユーザー(経営層、マネジメント層、プレイヤー層)の意思決定の速度向上を最大化する方向性で構築、運用するべき」 と現段階では考えています。 よくある光景 なぜそのように考えるに至ったかをもう少し掘り下げていきます。 SFAを導入しているのであれば、案件(商談)やリードの管理、目標に対しての現在の進捗といったものはある程度可視化できていると思います。 弊社でもそうですが、おおよその営業組織は、週一などの頻度で定例を開き、Q単位での進捗や各案件の進捗等を共有することと思います。そうした中で、例えば「残り◯◯日で今Qの目標に対して◯◯円足りない」みたいな場面はよくあることと思います(あるいは、今Qはいいけど、翌Qのパイプラインが足りない、年間での目標達成があやしそうだ、等)。 そうした時に、以前も弊社ではよくあったのですが、「とりあえずいまの状況をレポートでまとめて現状を詳細に把握しよう。それを次回の定例で確認して対応策を考えよう」となりがちではないでしょうか。レポートをまとめるのは私のように事業部にいる非営業メンバーだったり、あるいは経営企画のような立ち位置の人が担当すると思いますが、翌週になってそのレポートをみんなで眺め、あれこれ対策を考えるものの、場合によっては「では、次回にこのレポートの情報をもとに、それぞれ対策を考えてきて発表しよう」、さらにその翌週に「では来週に今回でたアイデアの中からどれを実施するか決めよう」みたいなことになります。実際にはこんなに悠長に時間を使うことはないと思いますが、とはいえ、「目標達成があやしいぞ」となってから「どのように動くべきか」が決まるまでに1週間、2週間費やしてしまうこともままあることだと思います。このように時間を使ってしまえば、方針が決まってから動いてももうリカバリーができなかったり、当然時間が経過しているので、今Qだけでなく、翌Qにすら影響がでることも考えられます。 私見ではありますが、営業の成績は、その人、あるいはその組織がとったアクション量に比例すると考えています。上記のような例ですと、方針が定まらない限りメンバークラスはアクションを取りづらく、通常の営業活動にプラスアルファでなんらかのアクションをとらないといけないにも関わらず、それをできずに時間を消費してしまうわけです。 私自身他社さんの事例を多く知っているわけではないですが、しかし、こうした光景はわりとよくある光景なのではないかと考えています。 SFAはどう寄与できるか 案件(商談)は管理できている、現状の進捗も可視化できている、でも意思決定にまで寄与できていないのであれば片手落ちなのではないでしょうか。 例えば、各営業メンバーそれぞれの平均商談日数や、新規開拓と既存のお客様の深耕営業のどちらが得意なのか、それぞれの平均受注金額はいくらなのか、どういう業界が得意なのか、どういう規模の企業が得意なのか、etc.といったことが可視化されていればどうでしょうか。 ダッシュボードのサンプル(数字は隠してます) ここまで可視化されていて、リアルタイムにモニタリングできていれば、あとは逆算していけば各々がとるべきアクションは自然と決まってきます。 不足分が◯◯円だから、おおよそ◯案件分の受注が必要、残りの期間が◯◯日だから、平均商談日数をみると新規開拓よりも既存顧客へのアプローチのほうが可能性がある、深耕営業が得意なのはだれで、平均アプローチ数は◯回だから、これから一週間Aさん、Bさん、Cさんは既存顧客にたいしてこういう風にアプローチをしてください、それをデイリーで結果をみていきましょう、といった感じです。 ここまで落とし込めれば、あとは実行してその結果をみる、というPDCAをすぐに回せます。 これは必ずしも事業部単位だけの話ではなく、個々の営業メンバーの目標達成という観点でも活用できます。 極端な言い方をすれば、 「いかに考える時間を減らして、行動のための時間を増やすか」 ということです。 自分たちにとってのキーはなにか ただし、商談を受注へともっていくための重要なキー、指標はそれぞれの会社さんで異なると思います。訪問回数が大事、という企業さんもあれば、商談と商談の期間が大事であったり、あるいは、商談の中で製品のデモをやったかどうかが大事、といった企業さんもいるかもしれません。 さらに言えば、そうした指標とは別に、マネジメントレイヤーが各営業メンバーのどういった数値に着目してマネジメントをしているか、といったことも千差万別です。 何が言いたいかと言うと、「どの指標が自分たちにとって大事なのかは自分たちで見つけなければならない」ということです。 これさえやれば営業成績が向上する、といった銀の弾丸はありません。自分たちにとっての銀の弾丸を探るために、レポートで色々可視化をしていく、可視化をするために案件(商談)やリードに色々なデータをとってみる、という模索の期間が必要なのではないかと思います。 チームで立ち向かう 話を少し戻しまして、Salesforce社でのイベントで登壇をした際に、いくつかメッセージを発信したのですが、その中の一つに「とにかく現場に飛び込もう」というものがありました。 意図としては、管理者として現場と一線を引いて構築、運用をするのではなく、実際に使うユーザー(=営業メンバー)と同じ目線で改善ができるようになりましょう、という内容なのですが、いま考えると、実はこれがかなり大事なことなのではないかと思うようになりました。 自分たちにとっての重要なキーを探るためには、管理者が一人でがんばっていても限界があります。実際に現場に飛び込み、営業メンバーが何を指標としているのか、あるいは、指標をさぐるためにはどんなデータが必要なのか、そして、とったデータをどうすればシステムとして可視化できるかを共に探り、作り上げていく必要があります。同様に、マネジメントレイヤーが何をみているのか、どういうデータがあれば意思決定の時間を少しでも短く出来るのか、そしてそれをシステム上でどう表現するのかといったことも、一方通行ではなく双方向に協力をしていく必要があります。 つまり、管理者対ユーザーという関係性ではなく、管理者も現場も関係なく、同じ目的をもった一つのチームとして組織の壁を超えて協力をしていく必要があるのです。そうしたチームをつくるための第一歩が、現場に飛び込む、ということなのだと私は考えています。 3者のコミット 個人的には、Salesforceに限らず、SFAを導入するのであれば、管理者と営業の現場メンバー(マネジメントレイヤー含む)に加えて、経営層のコミットもSFAの十全な活用、運用には必要と考えています。 というのも、せっかく構築をして、運用を開始しても、経営層が使わないのであれば結局現場は使わなくなるからです。 Salesforceにはレポートの他にもダッシュボードがあり、そこでリアルタイムに営業活動で重要な情報はみることができます。それにもかかわらず、例えば経営会議ではSalesforceにはいっているデータをcsvで出力し、それをスプレッドシートで加工をしてパワーポイントに落とし込んでるようであれば意味がありません。もちろん、すべてがすべてダッシュボードを見れば済むということはないでしょうが、少なくともダッシュボードをみればわかるものはダッシュボードをみればいいわけです。率先垂範という言葉がありますが、トップから率先して使うのが、現場に定着させるためには大きな効果があるのは間違いありません。 経営層が「導入することにしたからあとはよろしく」、現場やマネジメントレイヤーが「上から言われたからとりあえず使います」、管理者が「営業のことはわからないので、とりあえずやりますけど、細かいことはなにかあったら言ってください」といった状態では、改善もすすみませんし、運用が定着するわけありません。 幸い、弊社は規模が小さいこともあり、3者のコミットはできていますが、もし自社はそうではないな、と感じるようであれば、機能を充実させるよりもまずは3者がコミットできるようにコミュニケーションを開始するところから手を付けたほうがよいかもしれません。 まとめ 長くなってきたのでそろそろ締めたいと思います。 昨年の記事から1年を通して、自分なりにはよい活動ができたのではないかと思っています。ですが、それは自分がたまたま元々営業の経験があり、現場とコミュニケーションをとりやすかったり、現場がとても協力的であったり、あるいは、こうしたシステムをいじるのが性に合っていたからという幸運に恵まれていたからだと思います。 いま現在管理者をやっている方の中には、そうではない方もいることでしょうし、何から手を付けていけばいいかもよくわからない、という方もいると思います。 そうした中で、そもそもなんで導入をするのか、という原点に立ち返って、一人で抱え込まずにまわりに仲間を増やし、チームで立ち向かっていくことが、時間はかかるものの、唯一のできることなのではないかと思います。 本記事が少しでも参考になっていると幸いです。 それではまた1年後にお会いしましょう!
intdashにリアルタイムでデータを登録したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 収集データをリアルタイムで可視化・確認したいニーズが増えています。 今回は収集データをリアルタイムに加工・登録する方法をご紹介します。 はじめに 前提 YOLOとは OpenCVとは Gstreamerとは PyGObjectとは インストール クライアントライブラリ サンプルプログラム用ライブラリ Gstreamerインストール Pythonパッケージインストール やってみた 全体構成 実行結果 サンプルプログラム 計測作成 リアルタイムAPI接続 データポイント受信 H.264アップストリーム Convertor Detector タイムアウトで完了 おわりに はじめに 今回の対象はリアルタイムAPIへのデータ送信です。 intdash SDK 前提 今回のサンプルプログラムで以下のライブラリを使います。 1 YOLO OpenCV Gstreamer PyGObject YOLOとは YOLO(You Only Look Once)はディープラーニングモデルです。 画像に写っている物体がどこにあり、何なのかを高速に認識できるのが特徴です。 ai-market.jp 今回使うのは物体検出の精度と速度を向上させたモデルであるYOLOv4の軽量版YOLOv4-tinyです。 OpenCVとは 画像の読み込み・表示・編集など画像処理のためののオープンソースライブラリです。 今回はOpenCVからYOLOv4-tinyモデルを呼び出し、矩形を描画します。 Gstreamerとは 動画・音声の変換などを行えるオープンソースのメディアフレームワークです。 前回 利用したFFmpegよりもリアルタイム処理に向いています。 今回はH.264データのデコード・エンコードに利用します。 PyGObjectとは Pythonから、C言語ライブラリで使用されるオブジェクト指向プログラミングのフレームワークGObjectを利用するためのライブラリです。 今回は、GObjectを基盤として構築されたGstreamerを制御するのに利用します。 インストール クライアントライブラリ クライアントライブラリは 前回 インストールしたパッケージを利用します。 サンプルプログラム用ライブラリ 利用ライブラリをインストールします。 Gstreamerインストール Gstreamerをインストールします。バージョン、カラーバー表示を確認します。 2 brew install gstreamer gst-launch-1. 0 --version gst-launch-1. 0 videotestsrc ! autovideosink Gstreamerインストール Gstreamerカラーバー表示 Pythonパッケージインストール Pythonパッケージをインストールします。 opencv-python: OpenCVのPythonラッパー numpy: 数値計算や配列操作のためのPythonライブラリ、画像編集に利用 PyGObject: GObjectのPythonラッパー pip install opencv-python numpy PyGObject Pythonパッケージのインストール やってみた 映像をリアルタイムで物体検出します。 全体構成 H.264データをダウンストリーム・加工してintdashにアップストリームします。 全体構成図 H.264データが可視化されるにはサーバーまでを2往復することになります。 違和感を感じさせないためには低遅延であることが重要です。 実行結果 サンプルプログラムを起動します。 python lesson4/src/detect_people.py --api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --edge_uuid < YOUR_EDGE_UUID > intdash Motion V2でデータ収集を開始します。 Video Data Type: h264_frame Data Name: 1/h264 せっかくなので検出人数が多いところで撮影しました。 www.youtube.com 検出物体の人を緑、他を赤にして矩形描画したH.264データをアップストリーム 検出人数もアップストリーム 利用SIMではLIVE再生だと元の映像含めて通信が不安定 元データと経過時間が同じにしているため、過去再生では完全に時刻同期 サンプルプログラム 構成が少し複雑になったので図示しました。 クラスアーキテクチャ フレームのデコード・物体検出・エンコードをスムーズに行うために非同期処理にしています。 Gstreamerパイプラインとキューを使って非同期処理間でデータ授受 新規計測として検出結果(矩形描画映像、検出人数)をアップストリーム 計測作成 アップストリーム先の計測をREST APIで作成します。 作成後に計測UUIDをアップストリーム処理に渡します。 measurement = self.writer.create_measurement( "Created by DetectService" ) await self.upstreamer.open(measurement.uuid) リアルタイムAPI接続 アップストリームのためにノードID(エッジUUID)を指定します。 帯域が混み合っていたため、 ping_timeout : Pingタイムアウト(秒) を延ばしました(デフォルト1秒)。 api_url_parsed = urllib.parse.urlparse(api_url) conn = await iscp.Conn.connect( address=f "{api_url_parsed.hostname}:{api_port}" , connector=iscp.WebSocketConnector(enable_tls= True ), token_source= lambda : api_token, project_uuid=project_uuid, node_id=edge_uuid, ping_timeout=ping_timeout, ) データポイント受信 起動後はダウンストリームでデータポイントを待ち受けます。 一定時間データポイントが発生しない場合は TimeoutError をraiseします。 async for msg in self.down.chunks(timeout=timeout): H.264アップストリーム Gstreamerから取得したフレームがIDRフレームかNon IDRフレームかを判定します。 H.264を規定するISO/IEC 14496-10のAnnex Bにおいて定義されているバイトストリームフォーマットのNAL Unit Typeが SPS (nal_type:7) Sequence Parameter Set:シーケンス全体に関するパラメータ情報 PPS (nal_type:8) Picture Parameter Set:個々のフレームまたはグループオブフレーム(GOF)に関するパラメータ情報 IDR (nal_type:5) が順序通りに存在する場合にIDRフレームと判断します。 # IDRフレームの順序判定 if nalu_type == 7 : sps_found = True elif nalu_type == 8 and sps_found: pps_found = True elif nalu_type == 5 and sps_found and pps_found: return True なお、エンコードしたH.264のIDRフレームの頻度はGstreamerパイプラインの """ key-int-max={key_int_max} """ で指定しています。 Convertor Gstreamerパイプラインをラップするクラスです。 push メソッドで1フレームを与えて、 get で取り出します。 Gstreamerがバッファするため、getでは非同期に読み出して返します。 while True : sample = await asyncio.to_thread(self.sink.emit, "pull-sample" ) Detector 物体検出クラスです。 特定の物体検出結果(人物)の矩形色変更、検出人数カウントをします。 if self.class_names[class_ids[i]] == "person" : color = ( 0 , 255 , 0 ) count = count + 1 else : color = ( 0 , 0 , 255 ) タイムアウトで完了 検出結果は1つの計測に格納されますが、元計測が複数あっても構いません。 TimeoutError が発生したら場合は計測を完了します。 except TimeoutError : self.writer.complete_measurement(measurement.uuid) おわりに 今回はリアルタイムAPIのデータ送信をご紹介しました。 REST APIの計測作成も含めた集大成の内容でした。 これでひと通りの機能を網羅して入門シリーズもひと区切りです。 今後はより実用向けの内容も紹介していきたいと考えています。 サンプルプログラムは GitHub で公開しています。 ↩ 初回の起動は遅いことがあるようです。 ↩
intdashからデータをリアルタイムで取得したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 intdashの魅力のひとつは低遅延でリアルタイム性の高いデータ伝送です。 今回は収集データをストリームで取得する方法をご紹介します。 はじめに リアルタイムAPI向けSDK 前提 RTSP(Real Time Streaming Protocol)とは H.264とは FFmpegとは MediaMTXとは インストール クライアントライブラリインストール 動作確認 FFmpegインストール MediaMTXインストール やってみた 全体構成 実行結果 映像比較 データ伝送の低遅延性 サンプルプログラム サーバー接続 データチャンク受信 遅延ロガー ffmpeg/ffplay起動 おわりに はじめに 今回の対象はリアルタイムAPIからのデータ取得です。 intdash SDK リアルタイムAPI向けSDK リアルタイムAPI向けのSDKを利用します。 RESTと異なる専用のAPIでクライアントライブラリが提供されています。 前回 と変わって導入方法はシンプルです。 今回は収集した映像データをリアルタイムに取得してRTSP配信します。 1 前提 サンプルプログラムで映像関連の利用技術が多いので説明です。 RTSP(Real Time Streaming Protocol)とは 主に監視やライブストリーミングで利用される通信プロトコルです。 数秒程度の遅延で映像を配信することができます。 今回はRTSPを入力とするシステムとの連携を想定して利用します。 映像はH.264で圧縮します。 H.264とは 動画圧縮規格です。 前のフレームから変化した部分だけを抽出してデータ量を低減できる仕組みです。 IDRフレーム/キーフレーム:フレーム画素の全量を持つ、定期的に送信 Non IDRフレーム/デルタフレーム:変化点の画素のみを持つ www.idknet.co.jp intdashでもエッジから伝送する映像の圧縮に利用しています。 今回H.264データをRTSP配信するために FFmpeg を利用します。 FFmpegとは FFmpeg(エフエフエムペグ)は動画や音声を変換・編集するためのオープンソースのマルチメディアツールです。 シンプルに利用でき、RTSPサーバーへの映像配信や可視化をしてくれます。 www.innoqos.com MediaMTXとは オープンソースのマルチメディアサーバーです。 RTSPサーバーとして利用します。 qiita.com インストール クライアントライブラリインストール 各言語向けに提供されているクライアントライブラリ iscp をインストールします。 . ./venv/bin/activate pip install iscp クライアントライブラリのインストール 動作確認 Pythonを起動してパッケージをimportします。 エラーが出なければ、正常に読み込めています。 python >>> import iscp import確認 FFmpegインストール FFmpegをインストールしてバージョンを確認します。 brew install ffmpeg ffmpeg -version FFmpegインストール インストールすると可視化ビューア ffplay も使えるようになります。 ffplay -f lavfi -i testsrc =duration = 10:size = 1280x720:rate = 30 カラーパターン再生 MediaMTXインストール MediaMTX のバイナリ(.tar.gz)をダウンロードして展開します。 設定ファイル mediamtx.yml を編集して、送信するデータに合わせてFPS設定を変えておきます。 rpiCameraFPS: 15 FPS設定変更 起動するとRTSPを8554ポートでリスニングしているのがわかります。 OSのセキュリティ警告が出たら許可します。 ./mediamtx MediaMTX動作確認 やってみた 全体構成 intdash Motion V2 から収集した映像をダウンストリームで取得します。 RTSPストリーム配信 撮影映像をH.264エンコードしてintdash ServerにiSCPで送信 サンプルプログラムがH.264をダウンストリームしてRTSPサーバーへ配信 ffplayでRTSPサーバーからストリームを受信して可視化 RTSP配信自体にかかる遅延との比較のため、ダウンストリーム直後もffplayで可視化しています。 実行結果 遅延を測りやすい被写体を撮影しました。 東京駅ヤエチカ(八重洲地下街) DROPCLOCK 機材をセットします。 通勤客の横で 先ほどの通り、MediaMTXを起動します。 サンプルプログラムを起動します。 python lesson3/src/rtsp_stream.py --api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --edge_uuid < YOUR_EDGE_UUID > ダウンストリーム開始 intdash Motion V2でデータ収集を開始します。 Data Type: h264_frame Data Name: 1/h264 MotionからH.264を送信 ffplayでRTSPストリームを受信します。 ffplay -window_title " After RTSP " rtsp://localhost:8554/stream ビューアーでRTSPを受信 映像比較 撮影した時刻を比較してみます。 撮影時刻比較 ミリ秒まで比較 全体構成図 全体構成図と見比べて説明します。 ①リアル世界です。 ②intdashアップストリーム直前のMotionのプレビューです。 ③intdashダウンストリーム直後のffplayの可視化です。 ④RTSP受信後のffplayの可視化です。 ①〜④は0.954秒ほどずれています。 2 ①〜③は0.370秒なのでRTSP配信の遅延が0.584秒なのがわかります。 データ伝送の低遅延性 ①〜③にはMotionのH.264エンコード、ffplayのH.264デコード時間が含まれます。 データ伝送のみの遅延を見るため、送信前〜受信後の時間をログ出力してみます。 intdashの遅延時間 遅延ログ ブレがありますが、90.4ミリ秒〜370.6ミリ秒で推移しています。 3 サンプルプログラム APIアクセス部分を中心に説明します。 サーバー接続 REST APIと同じくAPIトークンで接続しています。 製品開発以外ではサーバー構成はhttps://〜で提供されるため、固定でHTTPS(443ポート)を指定しています。 api_url_parsed = urllib.parse.urlparse(api_url) conn = await iscp.Conn.connect( address=f "{api_url_parsed.hostname}:{api_port}" , connector=iscp.WebSocketConnector(enable_tls= True ), token_source= lambda : api_token, project_uuid=project_uuid, ) データチャンク受信 H.264のIDRとNon IDRフレームに限定 空チャンク(H.264が含まれないチャンク)を省略 self.down = await self.conn.open_downstream( filters=[ iscp.DownstreamFilter( source_node_id=self.edge_uuid, data_filters=[ iscp.DataFilter(name= "1/h264" , type = "h264_frame/idr_frame" ), iscp.DataFilter(name= "1/h264" , type = "h264_frame/non_idr_frame" ), ], ) ], omit_empty_chunk= True , ) 遅延ログ出力のための経過時間、可視化のためのペイロードを返却 async for msg in self.down.chunks(): for group in msg.data_point_groups: for data_point in group.data_points: yield data_point.elapsed_time, data_point.payload 遅延ロガー 受信時刻 - 送信時刻で遅延時間を算出します。 計測の基準時刻とデータポイントの経過時間から送信時の絶対時刻を算出 ローカルPCで受信時刻との差分を算出 current_time = iscp.DateTime.utcnow() absolute_time_unix_nano = self.basetime.unix_nano() + elapsed_time absolute_time = iscp.DateTime.from_unix_nano(absolute_time_unix_nano) delay = (current_time.unix_nano() - absolute_time.unix_nano()) / 1_000_000 ffmpeg/ffplay起動 subprocessで起動します。 intdashからダウンストリームしたH.264を再エンコードせずに渡しています。 subprocess.Popen( [ "ffmpeg" , "-f" , "h264" , "-i" , "-" , "-fflags" , "nobuffer" , "-preset" , "ultrafast" , "-c:v" , "copy" , # 再エンコードなし "-f" , "rtsp" , RTSP_URL, ], stdin=subprocess.PIPE, stderr=sys.stderr if STDERR_FLG else subprocess.DEVNULL, ), subprocess.Popen( [ "ffplay" , "-f" , "h264" , "-fflags" , "nobuffer" , "-flags" , "low_delay" , "-" , "-window_title" , "Before RTSP" , ], stdin=subprocess.PIPE, stderr=sys.stderr if STDERR_FLG else subprocess.DEVNULL, ), おわりに 今回はリアルタイムAPIのデータ取得をご紹介しました。 映像を扱うのは難易度が高く、今回は手軽なFFmpegを利用しました。 次回はもっとリアルタイム映像データに適した取り扱い方に挑戦してみます。 サンプルプログラムを GitHub で公開しています。 ↩ Motion/PCともに同じLTE SIMで通信しています。フレームは640x480・15FPSです。 ↩ ネットワークの遅延 + intdashの遅延です。正確に測定するなら、潤沢な通信帯域・PCリソース・適切な試行回数が必要ですが、今回は簡単に測定しています。また、正確にはiPhoneとPCの時刻誤差(NTP精度)が含まれます。 ↩
計測データを自分でintdashに登録したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 エッジから収集したデータを加工して可視化・確認したいニーズが増えています。 今回はREST APIでintdashにデータを登録する方法を説明します。 はじめに データ送信のステータス管理 Protocol Buffersの利用 インストール Buf CLIインストール Protocol Buffersエンコーダーの生成 プロトコル定義ファイルのダウンロード エンコーダーの生成 protobufパッケージインストール 動作確認 やってみた データ移行ツール 実行結果 エクスポート インポート サンプルプログラム エクスポート インポート GPS距離計算 全体構成図 実行結果 サンプルプログラム おわりに はじめに 今回説明するのはREST APIへのデータ送信です。 intdash SDK 典型的な利用シーンは以下の通りです。 保留データの遅延アップロード リアルタイム送信時に送りきれなかったデータをあとで送信 低頻度データ送信 リアルタイムAPIを使わずにストリーム送信 データ送信のステータス管理 前回 のREST APIからのデータ取得は非常にシンプルでした。 データ送信は少し複雑です。 同時並行で送信される計測データの未回収/回収済を管理するためです。 計測シーケンス 計測はデータ送信を受け付ける単位として複数の計測シーケンスを持ちます。 計測シーケンスが回収する予定の総データポイント数を管理します。 データポイントはチャンクという単位でまとめて送信されます。 チャンクには計測シーケンス内で連番が振られます。 チャンクの送信が完了するとステータスが回収済になります。 回収済のデータポイント合計が計測の回収済みデータポイント数になります。 Protocol Buffersの利用 REST APIへのデータ送信には、Googleが開発したシリアライズフォーマットである Protocol Buffers を利用できます。 1 これによって大量データの送信が安定化します。 バイナリ化によるトラフィック軽減 シリアライズ・デシリアライズの高速化 データフォーマット定義の明確化 インストール REST APIでデータ送信するため、 前回のインストール手順 に加えてProtocol Buffersエンコーダーを導入します。 Protocol Buffersエンコーダーはintdash用のプロトコル定義ファイルから生成します。 Buf CLIインストール プロトコル定義ファイルからProtocol Buffersエンコーダーを生成するツール Buf CLIをインストールします。 brew install bufbuild/buf/buf buf --version Buf CLIのインストール Protocol Buffersエンコーダーの生成 Buf CLIでプロトコル定義ファイルからエンコーダーを生成します。 プロトコル定義ファイルのダウンロード intdash API specificationページ から プロトコル定義ファイルページ に遷移し、プロトコル定義ファイル protocol.proto をダウンロードします。 2 プロトコル定義ファイルページへのリンク プロトコル定義ファイルページ エンコーダーの生成 ワークディレクトリを作成してプロトコル定義ファイルを配置します。 プロトコル定義ファイルのパッケージ名をintdashに変更します。 mkdir -p proto/intdash/v1/ cp path/to/protocol.proto proto/intdash/v1/ sed -i -e " s/package pb;/package intdash.v1;/g " proto/intdash/v1/protocol.proto Buf CLIの定義ファイルを作成し、エンコーダーを生成します。 buf.yaml :Buf CLIのプロジェクト基本設定 buf.gen.yaml :Buf CLIのコード生成設定 エンコーダーは gen ディレクトリに出力されます。 cat << EOS > ./proto/buf.yaml version: v1 breaking: use: - FILE lint: use: - DEFAULT EOS cat << EOS > ./buf.gen.yaml version: v1 managed: enabled: true plugins: - plugin: buf.build/protocolbuffers/python:v23.4 out: gen EOS buf generate proto ls -l gen Protocol Buffersエンコーダーの生成 protobufパッケージインストール 生成したエンコードの実行にはPythonの protobuf パッケージが必要です。 仮想環境を有効化してインストールします。 . ./venv/bin/activate pip install protobuf 動作確認 Pythonを起動して関連クラスを import します。 エラーが出なければ、正常に読み込めています。 3 python >>> from gen.intdash.v1.protocol_pb2 import ( StoreDataChunk, StoreDataChunks, StoreDataID, StoreDataPoint, StoreDataPointGroup ) import確認 やってみた 今回は2つのサンプルプログラムを説明します。 4 データ移行ツール GPS距離計算 データ移行ツール 計測データをJSONファイルにエクスポートし、別計測として再びインポートします。 5 インポート時にProtocol Buffersエンコーダーを利用します。 データ移行ツール 実行結果 エクスポートとインポートで処理を分けています。 エクスポート PYTHONPATH が指定されていないときは設定します。 echo $PYTHONPATH export PYTHONPATH =/path/to/your_workspace: 対象計測を指定して実行します。 python lesson2/migrate/src/meas_export.py--api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --meas_uuid < YOUR_MEAS_UUID > エクスポート実行 JSONを見てみるとデータの構造がよくわかります。 計測、基準時刻 計測の主な項目は以下の通りです。 uuid : 計測UUID basetime : もっとも優先度の高い基準時刻 status : 計測の状態 expected_data_points : 総データポイント数 received_data_points : 回収済みデータポイント数 基準時刻は計測中に複数送信できるため、リストになっています。 id : 基準時刻のID basetime : 基準時刻 priority : 優先度 データポイント データポイントの主な項目は以下の通りです。 time : 絶対時刻(UNIXエポックからの経過時間、計測の基準時刻+経過時間) data_type : データ型 data_id : データを識別する名前 data.d : データペイロード(BASE64エンコード) この計測はiPhoneアプリ intdash Motion V2 で収集した位置情報です。 同じ経過時間で複数のデータ名が送信されているのがわかります。 インポート 別環境のエッジの計測としてデータを登録します。 python lesson2/migrate/src/meas_import.py --api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --edge_uuid < YOUR_EDGE_UUID > インポート実行 別環境に登録された計測 サンプルプログラム エクスポート こちらはデータ取得のみでREST APIの使い方は前回の内容とほぼ変わりません。 計測(マーカー含む)、基準時刻、データポイントを取得しています。 計測データポイント取得で time_format=“ns” を指定しています。デフォルトではマイクロ秒のため、正確にデータ移行する際には指定が必要です。 api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(client) stream = api.list_project_data_points( project_uuid=project_uuid, name=meas_uuid, time_format= "ns" ) 取得した日付項目はdatetime型であるため、JSON出力時にナノ秒精度の時刻文字列にしています。 if isinstance (obj, datetime): iso_str = obj.isoformat() if obj.microsecond: nano_str = "{:09d}" .format(obj.microsecond * 1000 ) iso_str = iso_str.replace(f ".{obj.microsecond:06d}" , f ".{nano_str}" ) return iso_str インポート エクスポートされたJSONファイルを読み込んで、新しい計測を作成します。 計測、基準時刻、マーカー情報を各APIのcreate_xxxメソッドで作成しています。 計測作成時に基準時刻が必ず1つできてしまうため、登録後に削除して、改めて作成し直しています。 current_basetimes = api.list_project_measurement_base_times( project_uuid=project_uuid, measurement_uuid=measurement_uuid, ) for bt_current in current_basetimes[ "items" ]: api.delete_project_measurement_base_time_by_id( project_uuid=project_uuid, measurement_uuid=measurement_uuid, id =bt_current.id, ) 計測シーケンスは置換メソッド replace_project_measurement_sequence が提供されています。 総データポイント数、回収済みデータポイント数を指定して計測シーケンスを登録します。 sequence_group = MeasurementSequenceGroupReplace( expected_data_points=measurement_src[ "sequences" ][ "expected_data_points" ], final_sequence_number=measurement_src[ "sequences" ][ "received_data_points" ], ) api = measurement_service_measurement_sequences_api.MeasurementServiceMeasurementSequencesApi( client ) sequence = api.replace_project_measurement_sequence( project_uuid=project_uuid, measurement_uuid=measurement_uuid, sequences_uuid=sequence_uuid if sequence_uuid else str (uuid.uuid4()), measurement_sequence_group_replace=sequence_group, ) チャンクの送信でProtocol Buffersエンコーダーを使います。 データID( type , name )とデータポイント(経過時間 elapsed_time と payload をチャンクに格納 store_data_point_group = StoreDataPointGroup( data_id=StoreDataID( type =data_point[ "data_type" ], name=data_point[ "data_name" ] ), data_points=[store_data_point], ) store_data_chunk = StoreDataChunk( sequence_number=sequence_number, data_point_groups=[store_data_point_group] ) chunks.append(store_data_chunk) sequence_number += 1 Content-type に protocolbuf を指定してデータポイントを格納したチャンクを送信 results = api.create_project_measurement_sequence_chunks( project_uuid=project_uuid, body=io.BytesIO(chunk.SerializeToString()), _content_type= "application/vnd.iscp.v2.protobuf" , ) すべてのデータポイントを送信したら、計測のステータスを完了にします。 api = measurement_service_measurements_api.MeasurementServiceMeasurementsApi(client) api.complete_project_measurement( measurement_uuid=measurement_uuid, project_uuid=project_uuid ) GPS距離計算 GPSデータごとに緯度経度〜基準点からの距離を計算してREST APIで登録します。 全体構成図 GPS距離計算 元の計測は変更せず、同じ基準時刻の計測を新たに作ります。 なお、元の計測に新たなデータポイントを追加することもできます。 6 実行結果 実行します。 python lesson2/distance/src/distance.py --api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --meas_uuid < YOUR_MEAS_UUID > 距離算出実行 intdashに登録した距離を元のGPSデータと一緒に可視化しました。 distance というValue Currentパーツにバインドしています。 基準点は品川駅 サンプルプログラム クラスを分割しました。 MeasurementReader:元計測データを取得 DistanceCalculator:原点からの距離を算出 MeasurementWriter:新規計測データを送信 DetectService:各クラスの統合 REST APIアクセス部はデータ移行ツールとほとんど同じですが、 一定件数ずつフェッチしつつ登録するように制御しています。 api = measurement_service_data_points_api.MeasurementServiceDataPointsApi( self.client ) stream = api.list_project_data_points( project_uuid=self.project_uuid, name=self.meas_uuid, data_id_filter=[ "#:1/gnss_coordinates" ], start=self.start, limit=fetch_size, time_format= "ns" , ) start に前回データポイントの時刻+1からフェッチを開始しています。 おわりに 今回はREST APIでのデータ送信をご紹介しました。 計測シーケンスやProtocol Bufferは少しとっつきづらく、ソースも複雑に見えますが、パターンがわかると応用できると思います。 次回はリアルタイムAPIを説明していきます。 Protocol Buffersを使わずに JSONで送信 もできます。送信データがProtocol BuffersでシリアライズされているかはHTTPヘッダの Content-Type でサーバーに通知します。 ↩ intdash API specificationページの参照にはユーザー登録が必要です。 ↩ 動作確認できたら、Buf CLI、ワークディレクトリ、プロトコル定義ファイルは削除しても構いません。 ↩ サンプルプログラムを GitHub で公開しています。 ↩ データポイントが数百件ほどの小さい計測を想定しています。 ↩ 計測から任意の計測データを変更・削除することはできません。当該データのみを変更・削除したい場合は、別に計測を作成して元の計測を削除します。 ↩
もっと柔軟にintdashを活用したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 近年、お客様やパートナー様が自らカスタム機能を実装する場面が増えています。 intdashではサーバーAPIにアクセスするためのSDKを提供しています。 構成を刷新したintdash SDKが2024年6月バージョンより正式リリースとなりました。 1 SDKは非常に幅広い使い方が可能です。 入門シリーズとして実践的な内容に落とし込んで解説していきます。 はじめに intdash APIとは intdash SDKとは intdash API/SDKでできること 目的別のインストール手順・必要ソフトウェア インストール 手順 OpenAPI GeneratorでSDK生成 Javaインストール npmインストール OpenAPI Generatorインストール intdash SDK for Python生成 Python実行環境設定 Pythonインストール Python仮想環境作成 依存パッケージインストール 動作確認 import確認 エッジ一覧取得 Python開発環境整備 Pythonパス設定 フォーマッター、Linterの整備 やってみた 準備 利用パッケージインストール 実行結果 サンプルプログラム説明 構成図 計測リスト取得 データポイントリスト取得 おわりに はじめに intdash APIとは 産業向けIoTデータ伝送プラットフォームであるintdashが、製品群やサーバー外にデータを受け渡すためのインタフェースです。 2つのAPIがあります。 種類 概要 通信プロトコル 用途 REST API いわゆるGETやPOSTのリクエスト・レスポンス HTTP/HTTPS ユーザー/エッジ/計測などの参照・更新 リアルタイムAPI 高速データ伝送向けの特殊なAPI aptpod独自の iSCP 計測データのストリーム 2 intdash SDKとは intdash APIを利用するためのクライアントライブラリーです。 各種プログラミング言語向け に対応しています。 intdash SDK 各種プログラミング言語の使いどころはこんな感じです。 言語 特徴・用途 Python 入門向け、プロトタイピング、AI/機械学習 Go 高速・並列サーバー処理、バックエンドシステム構築 Rust 高速・メモリ安全性、リソースが限られるエッジ処理 TypeScript ブラウザ・フロントエンド Swift iPhoneアプリ C# Windowsプラットフォーム、Unityアプリ Kotlin Androidアプリ、Java互換 この入門シリーズではintdash SDK for Pythonを使って説明します。 intdash API/SDKでできること intdash APIへのデータフローは 2つのAPI ✖️データ取得/データ送信 に大別され、これを組み合わせてカスタム処理を実装します。 入門シリーズでもこの順で説明していきます。 # データフロー 適用例 ① REST APIで計測データ取得 データ分析、レポーティング ② REST APIで計測データ送信 データ登録、保留データアップロード ③ リアルタイムAPIで計測データ取得 遠隔監視、リアルタイムデータ分析 ④ リアルタイムAPIで計測データ送信 遠隔制御、リアルタイムデータ登録 目的別のインストール手順・必要ソフトウェア 実現したいことによって対応した手順が必要です。 # SDKインストール手順 必要ソフトウェア ① OpenAPI Generator で生成 Java、npm ② OpenAPI Generator で生成 Protocol Buffersエンコーダーを生成 Java、npm、 Buf CLI ③ ④ パッケージをインストール 各言語のパッケージ管理ツール 具体的な手順は記事ごとに説明します。 3 インストール 今回はREST APIでデータ取得するためのインストール方法を説明します。 手順 少し多いため、説明しながら進めていきます。 OpenAPI Generator でSDK生成 Javaインストール npmインストール OpenAPI Generatorインストール intdash SDK for Python生成 Python実行環境設定 Pythonインストール Python仮想環境の設定 依存パッケージインストール 動作確認 import確認 エッジ一覧取得 Python開発環境整備 Pythonパス設定 フォーマッター、Linterの整備 サンプルプログラム準備 利用パッケージインストール サンプルプログラム実行 OpenAPI GeneratorでSDK生成 REST API用のSDKをOpenAPI Generatorで生成します。 Javaインストール OpenAPI Generator を実行するためにJava実行環境が必要です。 Javaが実行できるか確認します。 java --version 存在しない場合はインストールします。 brew install openjdk OpenJDKのインストールと確認 npmインストール OpenAPI Generator をインストールするために、Node.jsのパッケージ管理ツール npmが必要です。 npmはNode.jsに含まれているため、Node.jsをインストールします。 brew install node npm -v Node.jsのインストール OpenAPI Generatorインストール OpenAPI Generator は、API仕様に基づいてクライアントライブラリー、ドキュメントなどを自動生成するツールです。 インストールしてバージョンを確認します。 npm install @openapitools/openapi-generator-cli npx @openapitools/openapi-generator-cli version OpenAPI Generatorのインストール intdash SDK for Python生成 intdashのAPI仕様からSDKを生成します。 OpenAPI Specificationファイルのバージョンを指定します。 package-name で指定するパッケージ名でディレクトリが生成されます。 また、開発時に参照するドキュメントを生成しています。 VERSION =v2. 7 . 0 SRC_DIR = " . " ./node_modules/.bin/openapi-generator-cli version-manager set 6 . 1 . 0 ./node_modules/.bin/openapi-generator-cli generate \ -g python -i https://docs.intdash.jp/api/intdash-api/ ${VERSION} /openapi_public.yaml \ --package-name = intdash \ --additional-properties = generateSourceCodeOnly =true \ --global-property = modelTests =false, apiTests =false, modelDocs =true, apiDocs =true \ --http-user-agent = SDK-Sample-Python-Client/Gen-By-OASGenerator \ -o " $SRC_DIR " ls -l intdash intdash SDKの生成 Python実行環境設定 Pythonプログラムを実行する環境を構築します。 Pythonインストール Pythonが実行できるか確認します。 python --version 存在しない場合はインストールします。 brew install python python --version Pythonのインストール Python仮想環境作成 Python仮想環境を作成します。 ローカルPCのPythonバージョンアップが開発中のプログラムに影響するのを防ぐため、仮想環境は有効です。 Pythonと任意のバージョンを指定して仮想環境を作成します。 作成後に有効化して、Pythonとパッケージ管理ツール pipのバージョンを確認します。 python3.xx -m venv venv . ./venv/bin/activate python --version pip --version Python仮想環境の作成 依存パッケージインストール intdash SDKが利用するPythonパッケージをインストールします。 pydantic: データバリデーションおよびデータモデルライブラリ python-dateutil: 日付と時刻の操作ライブラリ urllib3: HTTPクライアントライブラリ pip install pydantic python-dateutil urllib3 必要パッケージのインストール これでSDKを利用できる準備が整いました。 動作確認 簡単なリクエストを試してみます。 import確認 Pythonを起動して package-name で指定したパッケージを import します。 エラーが出なければ、正常に読み込めています。 python >>> import intdash import確認 エッジ一覧取得 最も簡単な例としてエッジ一覧を取得してみます。 intdash APIの認証方法には3種類あります。 ユーザーで認証: ユーザーに紐づくAPIトークンで認証 ユーザー情報に基づいてアクセストークンを払い出して認証 エッジで認証: エッジ情報に基づいてアクセストークンを払い出して認証 今回はAPIトークンで認証します。 intdashのUIであるWeb ConsoleのMy Pageで払い出します。 APIトークン払い出し エッジサービスのAPIオブジェクトを生成してエッジ一覧の取得メソッドを呼びます。 from intdash import ApiClient, Configuration from intdash.api import authentication_service_edges_api # APIトークンで認証 client = ApiClient( Configuration( host= "https://example.intdash.jp/api" , api_key={ "IntdashToken" : "YOUR_API_TOKEN" }, ) ) # エッジサービスのAPIオブジェクトを生成 api = authentication_service_edges_api.AuthenticationServiceEdgesApi(client) # エッジ一覧の取得 edges = api.list_edges() print (edges) エッジ一覧取得 intdash SDKでREST APIにアクセスできました。 基本的にはこのように 認証 各サービスAPIのメソッドでREST APIにアクセス を実装していきます。 Python開発環境整備 開発効率を高めるために開発環境の整備は重要です。 開発者によって好みが分かれるところですが、このサンプル作成に使ったVS Code設定をご紹介します。 なお、作成した仮想環境をVS CodeのSelect Interpreterで指定しておくとターミナル起動時に楽です。 Pythonパス設定 生成したSDKを含めてPython実行時にサンプルプログラムを参照できるように環境変数 PYTHONPATH にワークスペースを設定します。 " terminal.integrated.env.osx ": { " PYTHONPATH ": " ${workspaceFolder} " } , フォーマッター、Linterの整備 VS Codeの拡張機能で以下をインストールします。 Ruff :Python向けの高速なフォーマッター兼Linter My Type Checker :Pythonの静的型チェックツール Ruffの設定です。 保存時に自動でフォーマット、Lintエラー修正、importの整列・削除をさせています。 " [python] ": { " editor.formatOnSave ": true , " editor.defaultFormatter ": " charliermarsh.ruff ", " editor.codeActionsOnSave ": { " source.fixAll.ruff ": " explicit ", " source.organizeImports.ruff ": " explicit " , }, } , My Type Checkerの設定です。 型チェック中に見つからないインポートがあってもエラーを無視します。外部ライブラリで型情報が不足している場合に有効です。 " mypy-type-checker.args ": [ " --disallow-untyped-defs ", " --ignore-missing-imports " , ] , " mypy-type-checker.showNotifications ": " onError ", やってみた 特定のエッジの計測のうち、GPSデータを全部取り出して地図上にプロットします。 サンプルプログラムをGitHubで公開しています。 github.com 対象とするエッジはこちらです。 歴戦の社用車 社用車には当社製エッジコンピューター EDGEPLANT T1 が搭載されています。 開発用(上段)と営業デモ用(下段) 準備 追加で実行前の準備があります。 利用パッケージインストール サンプルプログラムに必要なパッケージをインストールします。 pip install folium matplotlib folium: OpenStreetMap表示 matplotlib: 地図プロット時のカラー情報制御 実行結果 PYTHONPATH が指定されていないときは設定します。 echo $PYTHONPATH export PYTHONPATH =/path/to/your_workspace: 実行します。 python lesson1/src/gnss_plot.py --api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --edge_uuids < YOUR_EDGE_UUID 1> < YOUR_EDGE_UUID 2> < YOUR_EDGE_UUID 3> 社用車のエッジUUIDは何回か変わっているため、複数指定できるようにしました。 実行 HTMLが出力されます。 1都10県 GPSデータの緯度経度をプロットしています。 2020年1月からの計測1,400件を取得 取り出したGPSデータの緯度経度を10分ごとにサンプリング 会社からの距離が近いほど青く遠いほど赤く描画 訪問先でエッジを起動した場合はルート途中は描画されない サンプルプログラム説明 REST APIアクセス部分を解説します。 構成図 時系列データベースに蓄積された計測データをREST APIから取り出します。 GSPデータを取得して地図描画 計測リスト取得 プロジェクトUUID、エッジUUIDを指定して計測のリストを取得しています。 api = measurement_service_measurements_api.MeasurementServiceMeasurementsApi(client) measurements = api.list_project_measurements( project_uuid=project_uuid, edge_uuid=edge_uuid, limit=LIMIT ) 開発中の大量データ取得防止のため limit で取得件数を絞っています。 データポイントリスト取得 計測ごとにデータポイントを取得します。 取得対象を緯度経度に限定します。 data_id_filter で以下のいずれかを指定していします。 #:0/GNRMC T1から収集されるNMEAフォーマットの緯度経度を含む GNRMC #:1/gnss_coordinates iPhoneアプリ intdash Motion V2 で収集されるフォーマットの緯度経度 api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(client) stream = api.list_project_data_points( project_uuid=project_uuid, name=meas_uuid, data_id_filter=[ "#:0/GNRMC" , # NMEA "#:1/gnss_coordinates" , # intdash Motion ], ) データポイントは複数行に分かれたJSONであるJSONLines形式で取得されます。 1行ごとにJSON化して、データを取り出します。 #:0/GNRMC JSONの data.s から取得 旧プロトコルiSCPv1で送信されたデータの格納形式 社用車のT1がiSCPv1で送信するため NMEAフォーマットのCSV形式をパースして緯度経度を取得 #:1/gnss_coordinates JSONの data.d から取得 新プロトコルiSCPv2の格納形式 BASE64文字列をデコードしてバイナリ値を取得 2次元ベクトルであるバイナリ値から64ビット浮動小数点数値を2つ取得 line_json = json.loads(line.decode()) if "data" in line_json: # iSCPv1 NMEA if "s" in line_json[ "data" ]: x, y = parse_gnrmc(line_json[ "data" ][ "s" ]) # iSCPv2 elif "d" in line_json[ "data" ]: base64_encoded = line_json[ "data" ][ "d" ] bin_data = base64.b64decode(base64_encoded) x, y = struct.unpack( ">dd" , bin_data) おわりに 今回はシリーズ初回としてSDKの概要とREST APIでのデータ取得をご紹介しました。 次回はより高度な使い方を試していきます。 今回少し触れた取得データの形式についてもより詳細に説明します。 2022年11月バージョンからPublic Beta版を利用できました。 ↩ ストリームとは ↩ 記事ではMacでの手順を説明します。 GitHub にWindowsの手順も掲載します。 ↩
intdashとタイムリーにシステム連携したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 2023年12月バージョンから待望のWebhook機能が利用できるようになったのでご紹介します。 はじめに Webhookとは やってみた Webhookリクエストの確認 計測作成イベント:Slack 計測完了イベント:地図マッチング 地図マッチングとは 全体構成 GPS補正Lambda レスポンス返却Lambda API Gateway 起動 おわりに Appendix. サンプルソース 計測作成イベント:Slack Slack通知Lambda Pythonプログラム 計測完了イベント:地図マッチング GPS補正Lambda Pythonプログラム GPS補正Lambda bootstrapシェル GPS補正Lambda コンテナDockerfile レスポンス返却Lambda Pythonプログラム はじめに Webhookとは システム内イベントをシステム外にHTTPで通知する仕組みです。 これでintdash内の変化をintdash外からリアルタイムに知ることができます。 イベントドリブンなシステム連携により、効率的なワークフローを実現できます。 intdash Webhook 以下のintdashリソースのイベントを通知することができます。 計測:作成・変更・削除・完了・終了 1 接続:接続・切断・アップストリーム開始・ダウンストリーム開始 エッジ:作成・変更・削除 ユーザー:作成・変更・削除 テナント:作成・変更・削除 プロジェクト の所属エッジ:追加・変更・除外 プロジェクト の所属メンバー:追加・変更・除外 やってみた Webhookリクエストの確認 早速やってみます。 まずは、WebhookのHTTPリクエストの内容を見てみましょう。 試しにWebhook.siteに通知 アクセスするだけでWebhook用のURLを払い出せるWebhook.siteを利用します。 webhook.site Webhook.siteで払い出されたURLと、通知するイベントをintdash APIに登録します。 計測リソースのイベント measurements_event を通知するよう登録します。 通知先URLと通知イベント種類を登録 スマートフォンアプリ intdash Motion で計測すると、Webhook.siteに通知されます。 内容を見ると measurement リソースの created アクションであること、計測のUUIDと発生時刻がわかります。 右上がヘッダー、下がボディ リクエストボディはこのようなかたちです。 通知先システムはここから必要な情報取り出して処理を行います。 共通 delivery_uuid : Webhook通知のID hook_uuid : Webhook設定のID resource_type ": リソースの種類 action : リソースに対するイベントの種類 occurred_at : 発生時刻 リソース別:計測の場合 project_uuid : プロジェクトのID measurement_uuid : 計測のID ヘッダーには以下の項目があります。 X-Intdash-Signature-256 : HMAC リクエスト改竄防止の仕組み Webhook設定にシークレット(Key)を登録すると、シークレットとリクエストボディで生成したメッセージ認証コード(MAC)を付与 通知先システムで同じシークレットを保持し、受け取ったリクエストボディとあわせてMAC値を生成して比較検証 メッセージ認証符号 - Wikipedia 計測作成イベント:Slack では、簡単なシステム連携を実装してみます。 AWS LambdaでSlackに計測開始を通知します。 Slack通知の構成 Slackチャンネルの通知用URLを払い出します。 2 Slackの カスタムインテグレーション Slackに通知するLambda関数を作成します。 3 Slack通知Lambda 処理はこんな感じです。 HMAC検証 計測作成以外のイベントは無視 Slack通知 intdash Webhookにレスポンスを返却 API Gatewayを作成して、Slack通知Lambdaを統合します。 API Gatewayに統合 作成したAPI GatewayのURLを確認 先ほど登録したintdash Webhookの設定をAPI GatewayのURLに変更します。 通知先URLを変更 計測を開始するとSlackに通知されます。 計測開始をSlack通知 通知の Data Visualizer のリンクをクリックすると自動でLIVE再生が始まるようにしてみました。 Data Visualizer のURLに、準備しておいたスクリーン Motion で自動LIVE再生するよう screenName=Motion&playMode=live というクエリストリングを付与しています。 都会の空は曇天模様 計測完了イベント:地図マッチング 少し時間のかかる分析処理を回してみます。 GPSデータを地図マッチングで補正します。 地図マッチングとは GPSデータの緯度経度を道路に沿うように補正します。 今回使うFast Map Matching (FMM) はその新し目のアルゴリズムです。 zenn.dev 全体構成 利用するライブラリがたくさんあるため、コンテナイメージでGPS補正Lambdaを構築します。 GPS補正Lambdaはコンテナをロードしたり、道路ネットワークのノードを事前計算します。 intdash Webhookは通知後に10秒以内にレスポンスを返す必要があるため 4 、先にレスポンスを返却する別のLambdaを設けます。 地図マッチングの構成 さらに時間かかかる場合はAWS Step FunctionsやAWS Batchを検討することになるでしょう。 GPS補正Lambda コンテナで実行するPythonプログラムを作成します。 処理はこのようになります。 5 intdash SDKでエッジ名、計測時刻、GPSデータを取得 GPSデータ範囲のOpenStreetMapから道路データを取得 道路データをもとに道路ネットワークモデルを構築 道路ネットワークモデルにGPSデータをマッチング マッチング前後のGPSデータを地図HTMLにプロット 地図HTMLをS3にアップロード マッチング前後の総走行距離を計算 総走行距離と地図HTMLのURLをSlack通知 コンテナ起動時に上記プログラムを実行するカスタムランタイムを作成します。 6 コンテナイメージをビルドし、Amazon ECRにプッシュします。 Dockerfileはこのようになります。 7 必要なライブラリをインストール FMMソースをビルド intdash SDKをOpenAPIで生成 Pythonプログラム、カスタムランタイムをローカルからコピー GPS補正Lambdaを作成します。 コンテナイメージから作成 タイムアウトは10分、FMMが使うのでメモリを1024MBにしました。 レスポンス返却Lambda もう1つのLambda関数を作成します。 8 処理はこれだけです。 HMAC検証 計測完了以外は無視 GPS補正Lambdaを非同期起動 intdash Webhookのレスポンスを返却 レスポンス返却Lambda API Gateway さきほどのAPI Gatewayの統合先をレスポンス返却Lambdaに変更します。 起動Lambdaを変更 起動 intdash Motionで、計測時にあげきれていなかったデータを遅延アップロードします。 GPSがずれやすいトンネルや急カーブのあるルートを選択します。 Motionから遅延アップロード サーバー上で計測データがすべて回収済みになり、計測が完了します。 計測が完了 Webhook設定により、計測完了イベントが通知されます。 2分ほど待つとSlackにGPS補正結果が通知されます。 補正結果の通知 OpenStreetMap のリンクをクリックすると、S3に配置した地図HTMLが開くようにしてみました。 来んかな福江島 ブルーの線が補正前のGPSデータ、ピンクの線が補正後のGPSデータです。 ズームすると、信号が途絶するトンネルや位置がぶれやすい海岸沿いの急カーブのデータが補正されて、道路に沿っているのがわかります。 ざぁま海と山 おわりに Webhookによりリアルタイムなシステム連携を実現でき、運用ワークフローが大幅に改善します。 他にも、稼働スケジュールと突き合わせてエッジの予定外起動を検知したり、 処理を逐次起動できるので定期バッチに比べて負荷の平準化も期待できます。 さらに以前ご紹介した永続化拡張と組み合わせて、intdash APIにアクセスする処理を実装せずにデータ分析を組むことも可能です。 tech.aptpod.co.jp intdashを使ったシステム構築はぜひaptpodにご相談ください。 Appendix. サンプルソース 計測作成イベント:Slack Slack通知Lambda Pythonプログラム import logging import json import hashlib import base64 import hmac import requests logger = logging.getLogger() logger.setLevel(logging.INFO) # 定数定義 SECRET_KEY = 'YOUR_SECRET_KEY' SLACK_WEBHOOK_URL = 'YOUR_SLACK_WEBHOOK_URL' def verify_hmac (secret, payload, received): """ HMAC検証 Args: secret (str): シークレットキー payload (str): ペイロード received (str): 受信したHMAC Returns: bool: 検証結果(True: 成功, False: 失敗) """ hmac_obj = hmac.new(secret.encode(), payload.encode(), hashlib.sha256) computed = base64.b64encode(hmac_obj.digest()).decode() return hmac.compare_digest(computed, received) def send_notification (project_uuid, meas_uuid): """ Slack通知 @param project_uuid: プロジェクトUUID @param meas_uuid: 計測UUID @return: なし """ message = { 'attachments' : [ { 'color' : '#de82a7' , 'author_name' : 'intdash Webhook' , "author_icon" : "YOUR_ICON_URL" , 'title' : "LIVE再生中です" , "fields" : [ { "title" : "計測" , "value" : f "<https://example.intdash.jp/console/measurements/{meas_uuid}/?projectUuid={project_uuid}|Meas Hub>" }, { "title" : "リアルタイム再生" , "value" : f "<https://example.intdash.jp/vm2m/?projectUuid={project_uuid}&screenName=Motion&playMode=live|Data Visualizer>" } ], "footer" : "計測したら即分析!WebhookでGPS補正を起動してみた" , "footer_icon" : "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTkbv74KdmChO7FenKcskkqIZTIYEMjJSisLjZVk5_O7pe-QKEBb1Kntdx-grb7dvdsNDs&usqp=CAU" , 'mrkdwn_in' : [ 'text' , 'fields' ] } ] } try : response = requests.post( SLACK_WEBHOOK_URL, data=json.dumps(message), headers={ 'Content-Type' : 'application/json' } ) if response.status_code != 200 : logger.error(f "Failed to send Slack notification. Status Code: {response.status_code}, Response: {response.text}" ) logger.info( "Slack notification sent successfully." ) except Exception as e: logger.error(f "Exception occurred while sending Slack notification: {e}" ) def lambda_handler (event, context): """ エントリポイント Args: event (dict): イベント context (LambdaContext): コンテキスト Returns: dict: APIレスポンス """ headers = event.get( 'headers' , {}) body = event.get( 'body' , '{}' ) logger.info(f "Received event: {event}" ) body_dict = json.loads(body) payload = json.dumps(body_dict, separators=( ',' , ':' )) # HMAC検証 signature = headers.get( 'x-intdash-signature-256' , '' ) if not verify_hmac(SECRET_KEY, payload, signature): logger.warning( "HMAC Verification Failed." ) return { 'statusCode' : 401 , 'body' : json.dumps({ 'message' : 'HMAC verification failed' }) } # リソースタイプとアクションの判定 resource_type = body_dict.get( 'resource_type' ) action = body_dict.get( 'action' ) if resource_type != 'measurement' or action != 'created' : logger.info(f "Ignored: resource_type={resource_type}, action={action}" ) return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'Resource_type or action ignored' }) } # Slack通知 try : project_uuid = body_dict.get( 'project_uuid' ) meas_uuid = body_dict.get( 'measurement_uuid' ) send_notification(project_uuid, meas_uuid) except Exception as e: logger.error(f "Failed to Slack notification: {e}" ) return { 'statusCode' : 500 , 'body' : json.dumps({ 'message' : 'Failed to Slack notification' }) } return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'Webhook received!' }) } 計測完了イベント:地図マッチング GPS補正Lambda Pythonプログラム import sys import os import json import logging import time import datetime import pytz import requests from apiclient import Configuration, ApiClient from apiclient.api import authentication_service_edges_api, measurement_service_measurements_api, measurement_service_data_points_api import struct import base64 import numpy as np import pandas as pd import geopandas as gpd import fmm from math import radians, sin, cos, sqrt, atan2 import osmnx as ox from shapely.geometry import Polygon import folium import boto3 logger = logging.getLogger() if not logger.hasHandlers(): handler = logging.StreamHandler() logger.addHandler(handler) logger.setLevel(logging.INFO) # 環境変数 os.environ[ 'MPLCONFIGDIR' ] = '/tmp' # ローカルストレージ data_path = "/tmp/" # intdash API設定 BASE_URL = "http://example.intdash.jp/api" API_TOKEN = "YOUR_API_TOKEN" # S3バケット設定 BUCKET_NAME = 'YOUR_BUCKET_NAME' AWS_REGION = 'YOUR_AWS_REGION' ACCESS_KEY = 'YOUR_ACCESS_KEY' SECRET_KEY = 'YOUR_SECRET_KEY' # Slack Webhook URL SLACK_WEBHOOK_URL = 'YOUR_SLACK_WEBHOOK_URL' def get_edge (edge_uuid): """ エッジ情報取得 @param edge_id: エッジUUID @return: エッジ情報 """ configuration = Configuration(host=BASE_URL, api_key={ "IntdashToken" : API_TOKEN}) with ApiClient(configuration) as api_client: api_instance = authentication_service_edges_api.AuthenticationServiceEdgesApi(api_client=api_client) edge = api_instance.get_edge(edge_uuid=edge_uuid) return edge def get_meas (project_uuid, meas_uuid): """ 計測情報取得 @param project_uuid: プロジェクトUUID @param meas_uuid: 計測UUID @return: 計測情報 """ configuration = Configuration(host=BASE_URL, api_key={ "IntdashToken" : API_TOKEN}) with ApiClient(configuration) as api_client: api_instance = measurement_service_measurements_api.MeasurementServiceMeasurementsApi(api_client=api_client) meas = api_instance.get_project_measurement(project_uuid=project_uuid, measurement_uuid=meas_uuid) return meas def get_gnss (project_uuid, meas_uuid): """ GNSSデータ取得 計測データからGNSS (1/gnss_coordinates) を抽出 @param project_uuid: プロジェクトUUID @param meas_id: 計測UUID @return: [(x, y)] """ configuration = Configuration(host=BASE_URL, api_key={ "IntdashToken" : API_TOKEN}) with ApiClient(configuration) as api_client: api_instance = measurement_service_data_points_api.MeasurementServiceDataPointsApi(api_client=api_client) stream = api_instance.list_project_data_points(project_uuid=project_uuid, name=meas_uuid) coordinates = [] while True : line = stream.readline() if not line: break resp = json.loads(line.decode()) if 'data' in resp and 'd' in resp[ 'data' ]: base64_encoded = resp[ 'data' ][ 'd' ] bin_data = base64.b64decode(base64_encoded) data_id = resp.get( 'data_id' , 'N/A' ) if data_id == '1/gnss_coordinates' : x, y = struct.unpack( '>dd' , bin_data) coordinates.append((x, y)) return coordinates def find_bounds (coordinates): """ 緯度経度境界探索 緯度経度の最小値-0.01, 最大値+0.01で境界を作成 @param coordinates: GPS座標のリスト ([(lat, lon)]) @return: [(min_lat-0.01, min_lon-0.01), (max_lat+0.01, max_lon+0.01)] のタプル """ lats = [lat for lat, lon in coordinates] lons = [lon for lat, lon in coordinates] min_lat, max_lat = min (lats), max (lats) min_lon, max_lon = min (lons), max (lons) bounds = [(min_lat - 0.01 , min_lon - 0.01 ), (max_lat + 0.01 , max_lon + 0.01 )] return bounds def save_shape (G, filepath= None , encoding= "utf-8" ): """ シェープファイル保存 @param G: グラフオブジェクト @param filepath: 保存先のディレクトリ @param encoding: ファイルエンコーディング """ if filepath is None : filepath = os.path.join(ox.settings.data_folder, "graph_shapefile" ) if not os.path.exists(filepath): os.makedirs(filepath) filepath_nodes = os.path.join(filepath, "nodes.shp" ) filepath_edges = os.path.join(filepath, "edges.shp" ) gdf_nodes, gdf_edges = ox.convert.graph_to_gdfs(G) gdf_nodes = ox.io._stringify_nonnumeric_cols(gdf_nodes) gdf_edges = ox.io._stringify_nonnumeric_cols(gdf_edges) gdf_edges[ "fid" ] = np.arange( 0 , gdf_edges.shape[ 0 ], dtype= 'int' ) gdf_nodes.to_file(filepath_nodes, encoding=encoding) gdf_edges.to_file(filepath_edges, encoding=encoding) def sort_coordinates (matched_edges, cpath_ids): """ 経路並び替え cpath_idsに基づいて経路を並び替える @param matched_edges: マッチングされた経路のGeoDataFrame @param cpath_ids: FMMのマッチ結果であるcpathのIDリスト @return: 並び替えられたGPS座標のリスト ([(lat, lon)]形式) """ ordered_coordinates = [] for fid in cpath_ids: edge = matched_edges[matched_edges[ 'fid' ] == fid] if not edge.empty: linestring = edge.iloc[ 0 ][ 'geometry' ] coords = [(lat, lon) for lon, lat in list (linestring.coords)] ordered_coordinates.extend(coords) return ordered_coordinates def calculate_distance (coord1, coord2): """ GPS座標間距離計算 @param coord1: (lat, lon) 1つ目の座標 @param coord2: (lat, lon) 2つ目の座標 @return: 2点間の距離 (km) """ R = 6371 lat1, lon1 = coord1 lat2, lon2 = coord2 dlat = radians(lat2 - lat1) dlon = radians(lon2 - lon1) a = sin(dlat / 2 )** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2 )** 2 c = 2 * atan2(sqrt(a), sqrt( 1 - a)) return R * c def total_distance (coordinates): """ 総走行距離計算 @param coordinates: GPS座標のリスト ([(lat, lon)]) @return: 総走行距離 (km) """ distance = 0.0 for i in range ( 1 , len (coordinates)): distance += calculate_distance(coordinates[i - 1 ], coordinates[i]) return distance def upload_file (filepath, bucket_name, key): """ S3アップロード @param filepath: アップロードするファイルのパス @param bucket_name: S3バケット名 @param key: S3内でのファイルキー @return: アップロードされたファイルの公開URL """ s3_client = boto3.client( 's3' , region_name=AWS_REGION, aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY) try : s3_client.upload_file(filepath, bucket_name, key, ExtraArgs={ 'ContentType' : 'text/html' }) url = f "https://{bucket_name}.s3.{AWS_REGION}.amazonaws.com/{key}" return url except Exception as e: logger.error(f "Error uploading to S3: {e}" ) return None def send_notification (project_uuid, edge, meas, distance_origin, distance_matched, map_url): """ Slack通知 @param project_uuid: プロジェクトUUID @param edge: エッジ情報 @param meas: 計測情報 @param distance_origin: 補正前の総走行距離 (km) @param distance_matched: 補正後の総走行距離 (km) @param map_url: 地図のURL (OpenStreetMap) @return: なし """ edge_uuid = edge[ 'uuid' ] edge_name = edge[ 'name' ] meas_uuid = meas[ 'uuid' ] jst = pytz.timezone( 'Asia/Tokyo' ) base_time = meas[ 'basetime' ].astimezone(jst) start_time = base_time.strftime( '%Y/%m/%d %H:%M:%S' ) duration = meas[ 'duration' ] / 1_000_000 end_time = (base_time + datetime.timedelta(seconds=duration)).strftime( '%Y/%m/%d %H:%M:%S' ) message = { 'attachments' : [ { 'color' : '#00bfff' , 'author_name' : 'Fast Map Matching' , "author_icon" : "YOUR_ICON" , 'title' : "GPSデータを補正しました" , "fields" : [ { "title" : "エッジ" , "value" : f "<https://example.intdash.jp/console/admin/edges/{edge_uuid}|{edge_name}>" }, { "title" : "計測" , "value" : f "<https://example.intdash.jp/console/measurements/{meas_uuid}/?projectUuid={project_uuid}|{start_time} - {end_time}>" }, { "title" : "総走行距離 補正前" , "value" : f "{distance_origin:.2f} km" }, { "title" : "総走行距離 補正後" , "value" : f "{distance_matched:.2f} km" }, { "title" : "走行ルート" , "value" : f "<{map_url}|OpenStreetMap>" } ], "footer" : "計測したら即分析!WebhookでGPS補正を起動してみた" , "footer_icon" : "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTkbv74KdmChO7FenKcskkqIZTIYEMjJSisLjZVk5_O7pe-QKEBb1Kntdx-grb7dvdsNDs&usqp=CAU" , 'mrkdwn_in' : [ 'text' , 'fields' ] } ] } try : response = requests.post( SLACK_WEBHOOK_URL, data=json.dumps(message), headers={ 'Content-Type' : 'application/json' } ) if response.status_code != 200 : logger.error(f "Failed to send Slack notification. Status Code: {response.status_code}, Response: {response.text}" ) logger.info( "Slack notification sent successfully." ) except Exception as e: logger.error(f "Exception occurred while sending Slack notification: {e}" ) def lambda_handler (event, context): logger.info(f "Received event: {event}" ) try : ## 入力 # GPSデータ読み込み project_uuid = event.get( 'project_uuid' ) meas_uuid = event.get( 'measurement_uuid' ) coordinates = get_gnss(project_uuid, meas_uuid) logger.debug(f "Coordinates: {coordinates}" ) if not coordinates: return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'No coordinates data found' }) } fmm_coordinates = [[lon, lat] for lat, lon in coordinates] # fmmは[[lon, lat]]形式 # OSMデータの取得とシェープファイルへの保存 bounds = find_bounds(coordinates) (min_lat, min_lon), (max_lat, max_lon) = bounds boundary_polygon = Polygon([(min_lon, min_lat), (max_lon, min_lat), (max_lon, max_lat), (min_lon, max_lat)]) G = ox.graph_from_polygon(boundary_polygon, network_type= 'drive' ) save_shape(G, filepath=f '{data_path}' ) # 道路ネットワークデータの読み込み edges = gpd.read_file(f "{data_path}/edges.shp" ) # 経路: 道路ネットワークの2つのノード(交差点など)を結ぶ線 network = fmm.Network(f "{data_path}/edges.shp" , "fid" , "u" , "v" ) graph = fmm.NetworkGraph(network) logger.info(f "Loaded {network.get_edge_count()} edges and {network.get_node_count()} nodes from shapefiles." ) ## 地図マッチング # UBODTの生成・読み込み ubodt_gen = fmm.UBODTGenAlgorithm(network, graph) ubodt_gen.generate_ubodt(f "{data_path}/ubodt.txt" , 0.02 , binary= False , use_omp= True ) ubodt = fmm.UBODT.read_ubodt_csv(f "{data_path}/ubodt.txt" ) # FMMモデルマッチング model = fmm.FastMapMatch(network, graph, ubodt) coordinates_str_list = [f "{lon} {lat}" for lon, lat in fmm_coordinates] wkt = f "LINESTRING({','.join(coordinates_str_list)})" fmm_config = fmm.FastMapMatchConfig( 5 , 20 , 15 ) # 候補数, 探索半径(m), GPS誤差(m) result = model.match_wkt(wkt, fmm_config) logger.info(f "Matched {len(result.cpath)} edges." ) ## 出力 # 地図作成 m = folium.Map(location=[np.mean([bounds[ 0 ][ 0 ], bounds[ 1 ][ 0 ]]), np.mean([bounds[ 0 ][ 1 ], bounds[ 1 ][ 1 ]])], zoom_start= 12 ) # 補正前GPSデータをプロット folium.PolyLine(coordinates, color= 'deepskyblue' , weight= 8 , opacity= 0.5 , popup= 'Original Route' ).add_to(m) folium.Marker(location=coordinates[ 0 ], popup= 'Start' , icon=folium.Icon(color= 'lightred' , icon= 'play' , prefix= 'fa' )).add_to(m) folium.Marker(location=coordinates[- 1 ], popup= 'Goal' , icon=folium.Icon(color= 'lightred' , icon= 'flag' , prefix= 'fa' )).add_to(m) # 補正後のルートをプロット matched_edges = edges[edges.fid.isin(result.cpath)] matched_coordinates = sort_coordinates(matched_edges, result.cpath) folium.PolyLine(matched_coordinates, color= 'deeppink' , weight= 7 , opacity= 0.6 , popup= 'Matched Route' ).add_to(m) # マップ保存・S3アップロード map_file = f "map_{meas_uuid}.html" map_path = f "{data_path}/{map_file}" m.save(map_file) map_url = upload_file(map_file, BUCKET_NAME, map_file) if not map_url: logger.error( "Map upload failed." ) logger.info(f "Map uploaded to S3: {map_url}" ) # 総走行距離を計算 total_distance_origin = total_distance(coordinates) total_distance_matched = total_distance(matched_coordinates) logger.info(f "補正前の総走行距離: {total_distance_origin:.2f} km" ) logger.info(f "補正後の総走行距離: {total_distance_matched:.2f} km" ) # Slack通知 meas = get_meas(project_uuid, meas_uuid) edge = get_edge(meas[ 'edge_uuid' ]) send_notification(project_uuid, edge, meas, total_distance_origin, total_distance_matched, map_url) return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'FMM completed and Slack notification sent' }) } except Exception as e: logger.error(f "Error processing event: {str(e)}" ) return { 'statusCode' : 500 , 'body' : json.dumps({ 'error' : str (e)}) } if __name__ == "__main__" : # ローカルテスト用 if len (sys.argv) > 1 : with open (sys.argv[ 1 ], 'r' ) as f: event = json.load(f) lambda_handler(event, None ) else : print ( "Usage: python3 app.py <event.json>" ) GPS補正Lambda bootstrapシェル #!/bin/bash # ------------------------------------------------------------------- # bootstrap # ------------------------------------------------------------------- # カスタムランタイム # # コンテナ起動時に実行 # LambdaランタイムAPIからイベントリクエスト取得 # リクエストイベントを引数にlambda_handler起動 # 結果をLambdaランタイムAPIに登録 # ------------------------------------------------------------------- set -euo pipefail # エラー発生時にスクリプトを終了し、未定義変数をエラーにする while true; do # LambdaランタイムAPIからイベントリクエスト取得 RESPONSE = $( curl -i -s " http:// ${AWS_LAMBDA_RUNTIME_API} /2018-06-01/runtime/invocation/next " ) HEADERS = $( echo " $RESPONSE " | sed -n ' /^\r$/q;p ' ) REQUEST_ID = $( echo " $HEADERS " | grep -Fi ' Lambda-Runtime-Aws-Request-Id ' | awk ' {print $2} ' | tr -d ' \r ' ) if [[ -z " $REQUEST_ID " ]] ; then echo " Error: REQUEST_ID is empty " exit 1 fi EVENT_DATA = $( echo " $RESPONSE " | sed ' 1,/^\r$/d ' ) echo " REQUEST_ID: $REQUEST_ID EVENT_DATA: $EVENT_DATA " # lambda_handler起動 RESPONSE = $( python3 -c " import json from app import lambda_handler event = json.loads(''' $EVENT_DATA ''') response = lambda_handler(event, None) print(json.dumps(response)) " ) echo " RESPONSE: $RESPONSE " if [[ -z " $RESPONSE " ]] ; then echo " Error: RESPONSE is empty. Request ID: $REQUEST_ID " exit 1 fi # 結果をLambdaランタイムAPIに登録 curl -s -X POST " http:// ${AWS_LAMBDA_RUNTIME_API} /2018-06-01/runtime/invocation/ ${REQUEST_ID} /response " \ -d " $RESPONSE " done GPS補正Lambda コンテナDockerfile # ------------------------------------------------------------------- # ベースイメージ: Ubuntu 22.04 LTS # ------------------------------------------------------------------- FROM ubuntu:22.04 # ------------------------------------------------------------------- # 基本ツールのインストール # - ビルドツール、CMake、Git、Python、npm ... # ------------------------------------------------------------------- RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential \ cmake \ g++ \ git \ vim \ python3 = 3 . 10 .6-1~ 22 . 04 . 1 \ python3-dev = 3 . 10 .6-1~ 22 . 04 . 1 \ python3-pip = 22 . 0 . 2 +dfsg-1ubuntu0. 4 \ wget \ curl \ libgdal-dev = 3 . 4 . 1 +dfsg-1build4 \ libboost-all-dev = 1 . 74 . 0 .3ubuntu7 \ libosmium2-dev = 2 . 18 .0-1 \ swig \ expat \ bzip2 \ default-jre = 2:1.11-72build2 \ npm \ jq \ && rm -rf /var/lib/apt/lists/* \ && apt-get clean # ------------------------------------------------------------------- # Python3 シンボリックリンク設定 # - Python3 と PIP3 をデフォルトに設定 # ------------------------------------------------------------------- RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 \ && update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 # ------------------------------------------------------------------- # FMMに必要なPython パッケージインストール # - Numpy、Pandas、Osmnx、Shapely .. # ------------------------------------------------------------------- RUN pip install --upgrade pip \ && pip install --no-cache-dir \ numpy = = 1 . 26 . 4 \ pandas = = 2 . 2 . 2 \ osmnx = = 1 . 9 . 4 \ folium = = 0 . 17 . 0 \ shapely = = 2 . 0 . 6 \ polars = = 1 . 9 . 0 # ------------------------------------------------------------------- # FMM のクローンとビルド # - GitHub から FMM をクローンし、ビルドしてインストール # ------------------------------------------------------------------- RUN git clone https://github.com/cyang-kth/fmm.git /home/fmm WORKDIR /home/fmm/build RUN cmake .. \ && make -j " $( nproc ) " \ && make install # ------------------------------------------------------------------- # intdash SDK のインストール準備 # - Node.js 設定と npm パッケージのインストール # - openapi-generator-cli のインストール # ------------------------------------------------------------------- WORKDIR /home RUN npm install -g n@ 10 . 0 . 0 \ && n 20 . 18 . 0 \ && hash -r \ && npm install --save @openapitools/openapi-generator-cli # ------------------------------------------------------------------- # intdash SDK インストール # - API クライアントコードを生成、Python パッケージとしてインストール # - コンテナで起動するため、dist-packagesにコピー # ------------------------------------------------------------------- ENV VERSION=v2.7.0 RUN ./node_modules/.bin/openapi-generator-cli version-manager set 6 . 1 . 0 \ && ./node_modules/.bin/openapi-generator-cli generate \ -g python -i https://docs.intdash.jp/api/intdash-api/ ${VERSION} /openapi_public.yaml \ --package-name = apiclient RUN cp -r /home/apiclient /usr/local/lib/python3. 10 /dist-packages/ # ------------------------------------------------------------------- # アプリに必要なPython パッケージをインストール # - 地図、AWS SDK for Python、HTTP # ------------------------------------------------------------------- ENV PACKAGE_PREFIX=/var/task RUN pip install --no-cache-dir \ folium = = 0 . 17 . 0 \ boto3 = = 1 . 35 . 32 \ requests = = 2 . 32 . 3 # ------------------------------------------------------------------- # ローカルファイルのコピー # - アプリケーションファイル # - ローカルテスト用ファイル # ------------------------------------------------------------------- COPY bootstrap ${PACKAGE_PREFIX}/bootstrap RUN chmod +x /var/task/bootstrap COPY app.py ${PACKAGE_PREFIX}/app.py COPY event.json /home/event.json # ------------------------------------------------------------------- # 環境変数の設定 # - GDAL、GEOS、およびその他の必要なライブラリのパスを設定 # ------------------------------------------------------------------- ENV \ GDAL_DATA=${PACKAGE_PREFIX}/share/gdal \ PROJ_LIB=/usr/share/proj/ \ GDAL_CONFIG=${PACKAGE_PREFIX}/bin/gdal-config \ GEOS_CONFIG=${PACKAGE_PREFIX}/bin/geos-config \ PATH=${PACKAGE_PREFIX}/bin:$PATH \ PYTHONPATH=${PACKAGE_PREFIX} # ------------------------------------------------------------------- # ENTRYPOINT、CMD、WORKDIR の設定 # - FMM実行時にcacheにJSONファイル作成のため、/tmp を親ディレクトリに設定 # ------------------------------------------------------------------- WORKDIR /tmp ENTRYPOINT [ " /var/task/bootstrap " ] CMD [ " python3 ", " /var/task/app.py " ] レスポンス返却Lambda Pythonプログラム import json import hmac import hashlib import base64 import boto3 import logging logger = logging.getLogger() logger.setLevel(logging.INFO) # 定数定義 SECRET_KEY = 'YOUR_SECRET_KEY' LAMBDA_CLIENT = boto3.client( 'lambda' ) def verify_hmac (secret, payload, received): """ HMAC検証 Args: secret (str): シークレットキー payload (str): ペイロード received (str): 受信したHMAC Returns: bool: 検証結果(True: 成功, False: 失敗) """ hmac_obj = hmac.new(secret.encode(), payload.encode(), hashlib.sha256) computed = base64.b64encode(hmac_obj.digest()).decode() return hmac.compare_digest(computed, received) def lambda_handler (event, context): """ エントリポイント Args: event (dict): イベント context (LambdaContext): コンテキスト Returns: dict: APIレスポンス """ headers = event.get( 'headers' , {}) body = event.get( 'body' , '{}' ) logger.info(f "Received event: {event}" ) body_dict = json.loads(body) payload = json.dumps(body_dict, separators=( ',' , ':' )) # HMAC検証 signature = headers.get( 'x-intdash-signature-256' , '' ) if not verify_hmac(SECRET_KEY, payload, signature): logger.warning( "HMAC Verification Failed." ) return { 'statusCode' : 401 , 'body' : json.dumps({ 'message' : 'HMAC verification failed' }) } # リソースタイプとアクションの判定 resource_type = body_dict.get( 'resource_type' ) action = body_dict.get( 'action' ) if resource_type != 'measurement' or action != 'completed' : logger.info(f "Ignored: resource_type={resource_type}, action={action}" ) return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'Resource_type or action ignored' }) } # FMM Lambdaを非同期起動 try : response = LAMBDA_CLIENT.invoke( FunctionName= 'intdash-fmm' , InvocationType= 'Event' , Payload=json.dumps(body_dict) ) logger.info(f "FMM Lambda invoked successfully: {response}" ) except Exception as e: logger.error(f "Failed to invoke FMM Lambda: {e}" ) return { 'statusCode' : 500 , 'body' : json.dumps({ 'message' : 'Failed to invoke FMM Lambda' }) } return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'Webhook received and FMM Lambda invoked' }) } 完了はエッジ側からすべての計測データを受け取りきったときに自動で遷移します。終了はユーザー操作によりサーバー側で計測が閉じられたときの状態です。 ↩ SlackのIncoming Webhookとはリクエストのフォーマットが異なるため、intdash Webhookは直接連携できません。 ↩ サンプルプログラムを本記事の最後に掲載しています。 ↩ レスポンスが返らない場合は通知が失敗したとみなされ、intdash次回起動時に再送されます。 ↩ サンプルプログラムを本記事の最後に掲載しています。 ↩ サンプルシェルを本記事の最後に掲載しています。 ↩ サンプルファイルを本記事の最後に掲載しています。 ↩ サンプルプログラムを本記事の最後に掲載しています。 ↩
手軽にデータストリーミングを開発したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 これまでintdashにデータをストリームするには、クライアントライブラリを使用してコーディングが必要でした。 2023年12月バージョンからREST APIでアップストリームできるようになったのでご紹介します。 はじめに ストリームとは もっと手軽にデータ送信 クライアントライブラリとの違い 使いどころ 向かない使い方 追加API 叩いてみた 計測の作成 アップストリームを作成 アップストリームチャンクを送信 アップストリームを閉じる 最小エッジでストリームしてみた おわりに Appendix. サンプルcurlコマンド Appendix. サンプルMicroPythonプログラム はじめに ストリームとは ストリームは、データをリアルタイム(または、ほぼリアルタイム)で継続的に送受信するプロセスです。 アップストリームは、デバイス側からプラットフォーム側にデータを送信する方向のストリームです。 www.aptpod.co.jp intdashでは、独自プロトコルiSCP(intdash Stream Control Protocol)で高頻度・低遅延のストリームを実現しています。 tech.aptpod.co.jp また、intdashを使ったアプリケーション開発をしやすくするため、intdashでは 各種クライアントライブラリ をご用意しています。 もっと手軽にデータ送信 今回追加されたのは、intdashにHTTPでデータ送信するためのAPIです。 クライアントライブラリとREST API クライアントライブラリとの違い クライアントライブラリとはトレードオフがあります。 特性 ライブラリ REST API 学習コスト ❌ ⭕️ 実行環境制約 ❌ ⭕️ アプリケーションの柔軟性 ⭕️ ❌ 使いどころ 低頻度データ(〜数秒に1回) ライブラリ追加が難しい動作環境 開発時の簡単な検証 向かない使い方 高頻度データ(1秒に数回〜) 処理性能要求が高めのアプリケーションへの組み込み 追加API 具体的に追加されたAPIは以下の4つです。 Create Upstream:アップストリーム作成 Close Upstream:アップストリームを閉じる Send Upstream Chunks:アップストリームチャンクを送信 Send Upstream Metadata:メタデータを送信 叩いてみた 実際に使ってみました。 Macbookからcurlコマンドを叩きながら、サーバーの計測の状態変化をMeas Hubで見ていきます。 1 計測の作成 まず、データを格納するための計測を作成します。 HTTPボディで基準時刻とエッジを指定しています。 計測作成 サーバーに、データが入っていない空の計測が作成されます。 ステータスは 計測準備中 となっています。 空の計測 アップストリームを作成 エッジとサーバー間の送受信プロセスであるアップストリームを作成します。 HTTPボディでエッジ source_node_id と計測 session_id を指定しています。 ストリーム作成 計測のステータスが 計測中 に変わります。 計測開始 アップストリームチャンクを送信 いよいよデータを送信します。 URLでストリームを指定し、HTTPボディでデータの定義と値を指定します。 データ送信 Meas Hubに送信データのデータ型とデータ名が表示されます。 回収済みデータポイント数が増加 データIDリストに送信データ アップストリームを閉じる 最後にストリームを閉じます。 URLでストリームを指定し、HTTPボディで計測完了 close_session を指定します。 ストリームを閉じる 計測のステータスは終了に変わり、計測の終了時刻が表示されます。 計測終了 最小エッジでストリームしてみた OSがない環境でもHTTPリクエストは送信できます。 M5Stackシリーズのマイコン ATOM Lite にMicroPythonで実装してみました。 2 サイズは24mm x 24mm x 10mm。ストリームするintdashエッジとしては過去最小ではないかと思います。 照度センサーの値を1Hzでアップストリームします。 www.youtube.com マイコンからHTTPでストリーム おわりに 手軽にストリームを実現できる方法としてREST APIの新機能をご紹介しました。 シェルでシステム情報を送信したり、intdashを使った開発のパターンが広がります。 データ収集プラットフォームの開発はぜひaptpodにご相談ください。 Appendix. サンプルcurlコマンド # エッジのOAuthクライアントクレデンシャル払い出し curl -X POST https://example.intdash.jp/api/auth/oauth2/token \ -d "grant_type=client_credentials&client_id={YOUR_ID}&client_secret={YOUR_SECRET}" # アクセストークン export ACCESS_TOKEN={YOUR_ACCESS_TOKEN} # 計測作成 curl -X POST https://example.intdash.jp/api/v1/projects/{YOUR_PROJECT_ID}/measurements \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "basetime": "2024-01-01T00:00:00.000000Z", "basetime_type": "ntp", "edge_uuid": "{YOUR_EDGE_ID}", "protected": false }' # ストリーム作成 curl -i -X POST https://example.intdash.jp/api/iscp/projects/{YOUR_PROJECT_ID}/upstreams \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "session_id": "8efe703c-b617-4a6d-bea9-ed9ea11c2c66", "source_node_id": "{YOUR_EDGE_ID}", "persist": true }' # チャンク送信 curl -i -X POST https://example.intdash.jp/api/iscp/projects/{YOUR_PROJECT_ID}/upstreams/{YOUR_STREAM_ID}/chunks \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "data_point_groups": [ { "data_id": { "type": "float64", "name": "1/test" }, "data_points": [ { "elapsed_time": 1, "float64": 0.123 } ] } ] }' # ストリームを閉じる curl -i -X PUT https://example.intdash.jp/api/iscp/projects/${YOUR_PROJECT_ID}/upstreams/{YOUR_STREAM_ID}/close \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "close_session": true }' Appendix. サンプルMicroPythonプログラム import os import sys import io import M5 from M5 import BtnA from hardware import Pin, RGB, I2C from unit import DLightUnit import time import requests2 import network import ntptime import json import gc # 定数定義 PROJECT_ID = 'YOUR_PROJECT_ID' EDGE_ID = 'YOUR_EDGE_ID' EDGE_SECRET = 'EDGE_SECRET' WIFI_SSID = 'YOUR_SSID' WIFI_PASSWORD = 'YOUR_PASS' NTP_SERVER = 'YOUR_NTP_SERVER' TIMEZONE = 'GMT-9' API_BASE_URL = "https://example.intdash.jp/api" # グローバル変数 access_token = None stream_id = None start_time_us = None is_measuring = True wlan = network.WLAN(network.STA_IF) rgb = RGB() dlight_0 = DLightUnit(I2C(0, scl=Pin(32), sda=Pin(26), freq=100000)) def send_request(method, url, headers=None, json=None, data=None): """ HTTPリクエストを実行する共通関数 :param method: HTTPメソッド (GET/POST/PUT/DELETE) :param url: リクエスト先のURL :param headers: リクエストのヘッダー情報 (オプション) :param json: JSON形式で送信するデータ (オプション) :param data: フォームエンコード形式で送信するデータ (オプション) :return: 200番台はHTTPレスポンス、それ以外はNone """ default_headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {access_token}' } headers = headers or default_headers request_func = { 'POST': requests2.post, 'PUT': requests2.put, 'GET': requests2.get, 'DELETE': requests2.delete }.get(method) if request_func is None: raise ValueError(f"Unsupported HTTP method: {method}") http_req = request_func(url, headers=headers, json=json, data=data) if 200 <= http_req.status_code < 300: response = http_req.json() else: print(f"Failed request. Status Code: {http_req.status_code}, Response: {http_req.text}") response = None http_req.close() gc.collect() return response def get_current_time(): """ 現在時刻取得(ISO 8601形式) :return: ISO 8601形式の現在時刻文字列 """ current_time = time.gmtime() return "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.000000Z".format( *current_time[:6] ) def get_elapsed_time(): """ 経過時間取得 :return: 経過時間(ナノ秒) """ return time.ticks_diff(time.ticks_us(), start_time_us) * 1000 def connect_wifi(): """ WiFi接続 """ if wlan.isconnected(): print("WiFi is already connected.") return wlan.active(False) wlan.active(True) wlan.connect(WIFI_SSID, WIFI_PASSWORD) while not wlan.isconnected(): print("Connecting to WiFi...") time.sleep(1) print("WiFi connected. IP Address:", wlan.ifconfig()) def sync_ntp(): """ NTPサーバー同期 """ ntptime.host = NTP_SERVER max_retries = 3 for attempt in range(max_retries): try: ntptime.settime() print("NTP time synchronized successfully.") print("Current Local Time (JST):", time.localtime()) break except OSError as e: print(f"NTP synchronization failed on attempt {attempt + 1}: {e}") if attempt < max_retries - 1: time.sleep(5) def authenticate(): """ 認証 """ global access_token url = f"{API_BASE_URL}/auth/oauth2/token" headers = {'Content-Type': 'application/x-www-form-urlencoded'} data = f"grant_type=client_credentials&client_id={EDGE_ID}&client_secret={EDGE_SECRET}" response = send_request('POST', url, headers=headers, data=data) access_token = response.get("access_token") if access_token: print("Access token obtained.") else: print("Failed to obtain access token.") def setup_measurement(): """ 計測・ストリーム開始 """ global stream_id, start_time_us start_time_us = time.ticks_us() current_time = get_current_time() measurement_url = f"{API_BASE_URL}/v1/projects/{PROJECT_ID}/measurements" body = { "basetime": current_time, "basetime_type": "ntp", "edge_uuid": EDGE_ID, "protected": False } measurement_response = send_request('POST', measurement_url, json=body) measurement_id = measurement_response.get("uuid") stream_url = f"{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams" body = { "session_id": measurement_id, "source_node_id": EDGE_ID, "persist": True } stream_response = send_request('POST', stream_url, json=body) stream_id = stream_response.get("stream_id") def btnA_wasPressed_event(state): """ ボタンイベント 計測中はストリームクローズ 計測停止中はストリーム開始 :param state: ボタンの状態 """ global is_measuring if is_measuring: is_measuring = False rgb.fill_color(0x000000) close_url = f'{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams/{stream_id}/close' send_request('PUT', close_url, json={"close_session": True}) else: setup_measurement() is_measuring = True def setup(): """ 初期設定 """ M5.begin() BtnA.setCallback(type=BtnA.CB_TYPE.WAS_PRESSED, cb=btnA_wasPressed_event) rgb.fill_color(0xFF0077) dlight_0.configure(dlight_0.CONTINUOUSLY, dlight_0.L_RESOLUTION_MODE) connect_wifi() sync_ntp() authenticate() setup_measurement() def loop(): """ メインループ処理 """ M5.update() if BtnA.isPressed() or not is_measuring: time.sleep(1) return rgb.fill_color(0x0000FF) lux = dlight_0.get_lux() print("Current Lux:", lux) body = { "data_point_groups": [ { "data_id": {"type": "float64", "name": "1/lux"}, "data_points": [{"elapsed_time": get_elapsed_time(), "float64": lux}] } ] } chunks_url = f"{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams/{stream_id}/chunks" send_request('POST', chunks_url, json=body) rgb.fill_color(0x000077) time.sleep(1) if __name__ == '__main__': try: setup() while True: loop() except (Exception, KeyboardInterrupt) as e: print("An error occurred:", e) サンプルコマンドを本記事の最後に掲載しています。 ↩ サンプルプログラムを本記事の最後に掲載しています。 ↩
収集データを手軽に分析に利用したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 intdashは収集データをリアルタイムに可視化できるのが強みですが、 定時/逐次処理によるデータ分析やレポーティングのご要望もいただいています。 そこで今回は、2023年12月から新たに追加された永続化拡張機能をご紹介します。 1 はじめに 永続化拡張とは 今回やること 手順 S3バケットの準備 AWSアクセスキーの準備 永続化拡張設定 データ連携確認 計測 分析・レポート作成 遅延アップロード おわりに Appendix. 分析プログラム はじめに 永続化拡張とは 収集データをintdash内の時系列データベースに永続化すると同時に、分析基盤などの他プラットフォームにも連携します。 2 他プラットフォームへデータを流し込む 連携先には以下を選択できます。 Amazon S3 Amazon Timestream REST API これにより、データ連携をカスタム開発せず、他プラットフォームでの収集データ利用が可能になります。 今回やること スマートフォンアプリ intdash Motion V2 で計測・収集したGPSデータを、intdashからAmazon S3にCSVファイルとしてほぼリアルタイムに書き出します。 計測完了後、CSVファイルをまとめてPCで取得します。 全体構成 手順 S3バケットの準備 連携先となるS3のバケットを作成します。 S3バケット作成 AWSアクセスキーの準備 intdashからのアクセスに使用する、IAMでバケット所有者のAWSアクセスキーIDとシークレットアクセスキーを払い出します。 アクセスキー払い出し 永続化拡張設定 intdashに永続化拡張を設定します。 誤って大量データ書き出しが始まらないよう、設定を作成したあとに有効化します。 3 設定の作成 設定項目は以下の通りです。 設定の名前と説明 永続化タイプ:S3 S3設定 リージョン、バケット名 パーティション単位(年、月、日) AWSアクセスキーID、シークレットアクセスキー ファイルフォーマット:CSV、JSON、RAW(Bytes)、Parquet 対象/除外データリスト データタイプ データ名 iSCP 2.0の ワイルドカード を利用可能 対象/除外ノード(エッジのUUID)リスト 有効/無効 作成した設定では、 対象データをGPSの2項目に制限しています。 登録された設定を確認して有効化します。 設定の有効化 データ連携確認 intdash Motionで計測してみます。 GPSデータの各項目がEdge Finderに表示されます。 intdashサーバーにてGPSデータを受信 計測を開始すると、すぐにS3バケットにCSVファイルが現れ始めます。 data-YYYY-MM-DDThh:mm:ss.sssssssssZ.csv とナノ秒単位のファイル名が付与されます。 S3バケットにCSVファイルが出現 ファイルの中身はこのようになっています。 GPSデータのうち、先ほど設定した2つの対象データのみが出力されています。 緯度経度 1/gnss_coordinates :x, y列 高度 1/gnss_altitude :float列 一部項目は割愛 time:絶対時刻 source_node_id:送信元エッジID session_id:計測ID data_type:データタイプ data_name:データ名称 string, float, int, bool, x, y, z, w:値、データタイプに応じた各列に格納 bytes:データ生値、Base64エンコード 1ファイル中には複数のエッジ・計測・データの行が混在します。 計測 移動しながら1時間ほど計測しました。 分析・レポート作成 Jupyter NotebookでCSVファイルを取得して地図に表示します。 4 緯度経度をつないだ線を、高度のレンジごとで色分けしています。 新宿駅は武蔵野台地、品川駅は埋立地 拡大すると1Hzで収集した緯度経度を綺麗に描けているのがわかります。 五反田〜目黒は激登り 遅延アップロード 永続化拡張は、リアルタイムで送りきれなかったデータを回収する遅延アップロードにも働きます。 電波が悪い状況で試しました。 京都出張者の朝は早い Motionに送りきれなかったデータが残っています。 手動でアップロードします。 xx.x % Uplodedの計測を選択してUpload S3に大きめのCSVファイルが出現 なお、エッジデバイス用のエージェントソフトウェア intdash Edge Agent なら遅延アップロードを自動で行えます。 www.youtube.com さて、再度プロットしてみました。 回線速度が落ちる静岡〜浜松間 遅延アップロードで補完されたデータにより、高度のグラデーションがより精緻に表現されています。 おわりに 永続化拡張機能によって、分析のためのデータ連携の開発が不要になり、システムインテグレーションの期間・コスト低減が期待できます。 他にも、画像分析などのAI活用、他プラットフォームデータとの複合データ分析など、様々な応用が考えられます。 モビリティデータを分析基盤で活用したい方は、ぜひintdashをご検討ください。 Appendix. 分析プログラム import boto3 import pandas as pd import folium from io import StringIO import matplotlib.pyplot as plt import matplotlib.colors as mcolors import time # AWSアクセスキーを設定 session = boto3.Session( aws_access_key_id='YOUR_ACCESS_KEY_ID', aws_secret_access_key='YOUR_SECRET_ACCESS_KEY', region_name='ap-northeast-1' ) s3 = session.client('s3') bucket_name = 'YOUR_BUCKET' prefix = 'YYYY/MM/DD/' # S3オブジェクトキーパス # ページネーションでファイルリストを取得 paginator = s3.get_paginator('list_objects_v2') page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) file_list = [] for page in page_iterator: for content in page.get('Contents', []): file_list.append(content['Key']) # バッチサイズ設定 batch_size = 100 # バッチサイズ(ファイル数) all_coordinates_data = [] # 全ての座標データを格納するリスト all_altitude_data = [] # 全ての高度データを格納するリスト # カラーマップ設定 cmap = plt.get_cmap('jet') # 'jet'カラーマップを使用 norm = mcolors.Normalize(vmin=0, vmax=30) # 地図中心設定 folium_map = folium.Map(location=[35.679889, 139.73875], zoom_start=12) # ファイルをバッチサイズごとに処理 for batch_start in range(0, len(file_list), batch_size): batch_files = file_list[batch_start:batch_start + batch_size] # バッチ処理 for file_key in sorted(batch_files): try: csv_obj = s3.get_object(Bucket=bucket_name, Key=file_key) body = csv_obj['Body'].read().decode('utf-8') df = pd.read_csv(StringIO(body), usecols=['time', 'data_name', 'x', 'y', 'float']) # 緯度経度データ抽出 coords_df = df[df['data_name'] == '1/gnss_coordinates'][['time', 'x', 'y']] # 高度データ抽出 alt_df = df[df['data_name'] == '1/gnss_altitude'][['time', 'float']] all_coordinates_data.extend(coords_df.values.tolist()) all_altitude_data.extend(alt_df.values.tolist()) except Exception as e: print(f"Error reading {file_key}: {e}") # S3 API制限回避 time.sleep(2) # スリープ # DataFrameに変換してソート coordinates_df = pd.DataFrame(all_coordinates_data, columns=['time', 'x', 'y']) altitude_df = pd.DataFrame(all_altitude_data, columns=['time', 'altitude']) # timeでソート coordinates_df.sort_values('time', inplace=True) altitude_df.sort_values('time', inplace=True) # 緯度(x)と経度(y)を地図にプロット for i in range(len(coordinates_df) - 1): # 現在のポイントと次のポイントのデータ row_current = coordinates_df.iloc[i] row_next = coordinates_df.iloc[i + 1] # 高度の行をtimeで検索 altitude_row_current = altitude_df[altitude_df['time'] == row_current['time']] altitude_current = altitude_row_current['altitude'].values[0] if not altitude_row_current.empty else None # 色の設定 if altitude_current is not None: color = mcolors.to_hex(cmap(norm(altitude_current))) else: color = 'gray' # ポリラインを描画 folium.PolyLine( locations=[(row_current['x'], row_current['y']), (row_next['x'], row_next['y'])], weight=5, opacity=0.8, color=color ).add_to(folium_map) # 地図表示 folium_map.save('map.html') folium_map 本機能はβ版であり、次バージョンよりインターフェース仕様が変更になる可能性があります。 ↩ 時系列データベースへの永続化をフックし、およそ1秒〜数秒で送信します。送信性能はサーバースペックやネットワークに依存します。 ↩ 現バージョンでは設定が反映されるまで時間がかかる場合があります。 ↩ Pythonプログラムを本記事の最後に掲載しています。 ↩