pekko-cqrs-es-implementation

Apache Pekko + Scala 3によるCQRS/Event Sourcing実装ガイド。 PersistenceEffectorを用いた集約アクター、ドメインモデルとアクターの分離、 状態遷移の型安全な表現、イベント設計、Protocol Buffersシリアライズ、 ZIOベースのユースケース層、リードモデルアップデータの実装パターンを提供する。 対象言語: Scala 3限定。CQRS/Event Sourcingアーキテクチャが前提の場合のみ使用。 トリガー条件: 「Scala」かつ「CQRS」または「Event Sourcing」または「Pekko」が リクエストに含まれる場合のみ起動。Scala以外の言語やCQRS/ES以外のアーキテクチャでは このスキルを使用してはならない。 トリガー:「PekkoでCQRS/ESを実装したい」「Scalaで集約アクターを書きたい」 「PersistenceEffectorの使い方」「Pekkoのイベントソーシング」 「ScalaでCQRSのコマンド側を実装」「Pekkoで状態遷移を管理」 といったPekko + Scala + CQRS/ES実装リクエストで起動。 非トリガー:「CQRSのトレードオフ」「イベントソーシングとは」「JavaでCQRS」 「GoでEvent Sourcing」「CQRSの概念を教えて」など、Scala/Pekko以外や概念的な質問では 起動してはならない。

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "pekko-cqrs-es-implementation" with this command: npx skills add j5ik2o/okite-ai/j5ik2o-okite-ai-pekko-cqrs-es-implementation

Pekko CQRS/ES 実装ガイド

Apache Pekko + Scala 3でCQRS/Event Sourcingを実装する際の具体的なパターン集。

適用条件

このスキルは以下のすべてを満たす場合にのみ使用する:

  1. プログラミング言語がScala(3.x推奨)であること
  2. CQRS/Event Sourcingアーキテクチャを採用していること
  3. Apache Pekko(またはAkka)をアクターフレームワークとして使用していること

上記を満たさない場合は、cqrs-tradeoffscqrs-to-event-sourcingcqrs-aggregate-modelingなど 言語非依存のスキルを使用すること。

アーキテクチャ概要

システム全体のデータフロー

クライアント
    │
    ▼
コマンドAPI(GraphQL Mutation)
    │
    ▼
ユースケース層(ZIO)
    │
    ▼
集約アクター(Pekko Typed)
    │
    ▼
ドメインモデル(純粋Scala)→ イベント生成
    │
    ▼
イベントストア(DynamoDB)
    │
    ▼
DynamoDB Streams
    │
    ▼
リードモデルアップデータ(AWS Lambda)
    │
    ▼
リードモデル(PostgreSQL)
    │
    ▼
クエリAPI(GraphQL Query)

コマンド側とクエリ側の分離

コマンド側クエリ側
目的ビジネスルール実行効率的なデータ取得
データストアDynamoDB(イベントストア)PostgreSQL(リードモデル)
APIGraphQL MutationGraphQL Query
レイヤードメイン → ユースケース → インターフェースアダプタインターフェースアダプタのみ
整合性強い整合性(集約内)結果整合性

クエリ側にユースケース層はない。 GraphQL自体がユースケースに相当する。

モジュール構成

modules/
├── command/
│   ├── domain/                          # ドメイン層(純粋Scala)
│   ├── use-case/                        # ユースケース層(ZIO)
│   ├── interface-adapter/               # アクター、GraphQL、シリアライザ
│   ├── interface-adapter-contract/      # コマンド/リプライのプロトコル定義
│   └── interface-adapter-event-serializer/  # Protocol Buffersシリアライザ
├── query/
│   ├── interface-adapter/               # DAO(Slick)、GraphQL
│   └── flyway-migration/               # DBマイグレーション
└── infrastructure/                      # 共有ユーティリティ

apps/
├── command-api/                         # コマンド側HTTPサーバー
├── query-api/                           # クエリ側HTTPサーバー
└── read-model-updater/                  # AWS Lambda

依存方向

domain ← use-case ← interface-adapter ← apps
  │                       ↑
  │               interface-adapter-contract
  │               interface-adapter-event-serializer
  └── Scalaのみに依存。フレームワーク非依存。

コマンド側実装パターン

1. ドメインモデル

原則: ドメインモデルは純粋なScalaコードで、Pekkoに一切依存しない。

集約(trait + private case class)

trait UserAccount extends Entity {
  override type IdType = UserAccountId
  def id: UserAccountId
  def name: UserAccountName
  def emailAddress: EmailAddress
  def createdAt: DateTime
  def updatedAt: DateTime

  def rename(newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)]
  def delete: Either[DeleteError, (UserAccount, UserAccountEvent)]
}

object UserAccount {
  // ファクトリ: 新しい状態とイベントのペアを返す
  def apply(
    id: UserAccountId,
    name: UserAccountName,
    emailAddress: EmailAddress,
    createdAt: DateTime = DateTime.now(),
    updatedAt: DateTime = DateTime.now()
  ): (UserAccount, UserAccountEvent) =
    (
      UserAccountImpl(id, false, name, emailAddress, createdAt, updatedAt),
      UserAccountEvent.Created_V1(
        id = DomainEventId.generate(),
        entityId = id,
        name = name,
        emailAddress = emailAddress,
        occurredAt = DateTime.now()
      ))

  // 実装はprivateで隠蔽
  private final case class UserAccountImpl(
    id: UserAccountId,
    deleted: Boolean,
    name: UserAccountName,
    emailAddress: EmailAddress,
    createdAt: DateTime,
    updatedAt: DateTime
  ) extends UserAccount {

    override def rename(
      newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)] =
      if (name == newName) {
        Left(RenameError.FamilyNameSame)
      } else {
        val updated = this.copy(name = newName, updatedAt = DateTime.now())
        val event = UserAccountEvent.Renamed_V1(
          id = DomainEventId.generate(),
          entityId = id,
          oldName = name,
          newName = newName,
          occurredAt = DateTime.now()
        )
        Right((updated, event))
      }

    override def delete: Either[DeleteError, (UserAccount, UserAccountEvent)] =
      if (deleted) {
        Left(DeleteError.AlreadyDeleted)
      } else {
        val updated = copy(deleted = true, updatedAt = DateTime.now())
        val event = UserAccountEvent.Deleted_V1(
          id = DomainEventId.generate(),
          entityId = id,
          occurredAt = DateTime.now()
        )
        Right((updated, event))
      }
  }
}

重要なパターン:

  • 状態変更メソッドは Either[Error, (NewState, Event)] を返す
  • ファクトリも (State, Event) のペアを返す
  • 実装クラスは private で外部から直接構築できない
  • ドメインモデル内でPekkoのimportは一切ない

イベント定義(enum + バージョニング)

enum UserAccountEvent extends DomainEvent {
  override type EntityIdType = UserAccountId

  case Created_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    name: UserAccountName,
    emailAddress: EmailAddress,
    occurredAt: DateTime
  )

  case Renamed_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    oldName: UserAccountName,
    newName: UserAccountName,
    occurredAt: DateTime
  )

  case Deleted_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    occurredAt: DateTime
  )
}

イベント設計ルール:

ルール説明
過去形で命名「何が起きたか」を表すCreated, Renamed, Deleted
_V1サフィックススキーマ進化に対応Renamed_V1, Renamed_V2
不変case classで自動的に保証
自己完結変更前後の値を含むoldName, newName
必須フィールドid, entityId, occurredAtすべてのイベントに共通

2. 集約の状態(enum)

enum UserAccountAggregateState {
  case NotCreated(id: UserAccountId)
  case Created(user: UserAccount)
  case Deleted(user: UserAccount)

  def applyEvent(event: UserAccountEvent): UserAccountAggregateState = (this, event) match {
    case (NotCreated(id), UserAccountEvent.Created_V1(_, entityId, name, emailAddress, _))
        if id == entityId =>
      Created(UserAccount(entityId, name, emailAddress)._1)

    case (Created(user), UserAccountEvent.Renamed_V1(_, entityId, _, newName, _))
        if user.id == entityId =>
      Created(user.rename(newName) match {
        case Right((u, _)) => u
        case Left(error) =>
          throw new IllegalStateException(s"Failed to rename user: $error")
      })

    case (Created(user), UserAccountEvent.Deleted_V1(_, entityId, _))
        if user.id == entityId =>
      Deleted(user.delete match {
        case Right((deletedUser, _)) => deletedUser
        case Left(error) =>
          throw new IllegalStateException(s"Failed to delete user: $error")
      })

    case _ =>
      throw new IllegalStateException(s"Cannot apply event $event to state $this")
  }
}

状態遷移の型安全性:

  • NotCreatedCreated(Created_V1イベントのみ)
  • CreatedCreated(Renamed_V1)/ Deleted(Deleted_V1)
  • Deleted → 遷移なし(どのイベントも受け付けない)

3. プロトコル定義(コマンド/リプライ)

object UserAccountProtocol {
  // コマンド: すべてidを持つ
  sealed trait Command { def id: UserAccountId }
  final case class Create(
    id: UserAccountId, name: UserAccountName,
    emailAddress: EmailAddress, replyTo: ActorRef[CreateReply]) extends Command
  final case class Rename(
    id: UserAccountId, newName: UserAccountName,
    replyTo: ActorRef[RenameReply]) extends Command
  final case class Delete(
    id: UserAccountId, replyTo: ActorRef[DeleteReply]) extends Command
  final case class Get(
    id: UserAccountId, replyTo: ActorRef[GetReply]) extends Command

  // リプライ: コマンドごとに専用の型
  sealed trait CreateReply
  final case class CreateSucceeded(id: UserAccountId) extends CreateReply

  sealed trait RenameReply
  final case class RenameSucceeded(id: UserAccountId) extends RenameReply
  final case class RenameFailed(id: UserAccountId, reason: RenameError) extends RenameReply

  sealed trait DeleteReply
  final case class DeleteSucceeded(id: UserAccountId) extends DeleteReply
  final case class DeleteFailed(id: UserAccountId, reason: DeleteError) extends DeleteReply

  sealed trait GetReply
  final case class GetSucceeded(value: UserAccount) extends GetReply
  final case class GetNotFoundFailed(id: UserAccountId) extends GetReply
}

プロトコル設計ルール:

  • コマンドはすべて sealed trait Command を継承し、id を持つ
  • リプライはコマンドごとに専用の sealed trait を定義する(CreateReply, RenameReply等)
  • replyTo: ActorRef[XxxReply] で型安全な応答を保証
  • 成功/失敗をcase classで表現し、パターンマッチで網羅性チェック

4. 集約アクター(PersistenceEffector)

object UserAccountAggregate {

  // 状態ごとにハンドラ関数を分離
  private def handleNotCreated(
    state: UserAccountAggregateState.NotCreated,
    effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
  ): Behavior[Command] = Behaviors.receiveMessagePartial {
    case Create(id, name, emailAddress, replyTo) if state.id == id =>
      val (newState, event) = UserAccount(id, name, emailAddress)
      effector.persistEvent(event) { _ =>
        replyTo ! CreateSucceeded(id)
        handleCreated(UserAccountAggregateState.Created(newState), effector)
      }
    case Get(id, replyTo) if state.id == id =>
      replyTo ! GetNotFoundFailed(id)
      Behaviors.same
  }

  private def handleCreated(
    state: UserAccountAggregateState.Created,
    effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
  ): Behavior[Command] = Behaviors.receiveMessagePartial {
    case Rename(id, newName, replyTo) if state.user.id == id =>
      // ドメインモデルに委譲
      state.user.rename(newName) match {
        case Left(reason) =>
          replyTo ! RenameFailed(id, reason)
          Behaviors.same
        case Right((newUser, event)) =>
          effector.persistEvent(event) { _ =>
            replyTo ! RenameSucceeded(id)
            handleCreated(state.copy(user = newUser), effector)
          }
      }
    // ... Delete, Get も同様
  }

  // エントリポイント
  def apply(id: UserAccountId): Behavior[Command] = {
    val config = PersistenceEffectorConfig
      .create[UserAccountAggregateState, UserAccountEvent, Command](
        persistenceId = s"${id.entityTypeName}-${id.asString}",
        initialState = UserAccountAggregateState.NotCreated(id),
        applyEvent = (state, event) => state.applyEvent(event)
      )
      .withPersistenceMode(PersistenceMode.Persisted)
      .withSnapshotCriteria(SnapshotCriteria.every(1000))
      .withRetentionCriteria(RetentionCriteria.snapshotEvery(2))

    Behaviors.setup[Command] { implicit ctx =>
      Behaviors
        .supervise(
          PersistenceEffector.fromConfig(config) {
            case (state: UserAccountAggregateState.NotCreated, effector) =>
              handleNotCreated(state, effector)
            case (state: UserAccountAggregateState.Created, effector) =>
              handleCreated(state, effector)
            case (state: UserAccountAggregateState.Deleted, effector) =>
              handleDeleted(state, effector)
          })
        .onFailure[IllegalArgumentException](SupervisorStrategy.restart)
    }
  }
}

アクター実装ルール:

  • ドメインロジックをアクターに書かない。 アクターは永続化とライフサイクル管理に徹する
  • 状態ごとにハンドラ関数を分離し、受け付けるコマンドを限定する
  • effector.persistEvent(event) { _ => ... } でイベント永続化後にリプライ
  • ドメインモデルが Left を返したらリプライのみ(永続化しない)
  • ドメインモデルが Right を返したらイベントを永続化してからリプライ

5. ユースケース層(ZIO)

private[users] final class UserAccountUseCaseImpl(
  userAccountAggregateRef: ActorRef[UserAccountProtocol.Command]
)(implicit
  timeout: Timeout,
  scheduler: Scheduler,
  ec: ExecutionContext
) extends UserAccountUseCase {

  override def createUserAccount(
    userAccountName: UserAccountName,
    emailAddress: EmailAddress
  ): IO[UserAccountUseCaseError, UserAccountId] =
    for {
      userAccountId <- ZIO.succeed(UserAccountId.generate())
      reply <- askActor[UserAccountProtocol.CreateReply] { replyTo =>
        UserAccountProtocol.Create(
          id = userAccountId,
          name = userAccountName,
          emailAddress = emailAddress,
          replyTo = replyTo
        )
      }.mapError(e => UserAccountUseCaseError.UnexpectedError(e.getMessage, Some(e)))
      result <- reply match {
        case UserAccountProtocol.CreateSucceeded(id) => ZIO.succeed(id)
      }
    } yield result

  private def askActor[R](
    createMessage: ActorRef[R] => UserAccountProtocol.Command
  ): Task[R] =
    PekkoInterop.fromFuture { userAccountAggregateRef.ask(createMessage) }
}

ユースケース層のルール:

  • ビジネスロジックを書く場所ではない。 処理ステップの調整役に徹する
  • ZIOのfor式でアクターとの通信を型安全に記述
  • askActor ヘルパーでPekko Ask → ZIO Task変換を共通化
  • エラーは UserAccountUseCaseError にマッピング

シリアライズ

Protocol Buffersによるイベントシリアライズ

// event.proto
syntax = "proto3";

message UserAccountCreatedV1 {
  string id = 1;
  string entity_id = 2;
  string name = 3;
  string email_address = 4;
  string occurred_at = 5;
}

message UserAccountRenamedV1 {
  string id = 1;
  string entity_id = 2;
  string old_name = 3;
  string new_name = 4;
  string occurred_at = 5;
}

シリアライズ方針:

  • イベントとスナップショットの両方をProtocol Buffersで定義
  • ScalaPBでScalaコードを自動生成
  • カスタムシリアライザでドメインイベント ↔ Protobufメッセージを変換
  • バージョニングはProtobufのフィールド番号で後方互換性を確保

スナップショット戦略

設定理由
保存頻度1000イベントごと起動時間とストレージのバランス
保持数最新2つリカバリ安全性の確保

テスト戦略

ドメインモデルテスト(Pekko非依存)

test("ユーザー名を変更できる") {
  val (user, _) = UserAccount(id, name, email)
  val result = user.rename(newName)
  result match {
    case Right((updated, event)) =>
      updated.name shouldBe newName
      event shouldBe a[UserAccountEvent.Renamed_V1]
    case Left(error) => fail(s"Unexpected error: $error")
  }
}

アクターテスト(ActorTestKit)

// PersistenceEffectorのテスト
val probe = testKit.createTestProbe[CreateReply]()
aggregateRef ! Create(id, name, email, probe.ref)
probe.expectMessage(CreateSucceeded(id))

テストの分離

レベル対象ツールPekko依存
ドメインビジネスロジックScalaTestなし
アクターメッセージング、永続化ActorTestKitあり
シリアライザイベント/スナップショットの変換ScalaTestなし
統合エンドツーエンドLocalStackあり

関連スキルとの使い分け

スキルフォーカス使うタイミング
本スキルPekko + Scala 3での具体的実装CQRS/ESをScalaで実装するとき
cqrs-tradeoffsCQRS採用判断のトレードオフ分析CQRS導入の是非を検討するとき
cqrs-to-event-sourcingなぜESが必要になるかの論理的説明ESの必然性を理解したいとき
cqrs-aggregate-modelingCQRS導入時の集約境界再定義集約の粒度を見直すとき
aggregate-design集約設計ルール全般集約の新規設計やレビューのとき
domain-building-blocksVO/Entity/Aggregate等の設計ドメインモデル全体を設計するとき

参考文献

関連スキル(併読推奨)

このスキルを使用する際は、以下のスキルも併せて参照すること:

  • cqrs-tradeoffs: CQRS/ES採用判断のトレードオフ分析
  • cqrs-to-event-sourcing: CQRSからイベントソーシングへの必然性
  • aggregate-design: ドメインモデルとしての集約設計ルール

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

domain-model-extractor

No summary provided by upstream source.

Repository SourceNeeds Review
General

cross-aggregate-constraints

No summary provided by upstream source.

Repository SourceNeeds Review
General

aggregate-design

No summary provided by upstream source.

Repository SourceNeeds Review
General

tell-dont-ask

No summary provided by upstream source.

Repository SourceNeeds Review