こんにちは。 ファインディ株式会社でFindy AI+の開発をしているdanです。 Findy AI+ではLLMを活用した分析機能を提供しています。 分析対象は個人・チーム・組織と幅広く、データ量に応じて分析に時間がかかることがあります。分析が完了するまで画面に何も表示されないと、ユーザーは処理が進んでいるのか分からず、待ち時間が長く感じられてしまいます。 この課題を解消するため、LLM分析結果の表示にストリーミング出力を導入しました。 今回は、実装内容とどの程度待ち時間が改善されたのかについてお話しします。 Findy AI+とは ストリーミング対応前は何が問題だった? 当初の設計 分析結果を見るのにどのくらいの時間を要していたのか どのように対応を進めたか ストリーミング対応の設計 肝となる実装の全体像 LangChainの astream() メソッド FastAPIでのストリーミング実装 ストリーミングレスポンスの生成 SSE(Server-Sent Events)の仕組み フロントエンドでの受信処理 ストリーミング対応後に得られた効果 おわりに Findy AI+とは Findy AI+ は、GitHub連携やプロンプト指示を通じて生成AIアクティビティを可視化し、生成AIの利活用向上を支援するサービスです。 人と生成AIの協働を後押しし、開発組織の変革をサポートします。 Claude Code、GitHub Copilot、Codex、Devinなど様々なAIツールの利活用を横断的に分析しており、分析基盤にはLangChainを採用しています。また、日報やチーム分析などの機能でもLLMを活用しています。 LLM分析に使用するプロンプト調整についても記事を公開していますので、よかったらご覧ください。 tech.findy.co.jp ストリーミング対応前は何が問題だった? 当初の設計 分析結果を見るのにどのくらいの時間を要していたのか 冒頭で述べた通り、Findy AI+では個人・チーム・組織と幅広いデータを分析対象としています。 分析に必要なデータ作成のAPIを例にあげます。 @ router.post ( "/api/v1/hoge" ) def create_hoge ( request: CreateHogeRequest, db: Session = Depends(get_db), ): # 1. データ取得 hoge = get_hoge(db, request.hoge_id) # 2. LLM呼び出し(全レスポンス待ち) client = Anthropic(api_key= "xxx" ) message = client.messages.create( model= "claude-sonnet-4-20250514" , max_tokens= 4096 , messages=[{ "role" : "user" , "content" : f "分析して: {hoge.content}" }], ) analysis_result = message.content[ 0 ].text # 3. 分析結果保存 save_analysis(db, hoge.id, analysis_result) # 4. レスポンス返却 return { "id" : hoge.id, "analysis" : analysis_result} やっていることは次の2つです。 LLM分析を行い完了後に分析結果を保存 フロントへデータを返す APIを呼び出してみると平均して30~40秒ほど時間がかかっていることが分かりました。 % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 7341 100 6673 100 668 193 19 0:00:35 0:00:34 0:00:01 2059 実行してから分析結果を見るのに30~40秒程度かかるほど、遅すぎて使いものにならない状態でした。 どのように対応を進めたか ストリーミング対応の設計 新たに作成したストリーミングのAPIは下記の通りです。 肝となる実装の全体像 ストリーミング対応したサンプルコードは下記になります。 @ router.post ( "/api/v1/hoge/analysis/streaming" ) async def create_hoge_analysis_streaming ( request: AnalysisRequest, db: Session = Depends(get_db), ): # 1. データ取得 hoge = get_hoge(db, request.hoge_id) # 2. LangChainエージェント作成 agent = ChatAnthropic( model= "claude-sonnet-4-20250514" , anthropic_api_key= "xxx" , max_tokens= 4096 , ) # 3. StreamingResponse返却 return StreamingResponse( generate_streaming_response(agent, hoge, db), media_type= "text/event-stream" , ) async def generate_streaming_response ( agent: ChatAnthropic, hoge: Hoge, db: Session, ) -> AsyncGenerator[ str , None ]: """SSE形式でストリーミングレスポンスを生成""" accumulated_content = "" messages = [HumanMessage(content=f "分析して: {hoge.content}" )] # チャンクごとに送信 async for chunk in agent.astream(messages): if chunk.content: accumulated_content += chunk.content data = json.dumps({ "type" : "content" , "content" : chunk.content}) yield f "data: {data} \n\n " # 完了後にDB保存 save_analysis(db, hoge.id, accumulated_content) # 完了イベント送信 data = json.dumps({ "type" : "complete" , "content" : accumulated_content}) yield f "data: {data} \n\n " やっていることは次の2つです。 分析結果がチャンクで返ってくるので、そのままフロントに渡す 分析完了後に分析結果を保存・フロントに返す どのように分析結果をチャンク(断片したテキスト)形式で受け取るのかについて説明します。 これは、LangChainのストリーミングメソッドを使用することで実現可能です。 LangChainの astream() メソッド async for chunk in agent.astream(messages): print (chunk.content) # "Find" → "y AI+" → "AI利活用" のように少しずつ届く astream() は非同期ジェネレータを返し、LLM APIからのレスポンスをチャンク単位で受信できます。 通常の invoke() との違いは全文が返らないことです。 # invoke(): 全文が返るまで待つ result = agent.invoke(messages) # 数秒〜数十秒ブロック # astream(): チャンクごとに即座に処理できる async for chunk in agent.astream(messages): # 即座に開始 process(chunk.content) FastAPIでのストリーミング実装 @ router.post ( "/api/v1/hoge/analysis/streaming" ) async def create_hoge_analysis_streaming (request: AnalysisRequest): hoge = get_hoge(request.hoge_id) agent = ChatAnthropic(model= "claude-sonnet-4-20250514" , ...) return StreamingResponse( generate_streaming_response(agent, hoge), media_type= "text/event-stream" , ) StreamingResponse に非同期ジェネレータを渡すことで、yield するたびにクライアントへデータが送信されます。 ストリーミングレスポンスの生成 async def generate_streaming_response (agent, hoge) -> AsyncGenerator[ str , None ]: accumulated_content = "" messages = [HumanMessage(content=f "分析して: {hoge.content}" )] # チャンクごとにフロントへ送信 async for chunk in agent.astream(messages): if chunk.content: accumulated_content += chunk.content data = json.dumps({ "type" : "content" , "content" : chunk.content}) yield f "data: {data} \n\n " # 完了後にDB保存 & 完了イベント送信 save_analysis(hoge.id, accumulated_content) data = json.dumps({ "type" : "complete" , "content" : accumulated_content}) yield f "data: {data} \n\n " accumulated_content で全文を蓄積しDB保存の準備を行います yield f"data: {data}\n\n" でSSE形式でフロントへ送信することでストリーミング対応を行います SSE(Server-Sent Events)の仕組み StreamingResponse で media_type="text/event-stream" を指定することで、SSE形式でデータを送信できます。 SSEでは data: {JSON}\n\n という形式でイベントを送信します。クライアントはこのイベントを受信するたびに画面を更新できるため、LLMの出力をリアルタイムで表示できます。 data: {"type": "content", "content": "Findy "}\n\n data: {"type": "content", "content": "AI+では"}\n\n data: {"type": "content", "content": "..."}\n\n data: {"type": "complete", "content": "Findy AI+では..."}\n\n フロントエンドでの受信処理 フロントエンドでは fetch の ReadableStream を使ってストリーミングデータを受信します。 const response = await fetch ( '/api/v1/hoge/analysis/streaming' , { method : 'POST' , body : JSON . stringify ( { hoge_id : 123 } ), } ); const reader = response. body .getReader(); const decoder = new TextDecoder (); while ( true ) { const { done , value } = await reader.read(); if (done) break ; const chunk = decoder. decode (value); // "data: {...}\n\n" 形式のイベントをパース const events = chunk. split ( ' \n\n ' ). filter ( e => e. startsWith ( 'data: ' )); for ( const event of events) { const data = JSON . parse (event. replace ( 'data: ' , '' )); if (data. type === 'content' ) { // チャンクを画面に追加表示 appendText(data.content); } else if (data. type === 'complete' ) { // 完了処理 } } } このように、サーバー側でチャンクを yield するたびにフロントエンドで受信・表示することで、ユーザーは分析結果をリアルタイムで確認できます。 ストリーミング対応後に得られた効果 画面遷移から3秒程度で読み始めることができるので非常に使い勝手が良くなりました。 おわりに ストリーミング対応をしたことで分析に使用するデータ量が多くても逐一分析結果が表示されるようになりました。 LangChainを使用したストリーミング対応について少しでも参考になれば幸いです。 現在、ファインディでは一緒に働くメンバーを募集中です。 興味がある方はこちらからご応募ください。 herp.careers