ããã«ã¡ã¯ããµã€ãªã¹ãã¯ãããžãŒã®åç°ã§ããåå㯠ãã¡ã ã§ã¢ãŠãããã¯ã¹ãã¿ãŒã³ãšããèšèšãã¿ãŒã³ã玹ä»ããŸããããä»åã¯ãã®èšèšãã¿ãŒã³ãå®çŸã§ãã River ãšããã©ã€ãã©ãªã䜿ã£ãŠã¿ãã®ã§ç޹ä»ããããšæããŸããããã§ã¯ãããŸãããã River ãšã¯ River 㯠Go èšèªã§æžããã PostgreSQL å°çšã®ãžã§ããã¥ãŒã©ã€ãã©ãªã§ããPostgreSQL ãããã¯ãšã³ããšããŠäœ¿çšããããšã§ãã¢ãŠãããã¯ã¹ãã¿ãŒã³ãç°¡åã«å®è£
ã§ããŸããäž»ãªç¹åŸŽãšããŠãRedis ã RabbitMQ ãªã©ã®å€éšãžã§ããã¥ãŒãå¿
èŠãšãã River åäœã§ãžã§ããã¥ãŒã管çããããšãã§ããŸãã ãã£ã¬ã¯ããªæ§æ ä»åäœããµã³ãã«ã¢ããªã¯ river_examples é
äžã«ã以äžã®ãããªæ§æã§é
眮ããŸãã river_examples âââ db â âââ init.sql âââ docker-compose.yml âââ go.mod âââ go.sum âââ handlers â âââ user.go âââ jobs â âââ send_email.go âââ main.go å®è£
ãŸããå¿
èŠãªããã±ãŒãžã å
¬åŒ ã®æé ã«åŸã£ãŠã€ã³ã¹ããŒã«ããŸãã go get github.com/riverqueue/river go get github.com/riverqueue/river/riverdriver/riverpgxv5 go mod tidy River ãåããããã«ã¯ PostgreSQL ãå¿
é ãªã®ã§ãDocker Compose ã§ DB ãäœã£ãŠãããŸãã ããã§ã¯ãRiver æ¬äœã䜿ãããŒãã«ã®ãã€ã°ã¬ãŒã·ã§ã³ãšãä»åãµã³ãã«ã§äœæããã¢ããªã±ãŒã·ã§ã³ã§äœ¿ãããŒãã«ã®ãã€ã°ã¬ãŒã·ã§ã³ãè¡ããŸãã ãŸãæåã«ã¢ããªã±ãŒã·ã§ã³ã§äœ¿ããŠãŒã¶ãŒããŒãã«ãäœæããããã® SQL ãäœããŸãã -- filepath: db/init.sql CREATE TABLE IF NOT EXISTS users ( id BIGSERIAL PRIMARY KEY, email TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); 次ã«ãpostgres æ¬äœãšãã€ã°ã¬ãŒã·ã§ã³çšã®ã³ã³ããã Docker Compose ã§äœããŸãã River ã«ã¯ãã€ã°ã¬ãŒã·ã§ã³çšã® go ã³ãã³ããããã®ã§ããã¡ããå©çšããŠãã€ã°ã¬ãŒã·ã§ã³ãè¡ã£ãŠããŸãã # filepath: docker-compose.yml services: db: image: postgres:16-alpine environment: POSTGRES_USER: user POSTGRES_PASSWORD: password POSTGRES_DB: mydb ports: - "5432:5432" volumes: - pgdata:/var/lib/postgresql/data - ./db/init.sql:/docker-entrypoint-initdb.d/001-init.sql:ro healthcheck: test: ["CMD-SHELL", "pg_isready -U user -d mydb"] interval: 2s timeout: 5s retries: 30 river-migrate: image: golang:1.24 depends_on: db: condition: service_healthy environment: DATABASE_URL: postgres://user:password@db:5432/mydb?sslmode=disable command: - bash - -c - | set -euo pipefail go install github.com/riverqueue/river/cmd/river@v0.28.0 /go/bin/river migrate-up --line main --database-url "$$DATABASE_URL" restart: "no" volumes: pgdata: 以äžã®ã³ãã³ãã§äœæããŸãã cd river_examples docker compose up -d db docker compose run --rm river-migrate 1. ãžã§ãã®å®çŸ© ç¶ããŠãã¡ãŒã«éä¿¡ãžã§ããå®çŸ©ããŸã // filepath: jobs/send_email.go package jobs import ( "context" "fmt" "log" "github.com/riverqueue/river" ) // SendEmailArgs ã¯ã¡ãŒã«éä¿¡ãžã§ãã®åŒæ° type SendEmailArgs struct { UserID int64 `json:"user_id"` Email string `json:"email"` Subject string `json:"subject"` Body string `json:"body"` } // Kind ã¯ãžã§ãã®çš®é¡ãèå¥ããååãè¿ã func (SendEmailArgs) Kind() string { return "send_email" } // SendEmailWorker ã¯ã¡ãŒã«éä¿¡ãå®è¡ããã¯ãŒã«ãŒ type SendEmailWorker struct { river.WorkerDefaults[SendEmailArgs] } // Work ã¯å®éã®ã¡ãŒã«éä¿¡åŠçãå®è¡ func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error { log.Printf("Sending email to %s (UserID: %d)", job.Args.Email, job.Args.UserID) log.Printf("Subject: %s", job.Args.Subject) // ããã§å®éã®ã¡ãŒã«éä¿¡åŠçãè¡ã // äŸïŒå€éšã®ã¡ãŒã«é
ä¿¡ãµãŒãã¹APIãåŒã³åºã err := sendEmailViaSMTP(job.Args.Email, job.Args.Subject, job.Args.Body) if err != nil { return fmt.Errorf("failed to send email: %w", err) } log.Printf("Email sent successfully to %s", job.Args.Email) return nil } func sendEmailViaSMTP(email, subject, body string) error { // å®éã®ã¡ãŒã«éä¿¡ããžã㯠// ããã§ã¯ç°¡ç¥åã®ããçç¥ return nil } 2. ã¡ã€ã³åŠç ã¡ã€ã³åŠçã§ã¯äž»ã«ä»¥äžãè¡ããŸãã River ã¯ã©ã€ã¢ã³ãã®åæå River ã¯ãŒã«ãŒã®ç»é² ãŠãŒã¶ãŒç»é²åŠç // filepath: main.go package main import ( "context" "log" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" "river_examples/handlers" "river_examples/jobs" ) func setupRiver(ctx context.Context, dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) { workers := river.NewWorkers() // ã¡ãŒã«éä¿¡ã¯ãŒã«ãŒãç»é² river.AddWorker(workers, &jobs.SendEmailWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { return nil, err } return riverClient, nil } func main() { ctx := context.Background() // PostgreSQLæ¥ç¶ããŒã«ã®äœæ dbPool, err := pgxpool.New(ctx, "postgres://user:password@localhost:5432/mydb?sslmode=disable") if err != nil { log.Fatal(err) } defer dbPool.Close() // Riverã¯ã©ã€ã¢ã³ãã®ã»ããã¢ãã riverClient, err := setupRiver(ctx, dbPool) if err != nil { log.Fatal(err) } // Riverã¯ãŒã«ãŒãèµ·å if err := riverClient.Start(ctx); err != nil { log.Fatal(err) } defer riverClient.Stop(ctx) log.Println("River worker started") // ã¢ããªã±ãŒã·ã§ã³ã®ã¡ã€ã³åŠç userHandler := handlers.NewUserHandler(dbPool, riverClient) if err := userHandler.RegisterUser(ctx, "test@example.com", "password"); err != nil { log.Fatal(err) } // åäœç¢ºèªçšïŒãžã§ããåŠçããããŸã§å°ãåŸ
ã€ïŒ time.Sleep(2 * time.Second) } 3. ãŠãŒã¶ãŒç»é²åŠç ãŠãŒã¶ãŒç»é²æã«ãDB ãžã®æžã蟌ã¿ãšãžã§ãã®æå
¥ãåããã©ã³ã¶ã¯ã·ã§ã³ã§è¡ããŸã // filepath: handlers/user.go package handlers import ( "context" "fmt" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "river_examples/jobs" ) type UserHandler struct { dbPool *pgxpool.Pool riverClient *river.Client[pgx.Tx] } func NewUserHandler(dbPool *pgxpool.Pool, riverClient *river.Client[pgx.Tx]) *UserHandler { return &UserHandler{ dbPool: dbPool, riverClient: riverClient, } } func (h *UserHandler) RegisterUser(ctx context.Context, email, password string) error { // ãã©ã³ã¶ã¯ã·ã§ã³éå§ tx, err := h.dbPool.Begin(ctx) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback(ctx) // 1. ãŠãŒã¶ãŒããŒãã«ã«æ¿å
¥ var userID int64 err = tx.QueryRow(ctx, ` INSERT INTO users (email, password_hash, created_at) VALUES ($1, $2, NOW()) RETURNING id `, email, hashPassword(password)).Scan(&userID) if err != nil { return fmt.Errorf("failed to insert user: %w", err) } // 2. ã¡ãŒã«éä¿¡ãžã§ãããšã³ãã¥ãŒïŒåããã©ã³ã¶ã¯ã·ã§ã³å
ïŒ _, err = h.riverClient.InsertTx(ctx, tx, jobs.SendEmailArgs{ UserID: userID, Email: email, Subject: "Welcome to Our Service!", Body: "Thank you for registering. Please verify your email...", }, nil) if err != nil { return fmt.Errorf("failed to enqueue email job: %w", err) } // 3. ãã©ã³ã¶ã¯ã·ã§ã³ãã³ããã if err := tx.Commit(ctx); err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } return nil } func hashPassword(password string) string { // ãã¹ã¯ãŒãããã·ã¥åïŒä»åã¯ãã®ãŸãŸç»é²ïŒ return password } ãµã³ãã«ã§ã¯ UserHandler ã main ããåŒã³åºã圢ã«ããŠããŠãå®è¡ãããš users ãžã® INSERT ãš River ã®ãžã§ãããŒãã«ãžã® INSERT ãåäžãã©ã³ã¶ã¯ã·ã§ã³ã§è¡ãããã¯ãŒã«ãŒããžã§ããåŠçããŸãã åäœã®æµã å®éã®åŠçã®æµãã¯ä»¥äžã®ããã«ãªããŸãã ãŠãŒã¶ãŒç»é²ãªã¯ãšã¹ããæ¥ã ãã©ã³ã¶ã¯ã·ã§ã³éå§ ãŠãŒã¶ãŒããŒãã«ã«æ¿å
¥ïŒINSERTïŒ River ã®ãžã§ãããŒãã«ã«æ¿å
¥ïŒInsertTxïŒ ãã©ã³ã¶ã¯ã·ã§ã³ã³ããã River ã¯ãŒã«ãŒããžã§ãããŒãã«ãç£èŠ ã¡ãŒã«éä¿¡ãžã§ããå®è¡ ããã«ããããŠãŒã¶ãŒç»é²ãæåããå Žåã®ã¿ã¡ãŒã«éä¿¡ãžã§ãã確å®ã«å®è¡ãããåå説æããã¢ãŠãããã¯ã¹ãã¿ãŒã³ãå®çŸã§ããŸãã ãŸãšã ä»åã¯ãRiver ã䜿ã£ããµã³ãã«ã¢ããªãäœã£ãŠã¿ãŸãããRiver ã䜿ãããšã§ã PostgreSQL ã®ãã©ã³ã¶ã¯ã·ã§ã³æ©èœã掻çšããã¢ãŠãããã¯ã¹ãã¿ãŒã³ãç°¡åã«å®è£
ã§ããŸããããŒã¿ããŒã¹ãžã®æžã蟌ã¿ãšãžã§ãã®æå
¥ãåäžã®ãã©ã³ã¶ã¯ã·ã§ã³ã§è¡ãããããããŒã¿ã®æŽåæ§ãä¿ã¡ãªããéåæåŠçãå®çŸã§ããŸãã ã芧ããã ãããããšãããããŸãïŒ ãã®æçš¿ã¯ã圹ã«ç«ã¡ãŸãããïŒ åœ¹ã«ç«ã£ã 圹ã«ç«ããªãã£ã 0人ããã®æçš¿ã¯åœ¹ã«ç«ã£ããšèšã£ãŠããŸãã The post Goã®ãžã§ããã¥ãŒã©ã€ãã©ãª River 䜿ã£ãŠã¿ã first appeared on SIOS Tech Lab .