workflow-engine

@bratsos/workflow-engine Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "workflow-engine" with this command: npx skills add bratsos/workflow-engine/bratsos-workflow-engine-workflow-engine

@bratsos/workflow-engine Skill

Type-safe workflow engine for building AI-powered, multi-stage pipelines with persistence and batch processing support. Uses a command kernel architecture with environment-agnostic design.

Architecture Overview

The engine follows a kernel + host pattern:

  • Core library (@bratsos/workflow-engine ) - Command kernel, stage/workflow definitions, persistence adapters

  • Node Host (@bratsos/workflow-engine-host-node ) - Long-running worker with polling loops and signal handling

  • Serverless Host (@bratsos/workflow-engine-host-serverless ) - Stateless single-invocation for edge/lambda/workers

The kernel is a pure command dispatcher. All workflow operations are expressed as typed commands dispatched via kernel.dispatch() . Hosts wrap the kernel with environment-specific process management.

When to Apply

  • User wants to create workflow stages or pipelines

  • User mentions defineStage , defineAsyncBatchStage , WorkflowBuilder

  • User is implementing workflow persistence with Prisma

  • User needs AI integration (generateText, generateObject, embeddings, batch)

  • User is building multi-stage data processing pipelines

  • User mentions kernel, command dispatch, or job execution

  • User wants to set up a Node.js worker or serverless worker

  • User wants to rerun a workflow from a specific stage

  • User needs to test workflows with in-memory adapters

Quick Start

import { defineStage, WorkflowBuilder } from "@bratsos/workflow-engine"; import { createKernel } from "@bratsos/workflow-engine/kernel"; import { createNodeHost } from "@bratsos/workflow-engine-host-node"; import { createPrismaWorkflowPersistence, createPrismaJobQueue, } from "@bratsos/workflow-engine"; import { z } from "zod";

// 1. Define a stage const processStage = defineStage({ id: "process", name: "Process Data", schemas: { input: z.object({ data: z.string() }), output: z.object({ result: z.string() }), config: z.object({ verbose: z.boolean().default(false) }), }, async execute(ctx) { return { output: { result: ctx.input.data.toUpperCase() } }; }, });

// 2. Build a workflow const workflow = new WorkflowBuilder( "my-workflow", "My Workflow", "Processes data", z.object({ data: z.string() }), z.object({ result: z.string() }) ) .pipe(processStage) .build();

// 3. Create kernel const kernel = createKernel({ persistence: createPrismaWorkflowPersistence(prisma), blobStore: myBlobStore, jobTransport: createPrismaJobQueue(prisma), eventSink: myEventSink, scheduler: myScheduler, clock: { now: () => new Date() }, registry: { getWorkflow: (id) => (id === "my-workflow" ? workflow : undefined) }, });

// 4. Start a Node host const host = createNodeHost({ kernel, jobTransport: createPrismaJobQueue(prisma), workerId: "worker-1", }); await host.start();

// 5. Dispatch a command await kernel.dispatch({ type: "run.create", idempotencyKey: crypto.randomUUID(), workflowId: "my-workflow", input: { data: "hello" }, });

Core Exports Reference

Export Type Import Path Purpose

defineStage

Function @bratsos/workflow-engine

Create sync stages

defineAsyncBatchStage

Function @bratsos/workflow-engine

Create async/batch stages

WorkflowBuilder

Class @bratsos/workflow-engine

Chain stages into workflows

createKernel

Function @bratsos/workflow-engine/kernel

Create command kernel

createNodeHost

Function @bratsos/workflow-engine-host-node

Create Node.js host

createServerlessHost

Function @bratsos/workflow-engine-host-serverless

Create serverless host

createAIHelper

Function @bratsos/workflow-engine

AI operations (text, object, embed, batch)

registerEmbeddingProvider

Function @bratsos/workflow-engine

Register custom embedding providers (Voyage, Cohere, etc.)

createStageIds

Function @bratsos/workflow-engine

Create stage ID constants from a workflow

defineStageIds

Function @bratsos/workflow-engine

Define stage ID constants from a tuple

isValidStageId

Function @bratsos/workflow-engine

Runtime stage ID validation

assertValidStageId

Function @bratsos/workflow-engine

Assert stage ID validity (throws)

definePlugin

Function @bratsos/workflow-engine/kernel

Define kernel plugins

createPluginRunner

Function @bratsos/workflow-engine/kernel

Create plugin event processor

Kernel Commands

All operations go through kernel.dispatch(command) :

Command Description

run.create

Create a new workflow run

run.claimPending

Claim pending runs, enqueue first-stage jobs

run.transition

Advance to next stage group or complete

run.cancel

Cancel a running workflow (authoritative: cascades to stages + jobs)

run.rerunFrom

Rerun from a specific stage (cleans up blob artifacts by prefix)

job.execute

Execute a single stage (uses multi-phase transactions; see 08-common-patterns.md)

stage.pollSuspended

Poll suspended stages for readiness (skips cancelled runs; per-stage transactions)

lease.reapStale

Release stale job leases

run.reapStuck

Detect and fail RUNNING runs with no recent activity

outbox.flush

Publish pending outbox events

plugin.replayDLQ

Replay dead-letter queue events

Stage Definition

Sync Stage

const myStage = defineStage({ id: "my-stage", name: "My Stage", description: "Optional", dependencies: ["prev"],

schemas: { input: InputSchema, // Zod schema or "none" output: OutputSchema, config: ConfigSchema, },

async execute(ctx) { const { input, config, workflowContext } = ctx; const prevOutput = ctx.require("prev"); const optOutput = ctx.optional("other");

await ctx.log("INFO", "Processing...");

return {
  output: { ... },
  customMetrics: { itemsProcessed: 10 },
};

}, });

Async Batch Stage

const batchStage = defineAsyncBatchStage({ id: "batch-process", name: "Batch Process", mode: "async-batch", schemas: { input: "none", output: OutputSchema, config: ConfigSchema },

async execute(ctx) { if (ctx.resumeState) { return { output: ctx.resumeState.cachedResult }; }

const batchId = await submitBatchJob(ctx.input);
return {
  suspended: true,
  state: {
    batchId,
    submittedAt: new Date().toISOString(),
    pollInterval: 60000,
    maxWaitTime: 3600000,
  },
  pollConfig: { pollInterval: 60000, maxWaitTime: 3600000, nextPollAt: new Date(Date.now() + 60000) },
};

},

async checkCompletion(suspendedState, ctx) { const status = await checkBatchStatus(suspendedState.batchId); if (status === "completed") return { ready: true, output: { results } }; if (status === "failed") return { ready: false, error: "Batch failed" }; return { ready: false, nextCheckIn: 60000 }; }, });

WorkflowBuilder

Workflows are linear pipelines of execution groups. .pipe() creates single-stage groups; .parallel() creates multi-stage groups. Parallel group outputs are keyed by stage ID in the workflow context.

const workflow = new WorkflowBuilder( "workflow-id", "Workflow Name", "Description", InputSchema, OutputSchema ) .pipe(stage1) // Group 0 .pipe(stage2) // Group 1 .parallel([stage3a, stage3b]) // Group 2 (concurrent, output: { "stage3a-id": ..., "stage3b-id": ... }) .pipe(stage4) // Group 3 .build();

// In stage4, access parallel outputs by stage ID: ctx.require("stage3a-id") // output of stage3a ctx.require("stage3b-id") // output of stage3b

workflow.getStageIds(); workflow.getExecutionPlan(); workflow.getDefaultConfig(); workflow.validateConfig(config);

When a workflow completes, the final execution group's output is persisted in WorkflowRun.output and included in the workflow:completed event.

Kernel Setup

import { createKernel } from "@bratsos/workflow-engine/kernel"; import type { Kernel, KernelConfig, Persistence, BlobStore, JobTransport, EventSink, Scheduler, Clock } from "@bratsos/workflow-engine/kernel";

const kernel = createKernel({ persistence, // Persistence port - runs, stages, logs, outbox, idempotency blobStore, // BlobStore port - large payload storage jobTransport, // JobTransport port - job queue eventSink, // EventSink port - async event publishing scheduler, // Scheduler port - deferred command triggers clock, // Clock port - injectable time source registry, // WorkflowRegistry - { getWorkflow(id) } });

// Dispatch typed commands const { workflowRunId } = await kernel.dispatch({ type: "run.create", idempotencyKey: "unique-key", workflowId: "my-workflow", input: { data: "hello" }, });

Node Host

import { createNodeHost } from "@bratsos/workflow-engine-host-node";

const host = createNodeHost({ kernel, jobTransport, workerId: "worker-1", orchestrationIntervalMs: 10_000, jobPollIntervalMs: 1_000, staleLeaseThresholdMs: 60_000, });

await host.start(); // Starts polling loops + signal handlers await host.stop(); // Graceful shutdown host.getStats(); // { workerId, jobsProcessed, orchestrationTicks, isRunning, uptimeMs }

Serverless Host

import { createServerlessHost, type ServerlessHost, type ServerlessHostConfig, type JobMessage, type JobResult, type ProcessJobsResult, type MaintenanceTickResult, } from "@bratsos/workflow-engine-host-serverless";

const host = createServerlessHost({ kernel, jobTransport, workerId: "my-worker", // Optional tuning (same defaults as Node host) staleLeaseThresholdMs: 60_000, maxClaimsPerTick: 10, maxSuspendedChecksPerTick: 10, maxOutboxFlushPerTick: 100, });

handleJob(msg: JobMessage): Promise<JobResult>

Execute a single pre-dequeued job. Consumers wire platform-specific ack/retry around the result.

// JobMessage shape (matches queue message body) interface JobMessage { jobId: string; workflowRunId: string; workflowId: string; stageId: string; attempt: number; maxAttempts?: number; payload: Record<string, unknown>; }

// JobResult interface JobResult { outcome: "completed" | "suspended" | "failed"; error?: string; }

const result = await host.handleJob(msg); if (result.outcome === "completed") msg.ack(); else if (result.outcome === "suspended") msg.ack(); else msg.retry();

processAvailableJobs(opts?): Promise<ProcessJobsResult>

Dequeue and process jobs from the job transport. Defaults to 1 job (safe for edge runtimes with CPU limits).

const result = await host.processAvailableJobs({ maxJobs: 5 }); // { processed: number, succeeded: number, failed: number }

runMaintenanceTick(): Promise<MaintenanceTickResult>

Run one bounded maintenance cycle: claim pending, poll suspended, reap stale, flush outbox, reap stuck runs.

const tick = await host.runMaintenanceTick(); // { claimed, suspendedChecked, staleReleased, eventsFlushed, stuckReaped } // Note: resumed suspended stages are automatically followed by run.transition.

AI Integration & Cost Tracking

const ai = createAIHelper( workflow.${ctx.workflowRunId}.stage.${ctx.stageId}, aiCallLogger, );

const { text, cost } = await ai.generateText("gemini-2.5-flash", prompt); const { object } = await ai.generateObject("gemini-2.5-flash", prompt, schema); const { embedding } = await ai.embed("text-embedding-004", ["text1"], { dimensions: 768 }); // OpenRouter embedding models (OpenAI, Cohere, etc.) const { embedding } = await ai.embed("openai/text-embedding-3-small", ["text1"]);

// Provider-specific options passthrough (Voyage, Cohere, etc.) const { embedding } = await ai.embed("voyage-4-large", ["text1"], { providerOptions: { voyage: { outputDimension: 512, inputType: "document" } }, });

// Custom embedding providers (Voyage, Cohere, Jina, etc.) import { registerEmbeddingProvider } from "@bratsos/workflow-engine"; import { voyage } from "voyage-ai-provider"; registerEmbeddingProvider("voyage", (modelId) => voyage.embeddingModel(modelId)); // Then register models with provider: "voyage" and use ai.embed() as usual

Persistence Setup

Required Prisma Models (ALL are required)

Copy the complete schema from the package README. This includes: WorkflowRun, WorkflowStage, WorkflowLog, WorkflowArtifact, AICall, JobQueue, OutboxEvent, IdempotencyKey.

Create Persistence

import { createPrismaWorkflowPersistence, createPrismaJobQueue, createPrismaAICallLogger, } from "@bratsos/workflow-engine/persistence/prisma";

const persistence = createPrismaWorkflowPersistence(prisma); const jobQueue = createPrismaJobQueue(prisma); const aiCallLogger = createPrismaAICallLogger(prisma);

// SQLite - MUST pass databaseType option const persistence = createPrismaWorkflowPersistence(prisma, { databaseType: "sqlite" }); const jobQueue = createPrismaJobQueue(prisma, { databaseType: "sqlite" });

Testing

// In-memory persistence and job queue import { InMemoryWorkflowPersistence, InMemoryJobQueue, InMemoryAICallLogger, } from "@bratsos/workflow-engine/testing";

// Kernel-specific test adapters import { FakeClock, InMemoryBlobStore, CollectingEventSink, NoopScheduler, } from "@bratsos/workflow-engine/kernel/testing";

// Create kernel with all in-memory adapters const persistence = new InMemoryWorkflowPersistence(); const jobQueue = new InMemoryJobQueue(); const kernel = createKernel({ persistence, blobStore: new InMemoryBlobStore(), jobTransport: jobQueue, eventSink: new CollectingEventSink(), scheduler: new NoopScheduler(), clock: new FakeClock(), registry: { getWorkflow: (id) => workflows.get(id) }, });

// Test a full workflow lifecycle await kernel.dispatch({ type: "run.create", idempotencyKey: "test", workflowId: "my-wf", input: {} }); await kernel.dispatch({ type: "run.claimPending", workerId: "test-worker" }); const job = await jobQueue.dequeue(); await kernel.dispatch({ type: "job.execute", workflowRunId: job.workflowRunId, workflowId: job.workflowId, stageId: job.stageId, config: {} }); await kernel.dispatch({ type: "run.transition", workflowRunId: job.workflowRunId });

Reference Files

  • 01-stage-definitions.md - Complete stage API

  • 02-workflow-builder.md - WorkflowBuilder patterns

  • 03-kernel-host-setup.md - Kernel & host configuration

  • 04-ai-integration.md - AI helper methods

  • 05-persistence-setup.md - Database setup

  • 06-async-batch-stages.md - Async operations

  • 07-testing-patterns.md - Testing with kernel

  • 08-common-patterns.md - Kernel patterns & best practices

  • 09-troubleshooting.md - Debugging stuck runs, P2002 errors, ghost jobs

Key Principles

  • Type Safety: All schemas are Zod - types flow through the entire pipeline

  • Command Kernel: All operations are typed commands dispatched through kernel.dispatch()

  • Environment-Agnostic: Kernel has no timers, no signals, no global state

  • Context Access: Use ctx.require() and ctx.optional() for type-safe stage output access

  • Transactional Outbox: Events written to outbox, published via outbox.flush command. job.execute and stage.pollSuspended use multi-phase transactions to avoid holding connections during external I/O

  • Idempotency: run.create and job.execute replay cached results by key; concurrent same-key dispatch throws IdempotencyInProgressError

  • Authoritative Cancellation: run.cancel cascades to stages + jobs. Ghost jobs (running against non-RUNNING runs) are detected via ghost: true flag and not retried

  • Self-Healing: Stage creation is idempotent (upsert), orchestration steps are isolated, stuck runs are automatically reaped

  • Cost Tracking: All AI calls automatically track tokens and costs

  • BlobStore-Only Artifacts: All artifact storage goes through the BlobStore port. run.rerunFrom cleans up artifacts by key prefix

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

workflow-engine

No summary provided by upstream source.

Repository SourceNeeds Review
General

polizy-storage

No summary provided by upstream source.

Repository SourceNeeds Review
General

zodipus

No summary provided by upstream source.

Repository SourceNeeds Review