TECH PLAY

アプトポッド

アプトポッド の技術ブログ

248

aptpod Advent Calendar 2024 12月12日の記事です。 こんにちは、開発本部Visual M2Mグループの遠藤です。フロントエンドエンジニアとして働いています。 Playwright 1.49.0から Aria snapshots という機能が新たに追加されました。この記事では、PlaywrightのAria snapshotsを実際に試しながら機能の紹介を行いたいと思います。 Aria snapshots とは アクセシビリティツリーについて Aria snapshots を使ってみる インストール 基本的な使い方 テストの修正 部分一致と正規表現を使用したマッチング スナップショットの生成と更新 ariaSnapshot()メソッド おわりに 参考文献 Aria snapshots とは PlaywrightのAria snapshotsでは、ページのアクセシビリティツリーをYAML形式の表現として提供します。提供されたツリーのスナップショットを保存することで、ページの変更時に差分を比較し、ページ構造が期待通りであるかをチェックすることができます。 たとえば次のようなHTML構造があるとします。 < h1 > Title </ h1 > < h2 > SubTitle </ h2 > こちらは次のYAML形式で表現することができます。 - heading "Title" [ level=1 ] - heading "SubTitle" [ level=2 ] アクセシビリティツリーについて アクセシビリティツリー はUI構造を表すアクセス可能なオブジェクトのツリーです。DOMやCSSOMに基づいて生成され、スクリーンリーダーや他の支援技術が情報をユーザーに提供するための基盤となります。 Chrome DevTools を利用することでどのようなアクセシビリティツリーが生成されているかを確認することもできます。たとえば前述したHTML構造は、Chrome DevToolsで次のようなアクセシビリティツリーになっていることを確認できます。 Aria snapshots を使ってみる ここからは実際にAria snapshotsを試しながら機能を確認していきたいと思います。 インストール 次のコマンドでPlaywrightのインストールを行います。 $ npm init playwright@latest もしくは VS CodeのPlaywright拡張機能 を使用して作業を行うこともできます。 基本的な使い方 まずはじめに次のHTML構造を想定してください。 < header id = "header" > < h1 > Title </ h1 > </ header > 次に、Playwrightのテストコードを書いてみます。 localhost:5500 は上記のHTMLをホストしているローカルサーバーを想定しています。 Aria snapshotsでは、 toMatchAriaSnapshot() メソッドを使用することで、 locator の範囲内のアクセシビリティツリーと、引数のアクシビリティツリーを比較することができます。 tests/example.spec.ts を次のように書き換えてください。 import { test , expect } from "@playwright/test" ; test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading "Title" [level=1] ` ); } ); 次のコマンドを実行してみましょう。上記のテストは成功します。 $ npx playwright test Running 1 test using 1 worker 1 passed ( 525ms ) 次にHTMLファイルに次のような変更を加えてみましょう。 <h1> タグを <h2> タグに変更 テキストを"Title"から"SubTitle"に変更 < header id = "header" > < h2 > SubTitle </ h2 > </ header > 再度テストを実行してみましょう。テストが失敗すれば期待通りです。 $ npx playwright test Running 1 test using 1 worker 1 ) [ chromium ] › example.spec.ts:3:5 › test example ─────────────────────────────────────────────── Error: Timed out 5000ms waiting for expect ( locator ) .toMatchAriaSnapshot ( expected ) Locator: locator ( ' #header ' ) - Expected - 1 + Received + 1 - banner: - - heading " Title " [ level = 1 ] + - heading " SubTitle " [ level = 2 ] テストの修正 与えられたエラーメッセージを元に、テストを修正して、再度実行してみましょう。 import { test , expect } from "@playwright/test" ; test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading "SubTitle" [level=2] ` ); } ); 今度は成功しました。 $ npx playwright test Running 1 test using 1 worker 1 passed ( 475ms ) 部分一致と正規表現を使用したマッチング 部分一致や正規表現を利用することで、より柔軟なテストが可能となります。 たとえば、以下のようにアクセブルな名前や属性を省略することで、部分一致を適用することができます。 import { test , expect } from "@playwright/test" ; test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading ` ); } ); 上記は、 header > heading という構造を持つHTML要素を期待します。この場合、 heading 要素のレベルやテキストは無視されます。つまり要素が h1 でも h2 でも、またテキストが Title でも SubTitle でも、テストは成功します。 HTML構造を変更せずに再度テストを実行してみましょう。テストが成功することを確認できます。 $ npx playwright test Running 1 test using 1 worker 1 passed ( 475ms ) 次のように正規表現を使用することで、動的なコンテンツに対してテストを行うこともできます。 test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading /.*Title$/ ` ); } ); スナップショットの生成と更新 スナップショットファイルの生成や更新は --update-snapshots オプションを使用して行うことができます。 一度、HTML構造とテストファイルを次の状態に戻します。 < header id = "header" > < h2 > SubTitle </ h2 > </ header > test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading "Title" [level=1] ` ); } ); この状態で npx playwright test を行なうと、テストは失敗します。したがって先ほどは、HTML構造を自分で書き換えることで、テストを成功させました。今度は --update-snapshots オプションを付与して実行してみます。 $ npx playwright test --update-snapshots Running 1 test using 1 worker New baselines created for: tests/example.spec.ts git apply test-results/rebaselines.patch 1 passed ( 5 .5s ) 今回はテストが成功し、 test-results/rebaselines.patch にパッチファイルが生成されました。 diff --git a/tests/example.spec.ts b/tests/example.spec.ts --- a/tests/example.spec.ts +++ b/tests/example.spec.ts @@ - 3 , 7 + 3 , 7 @@ test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - - banner: - - heading "Title" [level=1] + - banner: + - heading "SubTitle" [level=2] ` ); } ); git apply を使うことでパッチファイルの内容を tests/example.spec.ts に適用することができます。 $ git apply test-results/rebaselines.patch tests/example.spec.ts を確認してみると、下記のように差分が適用されていることを確認できます。 import { test , expect } from "@playwright/test" ; test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( ` - banner: - heading "SubTitle" [level=2] ` ); } ); また、 toMatchAriaSnapshot() に空文字列を渡すことで、その場でスナップショットを生成することもできます。この方法を利用して初回テスト実行時に、簡単にスナップショットを生成することができます。 import { test , expect } from "@playwright/test" ; test ( "test example" , async ( { page } ) => { await page.goto( "http://localhost:5500/" ); await expect (page.locator( "#header" )).toMatchAriaSnapshot( "" ); } ); 上記の状態で --update-snapshots を付与したテストを実行してみましょう。 $ npx playwright test --update-snapshots test-results/rebaselines.patch に次のパッチファイルが生成されます。 diff --git a/tests/example.spec.ts b/tests/example.spec.ts --- a/tests/example.spec.ts +++ b/tests/example.spec.ts @@ -2 , 5 + 2 , 8 @@ test ( " test example " , async ( { page } ) => { await page.goto ( " http://localhost:5500/ " ) ; - await expect ( page.locator ( " #header " )) .toMatchAriaSnapshot ( "" ) ; + await expect ( page.locator ( " #header " )) .toMatchAriaSnapshot ( ` + - banner: + - heading " SubTitle " [ level = 2 ] + ` ) ; } ) ; ariaSnapshot() メソッド ariaSnapshot() メソッドは、指定された locator の範囲内のスナップショットをYAML形式で出力します。これは、テスト実行中にスナップショットを動的に生成したい場合などで役に立ちます。 const snapshot = await page.locator( "#header" ).ariaSnapshot(); console . log (snapshot); // - banner: // - heading "SubTitle" [level=2] おわりに UIテストはその性質上、非常に繊細で難しい作業です。画面の構造や見た目の変化はアプリケーションの成長とともに頻繁に発生します。一方でテストコードのメンテナンスや、意図しない変更の検出にはなるべく労力をかけたくないものです。 そんな中、PlaywrightのAria snapshotsは、アクセシビリティツリーというアプローチでUIの構造を簡潔かつ柔軟にテストできる点で有用に感じました。YAML形式で視覚的にわかりやすくページの構造を記述できることもメリットの一つです。 今後も、実際のプロジェクトに適した方法を模索しながら、製品の品質向上につなげていければと思います。 参考文献 https://playwright.dev/docs/aria-Snapshots https://github.com/microsoft/playwright/releases/tag/v1.49.0
アバター
aptpod Advent Calendar 2024 12月11日の記事です。 ネイティブアプリケーション開発を担当している上野です。近年、デジタルツインの注目度が高まりつつある中、実際にデジタルツインアプリケーションを作る際の開発プラットフォームの選択肢として、UnityやROSが挙げられると思います。 aptpod,Inc News 上記ニュースで開発しているアプリケーションもUnityとROSが活用されています。 今回は実際に私がUnityとROSを利用してデジタルツインアプリケーションを開発している際に利用しているツールや取り扱い方法などをご紹介します。 UnityとROSの接続方法 ROSサーバーのセットアップ Docker Composeファイルの作成 Dockerコンテナの起動 Unityアプリのセットアップ プロジェクトの作成 ROS-TCP-Connectorのセットアップ ROSバージョンの選択 ROSコネクションの配置 ROSメッセージのサブスクライブ/パブリッシュ検証 ROSメッセージをサブスクライブしてみる ROSbagファイルに格納されたROSメッセージの確認方法 ROSメッセージをパブリッシュしてみる 独自に設計されたROSメッセージの生成方法 ROS用に設計されたロボットモデルのインポート その他、開発する上で注意ポイント Windowsの場合、Dockerで作成したROSサーバーは同一LAN内のROSノード(サーバー)とは接続できない UnityとROSでは座標系が違う 最後に UnityとROSの接続方法 UnityとROSとの接続は以下の2つのモジュールを利用しています。 ROS-TCP-Connector はUnity内にインストールするパッケージです。こちらを利用する事でTCP経由でROSメッセージのサブスクライブやパブリッシュが可能です。 ROS-TCP-Endpoint はROS用のパッケージでROSネットワーク内のメッセージをTCPでUnityへ橋渡してくれるエンドポイントです。 よくある例でROSBridgeを利用してWebSocket経由でROSメッセージのやり取りをする例がありますが、それらでやり取りするよりも圧倒的にパフォーマンスが良いようです。実際にPC内にDockerで作成したROSサーバーで発行した数百Mbpsクラスの点群データもスムーズにUnityに届けられているのでかなり高速と言えます。 ROSサーバーのセットアップ 今回はUnityがインストールされたWindowsPC内にDockerを利用してROS2サーバー(ノード)をセットアップし、ROSメッセージのやり取りを行えるようにします。 今回の構成 Docker Composeファイルの作成 Dockerのセットアップ方法は Unity-Robotics-Hub というROSとUnityのチュートリアルがあり、同一リポジトリ内の ros_unity_integration にROS-TCP-Endpointのパッケージを自動追加しつつROSサーバーをセットアップするDockerファイルがあるので利用すると簡単です。 私はros_unity_integrationフォルダと同階層にDocker Composeファイルを作成して利用しています。 ROSサーバーをDockerで建てる際のフォルダ例 docker-compose.yml #version: "3" services : ros2-endpoint : image : foxy container_name : ros2 build : context : ./ros_unity_integration dockerfile : ./ros2_docker/Dockerfile ports : - 10000:10000 volumes : - ./bagfiles:/home/dev_ws/bagfiles networks : - ros2_external restart : always command : > bash -c ". /home/dev_ws/install/setup.bash && export FASTRTPS_DEFAULT_PROFILES_FILE=/home/dev_ws/src/fastrtps-profile.xml && ros2 run ros_tcp_endpoint default_server_endpoint --ros-args -p ROS_IP:=0.0.0.0 -p ROS_TCP_PORT:=10000" ros2-endpoint2 : image : foxy container_name : ros2_for_controller build : context : ./ros_unity_integration dockerfile : ./ros2_docker/Dockerfile ports : - 10001:10001 volumes : - ./bagfiles:/home/dev_ws/bagfiles networks : - ros2_external restart : always command : > bash -c ". /home/dev_ws/install/setup.bash && export FASTRTPS_DEFAULT_PROFILES_FILE=/home/dev_ws/src/fastrtps-profile.xml && ros2 run ros_tcp_endpoint default_server_endpoint --ros-args -p ROS_IP:=0.0.0.0 -p ROS_TCP_PORT:=10001" networks : ros2_external : 上記Docker Composeファイルでは、 チュートリアル で行っているROS-TCP-Endpointのセットアップコマンドなどを一括実行させています。 また、Docker Composeファイルと同階層に作ったbagfilesフォルダをマウントする事で検証で利用するROSbagファイルを簡単に参照できるようにしています。 コンテナが複数ありますが、こちらはUnityアプリとROS-TCP-Endpointの疎通は1対1でないと接続不調になるケースがあった為です。 ROSは同一ネットワーク内のメッセージを自動で送受信する事が出来ます。Docker Composeの設定でコンテナ同士の通信を許可しておけば複数のコンテナに分かれていても問題はありません。 各コンテナの違いはコンテナ名とポートのみです。 私はコンテナが複数と1つだけのDocker Composeファイルを作成して使い分けています。 Dockerコンテナの起動 Dockerコンテナの起動コマンドは以下です。 # for docker-compose.yml docker compose up -d # for docker-compose2.yml docker compose -f docker-compose2.yml up -d 対象のDocker Composeファイルのファイル名が docker-compose.yml 以外の場合は -f オプションでファイル名を指定します。 コンテナの停止例は以下です。 # for docker-compose.yml docker compose down --rmi all --volumes --remove-orphans or docker compose -f docker-compose2.yml down --rmi all --volumes --remove-orphans Docker自体のセットアップ方法は今回は解説しませんがWindowsの場合はWSL(Windows Subsystem for Linux)内、Macの場合は Colima を利用、Linuxの場合はOSへ直接セットアップして利用しています。 Unityアプリのセットアップ プロジェクトの作成 執筆日(2024/12) ではUnityのプロジェクトを作成する際のプロジェクトテンプレートは ビルトインレンダーパイプライン を利用しています。 Unityプロジェクト作成時のビルトインレンダーパイプラインのテンプレート ユニバーサルレンダーパイプラインでも開発できるかと思いますがiPadなどタブレット対応も考慮している点と、より処理を軽量化したいケースで従来のビルトインレンダーパイプラインの方が勝っているかなと現時点では感じています。 ROS-TCP-Connectorのセットアップ ROS-TCP-Connectorのセットアップ方法は README に書かれている通りでPackage Manager内でGitのURLからインポートする方法があるのでそちらからインストールしてください。 Package Managerで選択する項目 ただ、私がROS-TCP-Connectorを利用する場合は、送受信しているROSメッセージの伝送帯域を確認したいケースが多くあり、標準では伝送帯域を収集する機能を要していない為、フォークして少し調整した物を利用しています。 github.com ROS-TCP-Connector インストール後の画面 帯域の測定方法に興味がある場合は テスト用のスクリプト がありますので参照ください。 ROSバージョンの選択 ROS-TCP-ConnectorはROS1とROS2の両方に対応しています。必要に応じて Robotics/ROS Settings よりバージョンの切り替えを行ってください。 ROS Settingsの表示方法 ROSバージョンは変更可能 ROSコネクションの配置 Unity内でGameObjectにROS Connectionコンポーネントを配置し、必要に応じてIPアドレスやポートを変更すればROSサーバーとの疎通に関しては準備完了です。 ROS Connection のInspector うまくROSサーバーと接続できていればアプリ実行時にシーン画面の左上に表示される矢印が青く点灯します。 ROSとの接続が成功している際の表示 接続できていない場合は、矢印が赤色になります。 ROSとの接続に失敗している際の表示 ROSメッセージのサブスクライブ/パブリッシュ検証 ここからはデータの疎通確認の例を記載します。 ROSメッセージをサブスクライブしてみる ROSメッセージをサブスクライブするには対象のメッセージのトピック名と型をROS Connectionに渡すだけで可能です。 ROS_StringSubscriber.cs using RosMessageTypes.Std; using System; using Unity.Robotics.ROSTCPConnector; using UnityEngine; using UnityEngine.Events; public class ROS_StringSubscriber : MonoBehaviour { [SerializeField] private string topicName = "/string" ; public string ReceivedTime; public string Data; public bool ShowReceiveLog = true ; public UnityEvent< string > OnReceiveData = new UnityEvent< string >(); // Start is called before the first frame update void Start() { ROSConnection.GetOrCreateInstance().Subscribe<StringMsg>( this .topicName, (message) => { if ( !this .enabled) return ; if (ShowReceiveLog) Debug.Log( $"OnReceiveStringMessage(topicName: { this.topicName } , data: { message.data } )" ); var dateTime = DateTime.UtcNow; ReceivedTime = dateTime.ToLocalTime().ToString( "HH:mm:ss.ffffff" ); Data = message.data; OnReceiveData ? .Invoke(message.data); }); } } 作成したコンポーネントをGameObjectに追加し、対象のトピック名を設定するだけで利用できます。 作成した ROS_StringSubscriber の Inspector Unityのプロジェクトを実行したらROSサーバーからメッセージをパブリッシュしてみます。 # Launching Docker containers. docker compose up -d # Access to Docker containers. docker exec -it ros2 /bin/bash # Sending ROS messages. ros2 topic pub --once /string std_msgs/msg/String " data: Hello World! " or ros2 topic pub --once /string std_msgs/String " data: Hello World! " ROSメッセージを受信した際の ROS_StringSubscriber の Inspector JSONなども送信可能です。 ros2 topic pub --once /string std_msgs/String " data: {'key': 'value'} " JSON形式を受信した際の ROS_StringSubscriber の Inspector ROSbagファイルに格納されたROSメッセージの確認方法 ROSbagに格納されたメッセージの確認を行う場合はまずは ros2 bag info で格納されているROSメッセージのTopic名や型を調べます。 ros2 bag info の実行例 上記で判明したTopic名と型を用いてUnityでサブスクライブ設定を行います。 ROSbag内のデータは時系列に格納されており、 ros2 bag play を行う事で実際に記録した際と同じ時間間隔でメッセージの再生が可能です。 ros2 bag play { ROSbagファイルパス } 上記を実行する事で格納されたROSメッセージがROSネットワークに発信され、Unityにも同様にROSメッセージを伝達する事が可能になります。 ROSメッセージをパブリッシュしてみる 続いて逆にUnityからROSサーバーへデータを送信してみます。以下の2つのファイルを用意してみました。 ROS_StringPublisher.cs using RosMessageTypes.Std; using System.Collections; using System.Collections.Generic; using Unity.Robotics.ROSTCPConnector; using UnityEngine; public class ROS_StringPublisher : MonoBehaviour { [SerializeField] private string topicName = "/string2" ; public bool ShowSendLog = true ; private ROSConnection ros; private StringMsg msg; private bool enable = false ; // Start is called before the first frame update void Start() { // Setup ROS this .ros = ROSConnection.GetOrCreateInstance(); this .ros.RegisterPublisher<StringMsg>( this .topicName); // Setup ROS Message this .msg = new StringMsg(); } public void Publish( string data) { if ( ! enable) return ; if (ShowSendLog) Debug.Log( $"PublishStringMessage(topicName: { this.topicName } , data: { data } )" ); this .msg.data = data; this .ros.Publish( this .topicName, this .msg); } private void OnEnable() { enable = true ; } private void OnDisable() { enable = false ; } } PublishSample.cs using UnityEngine; using UnityEngine.Events; public class PublishSample : MonoBehaviour { public string Message = "Unity to ROS test" ; public UnityEvent< string > PublishEvent = new UnityEvent< string >(); // Update is called once per frame void Update() { if (Input.GetKeyDown(KeyCode.Space)) { PublishEvent ? .Invoke(Message); } } } ROS_StringPublisherではROSConnectionに対し、送信するトピック名と型を登録し、関数Publish()に送られてきた文字列を送信する例になります。 PublishSampleはスペースキーが押されたら、変数Messageに格納されている文字列をPublisherに伝達できるようにイベントを発火しています。 新しくGameObjectを作成し、それぞれのコンポートを追加したら、PublishSampleのInspectorでPublishEventにROS_StringPublisherのPublish関数を登録します。 PublishSampleにROS_StringPublisherのROSメッセージ送信処理を登録する例 PublishSampleにROSメッセージ送信のイベントが登録できた際の例 Unityのプロジェクトを実行後、スペースキーを押すとログが出力されればOKです。 ROSメッセージ送信のイベントが発生した際に表示されるログ 実際にROSサーバー側で送信されたROSメッセージを確認するには ros2 topic echo を利用します。 # Access to Docker containers. docker exec -it ros2 /bin/bash # Checking ROS messages. ros2 topic echo /string2 std_msgs/String ROSサーバーでros topic echoを実行した際の例 正しくメッセージが受信できれば、型に定義されたデータ名とその中身が表示されます。 独自に設計されたROSメッセージの生成方法 標準でサポートされているROSメッセージだけでなく、状況に応じてカスタマイズされたROSメッセージを利用したいケースがあると思います。 ROS-TCP-ConnectorではROSメッセージの生成もサポートしているので対応可能です。 ROS Message Browserの開き方 Robotics/Generate ROS Messages... で ROS Message Browserを開き、パスの設定を行います。 ROS Messageの生成例 基本はUnityプロジェクト内のAssetsフォルダ配下に ROSMessages フォルダを作成し、ROSエンジニアから共有頂いたROSMessage定義ファイル( 拡張子 .msg )を格納すれば自動生成されます。 今回は NMEAメッセージをUnity用ROSMessageに変換してみました。 github.com ROS用に設計されたロボットモデルのインポート ROS用に設計されたロボットモデル多くは xacro や urdf といったxml形式のモデルが多いです。これらのモデルはUnityでは標準サポートしていない為、Unityへインポートする例も紹介します。 TurtleBot3 Waffle Piモデル 今回はURDFで定義されたTurtleBot3モデルを利用します。 利用するリポジトリは以下です。 github.com URDFのインポートには URDF-Importer というUnityパッケージを利用します。 github.com ROS-TCP-Connectorの時と同じようにGithubのリンクからインポートを行います。 URDF-Importer のインストール後の画面 Assets配下にURDFフォルダを作成しその配下に TurtleBot3の プロジェクト をクローンしてきます。 URDFファイルのインポート方法 クローンしてきたプロジェクトのトップディレクトリにurdfファイルありますのでそちらを右クリックなどでオプションを表示し、 Import Robot from Selected URDF file 選択し、 URDF Import Settings を表示します。 URDF Import Settings 表示された URDF Import Settings で Import URDF を選択することでモデルが生成されると思います。 URDFから生成された3Dモデル例 今回用意したurdfのリポジトリは元のリポジトリからフォークして若干調整しています。 turtlebot3_waffle_pi.urdf ... <link name = "base_link" > <visual> <origin xyz = "-0.064 0 0.0" rpy = "0 0 0" /> <geometry> <mesh filename = "package://turtlebot3_description/meshes/bases/waffle_pi_base.stl" scale = "0.001 0.001 0.001" /> </geometry> <material name = "light_black" /> </visual> ... urdfファイルの中身を見るとmeshタグにpackageの定義がされていますが、執筆時点では URDF-Importer はpackageの解決が出来ない為、そのまま利用するとエラーになっていました。 URDFの変換エラー packageのトップディレクトリはurdfファイルがある階層と同じ位置から始まる為、修正した内容としてはurdfファイルをリポジトリのトップへ移動して対応しました。 一旦これらで可視化まで出来ましたが、実はこのままだと生成されたモデルに重力が適用されていてアプリケーションを実行するとモデルが自由落下してしまいます。 自由落下中のモデル デジタルツインの様なアプリ開発のケースではROS空間で動作しているROSモデルをそのままの座標で可視化することが目的な為、重力といったシミュレーション要素は不要のため、デフォルトで生成された不要なコンポーネントは削除して利用しています。 削除するスクリプト例は以下です。 Editor/RemoveURDFImporterComponents.cs using UnityEngine; using UnityEditor; #if UNITY_EDITOR using Unity.Robotics.UrdfImporter; using Unity.Robotics.UrdfImporter.Control; public class RemoveURDFImporterComponents : MonoBehaviour { [MenuItem( "GameObject/Remove URDF-Importer Components" , false , 0 )] static void Execute(MenuCommand command) { foreach (GameObject obj in Selection.gameObjects) { RemoveChildComponents(obj.transform); } } private static void RemoveChildComponents(Transform t) { for ( int i = 0 ; i < t.childCount; i ++ ) { var child = t.GetChild(i); RemoveComponents(child); RemoveChildComponents(child); } RemoveComponents(t); } private static void RemoveComponents(Transform t) { // RemoveComponent<>(t); RemoveComponent<UrdfRobot>(t); RemoveComponent<Controller>(t); RemoveComponent<UrdfPlugins>(t); RemoveComponent<UrdfPlugin>(t); RemoveComponent<UrdfLink>(t); RemoveComponent<UrdfVisuals>(t); RemoveComponent<UrdfVisual>(t); RemoveComponent<UrdfCollisions>(t); RemoveComponent<UrdfCollision>(t); RemoveComponent<UrdfLink>(t); RemoveComponent<UrdfInertial>(t); RemoveComponent<UrdfJointFixed>(t); RemoveComponent<UrdfJointRevolute>(t); RemoveComponent<UrdfJointPrismatic>(t); RemoveComponent<UrdfJointContinuous>(t); // Must be last... RemoveComponent<ArticulationBody>(t); } private static void RemoveComponent< T >(Transform t) where T : Object { var s = t.GetComponent< T >(); while (s != null ) { DestroyImmediate(s); s = t.GetComponent< T >(); } } } #endif 上記はAssetsフォルダ配下に Editor フォルダを作成してその中に作成します。 URDF-Importer の不要な項目を削除する あとは Hierachy内のURDF-Importerから生成されたモデルを右クリック等でオプションを表示し、新たに追加された Remove URDF-Importer Components を実行する事で不要な項目が削除されます。 Remove URDF-Importer Components 実行後 あとはROSメッセージのTFなどからロボットの位置や関節情報をUnityのモデルにバインドしてあげると同期が成立すると思います。 TFのサブスクライブ例は以下です。 ROS_TFSubscriber using RosMessageTypes.Geometry; using RosMessageTypes.Tf2; using System; using System.Collections.Generic; using Unity.Robotics.ROSTCPConnector; using Unity.Robotics.ROSTCPConnector.ROSGeometry; using UnityEngine; using UnityEngine.Events; public class ROS_TFSubscriber : MonoBehaviour { [SerializeField] private string topicName = "/tf" ; [Serializable] public class TFFrame : IEquatable<TFFrame> { public string ChildFrameId; public string FrameId; public string ReceivedTime; public bool DisablePosition = false ; public Vector3 Position; public bool DisableRotation = false ; public Quaternion Rotation; public bool ShowReceiveLog = true ; public UnityEvent<Vector3, Quaternion> OnReceiveTransform = new UnityEvent<Vector3, Quaternion>(); public UnityEvent<Vector3> OnUpdatePosition = new UnityEvent<Vector3>(); public UnityEvent<Quaternion> OnUpdateRotation = new UnityEvent<Quaternion>(); public void OnReceiveMessage(TransformMsg message) { if ( !this .Script.enabled) return ; if (ShowReceiveLog) Debug.Log( $"OnReceiveTFMessage(childFrameId: { ChildFrameId } , frameId: { FrameId } , data: { message } )" ); var dateTime = DateTime.UtcNow; ReceivedTime = dateTime.ToLocalTime().ToString( "HH:mm:ss.ffffff" ); if ( ! DisablePosition) Position = message.translation.From<FLU>(); if ( ! DisableRotation) Rotation = message.rotation.From<FLU>(); OnUpdatePosition ? .Invoke(Position); OnUpdateRotation ? .Invoke(Rotation); OnReceiveTransform ? .Invoke(Position, Rotation); } internal MonoBehaviour Script; public bool Setup(MonoBehaviour Script) { // Load settings. if ( string .IsNullOrEmpty( this .ChildFrameId) && string .IsNullOrEmpty( this .FrameId)) return false ; this .Script = Script; return true ; } public static bool operator == (TFFrame l, TFFrame r) => l.Equals(r); public static bool operator != (TFFrame l, TFFrame r) => ! (l == r); public bool Equals(TFFrame other) { if (ReferenceEquals(other, null )) { return false ; } if ( this .ChildFrameId != other.ChildFrameId) { return false ; } if ( this .FrameId != other.FrameId) { return false ; } return true ; } public override int GetHashCode() { int hash = 1 ; if ( ! string .IsNullOrEmpty(ChildFrameId)) hash ^= ChildFrameId.GetHashCode(); if ( ! string .IsNullOrEmpty(FrameId)) hash ^= FrameId.GetHashCode(); return hash; } public override bool Equals( object obj) { if (obj is TFFrame v) { return Equals(v); } return false ; } } [SerializeField] private List<TFFrame> dataList = new List<TFFrame>(); // Start is called before the first frame update void Start() { var deleteList = new List<TFFrame>(); foreach ( var data in this .dataList) { if ( ! data.Setup( this )) { deleteList.Add(data); } } foreach ( var data in deleteList) { this .dataList.Remove(data); } if ( this .dataList.Count == 0 ) { Debug.Log( $" { topicName } list is empty, so the subcrite is skipped." ); this .gameObject.SetActive( false ); return ; } ROSConnection.GetOrCreateInstance().Subscribe<TFMessageMsg>( this .topicName, (message) => { foreach ( var t in message.transforms) { foreach ( var frame in dataList) { if (t.child_frame_id.Equals(frame.ChildFrameId)) { if ( ! string .IsNullOrEmpty(frame.FrameId) && ! t.header.frame_id.Equals(frame.FrameId)) { continue ; } frame.OnReceiveMessage(t.transform); } } } }); } } ROS_TFSubscriberとモデルとのバインド例 その他、開発する上で注意ポイント Windowsの場合、Dockerで作成したROSサーバーは同一LAN内のROSノード(サーバー)とは接続できない 上記の通り開発や検証端末でWindowsを選択する場合は注意が必要です。設定によってはローカルホスト外と通信できる可能性もありますが恐らく簡単ではないと思います。 LinuxでUnityアプリを動かし、DockerでROSサーバーを建てた場合はこの課題は発生しないので場合によっては動作端末にLinuxを選ばなければならないケースもあるかもしれません。 UnityとROSでは座標系が違う ROSメッセージのTFなどでロボットの座標をUnity内のモデルにそのまま適用すると関節があらぬ方向へいったりします。 ROS: 右手系 Unity: 左手系 それぞれの座標系は上記の通りでして、一応ROS-TCP-Connector内でそれらの変換もサポートしているので利用すると良いと思います。 // Convert from ROS to Unity var unity = message.translation.From<FLU>(); // Convert from Unity to ROS var ros = FLU.ConvertFromRUF(unity); 最後に いかがでしたでしょうか? Unity Technologiesで用意されているモジュールがとても優秀な為、ROSとの通信自体はかなり簡単に行えます。 今回はROSとUnityのみの解説となりましたが、ROSConnectionで行っているお作法は私が開発を担当している intdash-unity に簡単に置き換える事が可能です。 tech.aptpod.co.jp intdash では現存するROSサーバーと併用して、簡単にインターネットを超えてROSメッセージを伝送する事も可能なのでROSプラットフォームとしての可能性が広がると思います。 IoTデータを利用したプラットフォーム、IoTサービスの開発や、リアルタイムデジタルツイン、製造現場のDX化など、IoTシステムの開発でお困りのことがあれば、ぜひ弊社までお声がけください。 弊社の問い合わせフォームはこちらです。 www.aptpod.co.jp
アバター
こちらは aptpod Advent Calendar 2024 12月10日の記事になります。 本日はVPoPの岩田が担当します。 今回は、弊社製品のちょっとへんな使い方ということで、リモートデスクトップ用ソフトウェアのひとつである VNC がやり取りする通信を、VM2M Data Visualizer で可視化してみようと思います。 そもそもVNC(Virtual Network Computing)とは? なぜこんなことをしようと思ったのか? 本記事でつくるもののアーキテクチャー 先にできたものを動画で紹介 RFBプロトコルの仕様 ハンドシェイクプロセス プロトコルバージョン のハンドシェイク セキュリティ方式 のハンドシェイク 初期化プロセス クライアントからの初期化メッセージ(ClientInit) サーバーからの初期化メッセージ(ServerInit) 初期化後の通信 クライアントからサーバーへ送るメッセージ サーバーからクライアントへ送るメッセージ RFBプロトコルからの画面キャプチャー画像の構築 Raw Encoding における画像のエンコーディング方式 RFBプロトコルにおける画面データの送信方式 Pythonによる実際のコードの紹介 intdash へのデータの転送 今後の展望とあとがき そもそもVNC(Virtual Network Computing)とは? 弊社のテックブログを読んでくださるような方々であればもちろんご存知だと思いますが、 VNC( V irtual N etwork C omputing) とは、ネットワーク上の離れた場所にあるコンピュータを遠隔操作するための、いわゆるリモートデスクトップ用のソフトウェアです。詳細は Wikipedia に譲りますが、最初に Olivetti & Oracle Research Lab によって開発され、これがGPLライセンスで公開されたために、現在ではいろいろな派生ソフトウェアが存在する、という状況のようです。 私自身もあまり正しく理解できていなかったのですが、VNCというのはあくまでソフトウェアの名称であって、VNCが通信に使用しているプロトコルは RFBプロトコル( R emote F rame b uffer Protocol) という名称で、別途定義されているそうです。このRFBプロトコルは、RFB 3.8 というバージョンが2011年に RFC 6143 として標準化されており、仕様は誰でも閲覧することができます。 本記事のテーマは、このVNCの通信で送受信される以下のようなデータをうまく中継して、弊社製品である VM2M Data Visualizer のダッシュボード上で可視化してやろう、というものになります。 ディスプレイ画面の映像 マウス・キーボード操作 クリップボード情報 なぜこんなことをしようと思ったのか? 若干ふざけたタイトルなのでお遊び記事かと思いきや、一応このテーマにはきちんとした狙いがあり、実は技術調査のアウトプットだったりします。 弊社は、 VM2M Data Visualizer というリアルタイム描画が得意な可視化ダッシュボードツールを開発・提供しています。この VM2M Data Visualizer は、ノーコードで自由にダッシュボードを組み上げられる可視化ツールで、映像からセンサーデータまで様々なデータをひとつのダッシュボードにまとめて可視化することができます。 www.aptpod.co.jp VM2M Data Visualizer がありとあらゆるデータを一つのダッシュボードにまとめられるおかげで、ご利用いただいているお客様から 「あれもこれも、なんでもかんでも、 VM2M Data Visualizer のダッシュボードにまとめて扱いたい」 というご要望をいただくことがあります。 よくあるセンサーデータや映像・音声などであれば既存機能のみで十分対応が可能ですが、使い込んでいけばいくほどまとめたい対象も広がっていき、最終的には 「遠隔地に配置したPC上で動いているレガシーソフトウェアのUIも、まとめてダッシュボードで確認できないか」 という相談をいただいたりする機会も出てくるようになりました。 PCの画面をダッシュボードに表示するだけであれば、画面をキャプチャーして中継することもできますが、PCで稼働することを前提としたソフトウェアは、多くの場合マウスやキーボードによる操作を想定した作りになっており、画面を転送して表示しただけでは使い物になりません(可視化だけでなく、やはり操作を行いたくなります)。 もちろん、画面キャプチャーだけでなくマウスやキーボードの操作情報を中継する機能を自前実装することもできなくはありませんが、いろいろと実現方法を考えていくうちに、 「もはやこれはリモートデスクトップ用のプロトコルそのものなのではないか?」 という考えに思い至るようになりました。 リモートデスクトップ用のプロトコルであれば、Wiindows 向けの超有名どころである RDP や今回対象とした VNC など、すでに広く使われていて実績のあるものが存在します。そういった既存の技術を上手く活用すれば、わざわざ自前実装して車輪の再発明をする必要はありません。 これらをどうにか活用できないかと深堀って調べていくなかで、前述したとおりVNCで使用されているRFBプロトコルが標準化されていることを知り、さらにそのプロトコル仕様が極めてシンプルで簡単に扱えそうなことが分かってきたので、これを調査して製品化に向けた検討をしてみよう、と思い本記事の企画が生まれました。 本記事でつくるもののアーキテクチャー 今回の目的は、最終製品の実装ではなく、RFBプロトコルのフォーマットやシーケンスを理解することに限定し、なるべくライトに対応します。 検証作業を単純化するために、VNCサーバーの前段にTCPプロキシーを挟み込み、そのプロキシーでRFBプロトコルのやり取りをスニッフィングして intdash に引っ張り出すことで、VM2M Data Visualizer までデータを送り届けることにしました。 ※ intdash は VM2M Data Visualizer がリアルタイムデータの通信のために使用する、バックエンドの通信ミドルウェアです。 www.aptpod.co.jp ただしこれは最終的な構成ではなく、実際のプロダクトに機能を取り込むときには、もしかするとRFBプロトコルを intdash で中継できるようにするかもしれませんし、 VM2M Data Visualizer がVNCクライアントとして振る舞えるようにするかもしれません。 機能 案1)RFBプロトコルを intdash で中継し Data Visualizer が受信するパターン 機能 案2)Data Visualizer がそもそもVNCクライアントになるパターン 先にできたものを動画で紹介 Docker Compose でVNCサーバーにする Ubuntu Desktop 環境と、今回実装したプロキシーを立ち上げます。VNCサーバーには 以下の Docker イメージを使わせてもらいました。 github.com VNCサーバーとプロキシーが起動したら、プロキシーが待ち受けているポートに向かって、VNCクライアントでアクセスしてみます。今回の動画では VNC Viewer を使用しました。 プロキシーは、VNCクライアントからのデータをそのままVNCサーバーに中継するとともに、 VM2M Data Visualizer で可視化したい情報については intdash に分岐送信します。 プロキシーが送信しているデータを可視化するように VM2M Data Visualizer のダッシュボードを構成して再生をすると、みごとにVNCサーバーとVNCクライアントが送り合っている情報を VM2M Data Visualizer のダッシュボード上に表示することができました! youtu.be RFBプロトコルの仕様 前述の通り、RFBプロトコルは RFC 6143 として標準化されており、誰でも仕様を確認することができます。 datatracker.ietf.org ざっとプロトコルの流れを確認すると 7.1 Handshake Messages に記載されたハンドシェイクプロセスによって、サーバーとクライアントの間でコネクションが確立されます。その後、 7.3 Initialization Messages に記載された初期化メッセージによって、サーバークライアント間で相互に通知が必要な情報をやり取りします。その後は、基本的には一方通行の通信のみが行われ、その際に使用されるメッセージが 7.5 Client-to-Server Messages および 7.6 Server-to-Client Messages です。 唯一複雑なところといえば、 7.7 Encodings で規定されている、画面イメージを送信する際のエンコーディングの仕様ですが、後述するように最も単純な Raw Encoding に限定して使用すれば、 TRLE や ZRLE などの高度なエンコーディングの仕様は一旦理解を先送りすることができます。 ハンドシェイクプロセス ハンドシェイクは、主に プロトコルバージョン と セキュリティ方式 をサーバーとクライアントで確認しあいます。 プロトコルバージョン のハンドシェイク プロトコルバージョンのハンドシェイクは極めて単純で、 RFB xxx.yyy\n という12文字をサーバーから1回、クライアントから1回の1往復分送り合うだけです。今回はプロキシーを作るだけなので、12バイト分パススルーするだけでやり過ごします。 セキュリティ方式 のハンドシェイク プロトコルバージョンのハンドシェイクが完了したら、次はセキュリティ方式のハンドシェイクを行います。ここではまずサーバーがサポート可能なセキュリティタイプのリストを送信し、クライアントが実際に使用するセキュリティタイプを選択して返信します。 VNCがもともと暗号化を想定しない単純なプロトコルであったこともあり、RFCで規定されているセキュリティ方式は、「 None 」または「 VNC Authentication 」の2方式のみようです。今回の検証においては、「 None 」のみ使用すれば十分であったため、「 VNC Authentication 」の説明は割愛します。なお、弊社製品に取り込む際には、VNCのデータはセキュアな intdash のプロトコルに格納されて伝送されることになるため、VNCやRFBプロトコルのレベルで高度な認証や暗号化に対応していなくても全く問題ありません。 クライアントが送付したセキュリティタイプをサーバーが受け入れ可能な場合は、結果を応答してハンドシェイクは完了です。受け入れられない場合は、結果や理由を応答した後にサーバーからコネクションを切断します。 初期化プロセス 初期化プロセスでは、サーバーとクライアントが相互に通知すべき情報を、それぞれ1メッセージの送信で通知しあいます。 クライアントから送信されるメッセージを ClientInit 、サーバーから送信されるメッセージを ServerInit といいます。 クライアントからの初期化メッセージ( ClientInit ) ClientInit メッセージによって送信される情報は、 shared-flag のみです。本記事においてはあまり重要な情報ではないため、説明は割愛します。 ClientInit メッセージ サーバーからの初期化メッセージ( ServerInit ) ServerInit メッセージによって送信される情報は、以下のとおりです。 画面サイズ(framebuffer-width, framebuffer-height) 各ピクセルの表現方法(PIXEL_FORMAT) 画面の名称(デスクトップに関連付けられた名前) ServerInit メッセージ ここで通知された画面サイズが画面全体のサイズになります。 また、PIXEL_FORMATでは1ピクセルを何ビットで表すか、RGBは何ビットずつどの順で配置されているかなどの情報を含みます。細かな仕様はRFCに譲りますが、この情報により、サーバーから送られる画面データをどの様にパースしてイメージに再構成すればよいかを知ることができます。 PIXEL_FORMAT のフォーマット 初期化後の通信 クライアントからサーバーへ送るメッセージ クライアントからサーバーに送付するメッセージには、以下のものがあります。 SetEncodings SetPixelFormat KeyEvent PointerEvent ClientCutText FramebufferUpdateRequest SetEncodings を用いて、クライアントがサポート可能なエンコーディングを指定することで、サーバーに対して使用するエンコーディングを制限させることができます。 ちなみに今回の実装では、最も単純な Raw Encoding のみにエンコーディングを制限させるため、クライアントからの SetEncodings メッセージを中継する際に、クライアントに内緒でこっそり Raw Encoding 以外のエンコーディングをオミットする処理を入れています。プロトコルのシーケンスとしては SetEncodings に対してサーバーからの応答確認などもなくクライアントからメッセージを送りつけっぱなしにするだけなので、このような内緒の通信内容の書き換えもできててしまいます。 SetPixelFormat は、途中でピクセルの表現方法を変更するときに使用します。 KeyEvent 、 PointerEvent はその名の通り、キーボードの押下やマウスの移動を通知するメッセージです。こちらのメッセージをパースすることで、どのキーが押されたか、マウスがどこに動いたかをトラッキングできます。 KeyEvent メッセージ PointerEvent メッセージ ClientCutText はクライアント側のクリップボードの状態をサーバーに通知します。クライアント側がクリップボードに何かを載せたとき、それがサーバーに通知されてサーバー側で利用可能になります。 ClientCutText メッセージ FramebufferUpdateRequest は、画面キャプチャー画像を送信するよう要求するメッセージです。VNCでは基本的に、クライアントからの要求に応じて画面キャプチャー画像が返信されるプロトコルとなっています。ただし、サーバーは要求された範囲に対して画面の更新があるまで応答を先延ばしにすることができ、また、複数の要求に対して一つの応答を返せばよい、という決まりになっており、必要に応じて必要なだけ変更情報を送信できる効率のよい仕組みになっています。 サーバーからクライアントへ送るメッセージ サーバーからクライアントに送付するメッセージには、以下のものがあります。 SetColorMapEntries Bell FramebufferUpdate ServerCutText SetColorMapEntries および Bell については、本記事においては重要なメッセージではないため、説明を割愛します。 ServerCutText は、クライアント側の ClientCutText と同様に、サーバー側のクリップボードの状態をクライアントに通知します。 FramebufferUpdate は、サーバー側で画面に更新のあった特定範囲の画面キャプチャー画像を送信します。メッセージには複数の範囲のキャプチャー画像が含まれ rectangle と呼ばれます。rectangle は位置(x, y)、サイズ(w, h)とエンコーディングのタイプ、rectangle 内のキャプチャー画像データで構成されます。 画像データはエンコーディングタイプそれぞれ応じた形式で格納されますが、前述の通り今回はクライアントからの SetEncodings メッセージに細工をして Raw Encoding でしかデータが返ってこないようにしているため、 Raw Encoding として読み出せば事足ります。 FramebufferUpdate メッセージのヘッダー情報 各 rectangle のフォーマット RFBプロトコルからの画面キャプチャー画像の構築 Raw Encoding における画像のエンコーディング方式 Raw Encoding においては、1ピクセル分のデータサイズは ServerInit メッセージや SetPixelFormat メッセージによって送られる PIXEL_FORMAT によって決められます。 PIXEL_FORMAT のフォーマット(再掲) 今回使用したVNCサーバーでは、32ビット = 4バイトを使用することになっていました( bits-per-pixel = 32 )。ただし、実際にRGBデータが格納されるのは24ビット = 3バイトのみで、1バイト分はパディングのようです( depth = 24 )。また、RGBのアサインについても若干注意が必要で、RGBの順ではなくBGRの順番で並んでいるようでした( red-shift=16, green-shift=8, blue-shift=0 )。 このように1ピクセルあたり4バイトとして、これを画面の左から右へ横1行分並べて、さらにその各行分のバイト列を上から下の順番になるように結合したものが、イメージデータとなります。 RFBプロトコルにおける画面データの送信方式 RFBプロトコルにおける画面キャプチャー画像のデータは、前述の通り FramebufferUpdate メッセージに含まれる複数の rectangle によって送信されます。各 rectangle はその位置(x, y)、サイズ(w, h)とエンコーディングのタイプ、rectangle 内のキャプチャー画像データを持ちます。 このように、RFBプロトコルでは画面全体のキャプチャー画像を毎回送信するのではなく、画面全体のうちの更新のあった特定の範囲の画像のみをオンデマンドで送信するという仕様を採用することで、効率良く画面の更新を伝えるプロトコルになっているようです。 今回のように、画面全体のキャプチャー画像を構成しようとする場合には、各 rectangle 毎に画像を生成するだけでなく、それまでに受け取った rectangle を保持して、画面全体を表す画像に適宜パッチをあてていく処理を実装しなければなりません。 ちなみに、画面全体の大きさは、 ServerInit メッセージに格納された画面サイズで知ることが出来ます。 Pythonによる実際のコードの紹介 以上を踏まえると、受け取った rectangle を Python で画像化する処理は次のようになります。なお、以降のコードは PIL ライブラリの Image オブジェクトへの変換までの処理になります。 ServerInit メッセージを受け取ったときの処理 ServerInit メッセージによって画面全体のサイズ (framebuffer_width, framebuffer_height) が分かったら、画面キャプチャー全体を格納するための Image オブジェクト display_img を初期化します。 display_img = Image.new( "RGB" , (framebuffer_width, framebuffer_height)) FramebufferUpdate メッセージを受け取ったときの処理 rectangle から画像データ rect_data 、位置 (x, y) 、サイズ (w, h) を取得し、rectangle に相当する Image オブジェクト rect_img を生成したのちに、画面キャプチャー全体を格納している Image オブジェクト display_img の該当箇所に貼り付けます。 # イメージデータをNumPy配列化し、(h, w, 4) の3次元配列に整形する arr = np.frombuffer(rect_data, dtype=np.uint8).reshape(h, w, 4 ) # RGBの順序に気をつけて、Imageオブジェクトに変換する rect_img = Image.fromarray(arr[:, :, [ 2 , 1 , 0 ]]) # 全体画面に対して、更新箇所を貼り付ける display_img.paste(rect_img, (x, y)) あとは、 Image オブジェクトを JPEG にするなり PNG にするなり好きな方法で画像ファイル化すれば、画面キャプチャー画像をファイルとして取得出来ます。 intdash へのデータの転送 RFBプロトコルをパースして画面キャプチャーデータを取得できるようになったら、あとはそれを intdash に送信すれば VM2M Data Visualizer で受信してダッシュボードに表示できるようになります。今回はプロキシーを Python で実装したので、Python 用の intdash のクライアントライブラリ iscp-py を使用しました。iscp-py には、intdash とのリアルタイム接続のためのインターフェイスが一通り実装されており、intdashへのデータ送信プログラムを簡単に書くことができます。 docs.intdash.jp 今回は、この iscp-py を使って、プロキシー内の以下の処理ポイントに intdash へのデータ送信を加えてみました。 FramebufferUpdate の受信時(画面キャプチャー) KeyEvent の受信時(キーボード操作) PointerEvent の受信時(マウス操作) ClientCutText, ServerCutText の受信時(クリップボード) KeyEvent 、 PointerEvent メッセージはJSON形式のテキストで、 ClientCutText 、 ServerCutText メッセージはそのままの文字列で、 FramebufferUpdate メッセージは JPEG 形式の画像として送信することで、 VM2M Data Visualizer のダッシュボード上で、標準パーツを使用して可視化できるようになります。 今後の展望とあとがき 今回のお試し実装により、RFBプロトコルの概要が掴めました、RFCには高度なエンコーディングも記載されているものの、クライアントからの要求によりサーバーに単純なエンコーディングを強制することが可能であることも分かりました。また、一部のエンコーディングを除けば、全体として極めて単純なプロトコルであることが改めて確認できました。 ここまで単純明快なプロトコルであれば、 例えば VM2M Data Visualizer のパーツに、VNCクライアントとして振る舞うものを追加することもさほど難しくはなさそうです (もちろん、実際にプロダクト化を進めるにあたっては、 TRLE や ZRLE などのエンコーディングも必要に応じて調査をし、対応を検討します)。 様々なお客様とのプロジェクトを進めていくなかで、実はDX化を進める現場にも、まだまだレガシーなシステムがたくさん稼働していることは把握しています。弊社の VM2M Data Visualizer は、そういったレガシーシステムとも連携ができる柔軟なプロダクトになれれば良いなと、今回の調査以外にも日々色々と検討を進めています。 今後も、皆さまのご意見を取り入れつつ、より良いプロダクトを作っていきたいと考えておりますので、すでに利用してくださっている皆さま、これから使ってみたいとご興味を持っていただけた皆さま、ぜひ気軽にお声がけいただき、弊社のサービスやプロダクトに対するご意見をいただければ幸いです。 本記事で取り上げたレガシーシステムとのVNCによる連携につきましても、もし活用イメージをお持ちいただけましたら、ぜひご意見をお寄せください。お問い合わせはこちらです。 www.aptpod.co.jp 最後までお読みいただきありがとうございました。
アバター
aptpod Advent Calendar 2024 の12月9日の記事です。 本日は、テクニカルライターの篠崎が担当します。 当社では今年、ドキュメントサイトをリニューアルし、複数のプロダクトのドキュメントを1つのウェブサイトに統合しました。 これにより、読み手は全体を俯瞰しながら各製品の詳細に読み進めることができるようになりました。 また、私のような制作側にとっては、ページ間のリンクを張りやすくなりました。内容の重複も減らすことができ、更新作業もしやすくなりました。 統合されたドキュメントサイト 製品ドキュメントの作成には Sphinx を使っています。これまでプロダクトごとに別々だったSphinxデータは、これからは1つのプロジェクトとして管理するようにしました。 ただし、いままで別のプロジェクトだったものを1つにまとめるには、Sphinxデータのフォルダーをマージするだけでなく、いろいろな調整が必要でした。 例えば、複数のプロダクトを1つのドキュメントで扱うことになったため、ドキュメント内には複数のプロダクトのバージョン番号が登場するようになりました。 通常、Sphinxでは説明対象プロダクトのバージョンを設定ファイル内に書くことで、それを文中に展開して使用することができますが、複数のプロダクトを扱う場合には一工夫が必要でした。 ここではそれについて説明します。 ドキュメント内で展開できる変数(substitution) 外部へのリンクを作成するsphinx.ext.extlinks 説明対象プロダクトの情報をまとめて定義する まとめ 参考文献 ドキュメント内で展開できる変数(substitution) Sphinxでは、設定ファイル conf.py 内で version という変数を定義しておくことにより、本文のreStructuredText原稿で |version| と書いた部分にその値を展開することができます。 後日バージョンが変更になったときは、設定ファイル内の version 定義を変更するだけで文中に反映することができます。 conf.py version= '1.0.0' reStructuredText原稿 本サイトでは、MyApp v\ |version| について説明します。 (マークアップとしては |version| の前後にスペースが必要です。ただし MyApp v の後にバックスラッシュを入れ、その直後のスペースを無視するようにしています。) HTMLビルド結果 展開された |version| この |version| のような |...| を使った一種の変数展開はSphinxでは substitution と呼ばれています。 conf.py 内で使用される変数のうち、いくつかの変数はデフォルトで substitutionとして使用可能 になっています。 最初に書いたとおり、これからは複数のプロダクトを1つのドキュメントで扱うことになったので、「各プロダクトのバージョンを全部substitutionとして扱えるようにしたい」と思いました。そのために、この version とは別のsubstitutionを独自に設定することにしました。 substitutionは、reStructuredText上で以下のような書式で定義することができます。 reStructuredText原稿 .. |my-app1-version| replace:: 1.2.3 .. |my-app2-version| replace:: 4.5.6 これにより |my-app1-version| が 1.2.3 という文字列に、 |my-app2-version| が 4.5.6 という文字列に置換されます。 文中で以下のように書くと展開されます。 reStructuredText原稿 本サイトは以下のバージョンについて説明しています。 * MyApp1 v\ |my-app1-version| * MyApp2 v\ |my-app2-version| HTMLビルド結果 2つのバージョン番号を展開 ですが、 .. |my-app1-version| replace:: 1.2.3 のような定義はreStructuredTextファイルごとに書かなければなりません。複数のページからなるドキュメントでは大変です。 これを解決するのが、 conf.py の rst_prolog (または rst_epilog )です。 rst_prolog 変数の内容は、ビルド時にすべてのreStructuredTextファイルの先頭に付加されます。 そのため、 conf.py を以下のようにすればよいわけです。 conf.py rst_prolog= ''' .. |my-app1-version| replace:: 1.2.3 .. |my-app2-version| replace:: 4.5.6 ''' これで、ひとつひとつのreStructuredTextファイルで定義したのと同じ効果を得られます。 どのファイルでもこれらのsubstitutionを使えるようになりました。 外部へのリンクを作成するsphinx.ext.extlinks さらに弊社製品では、ドキュメントにはAPIリファレンスへのリンクが含まれます。 どうしても長いURLを原稿内に書くことになります(例えばこのような形式のURL: https://www.example.com/app1-reference/1.2.3/get-users )。 長いURLを繰り返し記載したいときに便利なのが、 sphinx.ext.extlinks です。 sphinx.ext.extlinksは機能拡張ですが、Sphinxに組み込まれているため新たにインストールする必要はありません。使用するには、 conf.py で extensions リストに入れるだけでOKです。 conf.py # すでにextensionsの定義がある場合は適宜修正してください extensions = [ 'sphinx.ext.extlinks' ] そのうえで、以下のように、リンクのテンプレートを定義します。 conf.py extlinks = { 'app1-api' : ( 'https://www.example.com/app1-reference/1.2.3/%s' , None ), } このようにすると、原稿内で以下のような表記ができるようになります。 reStructuredText原稿 :app1-api: `ユーザー一覧取得APIリファレンス <get-users>` HTMLをビルドすると以下のように展開されます。 HTMLビルド結果(一部省略) < a href = "https://www.example.com/app1-reference/1.2.3/get-users" > ユーザー一覧取得APIリファレンス </ a > sphinx.ext.extlinksによるリンク例 上のreStructuredText原稿で <> の中に入っていたURL断片 get-users が https://www.example.com/app1-reference/1.2.3/ の後ろ(extlinks定義の %s のところ)に展開されています。これにより、長いURLを毎回書く必要がなくなり、原稿がシンプルになります。 当社の場合、APIリファレンスはバージョンごとに用意しているので、リンクには特定のバージョン番号を含める必要があります。 説明対象プロダクトの情報をまとめて定義する バージョン文字列をsubstitutionsやextlinksなどの複数の形式で使うのであれば、重複のないように1か所で定義したくなります。 ここで「設定ファイル conf.py は実行されるPythonコードでもある」ということが活きてきます。以下のようなことができます。 conf.py # 2つの説明対象プロダクトの情報をまとめてdictionaryとして定義 app_versions = { 'my-app1' : { 'name' : 'MyApp1' , 'version' : '1.2.3' , }, 'my-app2' : { 'name' : 'MyApp2' , 'version' : '4.5.6' , }, } # substitutionを定義 # app_versionsをループして、 # `.. |my-app1| replace:: MyApp1 - v1.2.3` の形式でrst_prologを作成 rst_prolog= '' for app_id, app_info in app_versions.items(): rst_prolog += f '.. |{app_id}| replace:: {app_info["name"]} - v{app_info["version"]} \n ' # extlinksを定義 # app_versionsをループして、特定バージョン番号の入った外部リンクの短縮表記を定義する extensions =[ 'sphinx.ext.extlinks' ] extlinks = {} for app_id, app_info in app_versions.items(): extlinks[f '{app_id}-api' ] = (f 'https://www.example.com/{app_id}-reference/{app_info["version"]}/%s' , None ) ループの中が少し読みづらいですが、結果は以下のようになるはずです。 rst_prolog = ''' .. |my-app1| replace:: MyApp1 - v1.2.3 .. |my-app2| replace:: MyApp2 - v4.5.6 ''' extlinks = { 'my-app1-api' : ( 'https://www.example.com/my-app1-reference/1.2.3/%s' , None ), 'my-app2-api' : ( 'https://www.example.com/my-app2-reference/4.5.6/%s' , None ), } これにより、以下のような原稿を書くことができるようになります。 reStructuredText原稿 本サイトは以下のバージョンについて説明しています。 * |my-app1| * |my-app2| 以下のAPIを使用できます: :my-app1-api: `ユーザー一覧取得APIリファレンス <get-users>` :my-app2-api: `プロジェクト一覧取得APIリファレンス <get-projects>` HTMLビルド結果 2つのプロダクトのバージョンと、リファレンスへのリンク ループを使ってsubstitutionと外部リンクをいっぺんに定義することができました。 これで、説明対象のプロダクトが増えても conf.py の app_versions に追加していくことで管理できます。 まとめ Sphinxの設定ファイル conf.py は、これ自体がPythonで書かれたプログラムなので、設定ファイル内でさまざまな操作をプログラム的に行うことができます。この柔軟性は大変便利です。 当社では日ごろ多くのドキュメントをSphinxで作成していますが、この柔軟性にいつも助けられています。 参考文献 用語を定義したい — Python製ドキュメンテーションビルダー、Sphinxの日本ユーザ会 sphinx.ext.extlinks -- 外部リンクを短縮表記させるマークアップ — Sphinx documentation
アバター
aptpod Advent Calendar 2024 12月6日の記事を担当します、人事の照井です。 アプトポッドでは昨年春頃から、従業員の心身の健康維持/促進・生産性向上を目指し健康経営の取り組みを始めました。 今回はその取り組みの内容をご紹介したいと思います。 健康経営とは 健康経営が注目される社会的背景 健康経営を始めたきっかけと健康課題 健康経営の取り組み内容 健康経営に取り組んだ結果 まとめ 健康経営とは 健康経営とは、従業員の健康を企業の重要な資産と捉え健康管理や職場環境の改善に積極的に取り組むことで、生産性向上や企業価値の向上を目指す経営戦略です。 健康経営が注目される社会的背景 <1>少子高齢化と労働力人口の減少 日本の労働力人口は2030年までに約10%減少すると予測されています(総務省/労働力調査)。 労働力が減少する中、社員の健康を維持し生産性を向上させることが企業存続の鍵となっています。 <2>健康問題の拡大 ・メンタル不調やストレスの増加:  厚生労働省の「令和2年労働安全衛生調査」では、労働者の58%がストレスを感じていると回答。 ・プレゼンティーイズム(※)による損失:  日本生産性本部の調査によれば、プレゼンティーイズムによる損失額は欠勤の約2~3倍で、従業員1人あたり年間約50万円と試算されています。  ※出勤しているが生産性が低い状態 <3>政府の健康経営推進 経済産業省は、企業の健康経営の取り組みを評価する「健康経営優良法人認定制度」を導入しました。 健康経営優良法人に選定された企業は、財務面での成果や株価の上昇が見られ、社会的評価が高まるとともに投資家からの関心も集めやすくなっています。 健康経営を始めたきっかけと健康課題 アプトポッドはコロナ禍以降もフルリモートを続けているため、「運動不足」「体重増加」で悩む声が多々ありました。 一部、スタンディングデスクを取り入れる社員、ウォーキングマシーンに乗りながら ミーティングに参加する社員、それぞれ工夫して健康を維持している社員もいますが、普段から運動習慣がないとなかなか腰が上がらない社員が大多数です。 全社に情報を発信し続けることで小さなことから意識するきっかけになり、社員がより健やかに日々を過ごせたらと思い、取り組みを始めました。 健康経営の取り組み内容 健康経営に取り組むにあたり、まずは健康保険組合が主体で行っている 健康優良企業認定を受けることを目標 に開始しました。 これは、企業が従業員の健康づくりに積極的に取り組んでいることを評価・認定する制度です。 <1>健康経営を検討する場 全社員の健康管理に取り組むには人事だけが啓発するのではなく、現場の社員にも協力してもらう必要があると考えました。 そこで、月に1度行われる「衛生委員会」の機会を利用し、各本部から選出されたメンバーにも協力を仰ぎました。 <2>健康についての情報を展開 事前に人事で各月のテーマを決め、年始の衛生委員会にてメンバー内で担当振りをしました。 担当がテーマに沿った資料を作成し、衛生委員会で発表してもらいます。 その内容についてメンバーで少し話したり産業医からコメントをもらい、委員会終了後にその資料を全社展開しています。 下記、2024年に実施した衛生委員会でのテーマです。 テーマは季節に合わせて花粉や熱中症対策等、タイムリーな情報を送ることでより身近に、より関心を持てるようにしています。 展開資料一部抜粋 <3>その他安全衛生に関わる事項の展開 衛生委員会でのテーマ発表の内容展開の他に、人事からも毎月全社に情報展開しています。 この内容も基本的には季節に合わせた内容にしていますが、リモートワークでの就業のためどうしてもオフラインでのコミュニケーションより雑談の機会が少なくなることも考慮し「メンタル・ハラスメント相談窓口」の案内を半期に1度展開しています。 <4>社内のウォーキングイベントを年2回実施 健康保険組合のウォーキングイベントに便乗し、社内でも全社員を対象に年2回行っています。 社員の参加率は毎回9割以上で、このウォーキングイベントをきっかけに体質改善に挑んだ社員もいます。 <5>人事との距離を縮める全社員面談の実施 今年から人事と社員が半期に1度、1時間程度面談する機会をつくりました。 開始当初の目的は健康経営ではなかったものの、リモートワークでのコミュニケーション課題もあり、人事と直接対話することで人事が問題をキャッチアップし、直接上層部へ働きかけ、課題解決に向けて迅速に動くことができています。 普段の上長との1on1とはまた違うメンタル管理等、多角的なケアの取り組みです。 健康経営に取り組んだ結果 これらの活動を健康保険組合に提出し、見事「銀の認定」をいただくことができました。 ※令和5年認定、令和6年認定更新済 まとめ 今後も銀の認定をいただけるよう、継続的に健康経営には取り組んで行こうと思っています。 心身の健康は、個人がどれだけ「気づき」「実行」するかで大きな差が出てきます。 社員が心身共に健やかに、気持ちよく業務を行うことができるよう、今後も社員に寄り添った取り組みを目指していきます。
アバター
製品開発グループの大久保です。 aptpod Advent Calendar 2024 の12月5日の記事を担当します。 社内では、エッジ側でintdashに接続可能なデバイスを簡単に開発するためデバイス開発キットの基盤製品を開発しており、それを Device Connector と呼んでいます。これは主にRustによって開発しています。 Rustは高速なコンパイル型言語であり、デバイスとの通信、制御に適しています。しかしながら、データのフィルタリングなどの細かい挙動を、再ビルドを必要とせず利用者が多くて学習コストの小さいPythonで記述したいというニーズもあります。そのような、アプリケーションの基本部分はRustで実装しながら、状況に応じて制御を変えたい部分はPythonで記述したいという場合、RustからPythonを呼び出すことになります。それを今回は PyO3 で試してみたいと思います。 PyO3とは RustからPythonを呼び出す 実行結果 マルチスレッドの場合 まとめ PyO3とは PyO3 は、PythonをRustを利用するためのバインディングを提供します。RustからPythonを呼び出すだけでなく、Pythonから利用されるモジュールをRustで記述することができます。 大体の使い方は以下のユーザーガイドを見れば分かると思います。 https://pyo3.rs/main/getting-started.html RustからPythonを呼び出す 実際にRustからPythonを呼び出してみましょう。実行時には libpython3.x.so の機能を呼び出すので、例えばUbuntuなら以下のように依存ライブラリをインストールする必要があります。 sudo apt install python3-dev Cargo.tomlに以下の依存関係を記述します。 [dependencies] pyo3 = { version = "0.22.6", features = ["gil-refs"] } main.rs は以下のように記述します。 #[pyclass(name = "Greeting" )] #[derive( Clone , PartialEq , Eq , Debug )] pub struct PyGreeting { text: String , } #[pymethods] impl PyGreeting { #[getter] fn get_text ( & self ) -> &str { & self .text } } #[pyclass(name = "Response" )] #[derive( Clone , PartialEq , Eq , Debug )] pub struct PyResponse { text: String , } #[pymethods] impl PyResponse { #[new] fn new (text: String ) -> Self { Self { text } } } #[pymodule] fn mymodule (module: & Bound < '_ , PyModule > ) -> PyResult < () > { module. add_class :: < PyGreeting > () ? ; module. add_class :: < PyResponse > () ? ; Ok (()) } fn main () { pyo3 :: append_to_inittab! (mymodule); pyo3 :: prepare_freethreaded_python (); let script = r#" import mymodule def greeting(g): if g.text == "hello": res = "こんにちは" else: res = "なんですか?" return mymodule.Response(res) "# ; println! ( "{:?}" , run_script ( script, PyGreeting { text: "hello" . into () } ) ); println! ( "{:?}" , run_script ( script, PyGreeting { text: "hoge" . into () } ) ); } fn run_script (script: &str , greeting: PyGreeting) -> PyResponse { let result: PyResult < PyResponse > = Python :: with_gil ( | py | { let pyfunc: Py < PyAny > = PyModule :: from_code_bound (py, & script, "" , "" ) ? . getattr ( "greeting" ) ? . into (); let result = pyfunc. call1 (py, (greeting,)); let response = match result { Ok (response) => response, Err (e) => { eprintln! ( "{}" , e); if let Some (e) = e. traceback_bound (py) { if let Ok (e) = e. into_gil_ref (). format () { eprintln! ( "{}" , e); } } panic! (); } }; let response: PyResponse = response. extract (py) ? ; Ok (response) }); result. unwrap () } 以下細かく解説します。 #[pyclass(name = "Greeting" )] #[derive( Clone , PartialEq , Eq , Debug )] pub struct PyGreeting { text: String , } #[pymethods] impl PyGreeting { #[getter] // getterとして定義 fn get_text ( & self ) -> &str { & self .text } } #[pyclass(name = "Response" )] #[derive( Clone , PartialEq , Eq , Debug )] pub struct PyResponse { text: String , } #[pymethods] impl PyResponse { #[new] // コンストラクタ fn new (text: String ) -> Self { Self { text } } } #[pymodule] fn mymodule (module: & Bound < '_ , PyModule > ) -> PyResult < () > { module. add_class :: < PyGreeting > () ? ; module. add_class :: < PyResponse > () ? ; Ok (()) } このあたりはPythonスクリプトとRust両方から使用できる型を定義し、モジュールにまとめている箇所です。 #[pyclass(name = "...")] 属性で型を定義しつつ、Pythonから違う名前で見えるようリネームしています。 #[pymethods] でコンストラクタ等のメソッドも定義しており、これらもPythonから呼び出せるようになっています。 pyo3 :: append_to_inittab! (mymodule); pyo3 :: prepare_freethreaded_python (); Pythonの初期化を行います。 append_to_inittab!() で自作モジュールが読み込まれるように登録し、 prepare_freethreaded_python() で初期化します。なお、pyo3 に auto-initialize という feature を指定していれば prepare_freethreaded_python() は不要になります。 let script = r#" import mymodule def greeting(g): if g.text == "hello": res = "こんにちは" else: res = "なんですか?" return mymodule.Response(res) "# ; 実行するPythonスクリプトを定義します。 greeting 関数は Greeting 型の引数を受け取り、 Response 型を返します。この例では上記で用意したカスタムの型を受け渡ししていますが、文字列のようなプリミティブな型もやり取りできます。 println! ( "{:?}" , run_script ( script, PyGreeting { text: "hello" . into () } ) ); println! ( "{:?}" , run_script ( script, PyGreeting { text: "hoge" . into () } ) ); スクリプトを実行し、その結果を表示します。 fn run_script (script: &str , greeting: PyGreeting) -> PyResponse { let result: PyResult < PyResponse > = Python :: with_gil ( | py | { let pyfunc: Py < PyAny > = PyModule :: from_code_bound (py, & script, "" , "" ) ? . getattr ( "greeting" ) ? . into (); スクリプトを実際に実行します。Pythonスクリプトの実行は、 Python::with_gil に渡した関数の中で行います。 PyModule::from_code_bound でPythonのソースコードを読み込み、その中の greeting 関数を取得します。 let result = pyfunc. call1 (py, (greeting,)); let response = match result { Ok (response) => response, Err (e) => { eprintln! ( "{}" , e); if let Some (e) = e. traceback_bound (py) { if let Ok (e) = e. into_gil_ref (). format () { eprintln! ( "{}" , e); // スタックトレースの表示 } } panic! (); } }; let response: PyResponse = response. extract (py) ? ; Ok (response) greeting 関数を実行し、その結果を引き出します。Python関数内でエラーが起きた場合スタックトレースの情報が含まれていれば、 e.traceback_bound(py) で取り出して表示します。最後に extract() で返り値を PyResponse に変換して結果を返します。 実行結果 実行してみると以下のようになります。 PyResponse { text: "こんにちは" } PyResponse { text: "なんですか?" } Pythonスクリプト内に記述した分岐の通りの返答を返してくれています。 マルチスレッドの場合 上記の例では関係ないのですが、実際にPyO3を利用して実装した時に苦労した点として、 with_gil に渡した関数を実行できるのは1スレッドという制限があげられます。言い換えればPythonスクリプトを実行できるのは常に1つのスレッドのみです。これはPythonのGILという仕組みによるもののようです。 allow_threads を使えば、一時的にGILを解放できるので、Rustのコード中でブロックするような処理があるところはこれを追加して解決しました。Pythonソースコード内にブロックするところがあると処理が進まなくなる可能性は出るので、完全な解決ではありませんが。 まとめ 今回はRust製アプリケーションにPythonを組み込む方法を紹介しました。ビルド不要で柔軟に挙動を変えたいところ、たとえばデータのフィルタリング処理やゲームのスクリプト等があり、メインの部分はRustで書いているアプリケーションがあるなら、今回の手法は役立つと思います。
アバター
aptpod Advent Calendar 2024 12月4日の記事です。 デザインチームの上野です。 近年、デジタルツインの普及に伴い、建設機械や製造業でROSを用いて3Dモデルを扱う事例が増加しています。 そこで今回はROSで用いられる3DモデルのURDFについて0からBlenderを使って作成してみた流れをまとめたいと思います。 URDFとは Phobosとは 3Dモデルを作成しエクスポートしてみる アドオンのインストール モデリング ジョイントの設定 コリジョンの設定 慣性の設定 モーターの追加 マテリアルについて エクスポート モデルの確認方法 URDFとは URDF (Unified Robot Description Format)とは、ロボットの構造を記述するためのXMLのフォーマットです。 ロボットの各パーツの座標や向き、質量などが全てを数値化して記述されています。 Phobosとは Phobosは、Blender向けのオープンソースアドオンで、ROS用ロボットモデルをエクスポートする機能を提供します。URDFに加えて、SDFやSMURFといった形式でのエクスポートも可能です。 3Dモデルを作成しエクスポートしてみる 今回は手順の概要を理解するため、以下のような簡易モデルを作成し、それに設定を加えていきます。 それでは早速進めていきます。 アドオンのインストール 1.ダウンロード 以下のリンクからZipファイルをダウンロードしてください。 github.com 2.アドオンのインポート Blenderのプリファレンスからアドオンをインストールします。 インストールを押し先ほどDLしたZIPファイルを選択。 Phobosのアドオンが表示されるのでチェックをいれる。 この後一度Blenderを再起動するとよいです。 モデリング アドオンがインポートできたのでここからモデリングを進めていきたいと思います。 モデリング時に各パーツの中心座標の位置にジョイントができるのであらかじめ向きを意識して作成しておくと良いと思います。 モデリングが完成したら各メッシュのPhobos Typeを「Visual」に設定します。 メッシュを選択→プロパティの[Phobos Type] →[Visual]に変更 ジョイントの設定 モデルが用意できたらそれぞれのパーツにジョイントを設置していきます。 メッシュ選択後「kinematics」>「Create Link(s)」を押します。 オプションから位置を「Selected Object」にすることでパーツの原点にジョイントが自動的に移動します。 また「Parent Objects」にチェックを入れることでオブジェクトが自動的にジョイントの子供になります。 各パーツジョイントを設置し終わったら階層を整理します。 対象2つ選択後、Ctrl+Pを押すことでペアレントメニューが表示されるので「ボーン相対」を選択しどんどん親子関係を整理していきましょう。 以下のように親子関係を整理しました。 今度は各ジョイントの設定をしていきます。 ジョイントを選択後「Define Joint(s)」を押すことでジョイントの形状が変化し、ロボットの細かい制御の値などが編集できるようになります。 例えばロボットの首の部分に旋回用のJoint Typeを設定し、Lower/Upperの値を変更することで左右首を振り切らないようにできます。 コリジョンの設定 衝突判定用のコリジョンを設定します。 メッシュを選択した後「Create Collision Object(s)」を押すことで衝突反転用のメッシュが追加されます。オプションから形状を変化したり、個別でスケールなどの微調整を行えます。 慣性の設定 メッシュを選択した後「Create Inertials」を押すことで慣性の値を設定することができます。 モーターの追加 稼働させるジョイントを選択して「Add Motor」を押すと設定するモーターの詳細が設定できるようになります。 マテリアルについて マテリアルはスペキュラーBSDFまたはプリンシパルBSDFの使用を推奨しています。 テクスチャがある場合出力をBSDFノードのベースカラーにイメージテクスチャノードを追加する必要があります(ノーマルマップも同様)。 エクスポートされた形式 (URDF、SDF、SMURF など) でサポートされているプロパティと、それをロードするプログラムによって異なる場合があるので結果は確認してください。 エクスポート 赤枠の部分で出力の設定をした後Export Modelを押すと指定したパスに自動的に出力されます。 モデルの確認方法 以下のURLにアクセスし、URDFフォルダをドラッグ&ドロップすると、Webブラウザ上でもモデルを確認できます。 gkjohnson.github.io 以上でBlenderを用いたURDFの作成してみたになります。 結構複雑そうな処理を簡単にモデルに実装できるので是非試してみてください。 まだまだ制御できることはたくさんありそうなので引き続き研究していきたいと思います。
アバター
aptpod Advent Calendar 2024 12月3日の記事です。 ソリューションアーキテクトの奥山です。痒いところに手が届くそんな弊社製品の機能をご紹介します。 はじめに 産業IoTの分野では、膨大な計測データを効率的に管理・活用することが重要です。特に、自動車業界のCAEエンジニアやAD/ADASエンジニアにとって、計測データの解析や設計評価に役立てることは、業務効率化や技術革新の鍵となります。しかし、以下のような課題がデータ活用の障壁となっています。 独自フォーマットのデータ形式 が解析や外部ツール連携を難しくしている。 REST APIを利用した独自開発 に多くの工数がかかる。 業界標準形式への対応不足 により、データの再利用や解析が限定されている。 これらの課題を解決するために、aptpodでは計測データを効率的に業界標準形式へ変換する新しいミドルウェア「Meas Converter」を開発しました。 Meas Converter は、特に自動車業界で求められるMDF(Measurement Data Format)形式へのデータ変換をスムーズに行うツールです。この機能により、専門的な知識や追加開発を必要とせず、データ変換をシンプルかつ効率的に実現できます。 本記事では、Meas Converterの概要、特徴、そして操作方法を詳しく解説します。 はじめに Meas Converterとは? 主な特長 なぜMeas Converterが必要なのか? 主な課題 Meas Converterによる解決策 Meas Converterの主な特徴 1. 直感的で簡単な操作 2. 高度な変換機能 3. 将来を見据えた対応範囲 実際の操作説明 1. データを選択する 2. 変換設定を行う 3. 変換を実行する 4. 変換結果をダウンロードする まとめ 注意事項 Meas Converterとは? Meas Converter は、産業IoT分野の計測データを効率的に変換するために設計されたツールです。特に、自動車業界で広く使用される計測データを、業界標準の MDF(Measurement Data Format) に変換することで、データ活用の可能性を大きく広げます。 主な特長 対応データタイプ : CANデータ (車両内通信プロトコルで収集した情報) NMEAデータ (GPSなどの位置情報) アナログ信号データ (センサーから取得した値) 映像データ(テスト走行時の記録、音声対応も予定) 出力形式 : MDF(Version 4) AVI (映像データ) 出力方法 : 複数のデータタイプを変換後、ZIP形式で一括ダウンロード。 「データの価値を最大化し、解析や設計プロセスをシンプルにする、それがMeas Converterの使命です。」 なぜMeas Converterが必要なのか? 膨大な計測データを活用する際、特に以下のような課題に直面することが多いです。 主な課題 独自形式の制約 intdash独自形式のデータは解析や他ツールへの連携が難しく、専用知識や追加開発が必要でした。 業界標準形式への対応不足 MDF形式への変換は外部ツールの導入や複雑な開発が求められるため、業務負担が増していました。 運用の複雑さ REST APIベースのデータ取得や変換は手間がかかり、使い勝手が良いとは言えませんでした。 Meas Converterによる解決策 学習コストの削減 GUIを活用し、専門知識がなくても簡単にデータ変換が可能。 MDF形式への迅速な対応 自動車業界標準のMDF形式へ即座に変換。CAEやAD/ADASのエンジニアが活用しやすい環境を提供。 操作の簡略化 GUIでのワンクリック操作でデータ変換を実現。REST APIに頼らず、直感的な操作が可能。 Meas Converterの主な特徴 Meas Converterの設計理念は、「簡単操作で高い成果」です。その特徴を以下にまとめます。 1. 直感的で簡単な操作 シンプルなデータ変換フロー : データ検索 → 設定 → 出力 → ダウンロードの4ステップで作業完了。 GUIでプログラミングなしでも簡単に操作可能。 2. 高度な変換機能 自動変換機能 : 計測データと変換定義(CANでいうDBCファイルのような設定)を自動で紐付け、手間を軽減。 インポート可能な変換定義 : 変換定義をファイルからインポートして設定可能。 MDF形式への対応 : CAN、NMEA、アナログデータを業界標準形式に変換。 3. 将来を見据えた対応範囲 現在のMDF形式やAVI形式に加え、将来的にJSONやCSV形式への対応を予定。 実際の操作説明 Meas Converter を利用することで、計測データの変換が簡単な操作で完了します。このセクションでは、操作手順をご紹介します。 1. データを選択する UUIDを使って選択または、 Meas Hub(intdashの計測データの管理画面) から計測データを選択することができます。 UUIDからデータを選択 MeasHubからデータを選択 2. 変換設定を行う 選択後、変換結果名や出力範囲を設定することができます。出力するデータは過去の変換結果や新しく設定することができます。 選択した後の画面 出力データを選択します。 出力データをリストから選択する 選択後の出力対象データ 3. 変換を実行する 変換ボタン をクリックするだけで、変換プロセスが開始されます。 進行状況や結果は変換結果一覧にリアルタイムで表示されます。 変換結果 4. 変換結果をダウンロードする 変換結果一覧 から、変換済みファイルを確認。 青い「ダウンロード」ボタンをクリックして、ZIP形式で一括ダウンロードできます。 ダウンロード結果 まとめ Meas Converterは、自動車業界を中心としたエンジニアが直面する課題を解決し、業界標準形式でのデータ活用を加速します。直感的な操作と高度な変換機能により、計測データの価値を最大限に引き出します。 「データ変換をもっとシンプルに。Meas Converterでデータ活用の新しい未来へ。」 注意事項 本記事でご紹介した「Meas Converter」は先行リリース版です。 現段階では、試供版の提供は行っておりませんが、以下の内容をご理解いただけると幸いです。 現在の状態: 本記事に記載された機能や特長は、先行リリース版の仕様に基づいています。正式版リリースに向けてさらなる改良を行う予定です。 ご意見・ご感想のお願い: 本記事をご覧いただき、機能へのご意見や改善のご提案がございましたら、ぜひお知らせください。いただいたフィードバックを元に、より良い製品開発を目指します。 今後の展開: 先行リリース版で得た知見を活かし、正式リリースではさらに幅広いニーズに応える機能を追加予定です。
アバター
aptpod Advent Calendar 2024 12月2日の記事です。 みなさまお久しぶりです。アプトポッドで営業企画をしている神前(こうさき)と申します。 またもや 前回の登場 から1年です。4年連続でAdvent Calendarのトップバッターをやっているので、もはや風物詩ですね。 さて、昨年はSalesforceの導入について書いたのですが、今年は運用を1年回してみての感想と、その中での気付きについて書いていきたいと思います。 Salesforceに限らずになにかのシステムを導入する際に、導入するまでも大変だったりしますが、本番はその運用です。いざ導入したけども現場の人は使ってくれなければ宝の持ち腐れです。 導入してみたけどどうも現場に定着しない、使ってもらえてはいるけどもどうも深く使ってくれてはいない、そんな悩みを持っている人に届くと嬉しいです(想定読者が狭い)。 今年やったこと 実運用にあわせた改善 他ツールと連携したSalesforce周辺の改善 閑話休題 SFAはどのように構築、運用していくべきなのか よくある光景 SFAはどう寄与できるか 自分たちにとってのキーはなにか チームで立ち向かう 3者のコミット まとめ 今年やったこと この1年間でやったことは、大きく2つにわけることができます。 実運用にあわせた改善 他ツールと連携したSalesforce周辺の改善 それぞれもう少し詳細をみていきます。 実運用にあわせた改善 昨年の10月から本番運用を開始し、しばらくは実際に現場で使ってもらったフィードバックを元に改善をすすめていました。といっても、劇的に使い勝手が変わるような改善をするのではなく、むしろ項目やフローの追加、整理といった、小さいものを中心にとにかく数多くこなしていきました。 運用開始当初は「操作方法がわからない」、「エラーがでてやりたいことができない」といった問い合わせも多く、その都度対応していましたが、現在ではそういった問い合わせも月に一度あるかないかといったレベルまで使い慣れてもらったと思います。 来た問い合わせに対応するのはもちろんのことですが、自分から課題を拾いに行くことも意識していました。打ち合わせの合間に雑談として操作感をヒアリングしたり、困り事がないか確認するのもそうですが、実際に入力された案件(デフォルトだと「商談」ですが弊社では「案件」と名称を変更しています)の内容を日々眺めることで、使われている項目、使われていない項目はわかりますし、操作に苦慮している部分もなんとなくわかってきます。そこから、自分で実際に操作をしてみて、手間がかかる部分や使いにくい部分、あるいは、あったら便利なフロー等を推察し、時にはアイデアを現場にあてながら改善をすすめました。 運用から半年ほどは主に、直接Salesforceを触る現場の営業メンバーを中心に、言い換えると、「対社内」向けの改善を重点的にすすめてきた形になります。 ちなみに、昨年のブログで紹介をしたフローもリファクタリングを実施し、コンパクトに、かつ、シンプルにすることができました。 他ツールと連携したSalesforce周辺の改善 それに比べ、直近の半年ほどは、「対社外」を意識した改善をすすめてきました。 実際の操作感をよくする、ちょっとしたことをフローを使って自動化して便利にする、というのがだいぶ落ち着いたので、自社で使ってる他のツールとの連携を、フローをうまく活用しながら構築したり、エンドユーザーに送るメールを自動化させたりすることをすすめてきました。 一つの例として、 Zendesk との連携を強化があります。 Zendeskとは、クラウドベースのカスタマーサポートツールなのですが、およそ1年半くらい前に導入をしています。当時はSalesforceの導入が決まり、カスタマーサポートもSalesforceを利用するか検討したのですが、シンプルなツールとしての使いやすさからZendeskになった、という経緯があります(なお、カスタマーサポートの担当が弊社では別にいたため、このあたりの経緯については私自身は直接関わっていたわけではありません)。 現在ではお問い合わせのほとんどがZendeskを経由してくるようになるまで利用されているのですが、改善前まではSalesforceとの連携面が弱く、だれが問い合わせをしてきているかはもちろんわかるものの、その問い合わせがどの案件についての問い合わせかがすぐにわからない等の課題がありました。 そこで、まずは案件に自動連番でユニークな「案件番号」というものを案件作成時に生成し、案件内の項目である「納品日」になると、案件に紐づいたエンドユーザーの担当者さんにメールでZendeskの案内と、お問い合わせの際には案件番号をフォームに入力してもらうように依頼するように自動化しました。 これにより、もれなくエンドユーザーさんに対して問い合わせの窓口であるZendeskの案内ができ、かつ、問い合わせがあった際にも、どの案件についての問い合わせかがすぐにわかるようにできました。 これはあくまで一例ですが、ZendeskとSalesforceの連携強化について他にいくつか実施しています。 Zendesk以外にも、一部のパートナーさん向けのポータルサイトの構築と公開や、エンドユーザーさん向けのポータルサイトの構築をすすめています。 前半(実運用にあわせた改善)に比べると、改善の影響範囲が社内から社外へ広がり、関わるツールもSalesforce以外のものまで含めてだいぶ広くなったと思います。 閑話休題 話は変わるのですが、昨年のAdvent Calendarで書いた記事がSalesforce社の人の目にとまったようで、今年の夏頃にCustomer Evangelistのバッジをいただけました。 袋にいれてとっておいてあるバッジ 自身としては、構築や運用を四苦八苦しながらやってきたものをブログで書いただけなので、特段エヴァンジェリストとしての何かを意識して日々活動をしていたわけではないのですが、なんと今年の10月にこのバッジをいただいたことがこ゚縁となりSalesforce社でのイベントに登壇させていただくこととなりました(貴重な場をいただきありがとうございました)。 Salesforce社が主催で、Salesforce社のオフィスで開催するにも関わらず公式のイベントではない(と当日に話されてました)という摩訶不思議な立ち位置のイベントではありましたが、自身にとってもよい機会となりました。 当日は導入から3年未満くらいの立ち上げ初期のユーザーさんを対象に、私自身が実際に苦しんだところや、導入をしてから現場に使ってもらうためにやってきたことをお話したのですが、その際にでてきた質問や、参加されていた方の状況を聞くことで、色々な気付きがありました。 SFAはどのように構築、運用していくべきなのか ※以下に記載することは神前個人の所感です。また、以下に記載することが弊社で実現できているというわけではないことをあらかじめ記載しておきます。 この1年半強の構築と運用、そして、先日のイベント登壇を経て、改めてSFA(=Salesforce)を導入する理由、目指すべきゴールはどこなのかを振り返ってみました。 導入をしたそもそもの経緯の詳細は 昨年のブログ記事 をご覧いただくとして、大きく言えば、ERPや勤怠システムといった他のツールとの連携による全体最適化が導入の目的だったと言えます。これまで連携をしておらず、バラバラだった各ツールをSalesforceを中心にリプレイスしていくことで、データの二重入力や二重管理を防ぐ、つまり、一つ一つは小さいけど、積み重なるととてつもない無駄になる作業を排除していくことが目的でした。 実際に、導入から運用を開始してから半年ぐらいはそのように動いていました。ですが、それもある程度整ってくると、もちろん日々状況は変化するので要望がつきることはないものの、だいぶ落ち着いてきます。 そこからは方向転換をして、これまで手が回ってなかったこと、他のツールとの連携を活用していくことになるわけですが、とはいえ、どういう方向で動いていくべきかについては様々な選択肢があります。マイナスをゼロにしていく作業はわかりやすいのですが、ゼロをどうプラスにしていくのか、どこをプラスにしていくのかは色々考えられるからです。 先に、どういう方向性で動いていくべきかについて書いておくと、 「SFAは、各ユーザー(経営層、マネジメント層、プレイヤー層)の意思決定の速度向上を最大化する方向性で構築、運用するべき」 と現段階では考えています。 よくある光景 なぜそのように考えるに至ったかをもう少し掘り下げていきます。 SFAを導入しているのであれば、案件(商談)やリードの管理、目標に対しての現在の進捗といったものはある程度可視化できていると思います。 弊社でもそうですが、おおよその営業組織は、週一などの頻度で定例を開き、Q単位での進捗や各案件の進捗等を共有することと思います。そうした中で、例えば「残り◯◯日で今Qの目標に対して◯◯円足りない」みたいな場面はよくあることと思います(あるいは、今Qはいいけど、翌Qのパイプラインが足りない、年間での目標達成があやしそうだ、等)。 そうした時に、以前も弊社ではよくあったのですが、「とりあえずいまの状況をレポートでまとめて現状を詳細に把握しよう。それを次回の定例で確認して対応策を考えよう」となりがちではないでしょうか。レポートをまとめるのは私のように事業部にいる非営業メンバーだったり、あるいは経営企画のような立ち位置の人が担当すると思いますが、翌週になってそのレポートをみんなで眺め、あれこれ対策を考えるものの、場合によっては「では、次回にこのレポートの情報をもとに、それぞれ対策を考えてきて発表しよう」、さらにその翌週に「では来週に今回でたアイデアの中からどれを実施するか決めよう」みたいなことになります。実際にはこんなに悠長に時間を使うことはないと思いますが、とはいえ、「目標達成があやしいぞ」となってから「どのように動くべきか」が決まるまでに1週間、2週間費やしてしまうこともままあることだと思います。このように時間を使ってしまえば、方針が決まってから動いてももうリカバリーができなかったり、当然時間が経過しているので、今Qだけでなく、翌Qにすら影響がでることも考えられます。 私見ではありますが、営業の成績は、その人、あるいはその組織がとったアクション量に比例すると考えています。上記のような例ですと、方針が定まらない限りメンバークラスはアクションを取りづらく、通常の営業活動にプラスアルファでなんらかのアクションをとらないといけないにも関わらず、それをできずに時間を消費してしまうわけです。 私自身他社さんの事例を多く知っているわけではないですが、しかし、こうした光景はわりとよくある光景なのではないかと考えています。 SFAはどう寄与できるか 案件(商談)は管理できている、現状の進捗も可視化できている、でも意思決定にまで寄与できていないのであれば片手落ちなのではないでしょうか。 例えば、各営業メンバーそれぞれの平均商談日数や、新規開拓と既存のお客様の深耕営業のどちらが得意なのか、それぞれの平均受注金額はいくらなのか、どういう業界が得意なのか、どういう規模の企業が得意なのか、etc.といったことが可視化されていればどうでしょうか。 ダッシュボードのサンプル(数字は隠してます) ここまで可視化されていて、リアルタイムにモニタリングできていれば、あとは逆算していけば各々がとるべきアクションは自然と決まってきます。 不足分が◯◯円だから、おおよそ◯案件分の受注が必要、残りの期間が◯◯日だから、平均商談日数をみると新規開拓よりも既存顧客へのアプローチのほうが可能性がある、深耕営業が得意なのはだれで、平均アプローチ数は◯回だから、これから一週間Aさん、Bさん、Cさんは既存顧客にたいしてこういう風にアプローチをしてください、それをデイリーで結果をみていきましょう、といった感じです。 ここまで落とし込めれば、あとは実行してその結果をみる、というPDCAをすぐに回せます。 これは必ずしも事業部単位だけの話ではなく、個々の営業メンバーの目標達成という観点でも活用できます。 極端な言い方をすれば、 「いかに考える時間を減らして、行動のための時間を増やすか」 ということです。 自分たちにとってのキーはなにか ただし、商談を受注へともっていくための重要なキー、指標はそれぞれの会社さんで異なると思います。訪問回数が大事、という企業さんもあれば、商談と商談の期間が大事であったり、あるいは、商談の中で製品のデモをやったかどうかが大事、といった企業さんもいるかもしれません。 さらに言えば、そうした指標とは別に、マネジメントレイヤーが各営業メンバーのどういった数値に着目してマネジメントをしているか、といったことも千差万別です。 何が言いたいかと言うと、「どの指標が自分たちにとって大事なのかは自分たちで見つけなければならない」ということです。 これさえやれば営業成績が向上する、といった銀の弾丸はありません。自分たちにとっての銀の弾丸を探るために、レポートで色々可視化をしていく、可視化をするために案件(商談)やリードに色々なデータをとってみる、という模索の期間が必要なのではないかと思います。 チームで立ち向かう 話を少し戻しまして、Salesforce社でのイベントで登壇をした際に、いくつかメッセージを発信したのですが、その中の一つに「とにかく現場に飛び込もう」というものがありました。 意図としては、管理者として現場と一線を引いて構築、運用をするのではなく、実際に使うユーザー(=営業メンバー)と同じ目線で改善ができるようになりましょう、という内容なのですが、いま考えると、実はこれがかなり大事なことなのではないかと思うようになりました。 自分たちにとっての重要なキーを探るためには、管理者が一人でがんばっていても限界があります。実際に現場に飛び込み、営業メンバーが何を指標としているのか、あるいは、指標をさぐるためにはどんなデータが必要なのか、そして、とったデータをどうすればシステムとして可視化できるかを共に探り、作り上げていく必要があります。同様に、マネジメントレイヤーが何をみているのか、どういうデータがあれば意思決定の時間を少しでも短く出来るのか、そしてそれをシステム上でどう表現するのかといったことも、一方通行ではなく双方向に協力をしていく必要があります。 つまり、管理者対ユーザーという関係性ではなく、管理者も現場も関係なく、同じ目的をもった一つのチームとして組織の壁を超えて協力をしていく必要があるのです。そうしたチームをつくるための第一歩が、現場に飛び込む、ということなのだと私は考えています。 3者のコミット 個人的には、Salesforceに限らず、SFAを導入するのであれば、管理者と営業の現場メンバー(マネジメントレイヤー含む)に加えて、経営層のコミットもSFAの十全な活用、運用には必要と考えています。 というのも、せっかく構築をして、運用を開始しても、経営層が使わないのであれば結局現場は使わなくなるからです。 Salesforceにはレポートの他にもダッシュボードがあり、そこでリアルタイムに営業活動で重要な情報はみることができます。それにもかかわらず、例えば経営会議ではSalesforceにはいっているデータをcsvで出力し、それをスプレッドシートで加工をしてパワーポイントに落とし込んでるようであれば意味がありません。もちろん、すべてがすべてダッシュボードを見れば済むということはないでしょうが、少なくともダッシュボードをみればわかるものはダッシュボードをみればいいわけです。率先垂範という言葉がありますが、トップから率先して使うのが、現場に定着させるためには大きな効果があるのは間違いありません。 経営層が「導入することにしたからあとはよろしく」、現場やマネジメントレイヤーが「上から言われたからとりあえず使います」、管理者が「営業のことはわからないので、とりあえずやりますけど、細かいことはなにかあったら言ってください」といった状態では、改善もすすみませんし、運用が定着するわけありません。 幸い、弊社は規模が小さいこともあり、3者のコミットはできていますが、もし自社はそうではないな、と感じるようであれば、機能を充実させるよりもまずは3者がコミットできるようにコミュニケーションを開始するところから手を付けたほうがよいかもしれません。 まとめ 長くなってきたのでそろそろ締めたいと思います。 昨年の記事から1年を通して、自分なりにはよい活動ができたのではないかと思っています。ですが、それは自分がたまたま元々営業の経験があり、現場とコミュニケーションをとりやすかったり、現場がとても協力的であったり、あるいは、こうしたシステムをいじるのが性に合っていたからという幸運に恵まれていたからだと思います。 いま現在管理者をやっている方の中には、そうではない方もいることでしょうし、何から手を付けていけばいいかもよくわからない、という方もいると思います。 そうした中で、そもそもなんで導入をするのか、という原点に立ち返って、一人で抱え込まずにまわりに仲間を増やし、チームで立ち向かっていくことが、時間はかかるものの、唯一のできることなのではないかと思います。 本記事が少しでも参考になっていると幸いです。 それではまた1年後にお会いしましょう!
アバター
intdashにリアルタイムでデータを登録したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 収集データをリアルタイムで可視化・確認したいニーズが増えています。 今回は収集データをリアルタイムに加工・登録する方法をご紹介します。 はじめに 前提 YOLOとは OpenCVとは Gstreamerとは PyGObjectとは インストール クライアントライブラリ サンプルプログラム用ライブラリ Gstreamerインストール Pythonパッケージインストール やってみた 全体構成 実行結果 サンプルプログラム 計測作成 リアルタイムAPI接続 データポイント受信 H.264アップストリーム Convertor Detector タイムアウトで完了 おわりに はじめに 今回の対象はリアルタイムAPIへのデータ送信です。 intdash SDK 前提 今回のサンプルプログラムで以下のライブラリを使います。 1 YOLO OpenCV Gstreamer PyGObject YOLOとは YOLO(You Only Look Once)はディープラーニングモデルです。 画像に写っている物体がどこにあり、何なのかを高速に認識できるのが特徴です。 ai-market.jp 今回使うのは物体検出の精度と速度を向上させたモデルであるYOLOv4の軽量版YOLOv4-tinyです。 OpenCVとは 画像の読み込み・表示・編集など画像処理のためののオープンソースライブラリです。 今回はOpenCVからYOLOv4-tinyモデルを呼び出し、矩形を描画します。 Gstreamerとは 動画・音声の変換などを行えるオープンソースのメディアフレームワークです。 前回 利用したFFmpegよりもリアルタイム処理に向いています。 今回はH.264データのデコード・エンコードに利用します。 PyGObjectとは Pythonから、C言語ライブラリで使用されるオブジェクト指向プログラミングのフレームワークGObjectを利用するためのライブラリです。 今回は、GObjectを基盤として構築されたGstreamerを制御するのに利用します。 インストール クライアントライブラリ クライアントライブラリは 前回 インストールしたパッケージを利用します。 サンプルプログラム用ライブラリ 利用ライブラリをインストールします。 Gstreamerインストール Gstreamerをインストールします。バージョン、カラーバー表示を確認します。 2 brew install gstreamer gst-launch-1. 0 --version gst-launch-1. 0 videotestsrc ! autovideosink Gstreamerインストール Gstreamerカラーバー表示 Pythonパッケージインストール Pythonパッケージをインストールします。 opencv-python: OpenCVのPythonラッパー numpy: 数値計算や配列操作のためのPythonライブラリ、画像編集に利用 PyGObject: GObjectのPythonラッパー pip install opencv-python numpy PyGObject Pythonパッケージのインストール やってみた 映像をリアルタイムで物体検出します。 全体構成 H.264データをダウンストリーム・加工してintdashにアップストリームします。 全体構成図 H.264データが可視化されるにはサーバーまでを2往復することになります。 違和感を感じさせないためには低遅延であることが重要です。 実行結果 サンプルプログラムを起動します。 python lesson4/src/detect_people.py --api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --edge_uuid < YOUR_EDGE_UUID > intdash Motion V2でデータ収集を開始します。 Video Data Type: h264_frame Data Name: 1/h264 せっかくなので検出人数が多いところで撮影しました。 www.youtube.com 検出物体の人を緑、他を赤にして矩形描画したH.264データをアップストリーム 検出人数もアップストリーム 利用SIMではLIVE再生だと元の映像含めて通信が不安定 元データと経過時間が同じにしているため、過去再生では完全に時刻同期 サンプルプログラム 構成が少し複雑になったので図示しました。 クラスアーキテクチャ フレームのデコード・物体検出・エンコードをスムーズに行うために非同期処理にしています。 Gstreamerパイプラインとキューを使って非同期処理間でデータ授受 新規計測として検出結果(矩形描画映像、検出人数)をアップストリーム 計測作成 アップストリーム先の計測をREST APIで作成します。 作成後に計測UUIDをアップストリーム処理に渡します。 measurement = self.writer.create_measurement( "Created by DetectService" ) await self.upstreamer.open(measurement.uuid) リアルタイムAPI接続 アップストリームのためにノードID(エッジUUID)を指定します。 帯域が混み合っていたため、 ping_timeout : Pingタイムアウト(秒) を延ばしました(デフォルト1秒)。 api_url_parsed = urllib.parse.urlparse(api_url) conn = await iscp.Conn.connect( address=f "{api_url_parsed.hostname}:{api_port}" , connector=iscp.WebSocketConnector(enable_tls= True ), token_source= lambda : api_token, project_uuid=project_uuid, node_id=edge_uuid, ping_timeout=ping_timeout, ) データポイント受信 起動後はダウンストリームでデータポイントを待ち受けます。 一定時間データポイントが発生しない場合は TimeoutError をraiseします。 async for msg in self.down.chunks(timeout=timeout): H.264アップストリーム Gstreamerから取得したフレームがIDRフレームかNon IDRフレームかを判定します。 H.264を規定するISO/IEC 14496-10のAnnex Bにおいて定義されているバイトストリームフォーマットのNAL Unit Typeが SPS (nal_type:7) Sequence Parameter Set:シーケンス全体に関するパラメータ情報 PPS (nal_type:8) Picture Parameter Set:個々のフレームまたはグループオブフレーム(GOF)に関するパラメータ情報 IDR (nal_type:5) が順序通りに存在する場合にIDRフレームと判断します。 # IDRフレームの順序判定 if nalu_type == 7 : sps_found = True elif nalu_type == 8 and sps_found: pps_found = True elif nalu_type == 5 and sps_found and pps_found: return True なお、エンコードしたH.264のIDRフレームの頻度はGstreamerパイプラインの """ key-int-max={key_int_max} """ で指定しています。 Convertor Gstreamerパイプラインをラップするクラスです。 push メソッドで1フレームを与えて、 get で取り出します。 Gstreamerがバッファするため、getでは非同期に読み出して返します。 while True : sample = await asyncio.to_thread(self.sink.emit, "pull-sample" ) Detector 物体検出クラスです。 特定の物体検出結果(人物)の矩形色変更、検出人数カウントをします。 if self.class_names[class_ids[i]] == "person" : color = ( 0 , 255 , 0 ) count = count + 1 else : color = ( 0 , 0 , 255 ) タイムアウトで完了 検出結果は1つの計測に格納されますが、元計測が複数あっても構いません。 TimeoutError が発生したら場合は計測を完了します。 except TimeoutError : self.writer.complete_measurement(measurement.uuid) おわりに 今回はリアルタイムAPIのデータ送信をご紹介しました。 REST APIの計測作成も含めた集大成の内容でした。 これでひと通りの機能を網羅して入門シリーズもひと区切りです。 今後はより実用向けの内容も紹介していきたいと考えています。 サンプルプログラムは GitHub で公開しています。 ↩ 初回の起動は遅いことがあるようです。 ↩
アバター
intdashからデータをリアルタイムで取得したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 intdashの魅力のひとつは低遅延でリアルタイム性の高いデータ伝送です。 今回は収集データをストリームで取得する方法をご紹介します。 はじめに リアルタイムAPI向けSDK 前提 RTSP(Real Time Streaming Protocol)とは H.264とは FFmpegとは MediaMTXとは インストール クライアントライブラリインストール 動作確認 FFmpegインストール MediaMTXインストール やってみた 全体構成 実行結果 映像比較 データ伝送の低遅延性 サンプルプログラム サーバー接続 データチャンク受信 遅延ロガー ffmpeg/ffplay起動 おわりに はじめに 今回の対象はリアルタイムAPIからのデータ取得です。 intdash SDK リアルタイムAPI向けSDK リアルタイムAPI向けのSDKを利用します。 RESTと異なる専用のAPIでクライアントライブラリが提供されています。 前回 と変わって導入方法はシンプルです。 今回は収集した映像データをリアルタイムに取得してRTSP配信します。 1 前提 サンプルプログラムで映像関連の利用技術が多いので説明です。 RTSP(Real Time Streaming Protocol)とは 主に監視やライブストリーミングで利用される通信プロトコルです。 数秒程度の遅延で映像を配信することができます。 今回はRTSPを入力とするシステムとの連携を想定して利用します。 映像はH.264で圧縮します。 H.264とは 動画圧縮規格です。 前のフレームから変化した部分だけを抽出してデータ量を低減できる仕組みです。 IDRフレーム/キーフレーム:フレーム画素の全量を持つ、定期的に送信 Non IDRフレーム/デルタフレーム:変化点の画素のみを持つ www.idknet.co.jp intdashでもエッジから伝送する映像の圧縮に利用しています。 今回H.264データをRTSP配信するために FFmpeg を利用します。 FFmpegとは FFmpeg(エフエフエムペグ)は動画や音声を変換・編集するためのオープンソースのマルチメディアツールです。 シンプルに利用でき、RTSPサーバーへの映像配信や可視化をしてくれます。 www.innoqos.com MediaMTXとは オープンソースのマルチメディアサーバーです。 RTSPサーバーとして利用します。 qiita.com インストール クライアントライブラリインストール 各言語向けに提供されているクライアントライブラリ iscp をインストールします。 . ./venv/bin/activate pip install iscp クライアントライブラリのインストール 動作確認 Pythonを起動してパッケージをimportします。 エラーが出なければ、正常に読み込めています。 python >>> import iscp import確認 FFmpegインストール FFmpegをインストールしてバージョンを確認します。 brew install ffmpeg ffmpeg -version FFmpegインストール インストールすると可視化ビューア ffplay も使えるようになります。 ffplay -f lavfi -i testsrc =duration = 10:size = 1280x720:rate = 30 カラーパターン再生 MediaMTXインストール MediaMTX のバイナリ(.tar.gz)をダウンロードして展開します。 設定ファイル mediamtx.yml を編集して、送信するデータに合わせてFPS設定を変えておきます。 rpiCameraFPS: 15 FPS設定変更 起動するとRTSPを8554ポートでリスニングしているのがわかります。 OSのセキュリティ警告が出たら許可します。 ./mediamtx MediaMTX動作確認 やってみた 全体構成 intdash Motion V2 から収集した映像をダウンストリームで取得します。 RTSPストリーム配信 撮影映像をH.264エンコードしてintdash ServerにiSCPで送信 サンプルプログラムがH.264をダウンストリームしてRTSPサーバーへ配信 ffplayでRTSPサーバーからストリームを受信して可視化 RTSP配信自体にかかる遅延との比較のため、ダウンストリーム直後もffplayで可視化しています。 実行結果 遅延を測りやすい被写体を撮影しました。 東京駅ヤエチカ(八重洲地下街) DROPCLOCK 機材をセットします。 通勤客の横で 先ほどの通り、MediaMTXを起動します。 サンプルプログラムを起動します。 python lesson3/src/rtsp_stream.py --api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --edge_uuid < YOUR_EDGE_UUID > ダウンストリーム開始 intdash Motion V2でデータ収集を開始します。 Data Type: h264_frame Data Name: 1/h264 MotionからH.264を送信 ffplayでRTSPストリームを受信します。 ffplay -window_title " After RTSP " rtsp://localhost:8554/stream ビューアーでRTSPを受信 映像比較 撮影した時刻を比較してみます。 撮影時刻比較 ミリ秒まで比較 全体構成図 全体構成図と見比べて説明します。 ①リアル世界です。 ②intdashアップストリーム直前のMotionのプレビューです。 ③intdashダウンストリーム直後のffplayの可視化です。 ④RTSP受信後のffplayの可視化です。 ①〜④は0.954秒ほどずれています。 2 ①〜③は0.370秒なのでRTSP配信の遅延が0.584秒なのがわかります。 データ伝送の低遅延性 ①〜③にはMotionのH.264エンコード、ffplayのH.264デコード時間が含まれます。 データ伝送のみの遅延を見るため、送信前〜受信後の時間をログ出力してみます。 intdashの遅延時間 遅延ログ ブレがありますが、90.4ミリ秒〜370.6ミリ秒で推移しています。 3 サンプルプログラム APIアクセス部分を中心に説明します。 サーバー接続 REST APIと同じくAPIトークンで接続しています。 製品開発以外ではサーバー構成はhttps://〜で提供されるため、固定でHTTPS(443ポート)を指定しています。 api_url_parsed = urllib.parse.urlparse(api_url) conn = await iscp.Conn.connect( address=f "{api_url_parsed.hostname}:{api_port}" , connector=iscp.WebSocketConnector(enable_tls= True ), token_source= lambda : api_token, project_uuid=project_uuid, ) データチャンク受信 H.264のIDRとNon IDRフレームに限定 空チャンク(H.264が含まれないチャンク)を省略 self.down = await self.conn.open_downstream( filters=[ iscp.DownstreamFilter( source_node_id=self.edge_uuid, data_filters=[ iscp.DataFilter(name= "1/h264" , type = "h264_frame/idr_frame" ), iscp.DataFilter(name= "1/h264" , type = "h264_frame/non_idr_frame" ), ], ) ], omit_empty_chunk= True , ) 遅延ログ出力のための経過時間、可視化のためのペイロードを返却 async for msg in self.down.chunks(): for group in msg.data_point_groups: for data_point in group.data_points: yield data_point.elapsed_time, data_point.payload 遅延ロガー 受信時刻 - 送信時刻で遅延時間を算出します。 計測の基準時刻とデータポイントの経過時間から送信時の絶対時刻を算出 ローカルPCで受信時刻との差分を算出 current_time = iscp.DateTime.utcnow() absolute_time_unix_nano = self.basetime.unix_nano() + elapsed_time absolute_time = iscp.DateTime.from_unix_nano(absolute_time_unix_nano) delay = (current_time.unix_nano() - absolute_time.unix_nano()) / 1_000_000 ffmpeg/ffplay起動 subprocessで起動します。 intdashからダウンストリームしたH.264を再エンコードせずに渡しています。 subprocess.Popen( [ "ffmpeg" , "-f" , "h264" , "-i" , "-" , "-fflags" , "nobuffer" , "-preset" , "ultrafast" , "-c:v" , "copy" , # 再エンコードなし "-f" , "rtsp" , RTSP_URL, ], stdin=subprocess.PIPE, stderr=sys.stderr if STDERR_FLG else subprocess.DEVNULL, ), subprocess.Popen( [ "ffplay" , "-f" , "h264" , "-fflags" , "nobuffer" , "-flags" , "low_delay" , "-" , "-window_title" , "Before RTSP" , ], stdin=subprocess.PIPE, stderr=sys.stderr if STDERR_FLG else subprocess.DEVNULL, ), おわりに 今回はリアルタイムAPIのデータ取得をご紹介しました。 映像を扱うのは難易度が高く、今回は手軽なFFmpegを利用しました。 次回はもっとリアルタイム映像データに適した取り扱い方に挑戦してみます。 サンプルプログラムを GitHub で公開しています。 ↩ Motion/PCともに同じLTE SIMで通信しています。フレームは640x480・15FPSです。 ↩ ネットワークの遅延 + intdashの遅延です。正確に測定するなら、潤沢な通信帯域・PCリソース・適切な試行回数が必要ですが、今回は簡単に測定しています。また、正確にはiPhoneとPCの時刻誤差(NTP精度)が含まれます。 ↩
アバター
計測データを自分でintdashに登録したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 エッジから収集したデータを加工して可視化・確認したいニーズが増えています。 今回はREST APIでintdashにデータを登録する方法を説明します。 はじめに データ送信のステータス管理 Protocol Buffersの利用 インストール Buf CLIインストール Protocol Buffersエンコーダーの生成 プロトコル定義ファイルのダウンロード エンコーダーの生成 protobufパッケージインストール 動作確認 やってみた データ移行ツール 実行結果 エクスポート インポート サンプルプログラム エクスポート インポート GPS距離計算 全体構成図 実行結果 サンプルプログラム おわりに はじめに 今回説明するのはREST APIへのデータ送信です。 intdash SDK 典型的な利用シーンは以下の通りです。 保留データの遅延アップロード リアルタイム送信時に送りきれなかったデータをあとで送信 低頻度データ送信 リアルタイムAPIを使わずにストリーム送信 データ送信のステータス管理 前回 のREST APIからのデータ取得は非常にシンプルでした。 データ送信は少し複雑です。 同時並行で送信される計測データの未回収/回収済を管理するためです。 計測シーケンス 計測はデータ送信を受け付ける単位として複数の計測シーケンスを持ちます。 計測シーケンスが回収する予定の総データポイント数を管理します。 データポイントはチャンクという単位でまとめて送信されます。 チャンクには計測シーケンス内で連番が振られます。 チャンクの送信が完了するとステータスが回収済になります。 回収済のデータポイント合計が計測の回収済みデータポイント数になります。 Protocol Buffersの利用 REST APIへのデータ送信には、Googleが開発したシリアライズフォーマットである Protocol Buffers を利用できます。 1 これによって大量データの送信が安定化します。 バイナリ化によるトラフィック軽減 シリアライズ・デシリアライズの高速化 データフォーマット定義の明確化 インストール REST APIでデータ送信するため、 前回のインストール手順 に加えてProtocol Buffersエンコーダーを導入します。 Protocol Buffersエンコーダーはintdash用のプロトコル定義ファイルから生成します。 Buf CLIインストール プロトコル定義ファイルからProtocol Buffersエンコーダーを生成するツール Buf CLIをインストールします。 brew install bufbuild/buf/buf buf --version Buf CLIのインストール Protocol Buffersエンコーダーの生成 Buf CLIでプロトコル定義ファイルからエンコーダーを生成します。 プロトコル定義ファイルのダウンロード intdash API specificationページ から プロトコル定義ファイルページ に遷移し、プロトコル定義ファイル protocol.proto をダウンロードします。 2 プロトコル定義ファイルページへのリンク プロトコル定義ファイルページ エンコーダーの生成 ワークディレクトリを作成してプロトコル定義ファイルを配置します。 プロトコル定義ファイルのパッケージ名をintdashに変更します。 mkdir -p proto/intdash/v1/ cp path/to/protocol.proto proto/intdash/v1/ sed -i -e " s/package pb;/package intdash.v1;/g " proto/intdash/v1/protocol.proto Buf CLIの定義ファイルを作成し、エンコーダーを生成します。 buf.yaml :Buf CLIのプロジェクト基本設定 buf.gen.yaml :Buf CLIのコード生成設定 エンコーダーは gen ディレクトリに出力されます。 cat << EOS > ./proto/buf.yaml version: v1 breaking: use: - FILE lint: use: - DEFAULT EOS cat << EOS > ./buf.gen.yaml version: v1 managed: enabled: true plugins: - plugin: buf.build/protocolbuffers/python:v23.4 out: gen EOS buf generate proto ls -l gen Protocol Buffersエンコーダーの生成 protobufパッケージインストール 生成したエンコードの実行にはPythonの protobuf パッケージが必要です。 仮想環境を有効化してインストールします。 . ./venv/bin/activate pip install protobuf 動作確認 Pythonを起動して関連クラスを import します。 エラーが出なければ、正常に読み込めています。 3 python >>> from gen.intdash.v1.protocol_pb2 import ( StoreDataChunk, StoreDataChunks, StoreDataID, StoreDataPoint, StoreDataPointGroup ) import確認 やってみた 今回は2つのサンプルプログラムを説明します。 4 データ移行ツール GPS距離計算 データ移行ツール 計測データをJSONファイルにエクスポートし、別計測として再びインポートします。 5 インポート時にProtocol Buffersエンコーダーを利用します。 データ移行ツール 実行結果 エクスポートとインポートで処理を分けています。 エクスポート PYTHONPATH が指定されていないときは設定します。 echo $PYTHONPATH export PYTHONPATH =/path/to/your_workspace: 対象計測を指定して実行します。 python lesson2/migrate/src/meas_export.py--api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --meas_uuid < YOUR_MEAS_UUID > エクスポート実行 JSONを見てみるとデータの構造がよくわかります。 計測、基準時刻 計測の主な項目は以下の通りです。 uuid : 計測UUID basetime : もっとも優先度の高い基準時刻 status : 計測の状態 expected_data_points : 総データポイント数 received_data_points : 回収済みデータポイント数 基準時刻は計測中に複数送信できるため、リストになっています。 id : 基準時刻のID basetime : 基準時刻 priority : 優先度 データポイント データポイントの主な項目は以下の通りです。 time : 絶対時刻(UNIXエポックからの経過時間、計測の基準時刻+経過時間) data_type : データ型 data_id : データを識別する名前 data.d : データペイロード(BASE64エンコード) この計測はiPhoneアプリ intdash Motion V2 で収集した位置情報です。 同じ経過時間で複数のデータ名が送信されているのがわかります。 インポート 別環境のエッジの計測としてデータを登録します。 python lesson2/migrate/src/meas_import.py --api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --edge_uuid < YOUR_EDGE_UUID > インポート実行 別環境に登録された計測 サンプルプログラム エクスポート こちらはデータ取得のみでREST APIの使い方は前回の内容とほぼ変わりません。 計測(マーカー含む)、基準時刻、データポイントを取得しています。 計測データポイント取得で time_format=“ns” を指定しています。デフォルトではマイクロ秒のため、正確にデータ移行する際には指定が必要です。 api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(client) stream = api.list_project_data_points( project_uuid=project_uuid, name=meas_uuid, time_format= "ns" ) 取得した日付項目はdatetime型であるため、JSON出力時にナノ秒精度の時刻文字列にしています。 if isinstance (obj, datetime): iso_str = obj.isoformat() if obj.microsecond: nano_str = "{:09d}" .format(obj.microsecond * 1000 ) iso_str = iso_str.replace(f ".{obj.microsecond:06d}" , f ".{nano_str}" ) return iso_str インポート エクスポートされたJSONファイルを読み込んで、新しい計測を作成します。 計測、基準時刻、マーカー情報を各APIのcreate_xxxメソッドで作成しています。 計測作成時に基準時刻が必ず1つできてしまうため、登録後に削除して、改めて作成し直しています。 current_basetimes = api.list_project_measurement_base_times( project_uuid=project_uuid, measurement_uuid=measurement_uuid, ) for bt_current in current_basetimes[ "items" ]: api.delete_project_measurement_base_time_by_id( project_uuid=project_uuid, measurement_uuid=measurement_uuid, id =bt_current.id, ) 計測シーケンスは置換メソッド replace_project_measurement_sequence が提供されています。 総データポイント数、回収済みデータポイント数を指定して計測シーケンスを登録します。 sequence_group = MeasurementSequenceGroupReplace( expected_data_points=measurement_src[ "sequences" ][ "expected_data_points" ], final_sequence_number=measurement_src[ "sequences" ][ "received_data_points" ], ) api = measurement_service_measurement_sequences_api.MeasurementServiceMeasurementSequencesApi( client ) sequence = api.replace_project_measurement_sequence( project_uuid=project_uuid, measurement_uuid=measurement_uuid, sequences_uuid=sequence_uuid if sequence_uuid else str (uuid.uuid4()), measurement_sequence_group_replace=sequence_group, ) チャンクの送信でProtocol Buffersエンコーダーを使います。 データID( type , name )とデータポイント(経過時間 elapsed_time と payload をチャンクに格納 store_data_point_group = StoreDataPointGroup( data_id=StoreDataID( type =data_point[ "data_type" ], name=data_point[ "data_name" ] ), data_points=[store_data_point], ) store_data_chunk = StoreDataChunk( sequence_number=sequence_number, data_point_groups=[store_data_point_group] ) chunks.append(store_data_chunk) sequence_number += 1 Content-type に protocolbuf を指定してデータポイントを格納したチャンクを送信 results = api.create_project_measurement_sequence_chunks( project_uuid=project_uuid, body=io.BytesIO(chunk.SerializeToString()), _content_type= "application/vnd.iscp.v2.protobuf" , ) すべてのデータポイントを送信したら、計測のステータスを完了にします。 api = measurement_service_measurements_api.MeasurementServiceMeasurementsApi(client) api.complete_project_measurement( measurement_uuid=measurement_uuid, project_uuid=project_uuid ) GPS距離計算 GPSデータごとに緯度経度〜基準点からの距離を計算してREST APIで登録します。 全体構成図 GPS距離計算 元の計測は変更せず、同じ基準時刻の計測を新たに作ります。 なお、元の計測に新たなデータポイントを追加することもできます。 6 実行結果 実行します。 python lesson2/distance/src/distance.py --api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --meas_uuid < YOUR_MEAS_UUID > 距離算出実行 intdashに登録した距離を元のGPSデータと一緒に可視化しました。 distance というValue Currentパーツにバインドしています。 基準点は品川駅 サンプルプログラム クラスを分割しました。 MeasurementReader:元計測データを取得 DistanceCalculator:原点からの距離を算出 MeasurementWriter:新規計測データを送信 DetectService:各クラスの統合 REST APIアクセス部はデータ移行ツールとほとんど同じですが、 一定件数ずつフェッチしつつ登録するように制御しています。 api = measurement_service_data_points_api.MeasurementServiceDataPointsApi( self.client ) stream = api.list_project_data_points( project_uuid=self.project_uuid, name=self.meas_uuid, data_id_filter=[ "#:1/gnss_coordinates" ], start=self.start, limit=fetch_size, time_format= "ns" , ) start に前回データポイントの時刻+1からフェッチを開始しています。 おわりに 今回はREST APIでのデータ送信をご紹介しました。 計測シーケンスやProtocol Bufferは少しとっつきづらく、ソースも複雑に見えますが、パターンがわかると応用できると思います。 次回はリアルタイムAPIを説明していきます。 Protocol Buffersを使わずに JSONで送信 もできます。送信データがProtocol BuffersでシリアライズされているかはHTTPヘッダの Content-Type でサーバーに通知します。 ↩ intdash API specificationページの参照にはユーザー登録が必要です。 ↩ 動作確認できたら、Buf CLI、ワークディレクトリ、プロトコル定義ファイルは削除しても構いません。 ↩ サンプルプログラムを GitHub で公開しています。 ↩ データポイントが数百件ほどの小さい計測を想定しています。 ↩ 計測から任意の計測データを変更・削除することはできません。当該データのみを変更・削除したい場合は、別に計測を作成して元の計測を削除します。 ↩
アバター
もっと柔軟にintdashを活用したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 近年、お客様やパートナー様が自らカスタム機能を実装する場面が増えています。 intdashではサーバーAPIにアクセスするためのSDKを提供しています。 構成を刷新したintdash SDKが2024年6月バージョンより正式リリースとなりました。 1 SDKは非常に幅広い使い方が可能です。 入門シリーズとして実践的な内容に落とし込んで解説していきます。 はじめに intdash APIとは intdash SDKとは intdash API/SDKでできること 目的別のインストール手順・必要ソフトウェア インストール 手順 OpenAPI GeneratorでSDK生成 Javaインストール npmインストール OpenAPI Generatorインストール intdash SDK for Python生成 Python実行環境設定 Pythonインストール Python仮想環境作成 依存パッケージインストール 動作確認 import確認 エッジ一覧取得 Python開発環境整備 Pythonパス設定 フォーマッター、Linterの整備 やってみた 準備 利用パッケージインストール 実行結果 サンプルプログラム説明 構成図 計測リスト取得 データポイントリスト取得 おわりに はじめに intdash APIとは 産業向けIoTデータ伝送プラットフォームであるintdashが、製品群やサーバー外にデータを受け渡すためのインタフェースです。 2つのAPIがあります。 種類 概要 通信プロトコル 用途 REST API いわゆるGETやPOSTのリクエスト・レスポンス HTTP/HTTPS ユーザー/エッジ/計測などの参照・更新 リアルタイムAPI 高速データ伝送向けの特殊なAPI aptpod独自の iSCP 計測データのストリーム 2 intdash SDKとは intdash APIを利用するためのクライアントライブラリーです。 各種プログラミング言語向け に対応しています。 intdash SDK 各種プログラミング言語の使いどころはこんな感じです。 言語 特徴・用途 Python 入門向け、プロトタイピング、AI/機械学習 Go 高速・並列サーバー処理、バックエンドシステム構築 Rust 高速・メモリ安全性、リソースが限られるエッジ処理 TypeScript ブラウザ・フロントエンド Swift iPhoneアプリ C# Windowsプラットフォーム、Unityアプリ Kotlin Androidアプリ、Java互換 この入門シリーズではintdash SDK for Pythonを使って説明します。 intdash API/SDKでできること intdash APIへのデータフローは 2つのAPI ✖️データ取得/データ送信 に大別され、これを組み合わせてカスタム処理を実装します。 入門シリーズでもこの順で説明していきます。 # データフロー 適用例 ① REST APIで計測データ取得 データ分析、レポーティング ② REST APIで計測データ送信 データ登録、保留データアップロード ③ リアルタイムAPIで計測データ取得 遠隔監視、リアルタイムデータ分析 ④ リアルタイムAPIで計測データ送信 遠隔制御、リアルタイムデータ登録 目的別のインストール手順・必要ソフトウェア 実現したいことによって対応した手順が必要です。 # SDKインストール手順 必要ソフトウェア ① OpenAPI Generator で生成 Java、npm ② OpenAPI Generator で生成 Protocol Buffersエンコーダーを生成 Java、npm、 Buf CLI ③ ④ パッケージをインストール 各言語のパッケージ管理ツール 具体的な手順は記事ごとに説明します。 3 インストール 今回はREST APIでデータ取得するためのインストール方法を説明します。 手順 少し多いため、説明しながら進めていきます。 OpenAPI Generator でSDK生成 Javaインストール npmインストール OpenAPI Generatorインストール intdash SDK for Python生成 Python実行環境設定 Pythonインストール Python仮想環境の設定 依存パッケージインストール 動作確認 import確認 エッジ一覧取得 Python開発環境整備 Pythonパス設定 フォーマッター、Linterの整備 サンプルプログラム準備 利用パッケージインストール サンプルプログラム実行 OpenAPI GeneratorでSDK生成 REST API用のSDKをOpenAPI Generatorで生成します。 Javaインストール OpenAPI Generator を実行するためにJava実行環境が必要です。 Javaが実行できるか確認します。 java --version 存在しない場合はインストールします。 brew install openjdk OpenJDKのインストールと確認 npmインストール OpenAPI Generator をインストールするために、Node.jsのパッケージ管理ツール npmが必要です。 npmはNode.jsに含まれているため、Node.jsをインストールします。 brew install node npm -v Node.jsのインストール OpenAPI Generatorインストール OpenAPI Generator は、API仕様に基づいてクライアントライブラリー、ドキュメントなどを自動生成するツールです。 インストールしてバージョンを確認します。 npm install @openapitools/openapi-generator-cli npx @openapitools/openapi-generator-cli version OpenAPI Generatorのインストール intdash SDK for Python生成 intdashのAPI仕様からSDKを生成します。 OpenAPI Specificationファイルのバージョンを指定します。 package-name で指定するパッケージ名でディレクトリが生成されます。 また、開発時に参照するドキュメントを生成しています。 VERSION =v2. 7 . 0 SRC_DIR = " . " ./node_modules/.bin/openapi-generator-cli version-manager set 6 . 1 . 0 ./node_modules/.bin/openapi-generator-cli generate \ -g python -i https://docs.intdash.jp/api/intdash-api/ ${VERSION} /openapi_public.yaml \ --package-name = intdash \ --additional-properties = generateSourceCodeOnly =true \ --global-property = modelTests =false, apiTests =false, modelDocs =true, apiDocs =true \ --http-user-agent = SDK-Sample-Python-Client/Gen-By-OASGenerator \ -o " $SRC_DIR " ls -l intdash intdash SDKの生成 Python実行環境設定 Pythonプログラムを実行する環境を構築します。 Pythonインストール Pythonが実行できるか確認します。 python --version 存在しない場合はインストールします。 brew install python python --version Pythonのインストール Python仮想環境作成 Python仮想環境を作成します。 ローカルPCのPythonバージョンアップが開発中のプログラムに影響するのを防ぐため、仮想環境は有効です。 Pythonと任意のバージョンを指定して仮想環境を作成します。 作成後に有効化して、Pythonとパッケージ管理ツール pipのバージョンを確認します。 python3.xx -m venv venv . ./venv/bin/activate python --version pip --version Python仮想環境の作成 依存パッケージインストール intdash SDKが利用するPythonパッケージをインストールします。 pydantic: データバリデーションおよびデータモデルライブラリ python-dateutil: 日付と時刻の操作ライブラリ urllib3: HTTPクライアントライブラリ pip install pydantic python-dateutil urllib3 必要パッケージのインストール これでSDKを利用できる準備が整いました。 動作確認 簡単なリクエストを試してみます。 import確認 Pythonを起動して package-name で指定したパッケージを import します。 エラーが出なければ、正常に読み込めています。 python >>> import intdash import確認 エッジ一覧取得 最も簡単な例としてエッジ一覧を取得してみます。 intdash APIの認証方法には3種類あります。 ユーザーで認証: ユーザーに紐づくAPIトークンで認証 ユーザー情報に基づいてアクセストークンを払い出して認証 エッジで認証: エッジ情報に基づいてアクセストークンを払い出して認証 今回はAPIトークンで認証します。 intdashのUIであるWeb ConsoleのMy Pageで払い出します。 APIトークン払い出し エッジサービスのAPIオブジェクトを生成してエッジ一覧の取得メソッドを呼びます。 from intdash import ApiClient, Configuration from intdash.api import authentication_service_edges_api # APIトークンで認証 client = ApiClient( Configuration( host= "https://example.intdash.jp/api" , api_key={ "IntdashToken" : "YOUR_API_TOKEN" }, ) ) # エッジサービスのAPIオブジェクトを生成 api = authentication_service_edges_api.AuthenticationServiceEdgesApi(client) # エッジ一覧の取得 edges = api.list_edges() print (edges) エッジ一覧取得 intdash SDKでREST APIにアクセスできました。 基本的にはこのように 認証 各サービスAPIのメソッドでREST APIにアクセス を実装していきます。 Python開発環境整備 開発効率を高めるために開発環境の整備は重要です。 開発者によって好みが分かれるところですが、このサンプル作成に使ったVS Code設定をご紹介します。 なお、作成した仮想環境をVS CodeのSelect Interpreterで指定しておくとターミナル起動時に楽です。 Pythonパス設定 生成したSDKを含めてPython実行時にサンプルプログラムを参照できるように環境変数 PYTHONPATH にワークスペースを設定します。 " terminal.integrated.env.osx ": { " PYTHONPATH ": " ${workspaceFolder} " } , フォーマッター、Linterの整備 VS Codeの拡張機能で以下をインストールします。 Ruff :Python向けの高速なフォーマッター兼Linter My Type Checker :Pythonの静的型チェックツール Ruffの設定です。 保存時に自動でフォーマット、Lintエラー修正、importの整列・削除をさせています。 " [python] ": { " editor.formatOnSave ": true , " editor.defaultFormatter ": " charliermarsh.ruff ", " editor.codeActionsOnSave ": { " source.fixAll.ruff ": " explicit ", " source.organizeImports.ruff ": " explicit " , }, } , My Type Checkerの設定です。 型チェック中に見つからないインポートがあってもエラーを無視します。外部ライブラリで型情報が不足している場合に有効です。 " mypy-type-checker.args ": [ " --disallow-untyped-defs ", " --ignore-missing-imports " , ] , " mypy-type-checker.showNotifications ": " onError ", やってみた 特定のエッジの計測のうち、GPSデータを全部取り出して地図上にプロットします。 サンプルプログラムをGitHubで公開しています。 github.com 対象とするエッジはこちらです。 歴戦の社用車 社用車には当社製エッジコンピューター EDGEPLANT T1 が搭載されています。 開発用(上段)と営業デモ用(下段) 準備 追加で実行前の準備があります。 利用パッケージインストール サンプルプログラムに必要なパッケージをインストールします。 pip install folium matplotlib folium: OpenStreetMap表示 matplotlib: 地図プロット時のカラー情報制御 実行結果 PYTHONPATH が指定されていないときは設定します。 echo $PYTHONPATH export PYTHONPATH =/path/to/your_workspace: 実行します。 python lesson1/src/gnss_plot.py --api_url https://example.intdash.jp --api_token < YOUR_API_TOKEN > --project_uuid < YOUR_PROJECT_UUID > --edge_uuids < YOUR_EDGE_UUID 1> < YOUR_EDGE_UUID 2> < YOUR_EDGE_UUID 3> 社用車のエッジUUIDは何回か変わっているため、複数指定できるようにしました。 実行 HTMLが出力されます。 1都10県 GPSデータの緯度経度をプロットしています。 2020年1月からの計測1,400件を取得 取り出したGPSデータの緯度経度を10分ごとにサンプリング 会社からの距離が近いほど青く遠いほど赤く描画 訪問先でエッジを起動した場合はルート途中は描画されない サンプルプログラム説明 REST APIアクセス部分を解説します。 構成図 時系列データベースに蓄積された計測データをREST APIから取り出します。 GSPデータを取得して地図描画 計測リスト取得 プロジェクトUUID、エッジUUIDを指定して計測のリストを取得しています。 api = measurement_service_measurements_api.MeasurementServiceMeasurementsApi(client) measurements = api.list_project_measurements( project_uuid=project_uuid, edge_uuid=edge_uuid, limit=LIMIT ) 開発中の大量データ取得防止のため limit で取得件数を絞っています。 データポイントリスト取得 計測ごとにデータポイントを取得します。 取得対象を緯度経度に限定します。 data_id_filter で以下のいずれかを指定していします。 #:0/GNRMC T1から収集されるNMEAフォーマットの緯度経度を含む GNRMC #:1/gnss_coordinates iPhoneアプリ intdash Motion V2 で収集されるフォーマットの緯度経度 api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(client) stream = api.list_project_data_points( project_uuid=project_uuid, name=meas_uuid, data_id_filter=[ "#:0/GNRMC" , # NMEA "#:1/gnss_coordinates" , # intdash Motion ], ) データポイントは複数行に分かれたJSONであるJSONLines形式で取得されます。 1行ごとにJSON化して、データを取り出します。 #:0/GNRMC JSONの data.s から取得 旧プロトコルiSCPv1で送信されたデータの格納形式 社用車のT1がiSCPv1で送信するため NMEAフォーマットのCSV形式をパースして緯度経度を取得 #:1/gnss_coordinates JSONの data.d から取得 新プロトコルiSCPv2の格納形式 BASE64文字列をデコードしてバイナリ値を取得 2次元ベクトルであるバイナリ値から64ビット浮動小数点数値を2つ取得 line_json = json.loads(line.decode()) if "data" in line_json: # iSCPv1 NMEA if "s" in line_json[ "data" ]: x, y = parse_gnrmc(line_json[ "data" ][ "s" ]) # iSCPv2 elif "d" in line_json[ "data" ]: base64_encoded = line_json[ "data" ][ "d" ] bin_data = base64.b64decode(base64_encoded) x, y = struct.unpack( ">dd" , bin_data) おわりに 今回はシリーズ初回としてSDKの概要とREST APIでのデータ取得をご紹介しました。 次回はより高度な使い方を試していきます。 今回少し触れた取得データの形式についてもより詳細に説明します。 2022年11月バージョンからPublic Beta版を利用できました。 ↩ ストリームとは ↩ 記事ではMacでの手順を説明します。 GitHub にWindowsの手順も掲載します。 ↩
アバター
intdashとタイムリーにシステム連携したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 2023年12月バージョンから待望のWebhook機能が利用できるようになったのでご紹介します。 はじめに Webhookとは やってみた Webhookリクエストの確認 計測作成イベント:Slack 計測完了イベント:地図マッチング 地図マッチングとは 全体構成 GPS補正Lambda レスポンス返却Lambda API Gateway 起動 おわりに Appendix. サンプルソース 計測作成イベント:Slack Slack通知Lambda Pythonプログラム 計測完了イベント:地図マッチング GPS補正Lambda Pythonプログラム GPS補正Lambda bootstrapシェル GPS補正Lambda コンテナDockerfile レスポンス返却Lambda Pythonプログラム はじめに Webhookとは システム内イベントをシステム外にHTTPで通知する仕組みです。 これでintdash内の変化をintdash外からリアルタイムに知ることができます。 イベントドリブンなシステム連携により、効率的なワークフローを実現できます。 intdash Webhook 以下のintdashリソースのイベントを通知することができます。 計測:作成・変更・削除・完了・終了 1 接続:接続・切断・アップストリーム開始・ダウンストリーム開始 エッジ:作成・変更・削除 ユーザー:作成・変更・削除 テナント:作成・変更・削除 プロジェクト の所属エッジ:追加・変更・除外 プロジェクト の所属メンバー:追加・変更・除外 やってみた Webhookリクエストの確認 早速やってみます。 まずは、WebhookのHTTPリクエストの内容を見てみましょう。 試しにWebhook.siteに通知 アクセスするだけでWebhook用のURLを払い出せるWebhook.siteを利用します。 webhook.site Webhook.siteで払い出されたURLと、通知するイベントをintdash APIに登録します。 計測リソースのイベント measurements_event を通知するよう登録します。 通知先URLと通知イベント種類を登録 スマートフォンアプリ intdash Motion で計測すると、Webhook.siteに通知されます。 内容を見ると measurement リソースの created アクションであること、計測のUUIDと発生時刻がわかります。 右上がヘッダー、下がボディ リクエストボディはこのようなかたちです。 通知先システムはここから必要な情報取り出して処理を行います。 共通 delivery_uuid : Webhook通知のID hook_uuid : Webhook設定のID resource_type ": リソースの種類 action : リソースに対するイベントの種類 occurred_at : 発生時刻 リソース別:計測の場合 project_uuid : プロジェクトのID measurement_uuid : 計測のID ヘッダーには以下の項目があります。 X-Intdash-Signature-256 : HMAC リクエスト改竄防止の仕組み Webhook設定にシークレット(Key)を登録すると、シークレットとリクエストボディで生成したメッセージ認証コード(MAC)を付与 通知先システムで同じシークレットを保持し、受け取ったリクエストボディとあわせてMAC値を生成して比較検証 メッセージ認証符号 - Wikipedia 計測作成イベント:Slack では、簡単なシステム連携を実装してみます。 AWS LambdaでSlackに計測開始を通知します。 Slack通知の構成 Slackチャンネルの通知用URLを払い出します。 2 Slackの カスタムインテグレーション Slackに通知するLambda関数を作成します。 3 Slack通知Lambda 処理はこんな感じです。 HMAC検証 計測作成以外のイベントは無視 Slack通知 intdash Webhookにレスポンスを返却 API Gatewayを作成して、Slack通知Lambdaを統合します。 API Gatewayに統合 作成したAPI GatewayのURLを確認 先ほど登録したintdash Webhookの設定をAPI GatewayのURLに変更します。 通知先URLを変更 計測を開始するとSlackに通知されます。 計測開始をSlack通知 通知の Data Visualizer のリンクをクリックすると自動でLIVE再生が始まるようにしてみました。 Data Visualizer のURLに、準備しておいたスクリーン Motion で自動LIVE再生するよう screenName=Motion&playMode=live というクエリストリングを付与しています。 都会の空は曇天模様 計測完了イベント:地図マッチング 少し時間のかかる分析処理を回してみます。 GPSデータを地図マッチングで補正します。 地図マッチングとは GPSデータの緯度経度を道路に沿うように補正します。 今回使うFast Map Matching (FMM) はその新し目のアルゴリズムです。 zenn.dev 全体構成 利用するライブラリがたくさんあるため、コンテナイメージでGPS補正Lambdaを構築します。 GPS補正Lambdaはコンテナをロードしたり、道路ネットワークのノードを事前計算します。 intdash Webhookは通知後に10秒以内にレスポンスを返す必要があるため 4 、先にレスポンスを返却する別のLambdaを設けます。 地図マッチングの構成 さらに時間かかかる場合はAWS Step FunctionsやAWS Batchを検討することになるでしょう。 GPS補正Lambda コンテナで実行するPythonプログラムを作成します。 処理はこのようになります。 5 intdash SDKでエッジ名、計測時刻、GPSデータを取得 GPSデータ範囲のOpenStreetMapから道路データを取得 道路データをもとに道路ネットワークモデルを構築 道路ネットワークモデルにGPSデータをマッチング マッチング前後のGPSデータを地図HTMLにプロット 地図HTMLをS3にアップロード マッチング前後の総走行距離を計算 総走行距離と地図HTMLのURLをSlack通知 コンテナ起動時に上記プログラムを実行するカスタムランタイムを作成します。 6 コンテナイメージをビルドし、Amazon ECRにプッシュします。 Dockerfileはこのようになります。 7 必要なライブラリをインストール FMMソースをビルド intdash SDKをOpenAPIで生成 Pythonプログラム、カスタムランタイムをローカルからコピー GPS補正Lambdaを作成します。 コンテナイメージから作成 タイムアウトは10分、FMMが使うのでメモリを1024MBにしました。 レスポンス返却Lambda もう1つのLambda関数を作成します。 8 処理はこれだけです。 HMAC検証 計測完了以外は無視 GPS補正Lambdaを非同期起動 intdash Webhookのレスポンスを返却 レスポンス返却Lambda API Gateway さきほどのAPI Gatewayの統合先をレスポンス返却Lambdaに変更します。 起動Lambdaを変更 起動 intdash Motionで、計測時にあげきれていなかったデータを遅延アップロードします。 GPSがずれやすいトンネルや急カーブのあるルートを選択します。 Motionから遅延アップロード サーバー上で計測データがすべて回収済みになり、計測が完了します。 計測が完了 Webhook設定により、計測完了イベントが通知されます。 2分ほど待つとSlackにGPS補正結果が通知されます。 補正結果の通知 OpenStreetMap のリンクをクリックすると、S3に配置した地図HTMLが開くようにしてみました。 来んかな福江島 ブルーの線が補正前のGPSデータ、ピンクの線が補正後のGPSデータです。 ズームすると、信号が途絶するトンネルや位置がぶれやすい海岸沿いの急カーブのデータが補正されて、道路に沿っているのがわかります。 ざぁま海と山 おわりに Webhookによりリアルタイムなシステム連携を実現でき、運用ワークフローが大幅に改善します。 他にも、稼働スケジュールと突き合わせてエッジの予定外起動を検知したり、 処理を逐次起動できるので定期バッチに比べて負荷の平準化も期待できます。 さらに以前ご紹介した永続化拡張と組み合わせて、intdash APIにアクセスする処理を実装せずにデータ分析を組むことも可能です。 tech.aptpod.co.jp intdashを使ったシステム構築はぜひaptpodにご相談ください。 Appendix. サンプルソース 計測作成イベント:Slack Slack通知Lambda Pythonプログラム import logging import json import hashlib import base64 import hmac import requests logger = logging.getLogger() logger.setLevel(logging.INFO) # 定数定義 SECRET_KEY = 'YOUR_SECRET_KEY' SLACK_WEBHOOK_URL = 'YOUR_SLACK_WEBHOOK_URL' def verify_hmac (secret, payload, received): """ HMAC検証 Args: secret (str): シークレットキー payload (str): ペイロード received (str): 受信したHMAC Returns: bool: 検証結果(True: 成功, False: 失敗) """ hmac_obj = hmac.new(secret.encode(), payload.encode(), hashlib.sha256) computed = base64.b64encode(hmac_obj.digest()).decode() return hmac.compare_digest(computed, received) def send_notification (project_uuid, meas_uuid): """ Slack通知 @param project_uuid: プロジェクトUUID @param meas_uuid: 計測UUID @return: なし """ message = { 'attachments' : [ { 'color' : '#de82a7' , 'author_name' : 'intdash Webhook' , "author_icon" : "YOUR_ICON_URL" , 'title' : "LIVE再生中です" , "fields" : [ { "title" : "計測" , "value" : f "<https://example.intdash.jp/console/measurements/{meas_uuid}/?projectUuid={project_uuid}|Meas Hub>" }, { "title" : "リアルタイム再生" , "value" : f "<https://example.intdash.jp/vm2m/?projectUuid={project_uuid}&screenName=Motion&playMode=live|Data Visualizer>" } ], "footer" : "計測したら即分析!WebhookでGPS補正を起動してみた" , "footer_icon" : "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTkbv74KdmChO7FenKcskkqIZTIYEMjJSisLjZVk5_O7pe-QKEBb1Kntdx-grb7dvdsNDs&usqp=CAU" , 'mrkdwn_in' : [ 'text' , 'fields' ] } ] } try : response = requests.post( SLACK_WEBHOOK_URL, data=json.dumps(message), headers={ 'Content-Type' : 'application/json' } ) if response.status_code != 200 : logger.error(f "Failed to send Slack notification. Status Code: {response.status_code}, Response: {response.text}" ) logger.info( "Slack notification sent successfully." ) except Exception as e: logger.error(f "Exception occurred while sending Slack notification: {e}" ) def lambda_handler (event, context): """ エントリポイント Args: event (dict): イベント context (LambdaContext): コンテキスト Returns: dict: APIレスポンス """ headers = event.get( 'headers' , {}) body = event.get( 'body' , '{}' ) logger.info(f "Received event: {event}" ) body_dict = json.loads(body) payload = json.dumps(body_dict, separators=( ',' , ':' )) # HMAC検証 signature = headers.get( 'x-intdash-signature-256' , '' ) if not verify_hmac(SECRET_KEY, payload, signature): logger.warning( "HMAC Verification Failed." ) return { 'statusCode' : 401 , 'body' : json.dumps({ 'message' : 'HMAC verification failed' }) } # リソースタイプとアクションの判定 resource_type = body_dict.get( 'resource_type' ) action = body_dict.get( 'action' ) if resource_type != 'measurement' or action != 'created' : logger.info(f "Ignored: resource_type={resource_type}, action={action}" ) return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'Resource_type or action ignored' }) } # Slack通知 try : project_uuid = body_dict.get( 'project_uuid' ) meas_uuid = body_dict.get( 'measurement_uuid' ) send_notification(project_uuid, meas_uuid) except Exception as e: logger.error(f "Failed to Slack notification: {e}" ) return { 'statusCode' : 500 , 'body' : json.dumps({ 'message' : 'Failed to Slack notification' }) } return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'Webhook received!' }) } 計測完了イベント:地図マッチング GPS補正Lambda Pythonプログラム import sys import os import json import logging import time import datetime import pytz import requests from apiclient import Configuration, ApiClient from apiclient.api import authentication_service_edges_api, measurement_service_measurements_api, measurement_service_data_points_api import struct import base64 import numpy as np import pandas as pd import geopandas as gpd import fmm from math import radians, sin, cos, sqrt, atan2 import osmnx as ox from shapely.geometry import Polygon import folium import boto3 logger = logging.getLogger() if not logger.hasHandlers(): handler = logging.StreamHandler() logger.addHandler(handler) logger.setLevel(logging.INFO) # 環境変数 os.environ[ 'MPLCONFIGDIR' ] = '/tmp' # ローカルストレージ data_path = "/tmp/" # intdash API設定 BASE_URL = "http://example.intdash.jp/api" API_TOKEN = "YOUR_API_TOKEN" # S3バケット設定 BUCKET_NAME = 'YOUR_BUCKET_NAME' AWS_REGION = 'YOUR_AWS_REGION' ACCESS_KEY = 'YOUR_ACCESS_KEY' SECRET_KEY = 'YOUR_SECRET_KEY' # Slack Webhook URL SLACK_WEBHOOK_URL = 'YOUR_SLACK_WEBHOOK_URL' def get_edge (edge_uuid): """ エッジ情報取得 @param edge_id: エッジUUID @return: エッジ情報 """ configuration = Configuration(host=BASE_URL, api_key={ "IntdashToken" : API_TOKEN}) with ApiClient(configuration) as api_client: api_instance = authentication_service_edges_api.AuthenticationServiceEdgesApi(api_client=api_client) edge = api_instance.get_edge(edge_uuid=edge_uuid) return edge def get_meas (project_uuid, meas_uuid): """ 計測情報取得 @param project_uuid: プロジェクトUUID @param meas_uuid: 計測UUID @return: 計測情報 """ configuration = Configuration(host=BASE_URL, api_key={ "IntdashToken" : API_TOKEN}) with ApiClient(configuration) as api_client: api_instance = measurement_service_measurements_api.MeasurementServiceMeasurementsApi(api_client=api_client) meas = api_instance.get_project_measurement(project_uuid=project_uuid, measurement_uuid=meas_uuid) return meas def get_gnss (project_uuid, meas_uuid): """ GNSSデータ取得 計測データからGNSS (1/gnss_coordinates) を抽出 @param project_uuid: プロジェクトUUID @param meas_id: 計測UUID @return: [(x, y)] """ configuration = Configuration(host=BASE_URL, api_key={ "IntdashToken" : API_TOKEN}) with ApiClient(configuration) as api_client: api_instance = measurement_service_data_points_api.MeasurementServiceDataPointsApi(api_client=api_client) stream = api_instance.list_project_data_points(project_uuid=project_uuid, name=meas_uuid) coordinates = [] while True : line = stream.readline() if not line: break resp = json.loads(line.decode()) if 'data' in resp and 'd' in resp[ 'data' ]: base64_encoded = resp[ 'data' ][ 'd' ] bin_data = base64.b64decode(base64_encoded) data_id = resp.get( 'data_id' , 'N/A' ) if data_id == '1/gnss_coordinates' : x, y = struct.unpack( '>dd' , bin_data) coordinates.append((x, y)) return coordinates def find_bounds (coordinates): """ 緯度経度境界探索 緯度経度の最小値-0.01, 最大値+0.01で境界を作成 @param coordinates: GPS座標のリスト ([(lat, lon)]) @return: [(min_lat-0.01, min_lon-0.01), (max_lat+0.01, max_lon+0.01)] のタプル """ lats = [lat for lat, lon in coordinates] lons = [lon for lat, lon in coordinates] min_lat, max_lat = min (lats), max (lats) min_lon, max_lon = min (lons), max (lons) bounds = [(min_lat - 0.01 , min_lon - 0.01 ), (max_lat + 0.01 , max_lon + 0.01 )] return bounds def save_shape (G, filepath= None , encoding= "utf-8" ): """ シェープファイル保存 @param G: グラフオブジェクト @param filepath: 保存先のディレクトリ @param encoding: ファイルエンコーディング """ if filepath is None : filepath = os.path.join(ox.settings.data_folder, "graph_shapefile" ) if not os.path.exists(filepath): os.makedirs(filepath) filepath_nodes = os.path.join(filepath, "nodes.shp" ) filepath_edges = os.path.join(filepath, "edges.shp" ) gdf_nodes, gdf_edges = ox.convert.graph_to_gdfs(G) gdf_nodes = ox.io._stringify_nonnumeric_cols(gdf_nodes) gdf_edges = ox.io._stringify_nonnumeric_cols(gdf_edges) gdf_edges[ "fid" ] = np.arange( 0 , gdf_edges.shape[ 0 ], dtype= 'int' ) gdf_nodes.to_file(filepath_nodes, encoding=encoding) gdf_edges.to_file(filepath_edges, encoding=encoding) def sort_coordinates (matched_edges, cpath_ids): """ 経路並び替え cpath_idsに基づいて経路を並び替える @param matched_edges: マッチングされた経路のGeoDataFrame @param cpath_ids: FMMのマッチ結果であるcpathのIDリスト @return: 並び替えられたGPS座標のリスト ([(lat, lon)]形式) """ ordered_coordinates = [] for fid in cpath_ids: edge = matched_edges[matched_edges[ 'fid' ] == fid] if not edge.empty: linestring = edge.iloc[ 0 ][ 'geometry' ] coords = [(lat, lon) for lon, lat in list (linestring.coords)] ordered_coordinates.extend(coords) return ordered_coordinates def calculate_distance (coord1, coord2): """ GPS座標間距離計算 @param coord1: (lat, lon) 1つ目の座標 @param coord2: (lat, lon) 2つ目の座標 @return: 2点間の距離 (km) """ R = 6371 lat1, lon1 = coord1 lat2, lon2 = coord2 dlat = radians(lat2 - lat1) dlon = radians(lon2 - lon1) a = sin(dlat / 2 )** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2 )** 2 c = 2 * atan2(sqrt(a), sqrt( 1 - a)) return R * c def total_distance (coordinates): """ 総走行距離計算 @param coordinates: GPS座標のリスト ([(lat, lon)]) @return: 総走行距離 (km) """ distance = 0.0 for i in range ( 1 , len (coordinates)): distance += calculate_distance(coordinates[i - 1 ], coordinates[i]) return distance def upload_file (filepath, bucket_name, key): """ S3アップロード @param filepath: アップロードするファイルのパス @param bucket_name: S3バケット名 @param key: S3内でのファイルキー @return: アップロードされたファイルの公開URL """ s3_client = boto3.client( 's3' , region_name=AWS_REGION, aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY) try : s3_client.upload_file(filepath, bucket_name, key, ExtraArgs={ 'ContentType' : 'text/html' }) url = f "https://{bucket_name}.s3.{AWS_REGION}.amazonaws.com/{key}" return url except Exception as e: logger.error(f "Error uploading to S3: {e}" ) return None def send_notification (project_uuid, edge, meas, distance_origin, distance_matched, map_url): """ Slack通知 @param project_uuid: プロジェクトUUID @param edge: エッジ情報 @param meas: 計測情報 @param distance_origin: 補正前の総走行距離 (km) @param distance_matched: 補正後の総走行距離 (km) @param map_url: 地図のURL (OpenStreetMap) @return: なし """ edge_uuid = edge[ 'uuid' ] edge_name = edge[ 'name' ] meas_uuid = meas[ 'uuid' ] jst = pytz.timezone( 'Asia/Tokyo' ) base_time = meas[ 'basetime' ].astimezone(jst) start_time = base_time.strftime( '%Y/%m/%d %H:%M:%S' ) duration = meas[ 'duration' ] / 1_000_000 end_time = (base_time + datetime.timedelta(seconds=duration)).strftime( '%Y/%m/%d %H:%M:%S' ) message = { 'attachments' : [ { 'color' : '#00bfff' , 'author_name' : 'Fast Map Matching' , "author_icon" : "YOUR_ICON" , 'title' : "GPSデータを補正しました" , "fields" : [ { "title" : "エッジ" , "value" : f "<https://example.intdash.jp/console/admin/edges/{edge_uuid}|{edge_name}>" }, { "title" : "計測" , "value" : f "<https://example.intdash.jp/console/measurements/{meas_uuid}/?projectUuid={project_uuid}|{start_time} - {end_time}>" }, { "title" : "総走行距離 補正前" , "value" : f "{distance_origin:.2f} km" }, { "title" : "総走行距離 補正後" , "value" : f "{distance_matched:.2f} km" }, { "title" : "走行ルート" , "value" : f "<{map_url}|OpenStreetMap>" } ], "footer" : "計測したら即分析!WebhookでGPS補正を起動してみた" , "footer_icon" : "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTkbv74KdmChO7FenKcskkqIZTIYEMjJSisLjZVk5_O7pe-QKEBb1Kntdx-grb7dvdsNDs&usqp=CAU" , 'mrkdwn_in' : [ 'text' , 'fields' ] } ] } try : response = requests.post( SLACK_WEBHOOK_URL, data=json.dumps(message), headers={ 'Content-Type' : 'application/json' } ) if response.status_code != 200 : logger.error(f "Failed to send Slack notification. Status Code: {response.status_code}, Response: {response.text}" ) logger.info( "Slack notification sent successfully." ) except Exception as e: logger.error(f "Exception occurred while sending Slack notification: {e}" ) def lambda_handler (event, context): logger.info(f "Received event: {event}" ) try : ## 入力 # GPSデータ読み込み project_uuid = event.get( 'project_uuid' ) meas_uuid = event.get( 'measurement_uuid' ) coordinates = get_gnss(project_uuid, meas_uuid) logger.debug(f "Coordinates: {coordinates}" ) if not coordinates: return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'No coordinates data found' }) } fmm_coordinates = [[lon, lat] for lat, lon in coordinates] # fmmは[[lon, lat]]形式 # OSMデータの取得とシェープファイルへの保存 bounds = find_bounds(coordinates) (min_lat, min_lon), (max_lat, max_lon) = bounds boundary_polygon = Polygon([(min_lon, min_lat), (max_lon, min_lat), (max_lon, max_lat), (min_lon, max_lat)]) G = ox.graph_from_polygon(boundary_polygon, network_type= 'drive' ) save_shape(G, filepath=f '{data_path}' ) # 道路ネットワークデータの読み込み edges = gpd.read_file(f "{data_path}/edges.shp" ) # 経路: 道路ネットワークの2つのノード(交差点など)を結ぶ線 network = fmm.Network(f "{data_path}/edges.shp" , "fid" , "u" , "v" ) graph = fmm.NetworkGraph(network) logger.info(f "Loaded {network.get_edge_count()} edges and {network.get_node_count()} nodes from shapefiles." ) ## 地図マッチング # UBODTの生成・読み込み ubodt_gen = fmm.UBODTGenAlgorithm(network, graph) ubodt_gen.generate_ubodt(f "{data_path}/ubodt.txt" , 0.02 , binary= False , use_omp= True ) ubodt = fmm.UBODT.read_ubodt_csv(f "{data_path}/ubodt.txt" ) # FMMモデルマッチング model = fmm.FastMapMatch(network, graph, ubodt) coordinates_str_list = [f "{lon} {lat}" for lon, lat in fmm_coordinates] wkt = f "LINESTRING({','.join(coordinates_str_list)})" fmm_config = fmm.FastMapMatchConfig( 5 , 20 , 15 ) # 候補数, 探索半径(m), GPS誤差(m) result = model.match_wkt(wkt, fmm_config) logger.info(f "Matched {len(result.cpath)} edges." ) ## 出力 # 地図作成 m = folium.Map(location=[np.mean([bounds[ 0 ][ 0 ], bounds[ 1 ][ 0 ]]), np.mean([bounds[ 0 ][ 1 ], bounds[ 1 ][ 1 ]])], zoom_start= 12 ) # 補正前GPSデータをプロット folium.PolyLine(coordinates, color= 'deepskyblue' , weight= 8 , opacity= 0.5 , popup= 'Original Route' ).add_to(m) folium.Marker(location=coordinates[ 0 ], popup= 'Start' , icon=folium.Icon(color= 'lightred' , icon= 'play' , prefix= 'fa' )).add_to(m) folium.Marker(location=coordinates[- 1 ], popup= 'Goal' , icon=folium.Icon(color= 'lightred' , icon= 'flag' , prefix= 'fa' )).add_to(m) # 補正後のルートをプロット matched_edges = edges[edges.fid.isin(result.cpath)] matched_coordinates = sort_coordinates(matched_edges, result.cpath) folium.PolyLine(matched_coordinates, color= 'deeppink' , weight= 7 , opacity= 0.6 , popup= 'Matched Route' ).add_to(m) # マップ保存・S3アップロード map_file = f "map_{meas_uuid}.html" map_path = f "{data_path}/{map_file}" m.save(map_file) map_url = upload_file(map_file, BUCKET_NAME, map_file) if not map_url: logger.error( "Map upload failed." ) logger.info(f "Map uploaded to S3: {map_url}" ) # 総走行距離を計算 total_distance_origin = total_distance(coordinates) total_distance_matched = total_distance(matched_coordinates) logger.info(f "補正前の総走行距離: {total_distance_origin:.2f} km" ) logger.info(f "補正後の総走行距離: {total_distance_matched:.2f} km" ) # Slack通知 meas = get_meas(project_uuid, meas_uuid) edge = get_edge(meas[ 'edge_uuid' ]) send_notification(project_uuid, edge, meas, total_distance_origin, total_distance_matched, map_url) return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'FMM completed and Slack notification sent' }) } except Exception as e: logger.error(f "Error processing event: {str(e)}" ) return { 'statusCode' : 500 , 'body' : json.dumps({ 'error' : str (e)}) } if __name__ == "__main__" : # ローカルテスト用 if len (sys.argv) > 1 : with open (sys.argv[ 1 ], 'r' ) as f: event = json.load(f) lambda_handler(event, None ) else : print ( "Usage: python3 app.py <event.json>" ) GPS補正Lambda bootstrapシェル #!/bin/bash # ------------------------------------------------------------------- # bootstrap # ------------------------------------------------------------------- # カスタムランタイム # # コンテナ起動時に実行 # LambdaランタイムAPIからイベントリクエスト取得 # リクエストイベントを引数にlambda_handler起動 # 結果をLambdaランタイムAPIに登録 # ------------------------------------------------------------------- set -euo pipefail # エラー発生時にスクリプトを終了し、未定義変数をエラーにする while true; do # LambdaランタイムAPIからイベントリクエスト取得 RESPONSE = $( curl -i -s " http:// ${AWS_LAMBDA_RUNTIME_API} /2018-06-01/runtime/invocation/next " ) HEADERS = $( echo " $RESPONSE " | sed -n ' /^\r$/q;p ' ) REQUEST_ID = $( echo " $HEADERS " | grep -Fi ' Lambda-Runtime-Aws-Request-Id ' | awk ' {print $2} ' | tr -d ' \r ' ) if [[ -z " $REQUEST_ID " ]] ; then echo " Error: REQUEST_ID is empty " exit 1 fi EVENT_DATA = $( echo " $RESPONSE " | sed ' 1,/^\r$/d ' ) echo " REQUEST_ID: $REQUEST_ID EVENT_DATA: $EVENT_DATA " # lambda_handler起動 RESPONSE = $( python3 -c " import json from app import lambda_handler event = json.loads(''' $EVENT_DATA ''') response = lambda_handler(event, None) print(json.dumps(response)) " ) echo " RESPONSE: $RESPONSE " if [[ -z " $RESPONSE " ]] ; then echo " Error: RESPONSE is empty. Request ID: $REQUEST_ID " exit 1 fi # 結果をLambdaランタイムAPIに登録 curl -s -X POST " http:// ${AWS_LAMBDA_RUNTIME_API} /2018-06-01/runtime/invocation/ ${REQUEST_ID} /response " \ -d " $RESPONSE " done GPS補正Lambda コンテナDockerfile # ------------------------------------------------------------------- # ベースイメージ: Ubuntu 22.04 LTS # ------------------------------------------------------------------- FROM ubuntu:22.04 # ------------------------------------------------------------------- # 基本ツールのインストール # - ビルドツール、CMake、Git、Python、npm ... # ------------------------------------------------------------------- RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential \ cmake \ g++ \ git \ vim \ python3 = 3 . 10 .6-1~ 22 . 04 . 1 \ python3-dev = 3 . 10 .6-1~ 22 . 04 . 1 \ python3-pip = 22 . 0 . 2 +dfsg-1ubuntu0. 4 \ wget \ curl \ libgdal-dev = 3 . 4 . 1 +dfsg-1build4 \ libboost-all-dev = 1 . 74 . 0 .3ubuntu7 \ libosmium2-dev = 2 . 18 .0-1 \ swig \ expat \ bzip2 \ default-jre = 2:1.11-72build2 \ npm \ jq \ && rm -rf /var/lib/apt/lists/* \ && apt-get clean # ------------------------------------------------------------------- # Python3 シンボリックリンク設定 # - Python3 と PIP3 をデフォルトに設定 # ------------------------------------------------------------------- RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 \ && update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 # ------------------------------------------------------------------- # FMMに必要なPython パッケージインストール # - Numpy、Pandas、Osmnx、Shapely .. # ------------------------------------------------------------------- RUN pip install --upgrade pip \ && pip install --no-cache-dir \ numpy = = 1 . 26 . 4 \ pandas = = 2 . 2 . 2 \ osmnx = = 1 . 9 . 4 \ folium = = 0 . 17 . 0 \ shapely = = 2 . 0 . 6 \ polars = = 1 . 9 . 0 # ------------------------------------------------------------------- # FMM のクローンとビルド # - GitHub から FMM をクローンし、ビルドしてインストール # ------------------------------------------------------------------- RUN git clone https://github.com/cyang-kth/fmm.git /home/fmm WORKDIR /home/fmm/build RUN cmake .. \ && make -j " $( nproc ) " \ && make install # ------------------------------------------------------------------- # intdash SDK のインストール準備 # - Node.js 設定と npm パッケージのインストール # - openapi-generator-cli のインストール # ------------------------------------------------------------------- WORKDIR /home RUN npm install -g n@ 10 . 0 . 0 \ && n 20 . 18 . 0 \ && hash -r \ && npm install --save @openapitools/openapi-generator-cli # ------------------------------------------------------------------- # intdash SDK インストール # - API クライアントコードを生成、Python パッケージとしてインストール # - コンテナで起動するため、dist-packagesにコピー # ------------------------------------------------------------------- ENV VERSION=v2.7.0 RUN ./node_modules/.bin/openapi-generator-cli version-manager set 6 . 1 . 0 \ && ./node_modules/.bin/openapi-generator-cli generate \ -g python -i https://docs.intdash.jp/api/intdash-api/ ${VERSION} /openapi_public.yaml \ --package-name = apiclient RUN cp -r /home/apiclient /usr/local/lib/python3. 10 /dist-packages/ # ------------------------------------------------------------------- # アプリに必要なPython パッケージをインストール # - 地図、AWS SDK for Python、HTTP # ------------------------------------------------------------------- ENV PACKAGE_PREFIX=/var/task RUN pip install --no-cache-dir \ folium = = 0 . 17 . 0 \ boto3 = = 1 . 35 . 32 \ requests = = 2 . 32 . 3 # ------------------------------------------------------------------- # ローカルファイルのコピー # - アプリケーションファイル # - ローカルテスト用ファイル # ------------------------------------------------------------------- COPY bootstrap ${PACKAGE_PREFIX}/bootstrap RUN chmod +x /var/task/bootstrap COPY app.py ${PACKAGE_PREFIX}/app.py COPY event.json /home/event.json # ------------------------------------------------------------------- # 環境変数の設定 # - GDAL、GEOS、およびその他の必要なライブラリのパスを設定 # ------------------------------------------------------------------- ENV \ GDAL_DATA=${PACKAGE_PREFIX}/share/gdal \ PROJ_LIB=/usr/share/proj/ \ GDAL_CONFIG=${PACKAGE_PREFIX}/bin/gdal-config \ GEOS_CONFIG=${PACKAGE_PREFIX}/bin/geos-config \ PATH=${PACKAGE_PREFIX}/bin:$PATH \ PYTHONPATH=${PACKAGE_PREFIX} # ------------------------------------------------------------------- # ENTRYPOINT、CMD、WORKDIR の設定 # - FMM実行時にcacheにJSONファイル作成のため、/tmp を親ディレクトリに設定 # ------------------------------------------------------------------- WORKDIR /tmp ENTRYPOINT [ " /var/task/bootstrap " ] CMD [ " python3 ", " /var/task/app.py " ] レスポンス返却Lambda Pythonプログラム import json import hmac import hashlib import base64 import boto3 import logging logger = logging.getLogger() logger.setLevel(logging.INFO) # 定数定義 SECRET_KEY = 'YOUR_SECRET_KEY' LAMBDA_CLIENT = boto3.client( 'lambda' ) def verify_hmac (secret, payload, received): """ HMAC検証 Args: secret (str): シークレットキー payload (str): ペイロード received (str): 受信したHMAC Returns: bool: 検証結果(True: 成功, False: 失敗) """ hmac_obj = hmac.new(secret.encode(), payload.encode(), hashlib.sha256) computed = base64.b64encode(hmac_obj.digest()).decode() return hmac.compare_digest(computed, received) def lambda_handler (event, context): """ エントリポイント Args: event (dict): イベント context (LambdaContext): コンテキスト Returns: dict: APIレスポンス """ headers = event.get( 'headers' , {}) body = event.get( 'body' , '{}' ) logger.info(f "Received event: {event}" ) body_dict = json.loads(body) payload = json.dumps(body_dict, separators=( ',' , ':' )) # HMAC検証 signature = headers.get( 'x-intdash-signature-256' , '' ) if not verify_hmac(SECRET_KEY, payload, signature): logger.warning( "HMAC Verification Failed." ) return { 'statusCode' : 401 , 'body' : json.dumps({ 'message' : 'HMAC verification failed' }) } # リソースタイプとアクションの判定 resource_type = body_dict.get( 'resource_type' ) action = body_dict.get( 'action' ) if resource_type != 'measurement' or action != 'completed' : logger.info(f "Ignored: resource_type={resource_type}, action={action}" ) return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'Resource_type or action ignored' }) } # FMM Lambdaを非同期起動 try : response = LAMBDA_CLIENT.invoke( FunctionName= 'intdash-fmm' , InvocationType= 'Event' , Payload=json.dumps(body_dict) ) logger.info(f "FMM Lambda invoked successfully: {response}" ) except Exception as e: logger.error(f "Failed to invoke FMM Lambda: {e}" ) return { 'statusCode' : 500 , 'body' : json.dumps({ 'message' : 'Failed to invoke FMM Lambda' }) } return { 'statusCode' : 200 , 'body' : json.dumps({ 'message' : 'Webhook received and FMM Lambda invoked' }) } 完了はエッジ側からすべての計測データを受け取りきったときに自動で遷移します。終了はユーザー操作によりサーバー側で計測が閉じられたときの状態です。 ↩ SlackのIncoming Webhookとはリクエストのフォーマットが異なるため、intdash Webhookは直接連携できません。 ↩ サンプルプログラムを本記事の最後に掲載しています。 ↩ レスポンスが返らない場合は通知が失敗したとみなされ、intdash次回起動時に再送されます。 ↩ サンプルプログラムを本記事の最後に掲載しています。 ↩ サンプルシェルを本記事の最後に掲載しています。 ↩ サンプルファイルを本記事の最後に掲載しています。 ↩ サンプルプログラムを本記事の最後に掲載しています。 ↩
アバター
手軽にデータストリーミングを開発したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 これまでintdashにデータをストリームするには、クライアントライブラリを使用してコーディングが必要でした。 2023年12月バージョンからREST APIでアップストリームできるようになったのでご紹介します。 はじめに ストリームとは もっと手軽にデータ送信 クライアントライブラリとの違い 使いどころ 向かない使い方 追加API 叩いてみた 計測の作成 アップストリームを作成 アップストリームチャンクを送信 アップストリームを閉じる 最小エッジでストリームしてみた おわりに Appendix. サンプルcurlコマンド Appendix. サンプルMicroPythonプログラム はじめに ストリームとは ストリームは、データをリアルタイム(または、ほぼリアルタイム)で継続的に送受信するプロセスです。 アップストリームは、デバイス側からプラットフォーム側にデータを送信する方向のストリームです。 www.aptpod.co.jp intdashでは、独自プロトコルiSCP(intdash Stream Control Protocol)で高頻度・低遅延のストリームを実現しています。 tech.aptpod.co.jp また、intdashを使ったアプリケーション開発をしやすくするため、intdashでは 各種クライアントライブラリ をご用意しています。 もっと手軽にデータ送信 今回追加されたのは、intdashにHTTPでデータ送信するためのAPIです。 クライアントライブラリとREST API クライアントライブラリとの違い クライアントライブラリとはトレードオフがあります。 特性 ライブラリ REST API 学習コスト ❌ ⭕️ 実行環境制約 ❌ ⭕️ アプリケーションの柔軟性 ⭕️ ❌ 使いどころ 低頻度データ(〜数秒に1回) ライブラリ追加が難しい動作環境 開発時の簡単な検証 向かない使い方 高頻度データ(1秒に数回〜) 処理性能要求が高めのアプリケーションへの組み込み 追加API 具体的に追加されたAPIは以下の4つです。 Create Upstream:アップストリーム作成 Close Upstream:アップストリームを閉じる Send Upstream Chunks:アップストリームチャンクを送信 Send Upstream Metadata:メタデータを送信 叩いてみた 実際に使ってみました。 Macbookからcurlコマンドを叩きながら、サーバーの計測の状態変化をMeas Hubで見ていきます。 1 計測の作成 まず、データを格納するための計測を作成します。 HTTPボディで基準時刻とエッジを指定しています。 計測作成 サーバーに、データが入っていない空の計測が作成されます。 ステータスは 計測準備中 となっています。 空の計測 アップストリームを作成 エッジとサーバー間の送受信プロセスであるアップストリームを作成します。 HTTPボディでエッジ source_node_id と計測 session_id を指定しています。 ストリーム作成 計測のステータスが 計測中 に変わります。 計測開始 アップストリームチャンクを送信 いよいよデータを送信します。 URLでストリームを指定し、HTTPボディでデータの定義と値を指定します。 データ送信 Meas Hubに送信データのデータ型とデータ名が表示されます。 回収済みデータポイント数が増加 データIDリストに送信データ アップストリームを閉じる 最後にストリームを閉じます。 URLでストリームを指定し、HTTPボディで計測完了 close_session を指定します。 ストリームを閉じる 計測のステータスは終了に変わり、計測の終了時刻が表示されます。 計測終了 最小エッジでストリームしてみた OSがない環境でもHTTPリクエストは送信できます。 M5Stackシリーズのマイコン ATOM Lite にMicroPythonで実装してみました。 2 サイズは24mm x 24mm x 10mm。ストリームするintdashエッジとしては過去最小ではないかと思います。 照度センサーの値を1Hzでアップストリームします。 www.youtube.com マイコンからHTTPでストリーム おわりに 手軽にストリームを実現できる方法としてREST APIの新機能をご紹介しました。 シェルでシステム情報を送信したり、intdashを使った開発のパターンが広がります。 データ収集プラットフォームの開発はぜひaptpodにご相談ください。 Appendix. サンプルcurlコマンド # エッジのOAuthクライアントクレデンシャル払い出し curl -X POST https://example.intdash.jp/api/auth/oauth2/token \ -d "grant_type=client_credentials&client_id={YOUR_ID}&client_secret={YOUR_SECRET}" # アクセストークン export ACCESS_TOKEN={YOUR_ACCESS_TOKEN} # 計測作成 curl -X POST https://example.intdash.jp/api/v1/projects/{YOUR_PROJECT_ID}/measurements \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "basetime": "2024-01-01T00:00:00.000000Z", "basetime_type": "ntp", "edge_uuid": "{YOUR_EDGE_ID}", "protected": false }' # ストリーム作成 curl -i -X POST https://example.intdash.jp/api/iscp/projects/{YOUR_PROJECT_ID}/upstreams \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "session_id": "8efe703c-b617-4a6d-bea9-ed9ea11c2c66", "source_node_id": "{YOUR_EDGE_ID}", "persist": true }' # チャンク送信 curl -i -X POST https://example.intdash.jp/api/iscp/projects/{YOUR_PROJECT_ID}/upstreams/{YOUR_STREAM_ID}/chunks \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "data_point_groups": [ { "data_id": { "type": "float64", "name": "1/test" }, "data_points": [ { "elapsed_time": 1, "float64": 0.123 } ] } ] }' # ストリームを閉じる curl -i -X PUT https://example.intdash.jp/api/iscp/projects/${YOUR_PROJECT_ID}/upstreams/{YOUR_STREAM_ID}/close \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "close_session": true }' Appendix. サンプルMicroPythonプログラム import os import sys import io import M5 from M5 import BtnA from hardware import Pin, RGB, I2C from unit import DLightUnit import time import requests2 import network import ntptime import json import gc # 定数定義 PROJECT_ID = 'YOUR_PROJECT_ID' EDGE_ID = 'YOUR_EDGE_ID' EDGE_SECRET = 'EDGE_SECRET' WIFI_SSID = 'YOUR_SSID' WIFI_PASSWORD = 'YOUR_PASS' NTP_SERVER = 'YOUR_NTP_SERVER' TIMEZONE = 'GMT-9' API_BASE_URL = "https://example.intdash.jp/api" # グローバル変数 access_token = None stream_id = None start_time_us = None is_measuring = True wlan = network.WLAN(network.STA_IF) rgb = RGB() dlight_0 = DLightUnit(I2C(0, scl=Pin(32), sda=Pin(26), freq=100000)) def send_request(method, url, headers=None, json=None, data=None): """ HTTPリクエストを実行する共通関数 :param method: HTTPメソッド (GET/POST/PUT/DELETE) :param url: リクエスト先のURL :param headers: リクエストのヘッダー情報 (オプション) :param json: JSON形式で送信するデータ (オプション) :param data: フォームエンコード形式で送信するデータ (オプション) :return: 200番台はHTTPレスポンス、それ以外はNone """ default_headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {access_token}' } headers = headers or default_headers request_func = { 'POST': requests2.post, 'PUT': requests2.put, 'GET': requests2.get, 'DELETE': requests2.delete }.get(method) if request_func is None: raise ValueError(f"Unsupported HTTP method: {method}") http_req = request_func(url, headers=headers, json=json, data=data) if 200 <= http_req.status_code < 300: response = http_req.json() else: print(f"Failed request. Status Code: {http_req.status_code}, Response: {http_req.text}") response = None http_req.close() gc.collect() return response def get_current_time(): """ 現在時刻取得(ISO 8601形式) :return: ISO 8601形式の現在時刻文字列 """ current_time = time.gmtime() return "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.000000Z".format( *current_time[:6] ) def get_elapsed_time(): """ 経過時間取得 :return: 経過時間(ナノ秒) """ return time.ticks_diff(time.ticks_us(), start_time_us) * 1000 def connect_wifi(): """ WiFi接続 """ if wlan.isconnected(): print("WiFi is already connected.") return wlan.active(False) wlan.active(True) wlan.connect(WIFI_SSID, WIFI_PASSWORD) while not wlan.isconnected(): print("Connecting to WiFi...") time.sleep(1) print("WiFi connected. IP Address:", wlan.ifconfig()) def sync_ntp(): """ NTPサーバー同期 """ ntptime.host = NTP_SERVER max_retries = 3 for attempt in range(max_retries): try: ntptime.settime() print("NTP time synchronized successfully.") print("Current Local Time (JST):", time.localtime()) break except OSError as e: print(f"NTP synchronization failed on attempt {attempt + 1}: {e}") if attempt < max_retries - 1: time.sleep(5) def authenticate(): """ 認証 """ global access_token url = f"{API_BASE_URL}/auth/oauth2/token" headers = {'Content-Type': 'application/x-www-form-urlencoded'} data = f"grant_type=client_credentials&client_id={EDGE_ID}&client_secret={EDGE_SECRET}" response = send_request('POST', url, headers=headers, data=data) access_token = response.get("access_token") if access_token: print("Access token obtained.") else: print("Failed to obtain access token.") def setup_measurement(): """ 計測・ストリーム開始 """ global stream_id, start_time_us start_time_us = time.ticks_us() current_time = get_current_time() measurement_url = f"{API_BASE_URL}/v1/projects/{PROJECT_ID}/measurements" body = { "basetime": current_time, "basetime_type": "ntp", "edge_uuid": EDGE_ID, "protected": False } measurement_response = send_request('POST', measurement_url, json=body) measurement_id = measurement_response.get("uuid") stream_url = f"{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams" body = { "session_id": measurement_id, "source_node_id": EDGE_ID, "persist": True } stream_response = send_request('POST', stream_url, json=body) stream_id = stream_response.get("stream_id") def btnA_wasPressed_event(state): """ ボタンイベント 計測中はストリームクローズ 計測停止中はストリーム開始 :param state: ボタンの状態 """ global is_measuring if is_measuring: is_measuring = False rgb.fill_color(0x000000) close_url = f'{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams/{stream_id}/close' send_request('PUT', close_url, json={"close_session": True}) else: setup_measurement() is_measuring = True def setup(): """ 初期設定 """ M5.begin() BtnA.setCallback(type=BtnA.CB_TYPE.WAS_PRESSED, cb=btnA_wasPressed_event) rgb.fill_color(0xFF0077) dlight_0.configure(dlight_0.CONTINUOUSLY, dlight_0.L_RESOLUTION_MODE) connect_wifi() sync_ntp() authenticate() setup_measurement() def loop(): """ メインループ処理 """ M5.update() if BtnA.isPressed() or not is_measuring: time.sleep(1) return rgb.fill_color(0x0000FF) lux = dlight_0.get_lux() print("Current Lux:", lux) body = { "data_point_groups": [ { "data_id": {"type": "float64", "name": "1/lux"}, "data_points": [{"elapsed_time": get_elapsed_time(), "float64": lux}] } ] } chunks_url = f"{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams/{stream_id}/chunks" send_request('POST', chunks_url, json=body) rgb.fill_color(0x000077) time.sleep(1) if __name__ == '__main__': try: setup() while True: loop() except (Exception, KeyboardInterrupt) as e: print("An error occurred:", e) サンプルコマンドを本記事の最後に掲載しています。 ↩ サンプルプログラムを本記事の最後に掲載しています。 ↩
アバター
収集データを手軽に分析に利用したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 intdashは収集データをリアルタイムに可視化できるのが強みですが、 定時/逐次処理によるデータ分析やレポーティングのご要望もいただいています。 そこで今回は、2023年12月から新たに追加された永続化拡張機能をご紹介します。 1 はじめに 永続化拡張とは 今回やること 手順 S3バケットの準備 AWSアクセスキーの準備 永続化拡張設定 データ連携確認 計測 分析・レポート作成 遅延アップロード おわりに Appendix. 分析プログラム はじめに 永続化拡張とは 収集データをintdash内の時系列データベースに永続化すると同時に、分析基盤などの他プラットフォームにも連携します。 2 他プラットフォームへデータを流し込む 連携先には以下を選択できます。 Amazon S3 Amazon Timestream REST API これにより、データ連携をカスタム開発せず、他プラットフォームでの収集データ利用が可能になります。 今回やること スマートフォンアプリ intdash Motion V2 で計測・収集したGPSデータを、intdashからAmazon S3にCSVファイルとしてほぼリアルタイムに書き出します。 計測完了後、CSVファイルをまとめてPCで取得します。 全体構成 手順 S3バケットの準備 連携先となるS3のバケットを作成します。 S3バケット作成 AWSアクセスキーの準備 intdashからのアクセスに使用する、IAMでバケット所有者のAWSアクセスキーIDとシークレットアクセスキーを払い出します。 アクセスキー払い出し 永続化拡張設定 intdashに永続化拡張を設定します。 誤って大量データ書き出しが始まらないよう、設定を作成したあとに有効化します。 3 設定の作成 設定項目は以下の通りです。 設定の名前と説明 永続化タイプ:S3 S3設定 リージョン、バケット名 パーティション単位(年、月、日) AWSアクセスキーID、シークレットアクセスキー ファイルフォーマット:CSV、JSON、RAW(Bytes)、Parquet 対象/除外データリスト データタイプ データ名 iSCP 2.0の ワイルドカード を利用可能 対象/除外ノード(エッジのUUID)リスト 有効/無効 作成した設定では、 対象データをGPSの2項目に制限しています。 登録された設定を確認して有効化します。 設定の有効化 データ連携確認 intdash Motionで計測してみます。 GPSデータの各項目がEdge Finderに表示されます。 intdashサーバーにてGPSデータを受信 計測を開始すると、すぐにS3バケットにCSVファイルが現れ始めます。 data-YYYY-MM-DDThh:mm:ss.sssssssssZ.csv とナノ秒単位のファイル名が付与されます。 S3バケットにCSVファイルが出現 ファイルの中身はこのようになっています。 GPSデータのうち、先ほど設定した2つの対象データのみが出力されています。 緯度経度 1/gnss_coordinates :x, y列 高度 1/gnss_altitude :float列 一部項目は割愛 time:絶対時刻 source_node_id:送信元エッジID session_id:計測ID data_type:データタイプ data_name:データ名称 string, float, int, bool, x, y, z, w:値、データタイプに応じた各列に格納 bytes:データ生値、Base64エンコード 1ファイル中には複数のエッジ・計測・データの行が混在します。 計測 移動しながら1時間ほど計測しました。 分析・レポート作成 Jupyter NotebookでCSVファイルを取得して地図に表示します。 4 緯度経度をつないだ線を、高度のレンジごとで色分けしています。 新宿駅は武蔵野台地、品川駅は埋立地 拡大すると1Hzで収集した緯度経度を綺麗に描けているのがわかります。 五反田〜目黒は激登り 遅延アップロード 永続化拡張は、リアルタイムで送りきれなかったデータを回収する遅延アップロードにも働きます。 電波が悪い状況で試しました。 京都出張者の朝は早い Motionに送りきれなかったデータが残っています。 手動でアップロードします。 xx.x % Uplodedの計測を選択してUpload S3に大きめのCSVファイルが出現 なお、エッジデバイス用のエージェントソフトウェア intdash Edge Agent なら遅延アップロードを自動で行えます。 www.youtube.com さて、再度プロットしてみました。 回線速度が落ちる静岡〜浜松間 遅延アップロードで補完されたデータにより、高度のグラデーションがより精緻に表現されています。 おわりに 永続化拡張機能によって、分析のためのデータ連携の開発が不要になり、システムインテグレーションの期間・コスト低減が期待できます。 他にも、画像分析などのAI活用、他プラットフォームデータとの複合データ分析など、様々な応用が考えられます。 モビリティデータを分析基盤で活用したい方は、ぜひintdashをご検討ください。 Appendix. 分析プログラム import boto3 import pandas as pd import folium from io import StringIO import matplotlib.pyplot as plt import matplotlib.colors as mcolors import time # AWSアクセスキーを設定 session = boto3.Session( aws_access_key_id='YOUR_ACCESS_KEY_ID', aws_secret_access_key='YOUR_SECRET_ACCESS_KEY', region_name='ap-northeast-1' ) s3 = session.client('s3') bucket_name = 'YOUR_BUCKET' prefix = 'YYYY/MM/DD/' # S3オブジェクトキーパス # ページネーションでファイルリストを取得 paginator = s3.get_paginator('list_objects_v2') page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) file_list = [] for page in page_iterator: for content in page.get('Contents', []): file_list.append(content['Key']) # バッチサイズ設定 batch_size = 100 # バッチサイズ(ファイル数) all_coordinates_data = [] # 全ての座標データを格納するリスト all_altitude_data = [] # 全ての高度データを格納するリスト # カラーマップ設定 cmap = plt.get_cmap('jet') # 'jet'カラーマップを使用 norm = mcolors.Normalize(vmin=0, vmax=30) # 地図中心設定 folium_map = folium.Map(location=[35.679889, 139.73875], zoom_start=12) # ファイルをバッチサイズごとに処理 for batch_start in range(0, len(file_list), batch_size): batch_files = file_list[batch_start:batch_start + batch_size] # バッチ処理 for file_key in sorted(batch_files): try: csv_obj = s3.get_object(Bucket=bucket_name, Key=file_key) body = csv_obj['Body'].read().decode('utf-8') df = pd.read_csv(StringIO(body), usecols=['time', 'data_name', 'x', 'y', 'float']) # 緯度経度データ抽出 coords_df = df[df['data_name'] == '1/gnss_coordinates'][['time', 'x', 'y']] # 高度データ抽出 alt_df = df[df['data_name'] == '1/gnss_altitude'][['time', 'float']] all_coordinates_data.extend(coords_df.values.tolist()) all_altitude_data.extend(alt_df.values.tolist()) except Exception as e: print(f"Error reading {file_key}: {e}") # S3 API制限回避 time.sleep(2) # スリープ # DataFrameに変換してソート coordinates_df = pd.DataFrame(all_coordinates_data, columns=['time', 'x', 'y']) altitude_df = pd.DataFrame(all_altitude_data, columns=['time', 'altitude']) # timeでソート coordinates_df.sort_values('time', inplace=True) altitude_df.sort_values('time', inplace=True) # 緯度(x)と経度(y)を地図にプロット for i in range(len(coordinates_df) - 1): # 現在のポイントと次のポイントのデータ row_current = coordinates_df.iloc[i] row_next = coordinates_df.iloc[i + 1] # 高度の行をtimeで検索 altitude_row_current = altitude_df[altitude_df['time'] == row_current['time']] altitude_current = altitude_row_current['altitude'].values[0] if not altitude_row_current.empty else None # 色の設定 if altitude_current is not None: color = mcolors.to_hex(cmap(norm(altitude_current))) else: color = 'gray' # ポリラインを描画 folium.PolyLine( locations=[(row_current['x'], row_current['y']), (row_next['x'], row_next['y'])], weight=5, opacity=0.8, color=color ).add_to(folium_map) # 地図表示 folium_map.save('map.html') folium_map 本機能はβ版であり、次バージョンよりインターフェース仕様が変更になる可能性があります。 ↩ 時系列データベースへの永続化をフックし、およそ1秒〜数秒で送信します。送信性能はサーバースペックやネットワークに依存します。 ↩ 現バージョンでは設定が反映されるまで時間がかかる場合があります。 ↩ Pythonプログラムを本記事の最後に掲載しています。 ↩
アバター
限られたネットワークを効率的に使ってデータ収集したいみなさん、 こんにちは。ソリューションアーキテクトの伊勢です。 昨今、モビリティの遠隔監視ニーズはさらに高まっていますが、取得データは多岐に渡り、データ量は増える一方です。 データサイズや優先度に応じて、取り扱いを分けたいご要望をいただいています。 GPSや死活監視データは常時取りたい。 映像など重たいデータは緊急時/分析時に取得したい。 そこで今回はintdashのオンデマンドデータ収集機能をご紹介します。 はじめに オンデマンドデータ収集とは Terminal System 2とは 今回やること 手順 準備したもの 構成準備 Raspberry Piのセットアップ カメラ映像の送信設定 保留設定 データ回収 おわりに はじめに オンデマンドデータ収集とは 必要なときに、その都度データを収集する方式です。 収集するデータを選別できるため、ストリーミング方式に比べてトラフィックを抑えられます。 蓄積データを要求・収集 これまでのintdashは主にストリーミングでデータを収集していました。 Terminal System 2の2024年6月リリース版より、オンデマンド方式も実現できるようになりました。 Terminal System 2とは intdashに接続可能なゲートウェイアプライアンスで、組込Linuxにより構築されたシステムの最新版です。 これまでも、ストリーム送信しなかったデータをバッグクラウンドでアップロードできましたが、設定によりアップロードを保留できるようになりました。 これにより、OTAアップデート機能のDevice Management Consoleとあわせて、保留/保留解除を遠隔で指示できるようになっています。 tech.aptpod.co.jp tech.aptpod.co.jp また、エッジコンピュータとして新たにRaspberry Pi 4 Model Bをサポートしましたので、今回はそちらを使ってみます。 今回やること データ種類によってストリーミングとオンデマンドにわけて収集します。 ざっくり全体構成 収集方式をTerminal System 2のストリームとして設定します。 常時見たいデータとしてデバイスのステータス情報であるDevice Inventoryをリアルタイムにストリーム収集します。 緊急時/分析時に見たいデータとしてカメラ映像をオンデマンドで収集します。 オンデマンド用ストリームに渡されたデータは、サーバーへの送信が保留され、保留解除の指示を受けると送信されます。 手順 準備したもの 準備物一式 Raspberry Pi 4 Model Bと電源アダプタ microSDカードとmicroSDカードリーダー LANケーブル USBカメラ ディスプレイデバイス(Terminal System用) こちらも利用しています。 セットアップ用PC(Macbook) モニタとmicroHDMI-HDMI変換ケーブル USBキーボード 構成準備 intdashでは各種ドキュメントを公開しています。 今回はTerminal System 2のマニュアルに従っています。 Raspberry Piのセットアップ まずはセットアップ担当者向けの手順を実施します。 microSDカードリーダーをセットアップ用PCに接続 microSDにTerminal System OS 2イメージを書き込む Raspberry PiをLANケーブルに接続して起動 Device Management Consoleで認証要求を承認 Device Management Consoleで計測開始 初期設定で計測中 サーバー側で計測データを確認 Device Inventoryを10秒ごとに受信 カメラ映像の送信設定 ここにデータ送信設定を追加してみます。 カメラ映像をストリーミング収集 遠隔でカメラの送信設定を追加 USBカメラをRaspberry Piに接続 Device Management Consoleで設定追加してデプロイ Device Management Consoleから計測開始 左中央が映像/他はDevice Inventory ここまではマニュアル通りです。 保留設定 カメラ映像だけ送信が保留されるよう設定します。 カメラ映像はエッジに留まる Device Management Consoleでストリーム設定を追加 保留ストリームを追加 以下の3項目を指定しています。 skip_realtime: true リアルタイム送信を試みることなく遅延アップロード用データとして保存 deferred_upload: true 帯域不足や回線切断によりリアルタイム送信できなかったデータは遅延アップロードによりサーバーに送信 suspend_deferred_upload: true 遅延アップロード用データとして保存したのち、遅延アップロードを保留 カメラ映像が保留ストリームに流れるように設定変更 Device Management Consoleから計測開始 Device Inventoryのみが表示される データ回収 保留されたカメラ映像を回収します。 サーバーにすべての計測データがそろう Device Management Consoleのインベントリタブで保留中のデータを確認します。 約13MBが保留中 保留されている計測を指定して遅延アップロードの保留を解除します。 保留解除を指定してデプロイ 映像データが回収され、過去計測の再生では、リアルタイムで表示されなかったカメラ映像を含むすべてのデータを可視化できます。 一番いいところでカメラがズレてしまった おわりに Terminal System 2の新機能により任意タイミングでデータ収集できました。 常時収集するデータの例としては温度湿度/位置情報/ガス漏れ検知/ハートビート/CANなど多岐に渡ると想定されます。 異常を検知したら、カメラ映像/LiDAR点群/ROSトピックなど大容量データを回収して状況を把握します。 あるいは、通常時はモバイルで計測、分析用のデータはあとから有線LANで回収というユースケースも考えられます。 実現には充実した製品ドキュメント・チュートリアルが大いに参考になりました。 閲覧を希望される方は お問い合わせフォーム よりご連絡ください。 豊富で丁寧なドキュメント群
アバター
はじめに こんにちは、開発本部Visual M2Mグループの遠藤です。フロントエンドエンジニアとして働いています。 この度、株式会社アプトポッドは2024年7月24日(水)~7月26日(金)に東京ビッグサイトにて開催された「メンテナンス・レジリエンス TOKYO 2024」にNTTコミュニケーションズ株式会社様と共同で出展いたしました。連日、多くの方々にご来場いただき、感謝申し上げます。 mente.jma.or.jp 今回、私も弊社の展示を見学してきましたので、その内容の一部をご紹介したいと思います。 はじめに 会場の様子 展示内容 デジタルツインコーナー 〜現場の効率化・安全確保〜 IoT×AIコーナー 〜作業の自動化〜 IoT×BIM/CIMコーナー ~工程管理の自動化~ セミナー 感想 おわりに 会場の様子 私が展示会を見学した時間帯は、晴れており日差しも強く非常に暑い気候でした。 そのような暑さにも関わらず、展示会場は多くの来場者で賑わい、技術交流が盛んに行われていました。アプトポッドのブースも多数の方々に訪れていただき、弊社の技術やサービスに対して強い関心を示していただきました。 展示内容 アプトポッドのブースでは、Gamepad操作による「 Unitree Go1 / Go2 」のデモを交えつつ、主に3つの展示コーナーを設けました。また、展示会の合間に各技術やサービスに関するセミナーも開催しました。 デジタルツインコーナー 〜現場の効率化・安全確保〜 「デジタルツインコーナー」では、日立建機株式会社様協力のもと、弊社で開発を行ったリアルタイムデジタルツイン基盤のデモを展示しました。 昨今、建設業界では建設業就労者の人口減少と高齢化が切迫した課題となっており、その解決策の1つとしてデジタルツイン技術が注目されています。 このデモでは、現場に設置された複数のLiDARやカメラからリアルタイムでデータを収集し、そのデータをもとに地形情報と建設機械の状態をデジタル環境上に再現しました。また、作業者が持つスマートデバイスのGPS情報も反映することで、作業者の安全確保や作業効率の向上を期待することができます。 ブースでは映像だけではなく、Unityで開発したiPadアプリのデモも展示しており、デジタルツインアプリケーションを実際に触って体感することができました。 弊社が開発したデジタルツイン基盤の詳細については「 アプトポッド、intdashをベースに日立建機の建設施工リアルタイムデジタルツイン基盤を開発 」をご参照ください。 また、NTTコミュニケーションズ株式会社様の協力により、作業員のバイタルデータを取得し、熱中症の判断などの安全対策を行う「防爆対応 Worker Care 」のデモの展示も行いました。 これまで暑熱対策ソリューションとして、リストバンド型センサーは普及していたものの、防爆エリアでの使用が難しいという課題がありました。Worker Careは、防爆対応のリストバンド型センサーであり、防爆エリアでのバイタルデータの取得を可能にします。 Worker Careと弊社のサービスである intdash を連携することで可視化ダッシュボード( Visual M2M Data Visualizer )上で、各種バイタルデータやアラートをリアルタイムで表示することができます。 IoT×AIコーナー 〜作業の自動化〜 「IoT×AIコーナー」では、Unitree Go2を使用して、AIを活用した作業の自動化のデモを展示しました。 このソリューションではGo2の頭部に搭載されたカメラで映像を撮影し、リアルタイムで弊社サービスであるintdashに送信します。intdashとAI画像解析サービスを連携することで、画像解析作業を自動化することができます。 実際の会場のデモでは、可視化ダッシュボード上で物体検知の様子を見ることができ、Go2が撮影した無加工の映像と、AIが物体検知を行った映像の比較を行うことができました。これにより、たとえばロボットが自動運転で現場を巡回したり、メータをカメラで読み取って結果を送信するなどの作業が可能となります。 さらに、Unitree Go2に搭載したLiDARセンサー( MID-360 )で取得したROS Topicをintdashサーバーに送信し、弊社のダッシュボードで点群データを可視化する様子も展示しました。 これらの技術を組み合わせることで、物体の正確な位置や形状をリアルタイムで把握でき、作業の効率化や自動化に貢献することが期待されます。 IoT×BIM/CIMコーナー ~工程管理の自動化~ 「IoT×BIM/CIMコーナー」では、Unitree Go1を使用した、工程管理の自動化のソリューションを紹介しました。 今回の展示では360度カメラ( THETA X )を搭載したGo1で撮影を行い、リアルタイムで映像を可視化ダッシュボードに表示しました。 Go1によって撮影された映像はintdashサーバーに送信され、intdashと Liberaware社 の提供するクラウドサービス「LAPIS」を連携することで、撮影データの3D化を行うことができます。 「LAPIS」は独自の画像処理技術を用いて、撮影データから3D化を可能にするクラウドサービスです。これにより建設現場での工程管理の自動化が可能となり、BIM/CIMでの活用が期待されます。 また、intdashとの連携により、収集されたデータを一元管理することが可能です。現場の状況をリモートで把握し、迅速な意思決定を支援します。 セミナー 展示会の合間で、今回の展示に関連するセミナーも開催しました。 こちらのセミナーでは、弊社の技術やサービスに限らず、今回協力していただいたNTTコミュニケーションズ株式会社様、Liberaware株式会社様、TechShare株式会社様の技術やサービスについても紹介し、多くの来場者が興味を持って耳を傾けていました。 感想 私の所属するVisual M2Mグループでは、様々なデータに対応し、高度かつハイパフォーマンスな可視化アプリケーションを開発することをミッションとしています。 普段、私は主に可視化ダッシュボードの開発に取り組んでいますが、今回の展示会を通じて、データ収集プロセスやその可視化結果が現場でどのような価値を生むかを確認することができ、自分の仕事の意義を再認識する貴重な機会となりました。 この経験から得た新たな発見や知見を今後の開発にも反映し、より優れたプロダクトを提供できるよう努めていきたいと思います。 おわりに 「メンテナンス・レジリエンス TOKYO 2024」は、弊社としても多くの学びがあり良い機会となりました。これからも世の中の課題を解決できるように精進してまいります。ご来場いただいた皆様、改めてありがとうございました。 人手で行っていた業務をロボットで行いたい ロボットの遠隔操作を行いたい 大量データを扱えるハイパフォーマンスな可視化アプリケーションを開発したい などなど、アプトポッドではIoTに関する豊富な実績と高い技術力で、お客様の課題解決にご協力いたします! お気軽にお問合せください。 お問合せフォームはこちら。 www.aptpod.co.jp
アバター
Visual M2Mグループの白金です。 普段は Visual M2M Data Visualizerの製品開発を担当しています。 先日、 intdashの新しいバージョンがリリース され、 建設、物流、自動車開発のDX化の促進を推進するため、以下のROSデータと連携する新機能がVisual M2M Data Visualizerに追加されました: デジタルツイン対応の強化による現場環境の見える化と最適化 遠隔操縦機能とリアルタイム伝送機能の強化 本記事では、新機能の詳細と物理シミュレーターを使用した可視化および遠隔操作の実例とあわせて紹介したいと思います。 物理シミュレーターをVisual M2M Data Visualizerで可視化 Visual M2M Data Visualizerとは ROSデータと連携を強化する新機能 3D点群表示機能の追加 遠隔操作機能の追加 ROSデータの画像表示機能を追加 物理シミュレーターのデータをリアルタイムで可視化し、遠隔操作する その他の新機能 Command Buttonパーツ Edge Switcher おわりに Visual M2M Data Visualizerとは Visual M2M Data Visualizerは、intdashのデータパイプライン上を流れるリアルタイムストリーミングや、プラットフォーム上に蓄積された過去の計測データなど、様々な時系列データを多彩に可視化するWebベースのダッシュボードアプリケーションです。 計測器が出力する秒間数百〜数千といった高頻度な時系列データについても、ブラウザとインターネットさえあればいつでも再生できます。 www.aptpod.co.jp ROSデータと連携を強化する新機能 ROSデータと連携を強化するため、以下3つの機能をVisual M2M Data Visualizerに追加しました。 3D点群表示機能の追加 遠隔操作機能の追加 ROSデータの画像表示機能を追加 これにより、ROSデータをJSONやJPEGに変換したり、遠隔操作用の別デバイスを準備する必要がなくなります。 新機能のリリースとMessagePackの併用 *1 で、対応コスト削減と伝送効率の向上が期待できます。 では、もう少し各機能について詳しく見ていきましょう。 3D点群表示機能の追加 LiDARから取得した3D点群データを表示するPoint Cloudパーツを追加しました。 これにより、映像、センサーデータやロボットの姿勢データに加え、3D点群データを同期再生できるようになりました。LiDARを活用したリアルタイムなデジタルツインアプリケーションを、カスタマイズ開発なしで迅速に実現できます。 Point Cloudパーツ 上図の3D点群表示は、原点からの距離で色を変えていますが、その他にも、原点からの高さや各点に設定されている色で表現することも可能です。 youtu.be 3D点群の表示は、以下のROSデータに対応しています。 sensor_msgs/msg/LaserScan sensor_msgs/msg/PointCloud sensor_msgs/msg/PointCloud2 遠隔操作機能の追加 市販されているGamepad *2 を使用して、Visual M2M Data Visualizerから遠隔操作のデータを送信するGamepadパーツを追加しました。 Gamepadは、Visual M2M Data Visualizerを表示しているWindowsなどのOSがUSB、またはBluetoothで認識済みであれば、すぐにご利用いただけます。 *3 Gamepadパーツ 遠隔送信元のエッジは、画面左下のSender Edgeから設定できます。 Sensor Edgeの設定 Gamepadがお手元に準備ができない場合は、キーボードやマウスでの操作も可能です。詳細は以下の動画を参照ください。 youtu.be 遠隔操作データは、以下のROSデータで送信します。 sensor_msgs/msg/Joy ROSデータの画像表示機能を追加 Image Viewerパーツの機能を強化しました。 これにより、これまでのJPEG表示機能に加え、ROSデータの画像もカスタム開発なしで表示可能になりました。 下図は、ROSデータに含まれるJPEG画像を表示したものです。 Image Viewerパーツ Image Viewerパーツは、以下のROSデータの表示に対応しています。 sensor_msgs/msg/CompressedImage 物理シミュレーターのデータをリアルタイムで可視化し、遠隔操作する 新しくリリースされたVisual M2M Data Visualizerの機能を使用して、物理シミュレーターから送信されるデータをリアルタイムで可視化し、遠隔で操作するデモです。 デモの構成は下図の通りです。 デモの構成図 Visual M2M Data Visualizerの画面構成は、リアルタイムの可視化と遠隔操作を行うために、下図の通りに表現しました。 画面右下のセンサーデータグラフにはバックホーの一部センサーのみ表示されていますが、用途に応じてバックホーやトラックの他のセンサーデータも可視化できます。 Visual M2M Data Visualizer の画面構成 以下の動画はデモの様子です。Visual M2M Data Visualizerを使用して遠隔操作データを送信し、バックホーで土砂を掬う操作やトラックの移動ができました。 youtu.be 物理シミュレーターを実際のLiDAR、または重機の環境に変更し、intdash のサービスと組み合わせることで、シミュレーターと同様VM2M Data Visualizerを使用して、可視化、及び遠隔操作をご利用いただくことも可能です。 その他の新機能 その他にも、追加された遠隔操作および可視化機能を以下に紹介します。 Command Buttonパーツ Command Buttonパーツは、パーツ内に表示されているボタンを押すと設定されたエッジに指示データを送信します。 送信するデータは、各Command Buttonごとに任意のデータを設定できます。 Command Buttonパーツ Edge Switcher 画面右下に、Edge Switcherのボタンを追加し、ビジュアルパーツで表示したいエッジをシームレスに切り替えることが可能になりました。 これにより、同種別の複数の計測器を可視化する際に、事前準備としてスクリーンを複製、かつ表示するエッジを都度変更するといった運用コストを削減することが可能になります。 Edge Switcher おわりに 新しくリリースされたVisual M2M Data Visualizerを使用し、デジタルツイン対応の強化、及び遠隔操作の機能について紹介しました。 これにより、デジタルツインにリアルタイムで反映されることで、状況把握の時間が短縮され、迅速な意思決定に役立つことができれば幸いです。 今後のアップデートにもご期待ください。 *1 : intdash上でROSデータの送受信に MessagePack を使用しています。 MessagePackはシンプルな構造で、多くのプログラム言語をサポートし、異なるプラットフォーム間でのデータ交換を容易にします。 これにより、ROSデータの伝送効率が向上し、ネットワーク帯域の節約やリアルタイム性の向上が期待できます。 *2 : Gamepadは Amazon などから購入できます。 *3 : OSとGamepadの組み合わせでは、サポートしていなかったりドライバのインストールが必要になる場合があります。
アバター
intdashグループの野本です。 先日、intdashの新しいバージョンがリリース *1 され、ブラウザからインターネット経由でデバイスの設定変更やOTAアップデートができる「 Device Management Console 」が新しく追加されました。 Device Management Console この記事では、Device Management Consoleについて簡単にご紹介したいと思います。 Device Management Consoleとは Device Management Consoleは、Terminal System 2が搭載されたデバイスを対象とした、OTA (Over the Air)デプロイメントを管理するためのWebコンソールです。 ユーザーは、ブラウザ上で操作することにより、OTAサーバー *2 を経由して、Terminal System 2に対して各種操作やOTAを行うことができます。 従来、Terminal System 2の操作を行うためには、Menderアカウントを作成し、Menderが提供している汎用的なWeb UIを使用する必要がありました。 新しいバージョンでは、今回リリースされたDevice Management Consoleを使用することで、intdashアカウントのみを利用して、最適化されたインターフェースでTerminal System 2を操作をすることができるようになりました。 Device Management Consoleでできること Device Management Consoleでは、以下の操作が可能です。 デバイス管理 デバイスの一覧表示 デバイスの詳細情報表示 デバイスグループの作成 デバイスの認証管理 OTA/デプロイメント デバイス設定の変更 デバイスの各種操作(計測の開始/終了、再起動など) ソフトウェアの更新 周辺機器のファームウェアの更新 デプロイメント履歴の一覧表示 実際に使ってみる 実際にDevice Management Consoleを使ってみましょう。 例として、新しく構築したintdashサーバーとTerminal System 2を利用し、遠隔で計測開始する手順を説明します。 事前準備 あらかじめ、セットアップ(プロビジョニング)が完了したTerminal System 2を起動しておき、Ethernetケーブルを接続してインターネットに接続した状態にしてください。 Terminal System 2は自動的にOTAサーバーに対して認証要求を送信します。 Device Management Consoleにアクセスする Project Consoleを開き、右上の ボタンをクリックし、Device Management Consoleをクリックします。 Project Consoleのメニュー おっと、エラー画面が表示されました。 OTAサービス設定がされていない Device Management Consoleを使用するためには、OTAサービス設定の有効化が必要です。 OTAサービス設定はデフォルトで有効化されていませんので、設定を有効化しましょう。 OTAサービス設定 Admin Consoleを表示し、新しく追加されたOTAサービス設定画面を開きます。 OTAサービス設定画面 使用するMenderサーバーの接続情報を入力し、有効化します。 これでDevice Management Consoleを使用する準備ができました。再度、Device Mnaagement Consoleにアクセスします。 デバイス認証要求の承諾 Device Management Consoleのフロントページは、デバイス一覧画面を表示します。 デバイス一覧画面 Device Management Consoleでデバイスを扱えるようにするには、デバイスからMenderサーバーに送信された認証要求を承認する必要があります。 認証ステータスがPendingとなっているデバイスの[詳細]をクリックして、デバイス詳細画面を開きます。 デバイス認証要求の承認 デバイス詳細画面の[認証]タブの[承諾]ボタンをクリックして、デバイスの認証要求を承諾します。 これで、Device Management Consoleでデバイスを扱えるようになりました。 次は、計測を開始するためにデバイスの設定を変更します。 デバイス設定の変更 デバイス詳細画面の[設定]タブをクリックすると、現在のデバイスの設定を表示します。 デバイスの設定変更 [設定を変更する]をクリックして、テキストボックス内で設定を変更します。 intdashサーバー接続情報を設定する場合は、 agent.connection に値を設定します。 " agent.connection ": { " server_url ": " https://example.intdash.jp ", " project_uuid ": " 00000000-0000-0000-0000-000000000000 ", " edge_uuid ": " f90f2b42........468c36.EXAMPLE ", " client_secret ": " sEh9ZHPoKX8QYU...iBn5.EXAMPLE " } [デプロイメントを作成]をクリックすると、デバイスの設定を変更することができます。 デプロイメントを作成 計測を開始する デバイス一覧画面で、計測を開始するデバイスにチェックマークを入れ、[デプロイメントを作成]をクリックします。 デプロイメントを作成 [リリースを選択]をクリックし、 measurement-start_<Terminal System OSのバージョン> (計測を開始するスクリプト)を選択し、デプロイメントを強制を有効化します。 計測開始をデプロイ [デプロイメントを作成]をクリックすると、デプロイメント詳細画面に遷移します。 デプロイメント結果の確認 デプロイメント詳細画面では、デプロイメントのステータスやデバイスごとの進捗状況を確認することができます。 デプロイ結果 画面下部の[デバイス]セクションで、デバイスへのデプロイ結果が success になっていることを確認します。 今回は計測を開始するリリースをデプロイしたため、計測が正常に開始されたことが確認できました。 Edge Finderを開くと、intdashサーバーにリアルタイムデータが送信されています。 Edge Finderで確認 まとめ 新しくリリースされたDevice Management Consoleを使用し、遠隔で計測開始する手順をご紹介いたしました。 今回ご紹介した例は、計測を開始するスクリプトを遠隔からデプロイし、デバイス上で実行しています。 デプロイするリリースを変更することで、計測の停止、ソフトウェア/ファームウェアの更新なども同様の手順で行うことができます。 また、独自のアーティファクトを作成することで、幅広いユースケースのアップデート機能も実現可能です。 *3 今後のリリースでは、より簡易にデバイス設定変更ができるようにする改善などを予定しております。今後のアップデートにもご期待ください。 *1 : https://www.aptpod.co.jp/news/news/20240423_intdash *2 : OTAは、Northern.tech社の製品である Mender によって行われます。 *3 : https://docs.mender.io/artifact-creation/state-scripts#example-use-cases
アバター