upstash-workflow

Upstash Workflow Implementation Guide

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "upstash-workflow" with this command: npx skills add lobehub/lobehub/lobehub-lobehub-upstash-workflow

Upstash Workflow Implementation Guide

This guide covers the standard patterns for implementing Upstash Workflow + QStash async workflows in the LobeHub codebase.

🎯 The Three Core Patterns

All workflows in LobeHub follow the same 3-layer architecture with three essential patterns:

  • 🔍 Dry-Run Mode - Get statistics without triggering actual execution

  • 🌟 Fan-Out Pattern - Split large batches into smaller chunks for parallel processing

  • 🎯 Single Task Execution - Each workflow execution processes ONE item only

These patterns ensure scalable, debuggable, and cost-efficient async workflows.

Table of Contents

  • Architecture Overview

  • Core Patterns

  • File Structure

  • Implementation Patterns

  • Best Practices

  • Examples

Architecture Overview

Standard 3-Layer Pattern

All workflows follow a standard 3-layer architecture:

Layer 1: Entry Point (process-*) ├─ Validates prerequisites ├─ Calculates total items to process ├─ Filters existing items ├─ Supports dry-run mode (statistics only) └─ Triggers Layer 2 if work needed

Layer 2: Pagination (paginate-*) ├─ Handles cursor-based pagination ├─ Implements fan-out for large batches ├─ Recursively processes all pages └─ Triggers Layer 3 for each item

Layer 3: Single Task Execution (execute-/generate-) └─ Performs actual business logic for ONE item

Examples: welcome-placeholder , agent-welcome

Core Patterns

  1. Dry-Run Mode

Purpose: Get statistics without triggering actual execution

Pattern:

// Layer 1: Entry Point if (dryRun) { console.log('[workflow:process] Dry run mode, returning statistics only'); return { ...result, dryRun: true, message: [DryRun] Would process ${itemsNeedingProcessing.length} items, }; }

Use Case: Check how many items will be processed before committing to execution

Response:

{ success: true, totalEligible: 100, toProcess: 80, alreadyProcessed: 20, dryRun: true, message: "[DryRun] Would process 80 items" }

  1. Fan-Out Pattern

Purpose: Split large batches into smaller chunks for parallel processing

Pattern:

// Layer 2: Pagination const CHUNK_SIZE = 20;

if (itemIds.length > CHUNK_SIZE) { // Fan-out to smaller chunks const chunks = chunk(itemIds, CHUNK_SIZE); console.log('[workflow:paginate] Fan-out mode:', { chunks: chunks.length, chunkSize: CHUNK_SIZE, totalItems: itemIds.length, });

await Promise.all( chunks.map((ids, idx) => context.run(workflow:fanout:${idx + 1}/${chunks.length}, () => WorkflowClass.triggerPaginateItems({ itemIds: ids }), ), ), ); }

Use Case: Avoid hitting workflow step limits by splitting large batches

Configuration:

  • PAGE_SIZE = 50

  • Items per pagination page

  • CHUNK_SIZE = 20

  • Items per fan-out chunk

  • If batch > CHUNK_SIZE, split into chunks and recursively trigger pagination

  1. Single Task Execution

Purpose: Execute business logic for ONE item at a time

Pattern:

// Layer 3: Single Task Execution export const { POST } = serve<ExecutePayload>( async (context) => { const { itemId } = context.requestPayload ?? {};

if (!itemId) {
  return { success: false, error: 'Missing itemId' };
}

// Get item
const item = await context.run('workflow:get-item', async () => {
  return getItem(itemId);
});

// Execute business logic for THIS item only
const result = await context.run('workflow:execute', async () => {
  return processItem(item);
});

// Save result for THIS item
await context.run('workflow:save', async () => {
  return saveResult(itemId, result);
});

return { success: true, itemId, result };

}, { flowControl: { key: 'workflow.execute', parallelism: 10, ratePerSecond: 5, }, }, );

Key Principles:

  • Each workflow execution handles exactly ONE item

  • Parallelism controlled by flowControl config

  • Multiple items processed via Layer 2 triggering multiple Layer 3 executions

File Structure

Directory Layout

src/ ├── app/(backend)/api/workflows/ │ └── {workflow-name}/ │ ├── process-{entities}/route.ts # Layer 1 │ ├── paginate-{entities}/route.ts # Layer 2 │ └── execute-{entity}/route.ts # Layer 3 │ └── server/workflows/ └── {workflowName}/ └── index.ts # Workflow class

Cloud Project Configuration

For lobehub-cloud specific configurations (re-exports, cloud-only workflows, deployment patterns), see:

📄 Cloud Configuration Guide

Implementation Patterns

  1. Workflow Class

Location: src/server/workflows/{workflowName}/index.ts

import { Client } from '@upstash/workflow'; import debug from 'debug';

const log = debug('lobe-server:workflows:{workflow-name}');

// Workflow paths const WORKFLOW_PATHS = { processItems: '/api/workflows/{workflow-name}/process-items', paginateItems: '/api/workflows/{workflow-name}/paginate-items', executeItem: '/api/workflows/{workflow-name}/execute-item', } as const;

// Payload types export interface ProcessItemsPayload { dryRun?: boolean; force?: boolean; }

export interface PaginateItemsPayload { cursor?: string; itemIds?: string[]; // For fanout chunks }

export interface ExecuteItemPayload { itemId: string; }

/**

  • Get workflow URL using APP_URL */ const getWorkflowUrl = (path: string): string => { const baseUrl = process.env.APP_URL; if (!baseUrl) throw new Error('APP_URL is required to trigger workflows'); return new URL(path, baseUrl).toString(); };

/**

  • Get workflow client */ const getWorkflowClient = (): Client => { const token = process.env.QSTASH_TOKEN; if (!token) throw new Error('QSTASH_TOKEN is required to trigger workflows');

const config: ConstructorParameters<typeof Client>[0] = { token }; if (process.env.QSTASH_URL) { (config as Record<string, unknown>).url = process.env.QSTASH_URL; } return new Client(config); };

/**

  • {Workflow Name} Workflow */ export class {WorkflowName}Workflow { private static client: Client;

private static getClient(): Client { if (!this.client) { this.client = getWorkflowClient(); } return this.client; }

/**

  • Trigger workflow to process items (entry point) */ static triggerProcessItems(payload: ProcessItemsPayload) { const url = getWorkflowUrl(WORKFLOW_PATHS.processItems); log('Triggering process-items workflow'); return this.getClient().trigger({ body: payload, url }); }

/**

  • Trigger workflow to paginate items */ static triggerPaginateItems(payload: PaginateItemsPayload) { const url = getWorkflowUrl(WORKFLOW_PATHS.paginateItems); log('Triggering paginate-items workflow'); return this.getClient().trigger({ body: payload, url }); }

/**

  • Trigger workflow to execute a single item */ static triggerExecuteItem(payload: ExecuteItemPayload) { const url = getWorkflowUrl(WORKFLOW_PATHS.executeItem); log('Triggering execute-item workflow: %s', payload.itemId); return this.getClient().trigger({ body: payload, url }); }

/**

  • Filter items that need processing (e.g., check Redis cache, database state) */ static async filterItemsNeedingProcessing(itemIds: string[]): Promise<string[]> { if (itemIds.length === 0) return [];
// Check existing state (Redis, database, etc.)
// Return items that need processing

return itemIds;

} }

  1. Layer 1: Entry Point (process-*)

Purpose: Validates prerequisites, calculates statistics, supports dryRun mode

import { serve } from '@upstash/workflow/nextjs'; import { getServerDB } from '@/database/server'; import { WorkflowClass, type ProcessPayload } from '@/server/workflows/{workflowName}';

/**

  • Entry workflow for {workflow description}

    1. Get all eligible items
    1. Filter items that already have results
    1. If dryRun, return statistics only
    1. If no items need processing, return early
    1. Trigger paginate workflow */ export const { POST } = serve<ProcessPayload>( async (context) => { const { dryRun, force } = context.requestPayload ?? {};

    console.log('[{workflow}:process] Starting with payload:', { dryRun, force });

    // Get all eligible items const allItemIds = await context.run('{workflow}:get-all-items', async () => { const db = await getServerDB(); // Query database for eligible items return items.map((item) => item.id); });

    console.log('[{workflow}:process] Total eligible items:', allItemIds.length);

    if (allItemIds.length === 0) { return { success: true, totalEligible: 0, message: 'No eligible items found', }; }

    // Filter items that need processing const itemsNeedingProcessing = await context.run('{workflow}:filter-existing', () => WorkflowClass.filterItemsNeedingProcessing(allItemIds), );

    const result = { success: true, totalEligible: allItemIds.length, toProcess: itemsNeedingProcessing.length, alreadyProcessed: allItemIds.length - itemsNeedingProcessing.length, };

    console.log('[{workflow}:process] Check result:', result);

    // If dryRun mode, return statistics only if (dryRun) { console.log('[{workflow}:process] Dry run mode, returning statistics only'); return { ...result, dryRun: true, message: [DryRun] Would process ${itemsNeedingProcessing.length} items, }; }

    // If no items need processing, return early if (itemsNeedingProcessing.length === 0) { console.log('[{workflow}:process] All items already processed'); return { ...result, message: 'All items already processed', }; }

    // Trigger paginate workflow console.log('[{workflow}:process] Triggering paginate workflow'); await context.run('{workflow}:trigger-paginate', () => WorkflowClass.triggerPaginateItems({}));

    return { ...result, message: Triggered pagination for ${itemsNeedingProcessing.length} items, }; }, { flowControl: { key: '{workflow}.process', parallelism: 1, ratePerSecond: 1, }, }, );

  1. Layer 2: Pagination (paginate-*)

Purpose: Handles cursor-based pagination, implements fanout for large batches

import { serve } from '@upstash/workflow/nextjs'; import { chunk } from 'es-toolkit/compat'; import { getServerDB } from '@/database/server'; import { WorkflowClass, type PaginatePayload } from '@/server/workflows/{workflowName}';

const PAGE_SIZE = 50; const CHUNK_SIZE = 20;

/**

  • Paginate items workflow - handles pagination and fanout

    1. If specific itemIds provided (from fanout), process them directly
    1. Otherwise, paginate through all items using cursor
    1. Filter items that need processing
    1. If batch > CHUNK_SIZE, fanout to smaller chunks
    1. Trigger execute workflow for each item
    1. Schedule next page if cursor exists */ export const { POST } = serve<PaginatePayload>( async (context) => { const { cursor, itemIds: payloadItemIds } = context.requestPayload ?? {};

    console.log('[{workflow}:paginate] Starting with payload:', { cursor, itemIdsCount: payloadItemIds?.length ?? 0, });

    // If specific itemIds are provided, process them directly (from fanout) if (payloadItemIds && payloadItemIds.length > 0) { console.log('[{workflow}:paginate] Processing specific itemIds:', { count: payloadItemIds.length, });

    await Promise.all( payloadItemIds.map((itemId) => context.run({workflow}:execute:${itemId}, () => WorkflowClass.triggerExecuteItem({ itemId }), ), ), );

    return { success: true, processedItems: payloadItemIds.length, }; }

    // Paginate through all items const itemBatch = await context.run('{workflow}:get-batch', async () => { const db = await getServerDB(); // Query database with cursor and PAGE_SIZE const items = await db.query(...);

    if (!items.length) return { ids: [] };

    const last = items.at(-1); return { ids: items.map(item => item.id), cursor: last ? last.id : undefined, }; });

    const batchItemIds = itemBatch.ids; const nextCursor = 'cursor' in itemBatch ? itemBatch.cursor : undefined;

    console.log('[{workflow}:paginate] Got batch:', { batchSize: batchItemIds.length, nextCursor, });

    if (batchItemIds.length === 0) { console.log('[{workflow}:paginate] No more items, pagination complete'); return { success: true, message: 'Pagination complete' }; }

    // Filter items that need processing const itemIds = await context.run('{workflow}:filter-existing', () => WorkflowClass.filterItemsNeedingProcessing(batchItemIds), );

    console.log('[{workflow}:paginate] After filtering:', { needProcessing: itemIds.length, skipped: batchItemIds.length - itemIds.length, });

    // Process items if any need processing if (itemIds.length > 0) { if (itemIds.length > CHUNK_SIZE) { // Fanout to smaller chunks const chunks = chunk(itemIds, CHUNK_SIZE); console.log('[{workflow}:paginate] Fanout mode:', { chunks: chunks.length, chunkSize: CHUNK_SIZE, totalItems: itemIds.length, });

     await Promise.all(
       chunks.map((ids, idx) =>
         context.run(`{workflow}:fanout:${idx + 1}/${chunks.length}`, () =>
           WorkflowClass.triggerPaginateItems({ itemIds: ids }),
         ),
       ),
     );
    

    } else { // Process directly console.log('[{workflow}:paginate] Processing items directly:', { count: itemIds.length, });

     await Promise.all(
       itemIds.map((itemId) =>
         context.run(`{workflow}:execute:${itemId}`, () =>
           WorkflowClass.triggerExecuteItem({ itemId }),
         ),
       ),
     );
    

    } }

    // Schedule next page if (nextCursor) { console.log('[{workflow}:paginate] Scheduling next page:', { nextCursor }); await context.run('{workflow}:next-page', () => WorkflowClass.triggerPaginateItems({ cursor: nextCursor }), ); } else { console.log('[{workflow}:paginate] No more pages'); }

    return { success: true, processedItems: itemIds.length, skippedItems: batchItemIds.length - itemIds.length, nextCursor: nextCursor ?? null, }; }, { flowControl: { key: '{workflow}.paginate', parallelism: 20, ratePerSecond: 5, }, }, );

  1. Layer 3: Execution (execute-/generate-)

Purpose: Performs actual business logic

import { serve } from '@upstash/workflow/nextjs'; import { getServerDB } from '@/database/server'; import { WorkflowClass, type ExecutePayload } from '@/server/workflows/{workflowName}';

/**

  • Execute item workflow - performs actual business logic

    1. Get item data
    1. Perform business logic (AI generation, data processing, etc.)
    1. Save results */ export const { POST } = serve<ExecutePayload>( async (context) => { const { itemId } = context.requestPayload ?? {};

    console.log('[{workflow}:execute] Starting:', { itemId });

    if (!itemId) { return { success: false, error: 'Missing itemId' }; }

    const db = await getServerDB();

    // Get item data const item = await context.run('{workflow}:get-item', async () => { // Query database for item return item; });

    if (!item) { return { success: false, error: 'Item not found' }; }

    // Perform business logic const result = await context.run('{workflow}:process-item', async () => { const workflow = new WorkflowClass(db, itemId); return workflow.generate(); // or process(), execute(), etc. });

    // Save results await context.run('{workflow}:save-result', async () => { const workflow = new WorkflowClass(db, itemId); return workflow.saveToRedis(result); // or saveToDatabase(), etc. });

    console.log('[{workflow}:execute] Completed:', { itemId });

    return { success: true, itemId, result, }; }, { flowControl: { key: '{workflow}.execute', parallelism: 10, ratePerSecond: 5, }, }, );

Best Practices

  1. Error Handling

export const { POST } = serve<Payload>( async (context) => { const { itemId } = context.requestPayload ?? {};

// Validate required parameters
if (!itemId) {
  return { success: false, error: 'Missing itemId in payload' };
}

try {
  // Perform work
  const result = await context.run('step-name', () => doWork(itemId));

  return { success: true, itemId, result };
} catch (error) {
  console.error('[workflow:error]', error);
  return {
    success: false,
    error: error instanceof Error ? error.message : 'Unknown error'
  };
}

}, { flowControl: { ... } }, );

  1. Logging

Use consistent log prefixes and structured logging:

console.log('[{workflow}:{layer}] Starting with payload:', payload); console.log('[{workflow}:{layer}] Processing items:', { count: items.length }); console.log('[{workflow}:{layer}] Completed:', result); console.error('[{workflow}:{layer}:error]', error);

  1. Return Values

Return consistent response shapes:

// Success response return { success: true, itemId, result, message: 'Optional success message', };

// Error response return { success: false, error: 'Error description', itemId, // Include context if available };

// Statistics response (for entry point) return { success: true, totalEligible: 100, toProcess: 80, alreadyProcessed: 20, dryRun: true, // If applicable message: 'Summary message', };

  1. flowControl Configuration

Purpose: Control concurrency and rate limiting for workflow executions

Tune concurrency based on layer:

// Layer 1: Entry point - single instance only flowControl: { key: '{workflow}.process', parallelism: 1, // Only 1 process workflow at a time ratePerSecond: 1, // 1 execution per second }

// Layer 2: Pagination - moderate concurrency flowControl: { key: '{workflow}.paginate', parallelism: 20, // Up to 20 pagination workflows in parallel ratePerSecond: 5, // 5 new executions per second }

// Layer 3: Single task execution - high concurrency flowControl: { key: '{workflow}.execute', parallelism: 10, // Up to 10 items processed in parallel ratePerSecond: 5, // 5 new items per second }

Guidelines:

  • Layer 1: Always use parallelism: 1 to avoid duplicate processing

  • Layer 2: Moderate concurrency for pagination (typically 10-20)

  • Layer 3: Higher concurrency for parallel item processing (typically 5-10)

  • Adjust ratePerSecond based on external API rate limits or resource constraints

  1. context.run() Best Practices
  • Use descriptive step names with prefixes: {workflow}:step-name

  • Each step should be idempotent (safe to retry)

  • Don't nest context.run() calls - keep them flat

  • Use unique step names when processing multiple items:

// Good: Unique step names await Promise.all( items.map((item) => context.run({workflow}:execute:${item.id}, () => processItem(item))), );

// Bad: Same step name for all items await Promise.all( items.map((item) => context.run({workflow}:execute, () => // ❌ Not unique processItem(item), ), ), );

  1. Payload Validation

Always validate required parameters at the start:

export const { POST } = serve<Payload>( async (context) => { const { itemId, configId } = context.requestPayload ?? {};

// Validate at the start
if (!itemId) {
  return { success: false, error: 'Missing itemId in payload' };
}

if (!configId) {
  return { success: false, error: 'Missing configId in payload' };
}

// Proceed with work...

}, { flowControl: { ... } }, );

  1. Database Connection

Get database connection once per workflow:

export const { POST } = serve<Payload>( async (context) => { const db = await getServerDB(); // Get once

// Use in multiple steps
const item = await context.run('get-item', async () => {
  return itemModel.findById(db, itemId);
});

const result = await context.run('save-result', async () => {
  return resultModel.create(db, result);
});

}, { flowControl: { ... } }, );

  1. Testing

Create integration tests for workflows:

describe('WorkflowName', () => { it('should process items successfully', async () => { // Setup test data const items = await createTestItems();

// Trigger workflow
await WorkflowClass.triggerProcessItems({ dryRun: false });

// Wait for completion (use polling or webhook)
await waitForCompletion();

// Verify results
const results = await getResults();
expect(results).toHaveLength(items.length);

});

it('should support dryRun mode', async () => { const result = await WorkflowClass.triggerProcessItems({ dryRun: true });

expect(result).toMatchObject({
  success: true,
  dryRun: true,
  totalEligible: expect.any(Number),
  toProcess: expect.any(Number),
});

}); });

Examples

Example 1: Welcome Placeholder

Use Case: Generate AI-powered welcome placeholders for users

Structure:

  • Layer 1: process-users

  • Entry point, checks eligible users

  • Layer 2: paginate-users

  • Paginates through active users

  • Layer 3: generate-user

  • Generates placeholders for ONE user

Core Patterns Demonstrated:

  • Dry-Run Mode:

// Layer 1: process-users if (dryRun) { return { ...result, dryRun: true, message: [DryRun] Would process ${usersNeedingGeneration.length} users, }; }

  • Fan-Out Pattern:

// Layer 2: paginate-users if (userIds.length > CHUNK_SIZE) { const chunks = chunk(userIds, CHUNK_SIZE); await Promise.all( chunks.map((ids, idx) => context.run(welcome-placeholder:fanout:${idx + 1}/${chunks.length}, () => WelcomePlaceholderWorkflow.triggerPaginateUsers({ userIds: ids }), ), ), ); }

  • Single Task Execution:

// Layer 3: generate-user export const { POST } = serve<GenerateUserPlaceholderPayload>(async (context) => { const { userId } = context.requestPayload ?? {};

// Execute for ONE user only const workflow = new WelcomePlaceholderWorkflow(db, userId); const placeholders = await context.run('generate', () => workflow.generate());

return { success: true, userId, placeholdersCount: placeholders.length }; });

Key Features:

  • ✅ Filters users who already have cached placeholders in Redis

  • ✅ Supports paidOnly flag to process only subscribed users

  • ✅ Supports dryRun mode for statistics

  • ✅ Uses fan-out for large user batches (CHUNK_SIZE=20)

  • ✅ Each execution processes exactly ONE user

Files:

  • /api/workflows/welcome-placeholder/process-users/route.ts

  • /api/workflows/welcome-placeholder/paginate-users/route.ts

  • /api/workflows/welcome-placeholder/generate-user/route.ts

  • /server/workflows/welcomePlaceholder/index.ts

Example 2: Agent Welcome

Use Case: Generate welcome messages and open questions for AI agents

Structure:

  • Layer 1: process-agents

  • Entry point, checks eligible agents

  • Layer 2: paginate-agents

  • Paginates through active agents

  • Layer 3: generate-agent

  • Generates welcome data for ONE agent

Core Patterns Demonstrated:

  • Dry-Run Mode:

// Layer 1: process-agents if (dryRun) { return { ...result, dryRun: true, message: [DryRun] Would process ${agentsNeedingGeneration.length} agents, }; }

Fan-Out Pattern: Same as welcome-placeholder

Single Task Execution:

// Layer 3: generate-agent export const { POST } = serve<GenerateAgentWelcomePayload>(async (context) => { const { agentId } = context.requestPayload ?? {};

// Execute for ONE agent only const workflow = new AgentWelcomeWorkflow(db, agentId); const data = await context.run('generate', () => workflow.generate());

return { success: true, agentId, data }; });

Key Features:

  • ✅ Filters agents who already have cached data in Redis

  • ✅ Supports paidOnly flag for subscribed users' agents only

  • ✅ Supports dryRun mode for statistics

  • ✅ Uses fan-out for large agent batches (CHUNK_SIZE=20)

  • ✅ Each execution processes exactly ONE agent

Files:

  • /api/workflows/agent-welcome/process-agents/route.ts

  • /api/workflows/agent-welcome/paginate-agents/route.ts

  • /api/workflows/agent-welcome/generate-agent/route.ts

  • /server/workflows/agentWelcome/index.ts

Key Takeaways from Examples

Both workflows follow the exact same pattern:

Layer 1 (Entry Point):

  • Calculate statistics

  • Filter existing items

  • Support dry-run mode

  • Trigger pagination only if needed

Layer 2 (Pagination):

  • Paginate with cursor (PAGE_SIZE=50)

  • Fan-out large batches (CHUNK_SIZE=20)

  • Trigger Layer 3 for each item

  • Recursively process all pages

Layer 3 (Execution):

  • Process ONE item per execution

  • Perform business logic

  • Save results

  • Return success/failure

The only differences are:

  • Entity type (users vs agents)

  • Business logic (placeholder generation vs welcome generation)

  • Data source (different database queries)

Common Pitfalls

❌ Don't: Use context.run() without unique names

// Bad: Same step name when processing multiple items await Promise.all(items.map((item) => context.run('process', () => process(item))));

// Good: Unique step names await Promise.all(items.map((item) => context.run(process:${item.id}, () => process(item))));

❌ Don't: Forget to validate payload parameters

// Bad: No validation export const { POST } = serve<Payload>(async (context) => { const { itemId } = context.requestPayload ?? {}; const result = await process(itemId); // May fail with undefined });

// Good: Validate early export const { POST } = serve<Payload>(async (context) => { const { itemId } = context.requestPayload ?? {};

if (!itemId) { return { success: false, error: 'Missing itemId' }; }

const result = await process(itemId); });

❌ Don't: Skip filtering existing items

// Bad: No filtering, may duplicate work const allItems = await getAllItems(); await Promise.all(allItems.map((item) => triggerExecute(item)));

// Good: Filter existing items first const allItems = await getAllItems(); const itemsNeedingProcessing = await filterExisting(allItems); await Promise.all(itemsNeedingProcessing.map((item) => triggerExecute(item)));

❌ Don't: Use inconsistent logging

// Bad: Inconsistent prefixes and formats console.log('Starting workflow'); log.info('Processing item:', itemId); console.log(Done with ${itemId});

// Good: Consistent structured logging console.log('[workflow:layer] Starting with payload:', payload); console.log('[workflow:layer] Processing item:', { itemId }); console.log('[workflow:layer] Completed:', { itemId, result });

Environment Variables Required

Required for all workflows

APP_URL=https://your-app.com # Base URL for workflow endpoints QSTASH_TOKEN=qstash_xxx # QStash authentication token

Optional (for custom QStash URL)

QSTASH_URL=https://custom-qstash.com # Custom QStash endpoint

Checklist for New Workflows

Planning Phase

  • Identify entity to process (users, agents, items, etc.)

  • Define business logic for single item execution

  • Determine filtering logic (Redis cache, database state, etc.)

Implementation Phase

  • Define payload types with proper TypeScript interfaces

  • Create workflow class with static trigger methods

  • Layer 1: Implement entry point with dry-run support

  • Layer 1: Add filtering logic to avoid duplicate work

  • Layer 2: Implement pagination with fan-out logic

  • Layer 3: Implement single task execution (ONE item per run)

  • Configure appropriate flowControl for each layer

  • Add consistent logging with workflow prefixes

  • Validate all required payload parameters

  • Use unique context.run() step names

Quality & Deployment

  • Return consistent response shapes

  • Configure cloud deployment (see Cloud Guide if using lobehub-cloud)

  • Write integration tests

  • Test with dry-run mode first

  • Test with small batch before full rollout

Additional Resources

  • Upstash Workflow Documentation

  • QStash Documentation

  • Example Workflows in Codebase

  • Workflow Classes

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

agent-tracing

No summary provided by upstream source.

Repository SourceNeeds Review
General

react

No summary provided by upstream source.

Repository SourceNeeds Review
1.3K-lobehub
Coding

typescript

No summary provided by upstream source.

Repository SourceNeeds Review
General

zustand

No summary provided by upstream source.

Repository SourceNeeds Review