flow-coordinator

Lightweight workflow coordinator supporting two workflow formats:

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "flow-coordinator" with this command: npx skills add catlog22/claude-code-workflow/catlog22-claude-code-workflow-flow-coordinator

Flow Coordinator

Lightweight workflow coordinator supporting two workflow formats:

  • Legacy Templates: Command chains with slash-command execution

  • Unified Workflows: DAG-based PromptTemplate nodes (spec: spec/unified-workflow-spec.md )

Specification Reference

  • Unified Workflow Spec: @spec/unified-workflow-spec.md

  • Demo Workflow: ccw/data/flows/demo-unified-workflow.json

Architecture

User Task → Detect Format → Select Workflow → Init Status → Execute → Complete │ │ ├─ Legacy Template │ │ └─ Sequential cmd execution │ │ │ └─ Unified Workflow │ └─ DAG traversal with contextRefs │ │ └──────────────── Resume (from status.json) ──────────────┘

Execution Modes: ├─ analysis → Read-only, CLI --mode analysis ├─ write → File changes, CLI --mode write ├─ mainprocess → Blocking, synchronous └─ async → Background, ccw cli

Core Concepts

Dual Format Support:

  • Legacy: templates/*.json with cmd , args , execution

  • Unified: ccw/data/flows/*.json with nodes , edges , contextRefs

Unified PromptTemplate Model: All workflow steps are natural language instructions with:

  • instruction : What to execute (natural language)

  • slashCommand : Optional slash command name (e.g., "workflow:plan")

  • slashArgs : Optional arguments for slash command (supports {{variable}})

  • outputName : Name for output reference

  • contextRefs : References to previous step outputs

  • tool : Optional CLI tool (gemini/qwen/codex/claude)

  • mode : Execution mode (analysis/write/mainprocess/async)

DAG Execution: Unified workflows execute as directed acyclic graphs with parallel branches and conditional edges.

Dynamic Discovery: Both formats discovered at runtime via Glob.

Execution Flow

async function execute(task) { // 1. Discover and select template const templates = await discoverTemplates(); const template = await selectTemplate(templates);

// 2. Init status const sessionId = fc-${timestamp()}; const statusPath = .workflow/.flow-coordinator/${sessionId}/status.json; const status = initStatus(template, task); write(statusPath, JSON.stringify(status, null, 2));

// 3. Execute steps based on execution config await executeSteps(status, statusPath); }

async function executeSteps(status, statusPath) { for (let i = status.current; i < status.steps.length; i++) { const step = status.steps[i]; status.current = i;

// Execute based on step mode (all steps use slash-command type)
const execConfig = step.execution || { type: 'slash-command', mode: 'mainprocess' };

if (execConfig.mode === 'async') {
  // Async execution - stop and wait for hook callback
  await executeSlashCommandAsync(step, status, statusPath);
  break;
} else {
  // Mainprocess execution - continue immediately
  await executeSlashCommandSync(step, status);
  step.status = 'done';
  write(statusPath, JSON.stringify(status, null, 2));
}

}

// All steps complete if (status.current >= status.steps.length) { status.complete = true; write(statusPath, JSON.stringify(status, null, 2)); } }

Unified Workflow Execution

For workflows using the unified PromptTemplate format (ccw/data/flows/*.json ):

async function executeUnifiedWorkflow(workflow, task) { // 1. Initialize execution state const sessionId = ufc-${timestamp()}; const statusPath = .workflow/.flow-coordinator/${sessionId}/status.json; const state = { id: sessionId, workflow: workflow.id, goal: task, nodeStates: {}, // nodeId -> { status, result, error } outputs: {}, // outputName -> result complete: false };

// 2. Topological sort for execution order const executionOrder = topologicalSort(workflow.nodes, workflow.edges);

// 3. Execute nodes respecting DAG dependencies await executeDAG(workflow, executionOrder, state, statusPath); }

async function executeDAG(workflow, order, state, statusPath) { for (const nodeId of order) { const node = workflow.nodes.find(n => n.id === nodeId); const data = node.data;

// Check if all dependencies are satisfied
if (!areDependenciesSatisfied(nodeId, workflow.edges, state)) {
  continue; // Will be executed when dependencies complete
}

// Build instruction from slashCommand or raw instruction
let instruction = buildNodeInstruction(data, state.outputs);

// Execute based on mode
state.nodeStates[nodeId] = { status: 'running' };
write(statusPath, JSON.stringify(state, null, 2));

const result = await executeNode(instruction, data.tool, data.mode);

// Store output for downstream nodes
state.nodeStates[nodeId] = { status: 'completed', result };
if (data.outputName) {
  state.outputs[data.outputName] = result;
}
write(statusPath, JSON.stringify(state, null, 2));

}

state.complete = true; write(statusPath, JSON.stringify(state, null, 2)); }

/**

  • Build node instruction from slashCommand or raw instruction
  • Handles slashCommand/slashArgs fields from frontend orchestrator */ function buildNodeInstruction(data, outputs) { const refs = data.contextRefs || [];

// If slashCommand is set, construct instruction from it if (data.slashCommand) { // Resolve variables in slashArgs const args = data.slashArgs ? resolveContextRefs(data.slashArgs, refs, outputs) : '';

// Build slash command instruction
let instruction = `/${data.slashCommand}${args ? ' ' + args : ''}`;

// Append additional instruction if provided
if (data.instruction) {
  const additionalInstruction = resolveContextRefs(data.instruction, refs, outputs);
  instruction = `${instruction}\n\n${additionalInstruction}`;
}

return instruction;

}

// Fallback: use raw instruction with context refs resolved return resolveContextRefs(data.instruction || '', refs, outputs); }

function resolveContextRefs(instruction, refs, outputs) { let resolved = instruction; for (const ref of refs) { const value = outputs[ref]; const placeholder = {{${ref}}}; resolved = resolved.replace(new RegExp(placeholder, 'g'), typeof value === 'object' ? JSON.stringify(value) : String(value)); } return resolved; }

async function executeNode(instruction, tool, mode) { // Build CLI command based on tool and mode const cliTool = tool || 'gemini'; const cliMode = mode === 'write' ? 'write' : 'analysis';

if (mode === 'async') { // Background execution return Bash( ccw cli -p "${escapePrompt(instruction)}" --tool ${cliTool} --mode ${cliMode}, { run_in_background: true } ); } else { // Synchronous execution return Bash( ccw cli -p "${escapePrompt(instruction)}" --tool ${cliTool} --mode ${cliMode} ); } }

Unified Workflow Discovery

async function discoverUnifiedWorkflows() { const files = Glob('*.json', { path: 'ccw/data/flows/' });

const workflows = []; for (const file of files) { const content = JSON.parse(Read(file)); // Detect unified format by checking for 'nodes' array if (content.nodes && Array.isArray(content.nodes)) { workflows.push({ id: content.id, name: content.name, description: content.description, nodeCount: content.nodes.length, format: 'unified', file: file }); } } return workflows; }

Format Detection

function detectWorkflowFormat(content) { if (content.nodes && content.edges) { return 'unified'; // PromptTemplate DAG format } if (content.steps && content.steps[0]?.cmd) { return 'legacy'; // Command chain format } throw new Error('Unknown workflow format'); }

Legacy Template Discovery

Dynamic query - never hardcode template list:

async function discoverTemplates() { // Discover all JSON templates const files = Glob('*.json', { path: 'templates/' });

// Parse each template const templates = []; for (const file of files) { const content = JSON.parse(Read(file)); templates.push({ name: content.name, description: content.description, steps: content.steps.map(s => s.cmd).join(' → '), file: file }); }

return templates; }

Template Selection

User chooses from discovered templates:

async function selectTemplate(templates) { // Build options from discovered templates const options = templates.slice(0, 4).map(t => ({ label: t.name, description: t.steps }));

const response = await AskUserQuestion({ questions: [{ question: 'Select workflow template:', header: 'Template', options: options, multiSelect: false }] });

// Handle "Other" - show remaining templates or custom input if (response.template === 'Other') { return await selectFromRemainingTemplates(templates.slice(4)); }

return templates.find(t => t.name === response.template); }

Status Schema

Creation: Copy template JSON → Update id , template , goal , set all steps status: "pending"

Location: .workflow/.flow-coordinator/{session-id}/status.json

Core Fields:

  • id : Session ID (fc-YYYYMMDD-HHMMSS)

  • template : Template name

  • goal : User task description

  • current : Current step index

  • steps[] : Step array from template (with runtime status , session , taskId )

  • complete : All steps done?

Step Status: pending → running → done | failed | skipped

Extended Template Schema

Templates stored in: templates/*.json (discovered at runtime via Glob)

TemplateStep Fields:

  • cmd : Full command path (e.g., /workflow:lite-plan , /workflow:execute )

  • args? : Arguments with {{goal}} and {{prev}} placeholders

  • unit? : Minimum execution unit name (groups related commands)

  • optional? : Can be skipped by user

  • execution : Type and mode configuration

  • type : Always 'slash-command' (for all workflow commands)

  • mode : 'mainprocess' (blocking) or 'async' (background)

  • contextHint? : Natural language guidance for context assembly

Template Example:

{ "name": "rapid", "steps": [ { "cmd": "/workflow:lite-plan", "args": ""{{goal}}"", "unit": "quick-implementation", "execution": { "type": "slash-command", "mode": "mainprocess" }, "contextHint": "Create lightweight implementation plan" }, { "cmd": "/workflow:lite-execute", "args": "--in-memory", "unit": "quick-implementation", "execution": { "type": "slash-command", "mode": "async" }, "contextHint": "Execute plan from previous step" } ] }

Execution Implementation

Mainprocess Mode (Blocking)

async function executeSlashCommandSync(step, status) { // Build command: /workflow:cmd -y args const cmd = buildCommand(step, status); const result = await SlashCommand({ command: cmd });

step.session = result.session_id; step.status = 'done'; return result; }

Async Mode (Background)

async function executeSlashCommandAsync(step, status, statusPath) { // Build prompt: /workflow:cmd -y args + context const prompt = buildCommandPrompt(step, status);

step.status = 'running'; write(statusPath, JSON.stringify(status, null, 2));

// Execute via ccw cli in background const taskId = Bash( ccw cli -p "${escapePrompt(prompt)}" --tool claude --mode write, { run_in_background: true } ).task_id;

step.taskId = taskId; write(statusPath, JSON.stringify(status, null, 2));

console.log(Executing: ${step.cmd} (async)); console.log(Resume: /flow-coordinator --resume ${status.id}); }

Prompt Building

Prompts are built in format: /workflow:cmd -y args

  • context

function buildCommandPrompt(step, status) { // step.cmd already contains full path: /workflow:lite-plan, /workflow:execute, etc. let prompt = ${step.cmd} -y;

// Add arguments (with placeholder replacement) if (step.args) { const args = step.args .replace('{{goal}}', status.goal) .replace('{{prev}}', getPreviousSessionId(status)); prompt += ${args}; }

// Add context based on contextHint if (step.contextHint) { const context = buildContextFromHint(step.contextHint, status); prompt += \n\nContext:\n${context}; } else { // Default context: previous session IDs const previousContext = collectPreviousResults(status); if (previousContext) { prompt += \n\nPrevious results:\n${previousContext}; } }

return prompt; }

function buildContextFromHint(hint, status) { // Parse contextHint instruction and build context accordingly // Examples: // "Summarize IMPL_PLAN.md" → read and summarize plan // "List test coverage gaps" → analyze previous test results // "Pass session ID" → just return session reference

return parseAndBuildContext(hint, status); }

Example Prompt Output

/workflow:lite-plan -y "Implement user registration"

Context: Task: Implement user registration Previous results:

  • None (first step)

/workflow:execute -y --in-memory

Context: Task: Implement user registration Previous results:

  • lite-plan: WFS-plan-20250130 (planning-context.md)

User Interaction

Step 1: Select Template

Select workflow template:

○ rapid lite-plan → lite-execute → test-cycle-execute ○ coupled plan → plan-verify → execute → review → test ○ bugfix lite-fix → lite-execute → test-cycle-execute ○ tdd tdd-plan → execute → tdd-verify ○ Other (more templates or custom)

Step 2: Review Execution Plan

Template: coupled Steps:

  1. /workflow:plan (slash-command mainprocess)
  2. /workflow:plan-verify (slash-command mainprocess)
  3. /workflow:execute (slash-command async)
  4. /workflow:review-session-cycle (slash-command mainprocess)
  5. /workflow:review-cycle-fix (slash-command mainprocess)
  6. /workflow:test-fix-gen (slash-command mainprocess)
  7. /workflow:test-cycle-execute (slash-command async)

Proceed? [Confirm / Cancel]

Resume Capability

async function resume(sessionId) { const statusPath = .workflow/.flow-coordinator/${sessionId}/status.json; const status = JSON.parse(Read(statusPath));

// Find first incomplete step status.current = status.steps.findIndex(s => s.status !== 'done'); if (status.current === -1) { console.log('All steps complete'); return; }

// Continue executing steps await executeSteps(status, statusPath); }

Available Templates

Templates discovered from templates/*.json :

Template Use Case Steps

rapid Simple feature /workflow:lite-plan → /workflow:lite-execute → /workflow:test-cycle-execute

coupled Complex feature /workflow:plan → /workflow:plan-verify → /workflow:execute → /workflow:review-session-cycle → /workflow:test-fix-gen

bugfix Bug fix /workflow:lite-fix → /workflow:lite-execute → /workflow:test-cycle-execute

tdd Test-driven /workflow:tdd-plan → /workflow:execute → /workflow:tdd-verify

test-fix Fix failing tests /workflow:test-fix-gen → /workflow:test-cycle-execute

brainstorm Exploration /workflow:brainstorm-with-file

debug Debug with docs /workflow:debug-with-file

analyze Collaborative analysis /workflow:analyze-with-file

issue Issue workflow /workflow:issue:plan → /workflow:issue:queue → /workflow:issue:execute

Design Principles

  • Minimal fields: Only essential tracking data

  • Flat structure: No nested objects beyond steps array

  • Step-level execution: Each step defines how it's executed

  • Resumable: Any step can be resumed from status

  • Human readable: Clear JSON format

Reference Documents

Document Purpose

spec/unified-workflow-spec.md Unified PromptTemplate workflow specification

ccw/data/flows/*.json Unified workflows (DAG format, dynamic discovery)

templates/*.json Legacy workflow templates (command chain format)

Demo Workflows (Unified Format)

File Description Nodes

demo-unified-workflow.json

Auth implementation 7 nodes: Analyze → Plan → Implement → Review → Tests → Report

parallel-ci-workflow.json

CI/CD pipeline 8 nodes: Parallel checks → Merge → Conditional notify

simple-analysis-workflow.json

Analysis pipeline 3 nodes: Explore → Analyze → Report

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

review-code

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

skill-generator

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

ccw-help

No summary provided by upstream source.

Repository SourceNeeds Review