every Tech Blog

株式会社エブリーのTech Blogです。

Next.js + Go + AWS API Gateway で WebSocket API を使って API サーバーからフロントエンドに通知を送る

Next.js + Go + AWS API Gateway で WebSocket API を使って API サーバーからフロントエンドに通知を送る

はじめに

こんにちは トモニテ でバックエンド周りの開発を行っている rymiyamoto です。

エブリーとして初の試みとなる Tech Blog Advent Calendar 2023 の 1日目の記事として参加させていただきました。
毎日他の記事も公開されるので、ぜひチェックしてみてください!

tech.every.tv

今回ですが Next.js + Go + AWS API Gateway で WebSocket API を使ってみたのでその内容を紹介していきます。

経緯

トモニテ では現在、バックエンドは Go でフロントエンドは React(Next.js) で開発を行っています。

フロントエンドとバックエンドの通信は REST API で行っていましたが、エンドユーザーの行動に対してダッシュボードを利用しているユーザーに即時性のある通知機能を実装する必要が出てきたため、WebSocket API を使ってみることにしました。

現状 API サーバーは ECS 上で動いており、API サーバー側で WebSocket API を実装するのは少し手間がかかるため、AWS API Gateway で WebSocket API を実装することにしました。

WebSocket API とは

ユーザーのブラウザーとサーバー間で対話的な通信セッションを開くことができるものです。 サーバーにメッセージを送信したり、応答をサーバーにポーリングすることなく、イベント駆動型のレスポンスを受信することができます。

developer.mozilla.org

今回の実装

元々ある ECS 環境(API サーバー・dashboard・web)から API Gateway で WebSocket API を利用できるように各種 Lambda を作成しました。

また裏側では Lambda から RDS への接続を行いたいため、RDS Proxy を利用しています。

構成図

流れとしては以下のようなフローです。

  1. dashboard(FE)で WebSocket API を利用するためのクライアントを作成してコネクション確立
  2. web(FE)で API サーバーに対してリクエストを送った際に、通知を行う Lambda を呼び出し
  3. API Gateway を通して dashboard に通知が飛ぶ

API Gateway の設定

ともかく WebSocket API を利用できるように API Gateway を作成します。

API Gateway において、どのリクエストに対してどの操作を行うかを決定するルート式を指定します。
今回は特別に指定もいらないので $request.body.action としておきます。

WebSocket API を使うための API GateWay の作成

以降の部分は特に指定がなければデフォルトのまま作成していきます。
このとき、ルートに $connect$disconnect が追加されますが、これらは接続と切断時のルートとなります。

IAM Role の作成

実行用の Lambda の Role

API Gateway や SecretManager(RDS Proxy 周りの機密情報の管理) を利用するために Role を作成します
(以降 web-socket-lambda-role とします)

その時に必要となるポリシーは以下です。

{
  "Statement": [
    {
      "Action": "secretsmanager:GetSecretValue",
      "Effect": "Allow",
      "Resource": "*",
      "Sid": "GetSecretValue"
    },
    {
      "Action": [
        "ec2:DescribeNetworkInterfaces",
        "ec2:DeleteNetworkInterface",
        "ec2:CreateNetworkInterface"
      ],
      "Effect": "Allow",
      "Resource": "*",
      "Sid": "ManageNetworkInterface"
    },
    {
      "Action": [
        "logs:PutLogEvents",
        "logs:CreateLogStream",
        "logs:CreateLogGroup"
      ],
      "Effect": "Allow",
      "Resource": "*",
      "Sid": "ManageLogGroup"
    },
    {
      "Action": "execute-api:*",
      "Effect": "Allow",
      "Resource": "*",
      "Sid": "ExecuteAPI"
    }
  ],
  "Version": "2012-10-17"
}

API サーバーから Lambda を呼び出すための Policy 追加

Lambda を呼び出すための Policy を API サーバーの Role に付与します。

その時に必要となるポリシーは以下です。
(websocket-notification が後に作成される通知用の Lambda の名前です)

{
  "Statement": [
    {
      "Action": "lambda:InvokeFunction",
      "Effect": "Allow",
      "Resource": "arn:aws:lambda:ap-northeast-1:111111111111:function:websocket-notification",
      "Sid": ""
    }
  ],
  "Version": "2012-10-17"
}

Lambda の設定

今回 WebSocket API を利用するための Lambda は以下の 3 つとなります。

  • connect: 接続時に API Gateway から呼び出される
  • disconnect: 切断時に API Gateway から呼び出される
  • notification: API サーバーから呼び出されて通知を行う

connect

やること

  • API Gateway の $connect ルートをイベントトリガーとして設定する
  • クエリにユーザーが特定できるような情報を渡しておく
  • WebSocket の接続 ID を取得して DB に書き込みを行う

内部の処理のイメージは以下です。

package main

import (
    "context"
    "log"
    "net/http"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
)

type Response events.APIGatewayProxyResponse

func Handler(_ context.Context, request events.APIGatewayWebSocketProxyRequest) (Response, error) {
    log.Println("Begin WebSocket connect")

    log.Println("ユーザー特定")
    // リクエストのクエリからユーザーを一意に特定できる情報を取得します
    // 今回はユーザーの識別となるトークンを取得しています(複数タブを識別するため)
    token := request.QueryStringParameters["token"]
    // 以下にトークンからユーザーIDを取得

    log.Println("DSNの取得開始")
    // 以下にDNS情報をsecret managerから取得する処理

    log.Println("DBの接続")
    // 以下にDBの接続処理

    log.Println("コネクションIDの保存")
    // リクエストからコネクションIDがとれます
    connectionID := request.RequestContext.ConnectionID
    // 以下にコネクションIDの保存処理

    log.Println("End WebSocket connect")
    return Response{StatusCode: http.StatusOK}, nil
}

func main() {
    lambda.Start(Handler)
}

disconnect

やること

  • API Gateway の $disconnect ルートをイベントトリガーとして設定する
  • WebSocket の接続 ID を取得して DB から削除を行う

内部の処理のイメージは以下です。

package main

import (
    "context"
    "log"
    "net/http"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
)

type Response events.APIGatewayProxyResponse

func Handler(_ context.Context, request events.APIGatewayWebsocketProxyRequest) (Response, error) {
    log.Println("Begin WebSocket disconnect")

    log.Println("DSNの取得開始")
    // 以下にDNS情報をsecret managerから取得する処理

    log.Println("DBの接続")
    // 以下にDBの接続処理

    log.Println("コネクションIDの削除")
    // リクエストからコネクションIDがとれます
    connectionID := request.RequestContext.ConnectionID
    // 以下にコネクションIDの削除処理

    log.Println("End WebSocket disconnect")
    return Response{StatusCode: http.StatusOK}, nil
}

func main() {
    lambda.Start(Handler)
}

notification

やること

  • 呼び出し時の payload には通知を行うユーザーの ID を含めておく
  • そのユーザー ID からコネクション ID を取得して通知を行う

内部の処理のイメージは以下です。

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "os"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi"
)

type Response events.APIGatewayProxyResponse

func sendMessage(ctx context.Context, endpoint, connectionID, message string) error {
    cfg, err := config.LoadDefaultConfig(ctx)
    if err != nil {
        return err
    }

    client := apigatewaymanagementapi.NewFromConfig(cfg, func(o *apigatewaymanagementapi.Options) {
        o.BaseEndpoint = aws.String(endpoint)
    })
    input := &apigatewaymanagementapi.PostToConnectionInput{
        ConnectionId: aws.String(connectionID),
        Data:         []byte(message),
    }
    _, err = client.PostToConnection(ctx, input)

    return err
}

type Event struct {
    UserID uint64 `json:"user_id"`
    JsonString  string `json:"json"`
}

func Handler(ctx context.Context, event Event) (Response, error) {
    log.Println("Begin WebSocket notification")

    log.Println("DSNの取得開始")
    // 以下にDNS情報をsecret managerから取得する処理

    log.Println("DBの接続")
    // 以下にDBの接続処理

    log.Println("対象ユーザーのコネクションIDリスト(複数端末や複数タブの都合上)取得")
    // lambda呼び出し時のpayloadには特定させるためのユーザーID、渡したいメッセージのjson入れておきます
    userID := event.UserID
    // 以下に対象ユーザーのコネクションIDのリスト取得処理

    log.Println("API Gatewayを経由してWeb Socketのメッセージを送信")
    endpoint := os.Getenv("API_GATEWAY_ENDPOINT")
    for _, connectionID := range connectionIDs {
        err = sendMessage(ctx, endpoint, connectionID, event.JsonString)
    }
    if err != nil {
        return Response{StatusCode: http.StatusInternalServerError}, err
    }

    log.Println("End WebSocket notification")
    return Response{StatusCode: http.StatusOK}, nil
}

func main() {
    lambda.Start(Handler)
}

API サーバーで通知 Lambda(notification)を呼び出し

API サーバー側で以下の関数を作成し通知を行う Lambda を呼び出すようにします。

package aws

import (
    "context"
    "fmt"
    "os"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/lambda"
)

func createClient() (*lambda.Client, error) {
    cfg, err := config.LoadDefaultConfig(context.Background())
    if err != nil {
        return nil, fmt.Errorf("load default config on background context failed. err: %w", err)
    }

    return lambda.NewFromConfig(cfg), nil
}

func Invoke(payload []byte) error {
    funcName := os.Getenv("WEBSOCKET_NOTIFICATION_FUNCTION")
    client, err := createClient()
    if err != nil {
        return fmt.Errorf("create lambda client failed. lambda function: %s, err: %w", funcName, err)
    }

    input := &lambda.InvokeInput{
        FunctionName: aws.String(funcName),
        Payload:      payload,
    }
    _, err = client.Invoke(context.Background(), input)
    if err != nil {
        return fmt.Errorf("call %s failed. err: %w", funcName, err)
    }

    return nil
}

WebSocket API を使うためのクライアントの作成

WebSocket API を利用するためのクライアントを作成します。

フロントエンドは Next.js で作成しているので、useEffect でコネクション確立を行うようにしています。

"use client";

import { useEffect, useState } from "react";

type ApplicationEvent = {
  message: string;
};

export default function EventReceiver({ token }: { token: string }) {
  const [applicationEvent, setApplicationEvent] = useState<ApplicationEvent>();

  // WebSocketのコネクションを張る
  useEffect(() => {
    const connectWebSocket = () => {
      const webSocketURL = process.env.NEXT_PUBLIC_WEB_SOCKET_URL;
      if (!webSocketURL) {
        throw new Error("Web SocketのURLが設定されていません。");
      }

      const ws = new WebSocket(`${webSocketURL}?token=${token}`);
      ws.onmessage = (event) => {
        const e = JSON.parse(event.data);
        setApplicationEvent(e);
      };
      // 勝手に接続切れたときの再接続
      ws.onclose = () => {
        setTimeout(connectWebSocket, 1000);
      };

      return ws;
    };

    const ws = connectWebSocket();
    return () => {
      ws.close();
    };
  }, [token]);

  if (!applicationEvent) {
    return null;
  }

  return <div>{e.message}</div>;
}

(Next.js ver.13 想定のため、12 以下の場合は "use client"; は不要です。)

まとめ

WebSocket API を利用して API サーバーからフロントエンドに通知を送る方法を紹介しました。

自身で WebSocket API を実装するのは少し手間がかかりますが、API Gateway を使うことで簡単に実装することができます。
今回は複数ブラウザのケースを考慮してユーザーにコネクションを紐付けるようにしましたが、そうでない場合は通知部分にコネクション ID を渡すことでもう少しスッキリかけたと思います。

ただクライアント側で WebSocket のコネクションを張るときに再接続処理を書く必要があるので、プレーンで書くと少し面倒です。

WebSocket API は今回のような通知機能だけでなく、リアルタイムなチャットなどにも利用できるので、今後も様々な場面で利用可能です。
同じような実装してみたい方の参考になれば幸いです。