さくらのクラウド高火力プランを使ってImageFlux Live StreamingにAIライブ配信してみた

こんにちは、テリーです。2023年は画像生成AI、音声AI、LLMなど、映像配信に使用できるたくさんの技術とサービスが公開されて、10年に一度級の進歩を感じました。AIの知能はすでに大学生の平均を超えているという説もあります。疲れ知らずのAIを活用し、2024年も去年と同じような速さで、次から次へと新しい技術、進化した技術が公開されていくのだろうとワクワクが止まりません。

さて、前回の記事では、P2Pの機能を使用してクラウド上のGPUサーバでAIフィルタを適用し、その映像を送り返すサンプルを紹介しました。回線とCPUに余裕がある場合はよいのですが、スマートフォンからのモバイル配信のような限られたリソースの場合には問題が発生します。映像を配信元に送り返さずに、クラウドから直接ビデオ会議サービスに映像を送りたいです。

そこで今回は、さくらのクラウド高火力プラン上で動作するPythonプログラムで映像を受け取り、その映像を元にAIフィルタで生成した画像フレームをImageFlux Live Streamingにライブ配信するサンプルをご紹介します。

対象読者

  • Pythonを使ってリアルタイムのAI画像解析を行い、その出力を元にImageFlux Live Streamingで大規模ライブ配信をしたい人
  • さくらのクラウド高火力プランを使ってみたい人

本記事はImageFlux Live StreamingおよびWebRTC SFU Soraについて、実装経験のある方、アカウントをお持ちの方向けに書いています。ImageFlux Live Streamingの詳細および契約についてはこちらをご参照ください。

動作確認環境

クライアントPC

  • macOS 14.2.1
  • Chrome 120.0.6099.234

サーバ

  • Ubuntu 22.04.2 LTS
  • aiortc 1.6.0
  • aiohttp 3.9.1
  • sora-python-sdk 2023.3.1
  • nvidia-driver 545.29.06

前回の記事との違い

前回の記事で紹介したAIフィルタ用のPythonプログラムは、接続元のブラウザから映像を受け取り、加工して送り返し、さらに別のビデオ会議サイトに送信しています。図にすると下記のようになります。配信元のブラウザが接続している回線を2倍消費していることがわかります。

今回ご紹介する方法では、AIフィルタ用のPythonプログラムから直接ImageFlux Live Streamingに接続し配信するため、配信元の回線は1映像分のみです。

この方式では配信元のパソコンやマイコンへの負荷が低く、また回線がそれほど高速でなくても使用できるというメリットがある一方で、ImageFlux Live Streamingとの接続、切断イベントなどをPythonで記述する必要があります。

さくらのクラウド高火力プランを起動

さくらのクラウド高火力プランは「石狩第1ゾーン」でサーバを追加するときのみ、選択肢に現れます。はじめて使用するときには混乱するので特に要注意です。

値段を見ると、最小プランでも1時間483円、1ヶ月23.1万円という料金です。とても高額なので、しっかりと使用時間の計画を立てて準備をしてから起動しましょう。

OSは「Ubuntu 22.04.2 LTS」を選択します。ディスクは使用するAIが必要とするモデルのファイルサイズで決めます。まずは100GBで試して、足りなければ増やすのがよいでしょう。20GB,40GBでは空き容量に余裕がなく、作り直しになることが多いです。

さくらの高火力サーバには高速なバックボーン回線も付いてきます。他社のGPUクラウドサービスでは、UDPポートを使用できない、回線が細い、ストリーミングが利用規約違反、などの制限があり、ライブ映像解析・ライブ配信用途には使えないところが多いですが、さくらの高火力サーバでは安心して配信することができます。1時間483円は非常に高額ですが、高速な回線費用が含まれていると考えれば納得できるユースケースもあるでしょう。サーバ管理画面には下図のように「100Mbpsベストエフォート」と書かれていますが、実測ではそれよりも速い速度が出ることもあります。

サーバを起動し、SSHで接続ができたら、下記のコマンドを順に実行し、NVIDIAの最新のドライバをインストールします。インストールには5分から10分かかります。インストールが終わったら念のため再起動します。

wget https://us.download.nvidia.com/XFree86/Linux-x86_64/545.29.06/NVIDIA-Linux-x86_64-545.29.06.run
sudo ./NVIDIA-Linux-x86_64-545.29.06.run

sudo apt install -y software-properties-common pciutils gcc make dkms
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-12 100 --slave /usr/bin/g++ g++ /usr/bin/g++-12 --slave /usr/bin/gcov gcov /usr/bin/gcov-12

sudo ./NVIDIA-Linux-x86_64-545.29.06.run

nvidia-smi
nvcc -V
sudo reboot

次にsora-python-sdkのインストールをします。

pip install sora_sdk

その他のインストールは、前回の記事を参照してください。

以上でセットアップは終了です。

PythonプログラムからImageFlux Live Streamingにライブ配信

まずはsora_sdkを使ってImageFlux Live Streamingに映像を配信できることを確認します。

下記のプログラムを作成します。(行頭の数字は行番号です)

  1 import cv2      
  2 from PIL import ImageFont, ImageDraw, Image
  3 import numpy as np
  4 import time     
  5 from sora_sdk import Sora 
  6                     
  7                 
  8 class App:
  9     def __init__(self):
 10         self.init_colors()
 11         self.init_sora()
 12         self.run()
 13 
 14     def init_colors(self):
 15         font_name = "MPLUSRounded1c-Bold"
 16         font_size = 72
 17         self.font = ImageFont.truetype(font_name, font_size, encoding="utf-8    ")
 18         hsv_colors = np.zeros((180, 1, 3), dtype=np.uint8)
 19         for i in range(180):
 20             hsv_colors[i] = [i, 255, 255]
 21         rgb_colors = cv2.cvtColor(hsv_colors, cv2.COLOR_HSV2RGB)
 22         self.rgb_array = np.squeeze(rgb_colors)
 23 
 24     def init_sora(self):
 25         signaling_url = SIGNALING_URL
 26         channel_id = CHANNEL_ID
 27         self.sora = Sora()
 28         self.video_source = self.sora.create_video_source()
 29         self.connection = self.sora.create_connection(
 30             signaling_urls=[signaling_url],
 31             role="sendonly",
 32             channel_id=channel_id,
 33             video_source=self.video_source,
 34             video_bit_rate=1000,
 35         )
 36         self.connection.on_disconnect = self.on_disconnect
 37 
 38     def on_disconnect(self, error_code, message):
 39         print(f"on_disconnect: error_code='{error_code}' message='{message}'    ")
 40         self.running = False
 41 
 42     def run(self):
 43         img_pil = Image.new("RGB", (320, 240), (255, 255, 255))
 44         draw = ImageDraw.Draw(img_pil)
 45         position = (img_pil.width // 2, img_pil.height // 2)
 46 
 47         self.connection.connect()
 48 
 49         try:
 50             start = time.time()
 51             i = 0
 52             while True:
 53                 draw.rectangle([(0, 0), img_pil.size], fill=tuple(self.rgb_a    rray[i % 180]))
 54                 now = time.time() - start
 55                 formatted_time = "{:.3f}".format(now)
 56                 draw.text(
 57                     position,
 58                     formatted_time,
 59                     font=self.font,
 60                    fill="black",
 61                     stroke_fill="white",
 62                     stroke_width=6,
 63                     align="center",
 64                     anchor="mm",
 65                 )
 66                 img = np.array(img_pil)
 67                 # cv2.imshow("Image", img)
 68                 self.video_source.on_captured(img)
 69                 delay = max(int(((i + 1) / 30 - now) * 1000), 1)
 70                 # print(delay)
 71                 ret = cv2.waitKey(delay)
 72                 if ret >= 0:
 73                     break
 74                 i += 1
 75         finally:
 76             self.connection.disconnect()
 77             cv2.destroyAllWindows()
 78 
 79 
 80 if __name__ == "__main__":
 81     App()

25,26行目のSIGNALING_URL, CHANNEL_IDには、ImageFlux Live StreamingのCreateMultistreamChannel APIまたはCreateMultistreamChannelWithHLS APIを呼び出して取得した値を指定します。sora-python-sdkはmultistream:trueでの配信になるため、CreateChannel APIで作成したチャンネルには配信できません。

上記のPythonプログラムを実行すると、下図のように経過時間を表示する映像が配信されます。Sora DevToolsを使用するなどして同チャンネルを受信すると、経過時間のカウントが増えていくことが確認できます。

ブラウザのWebカメラ映像をPythonプログラムを経由してImageFlux Live Streamingにライブ配信

次に、前回の記事を参考に、ブラウザからPythonプログラムにP2Pで接続してWebカメラの映像を送信し、Pythonプログラムで受け取った画像フレームを加工せずにImageFlux Live Streamingに送ります。前回の記事との違いは、映像の流れが一方通行で、Pythonプログラムからブラウザには送り返しません。

  1 import asyncio, json, logging, ssl, uuid
  2 import traceback
  3 from aiohttp import web
  4 from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescriptio    n
  5 from av import VideoFrame
  6 import numpy as np
  7 from queue import Empty
  8 from torch.multiprocessing import Process, Queue, RawValue, set_start_method
  9 from aiortc.mediastreams import MediaStreamError
 10 from sora_sdk import Sora
 11 
 12 
 13 try:
 14     set_start_method("spawn")
 15 except RuntimeError:
 16     pass
 17 
 18 sora_process = None
 19 
 20 
 21 logger = logging.getLogger("pc")
 22 pcs = set()
 23 
 24 
 25 class SoraSink:
 26     def __init__(self):
 27         self.__tracks = {}
 28         self.disconnected = False
 29 
 30     def addTrack(self, track):
 31         if track not in self.__tracks:
 32             self.__tracks[track] = None
 33 
 34     async def start(self):
 35         for track, task in self.__tracks.items():
 36             if task is None:
 37                 self.__tracks[track] = asyncio.ensure_future(self.consume(track))
 38 
 39     async def stop(self):
 40         for task in self.__tracks.values():
 41             if task is not None:
 42                 task.cancel()
 43         self.__tracks = {}
 44 
 45     def on_disconnect(self, error_code, message: str):
 46         print(f"on_disconnect: error_code: {error_code}, message: {message}")
 47         self.disconnected = True
 48 
 49     async def consume(self, track):
 50         signaling_url = SIGNALING_URL
 51         channel_id = CHANNEL_ID
 52         sora = Sora()
 53         video_source = sora.create_video_source()
 54         conn = sora.create_connection(
 55             signaling_urls=[signaling_url],
 56             role="sendonly",
 57             channel_id=channel_id,
 58             video_source=video_source,
 59             video_bit_rate=1000,
 60         )
 61         conn.on_disconnect = self.on_disconnect
 62         conn.connect()
 63         try:
 64             while not self.disconnected:
 65                 try:
 66                     frame = await track.recv()
 67                     img = frame.to_ndarray(format="bgr24")
 68                     if img is None:
 69                         continue
 70                     video_source.on_captured(img)
 71                 except MediaStreamError:
 72                     try:
 73                         await asyncio.sleep(0.1)
 74                     except:
 75                         break
 76         except:
 77             traceback.print_exc()
 78         finally:
 79             conn.disconnect()
 80 
 81 
 82 async def offer(request):
 83     params = await request.json()
 84     offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
 85 
 86     pc = RTCPeerConnection()
 87     pc_id = "PeerConnection(%s)" % uuid.uuid4()
 88     pcs.add(pc)
 89 
 90     sorasink = SoraSink()
 91 
 92     def log_info(msg, *args):
 93         logger.info(pc_id + " " + msg, *args)
 94 
 95     log_info("Created for %s", request.remote)
 96 
 97     @pc.on("datachannel")
 98     def on_datachannel(channel):
 99         @channel.on("message")
100         def on_message(message):
101             if isinstance(message, str) and message.startswith("ping"):
102                 channel.send("pong" + message[4:])
103 
104     @pc.on("connectionstatechange")
105     async def on_connectionstatechange():
106         log_info("Connection state is %s", pc.connectionState)
107         if pc.connectionState == "failed":
108             await pc.close()
109             await sorasink.stop()
110             pcs.discard(pc)
111 
112     @pc.on("track")
113     def on_track(track):
114         log_info("Track %s received", track.kind)
115 
116         if track.kind == "video":
117             sorasink.addTrack(track)
118 
119         @track.on("ended")
120         async def on_ended():
121             log_info("Track %s ended", track.kind)
122             await sorasink.stop()
123 
124     await pc.setRemoteDescription(offer)
125     answer = await pc.createAnswer()
126     await pc.setLocalDescription(answer)
127     await sorasink.start()
128 
129     return web.Response(
130         content_type="application/json",
131         text=json.dumps(
132             {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
133         ),
134     )
135 
136 
137 async def index(request):
138     return web.HTTPFound("/index.html")
139 
140 
141 async def on_shutdown(app):
142     coros = [pc.close() for pc in pcs]
143     await asyncio.gather(*coros)
144     pcs.clear()
145 
146 
147 @web.middleware
148 async def cors_middleware(request, handler):
149     headers = {
150         "Access-Control-Allow-Headers": "*",
151         "Access-Control-Allow-Methods": "*",
152         "Access-Control-Allow-Origin": "*",
153     }
154     if request.method == "OPTIONS":
155         return web.Response(headers=headers)
156     try:
157         response = await handler(request)
158         for key, value in headers.items():
159             response.headers[key] = value
160         return response
161     except web.HTTPException as e:
162         for key, value in headers.items():
163             e.headers[key] = value
164         raise e
165 
166 
167 def main():
168     cert_file = "fullchain.pem"
169     key_file = "privkey.pem"
170     ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_SERVER)
171     ssl_context.load_cert_chain(cert_file, key_file)
172 
173     app = web.Application(middlewares=[cors_middleware])
174     app.on_shutdown.append(on_shutdown)
175     app.router.add_get("/", index)
176     app.router.add_post("/offer", offer)
177     app.add_routes([web.static("/", "static", show_index=True)])
178 
179     web.run_app(
180         app, access_log=None, host="0.0.0.0", port=7860, ssl_context=ssl_context
181     )
182 
183 
184 if __name__ == "__main__":
185     main()

上記のプログラムを実行し、ブラウザで https://p2pcamera.hopto.org:7860/index.html (前回の記事参照)にアクセスして配信を開始すると、Webカメラの映像が上記Pythonプログラムを経由してImageFlux Live Streamingにリレー配信されていることが確認できます。

AIフィルタの実装と確認

ここまで動作することが確認できれば、あとはPythonのプログラムにAIの処理を追加するだけです。ブラウザからP2Pで接続し、Pythonプログラムで受け取った映像をAI画像解析し、その結果を元に新たな画像フレームを生成してImageFlux Live Streamingに送ります。AIフィルタは前回の記事で使用したDensePoseを使用しています。

  1 import asyncio, json, logging, ssl, uuid
  2 import traceback
  3 from aiohttp import web
  4 from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription
  5 from av import VideoFrame
  6 import numpy as np
  7 from queue import Empty
  8 from torch.multiprocessing import Process, Queue, RawValue, set_start_method
  9 from aiortc.mediastreams import MediaStreamError
 10 from sora_sdk import Sora
 11 
 12 
 13 try:
 14     set_start_method("spawn")
 15 except RuntimeError:
 16     pass
 17 
 18 ai_process = None
 19 sora_process = None
 20 input_queue = Queue()
 21 output_queue = Queue()
 22 processed_count = RawValue("i", 0)
 23 
 24 
 25 def push_pop(frame):
 26     try:
 27         while not input_queue.empty():
 28             input_queue.get_nowait()
 29     except Empty:
 30         pass
 31     input_queue.put(frame.to_image())
 32     try:
 33         return output_queue.get_nowait()
 34     except Empty:
 35         return None
 36 
 37 
 38 logger = logging.getLogger("pc")
 39 pcs = set()
 40 
 41 
 42 class SoraSink:
 43     def __init__(self):
 44         self.__tracks = {}
 45         self.disconnected = False
 46 
 47     def addTrack(self, track):
 48         if track not in self.__tracks:
 49             self.__tracks[track] = None
 50 
 51     async def start(self):
 52         for track, task in self.__tracks.items():
 53             if task is None:
 54                 self.__tracks[track] = asyncio.ensure_future(self.consume(track))
 55 
 56     async def stop(self):
 57         for task in self.__tracks.values():
 58             if task is not None:
 59                 task.cancel()
 60         self.__tracks = {}
 61 
 62     def on_disconnect(self, error_code, message: str):
 63         print(f"on_disconnect: error_code: {error_code}, message: {message}")
 64         self.disconnected = True
 65 
 66     async def consume(self, track):
 67         signaling_url = SIGNALING_URL
 68         channel_id = CHANNEL_ID
 69         sora = Sora()
 70         video_source = sora.create_video_source()
 71         conn = sora.create_connection(
 72             signaling_urls=[signaling_url],
 73             role="sendonly",
 74             channel_id=channel_id,
 75             video_source=video_source,
 76             video_bit_rate=1000,
 77         )
 78         conn.on_disconnect = self.on_disconnect
 79         conn.connect()
 80         try:
 81             while not self.disconnected:
 82                 try:
 83                     frame = await track.recv()
 84                     img = push_pop(frame)
 85                     if img is None:
 86                         continue
 87                     video_source.on_captured(img)
 88                 except MediaStreamError:
 89                     try:
 90                         await asyncio.sleep(0.1)
 91                     except:
 92                         break
 93         except:
 94             traceback.print_exc()
 95         finally:
 96             conn.disconnect()
 97 
 98 
 99 async def offer(request):
100     params = await request.json()
101     offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
102 
103     pc = RTCPeerConnection()
104     pc_id = "PeerConnection(%s)" % uuid.uuid4()
105     pcs.add(pc)
106 
107     sorasink = SoraSink()
108 
109     def log_info(msg, *args):
110         logger.info(pc_id + " " + msg, *args)
111 
112     log_info("Created for %s", request.remote)
113 
114     @pc.on("datachannel")
115     def on_datachannel(channel):
116         @channel.on("message")
117         def on_message(message):
118             if isinstance(message, str) and message.startswith("ping"):
119                 channel.send("pong" + message[4:])
120 
121     @pc.on("connectionstatechange")
122     async def on_connectionstatechange():
123         log_info("Connection state is %s", pc.connectionState)
124         if pc.connectionState == "failed":
125             await pc.close()
126             await sorasink.stop()
127             pcs.discard(pc)
128 
129     @pc.on("track")
130     def on_track(track):
131         log_info("Track %s received", track.kind)
132 
133         if track.kind == "video":
134             sorasink.addTrack(track)
135 
136         @track.on("ended")
137         async def on_ended():
138             log_info("Track %s ended", track.kind)
139             await sorasink.stop()
140 
141     await pc.setRemoteDescription(offer)
142     answer = await pc.createAnswer()
143     await pc.setLocalDescription(answer)
144     await sorasink.start()
145 
146     return web.Response(
147         content_type="application/json",
148         text=json.dumps(
149             {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
150         ),
151     )
152 
153 
154 async def index(request):
155     return web.HTTPFound("/index.html")
156 
157 
158 async def on_shutdown(app):
159     coros = [pc.close() for pc in pcs]
160     await asyncio.gather(*coros)
161     pcs.clear()
162 
163 
164 @web.middleware
165 async def cors_middleware(request, handler):
166     headers = {
167         "Access-Control-Allow-Headers": "*",
168         "Access-Control-Allow-Methods": "*",
169         "Access-Control-Allow-Origin": "*",
170     }
171     if request.method == "OPTIONS":
172         return web.Response(headers=headers)
173     try:
174         response = await handler(request)
175         for key, value in headers.items():
176             response.headers[key] = value
177         return response
178     except web.HTTPException as e:
179         for key, value in headers.items():
180             e.headers[key] = value
181         raise e
182 
183 
184 def main():
185     from sub_densepose import sub_main
186 
187     global ai_process
188     ai_process = Process(
189         target=sub_main,
190         args=(input_queue, output_queue, processed_count),
191         daemon=True,
192     )
193     ai_process.start()
194     logging.basicConfig(level=logging.INFO)
195 
196     cert_file = "fullchain.pem"
197     key_file = "privkey.pem"
198     ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_SERVER)
199     ssl_context.load_cert_chain(cert_file, key_file)
200 
201     app = web.Application(middlewares=[cors_middleware])
202     app.on_shutdown.append(on_shutdown)
203     app.router.add_get("/", index)
204     app.router.add_post("/offer", offer)
205     app.add_routes([web.static("/", "static", show_index=True)])
206 
207     web.run_app(
208         app, access_log=None, host="0.0.0.0", port=7860, ssl_context=ssl_context
209     )
210 
211 
212 if __name__ == "__main__":
213     main()

プログラムを実行したらImageFlux Live StreamingのHLS視聴用URLにアクセスし、視聴できることを確認します。

まとめ

ブラウザからP2Pでさくらのクラウド高火力サーバに映像を転送し、AIフィルタを通した映像をImageFlux Live Streaming経由で配信するサンプルを紹介しました。セグメンテーションや物体検出、顔検出、その他AIが得意とする処理をクラウド上で実現できます。データチャンネルを使用してImageFlux Live Streamingの接続情報を送るとより汎用的に配信することができるでしょう。また、さくらの高火力サーバはWebAPIを使用して起動・停止ができるようなので、ワンクリックでサーバを起動・停止できると使い勝手もよくなります。ぜひ挑戦してください。ご質問、ご感想もお待ちしています。