こんにちは。サイオステクノロジーの和田です。前回は こちら でアウトボックスパターンという設計パターンを紹介しましたが、今回はその設計パターンを実現できる River というライブラリを使ってみたので紹介したいと思います。それではいきましょう。 River とは River は Go 言語で書かれた PostgreSQL 専用のジョブキューライブラリです。PostgreSQL をバックエンドとして使用することで、アウトボックスパターンを簡単に実装できます。主な特徴として、Redis や RabbitMQ などの外部ジョブキューを必要とせず River 単体でジョブキューを管理することができます。 ディレクトリ構成 今回作るサンプルアプリは river_examples 配下に、以下のような構成で配置します。 river_examples ├── db │ └── init.sql ├── docker-compose.yml ├── go.mod ├── go.sum ├── handlers │ └── user.go ├── jobs │ └── send_email.go └── main.go 実装 まず、必要なパッケージを 公式 の手順に従ってインストールします。 go get github.com/riverqueue/river go get github.com/riverqueue/river/riverdriver/riverpgxv5 go mod tidy River を動かすためには PostgreSQL が必須なので、Docker Compose で DB を作っていきます。 ここでは、River 本体が使うテーブルのマイグレーションと、今回サンプルで作成するアプリケーションで使うテーブルのマイグレーションを行います。 まず最初にアプリケーションで使うユーザーテーブルを作成するための SQL を作ります。 -- filepath: db/init.sql CREATE TABLE IF NOT EXISTS users ( id BIGSERIAL PRIMARY KEY, email TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); 次に、postgres 本体とマイグレーション用のコンテナを Docker Compose で作ります。 River にはマイグレーション用の go コマンドがあるので、そちらを利用してマイグレーションを行っています。 # filepath: docker-compose.yml services: db: image: postgres:16-alpine environment: POSTGRES_USER: user POSTGRES_PASSWORD: password POSTGRES_DB: mydb ports: - "5432:5432" volumes: - pgdata:/var/lib/postgresql/data - ./db/init.sql:/docker-entrypoint-initdb.d/001-init.sql:ro healthcheck: test: ["CMD-SHELL", "pg_isready -U user -d mydb"] interval: 2s timeout: 5s retries: 30 river-migrate: image: golang:1.24 depends_on: db: condition: service_healthy environment: DATABASE_URL: postgres://user:password@db:5432/mydb?sslmode=disable command: - bash - -c - | set -euo pipefail go install github.com/riverqueue/river/cmd/river@v0.28.0 /go/bin/river migrate-up --line main --database-url "$$DATABASE_URL" restart: "no" volumes: pgdata: 以下のコマンドで作成します。 cd river_examples docker compose up -d db docker compose run --rm river-migrate 1. ジョブの定義 続いて、メール送信ジョブを定義します // filepath: jobs/send_email.go package jobs import ( "context" "fmt" "log" "github.com/riverqueue/river" ) // SendEmailArgs はメール送信ジョブの引数 type SendEmailArgs struct { UserID int64 `json:"user_id"` Email string `json:"email"` Subject string `json:"subject"` Body string `json:"body"` } // Kind はジョブの種類を識別する名前を返す func (SendEmailArgs) Kind() string { return "send_email" } // SendEmailWorker はメール送信を実行するワーカー type SendEmailWorker struct { river.WorkerDefaults[SendEmailArgs] } // Work は実際のメール送信処理を実行 func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error { log.Printf("Sending email to %s (UserID: %d)", job.Args.Email, job.Args.UserID) log.Printf("Subject: %s", job.Args.Subject) // ここで実際のメール送信処理を行う // 例:外部のメール配信サービスAPIを呼び出す err := sendEmailViaSMTP(job.Args.Email, job.Args.Subject, job.Args.Body) if err != nil { return fmt.Errorf("failed to send email: %w", err) } log.Printf("Email sent successfully to %s", job.Args.Email) return nil } func sendEmailViaSMTP(email, subject, body string) error { // 実際のメール送信ロジック // ここでは簡略化のため省略 return nil } 2. メイン処理 メイン処理では主に以下を行います。 River クライアントの初期化 River ワーカーの登録 ユーザー登録処理 // filepath: main.go package main import ( "context" "log" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" "river_examples/handlers" "river_examples/jobs" ) func setupRiver(ctx context.Context, dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) { workers := river.NewWorkers() // メール送信ワーカーを登録 river.AddWorker(workers, &jobs.SendEmailWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { return nil, err } return riverClient, nil } func main() { ctx := context.Background() // PostgreSQL接続プールの作成 dbPool, err := pgxpool.New(ctx, "postgres://user:password@localhost:5432/mydb?sslmode=disable") if err != nil { log.Fatal(err) } defer dbPool.Close() // Riverクライアントのセットアップ riverClient, err := setupRiver(ctx, dbPool) if err != nil { log.Fatal(err) } // Riverワーカーを起動 if err := riverClient.Start(ctx); err != nil { log.Fatal(err) } defer riverClient.Stop(ctx) log.Println("River worker started") // アプリケーションのメイン処理 userHandler := handlers.NewUserHandler(dbPool, riverClient) if err := userHandler.RegisterUser(ctx, "test@example.com", "password"); err != nil { log.Fatal(err) } // 動作確認用(ジョブが処理されるまで少し待つ) time.Sleep(2 * time.Second) } 3. ユーザー登録処理 ユーザー登録時に、DB への書き込みとジョブの投入を同じトランザクションで行います // filepath: handlers/user.go package handlers import ( "context" "fmt" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "river_examples/jobs" ) type UserHandler struct { dbPool *pgxpool.Pool riverClient *river.Client[pgx.Tx] } func NewUserHandler(dbPool *pgxpool.Pool, riverClient *river.Client[pgx.Tx]) *UserHandler { return &UserHandler{ dbPool: dbPool, riverClient: riverClient, } } func (h *UserHandler) RegisterUser(ctx context.Context, email, password string) error { // トランザクション開始 tx, err := h.dbPool.Begin(ctx) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback(ctx) // 1. ユーザーテーブルに挿入 var userID int64 err = tx.QueryRow(ctx, ` INSERT INTO users (email, password_hash, created_at) VALUES ($1, $2, NOW()) RETURNING id `, email, hashPassword(password)).Scan(&userID) if err != nil { return fmt.Errorf("failed to insert user: %w", err) } // 2. メール送信ジョブをエンキュー(同じトランザクション内) _, err = h.riverClient.InsertTx(ctx, tx, jobs.SendEmailArgs{ UserID: userID, Email: email, Subject: "Welcome to Our Service!", Body: "Thank you for registering. Please verify your email...", }, nil) if err != nil { return fmt.Errorf("failed to enqueue email job: %w", err) } // 3. トランザクションをコミット if err := tx.Commit(ctx); err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } return nil } func hashPassword(password string) string { // パスワードハッシュ化(今回はそのまま登録) return password } サンプルでは UserHandler を main から呼び出す形にしていて、実行すると users への INSERT と River のジョブテーブルへの INSERT が同一トランザクションで行われ、ワーカーがジョブを処理します。 動作の流れ 実際の処理の流れは以下のようになります。 ユーザー登録リクエストが来る トランザクション開始 ユーザーテーブルに挿入(INSERT) River のジョブテーブルに挿入(InsertTx) トランザクションコミット River ワーカーがジョブテーブルを監視 メール送信ジョブを実行 これにより、ユーザー登録が成功した場合のみメール送信ジョブが確実に実行され、前回説明したアウトボックスパターンが実現できます。 まとめ 今回は、River を使ったサンプルアプリを作ってみました。River を使うことで、 PostgreSQL のトランザクション機能を活用し、アウトボックスパターンを簡単に実装できます。データベースへの書き込みとジョブの投入を単一のトランザクションで行えるため、データの整合性を保ちながら非同期処理を実現できます。 ご覧いただきありがとうございます! この投稿はお役に立ちましたか? 役に立った 役に立たなかった 0人がこの投稿は役に立ったと言っています。 The post Goのジョブキューライブラリ River 使ってみた first appeared on SIOS Tech Lab .