agent-factory

Creates specialized worker agents dynamically from templates. Use when orchestrator needs to spawn task-specific workers for parallel execution. Handles agent lifecycle: create -> execute -> cleanup.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "agent-factory" with this command: npx skills add ibrahim-3d/conductor-orchestrator-superpowers/ibrahim-3d-conductor-orchestrator-superpowers-agent-factory

Agent Factory -- Dynamic Worker Creation

Creates ephemeral worker agents from templates, specializing them based on task type.

Worker Creation Flow

Task from DAG -> Determine Type -> Select Template -> Substitute Placeholders -> Spawn Worker

Template Selection

Task TypeTemplateSpecialization
codecode-worker.template.mdTDD, code patterns, tests
uiui-worker.template.mdDesign system, accessibility
integrationintegration-worker.template.mdAPI contracts, error handling
testtest-worker.template.mdCoverage targets, test patterns
docstask-worker.template.mdBase template
configtask-worker.template.mdBase template

CreateWorkerAgent Procedure

def create_worker_agent(task: dict, track_id: str, message_bus_path: str) -> dict:
    """
    Create a specialized worker agent for a task.

    Args:
        task: Task node from DAG (id, name, type, files, depends_on, acceptance)
        track_id: Current track identifier
        message_bus_path: Path to message bus directory

    Returns:
        dict with worker_id, skill_path, prompt
    """

    # 1. Generate unique worker ID
    timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
    worker_id = f"worker-{task['id']}-{timestamp}"

    # 2. Select template based on task type
    task_type = task.get('type', 'code')
    template_map = {
        'code': 'code-worker.template.md',
        'ui': 'ui-worker.template.md',
        'integration': 'integration-worker.template.md',
        'test': 'test-worker.template.md',
    }
    template_name = template_map.get(task_type, 'task-worker.template.md')
    template_path = f"${CLAUDE_PLUGIN_ROOT}/skills/worker-templates/{template_name}"

    # 3. read_file template
    template = read_file(template_path)

    # 4. Prepare substitution values
    substitutions = {
        '{task_id}': task['id'],
        '{task_name}': task['name'],
        '{track_id}': track_id,
        '{phase}': str(task.get('phase', 1)),
        '{files}': format_list(task.get('files', [])),
        '{depends_on}': format_list(task.get('depends_on', [])),
        '{acceptance}': task.get('acceptance', 'Complete the task as specified'),
        '{message_bus_path}': message_bus_path,
        '{timestamp}': timestamp,
        '{worker_id}': worker_id,
        '{unblocks}': format_list(find_unblocked_tasks(task['id'])),
    }

    # 5. Substitute placeholders
    worker_skill = template
    for placeholder, value in substitutions.items():
        worker_skill = worker_skill.replace(placeholder, value)

    # 6. Add task-specific instructions
    if task.get('task_instructions'):
        worker_skill = worker_skill.replace(
            '{task_instructions}',
            task['task_instructions']
        )
    else:
        worker_skill = worker_skill.replace(
            '{task_instructions}',
            f"Implement: {task['name']}\n\nAcceptance: {task.get('acceptance', 'N/A')}"
        )

    # 7. Add base protocol
    base_protocol = read_file("${CLAUDE_PLUGIN_ROOT}/skills/worker-templates/task-worker.template.md")
    base_protocol_section = extract_section(base_protocol, "## Execution Protocol")
    worker_skill = worker_skill.replace('{base_worker_protocol}', base_protocol_section)

    # 8. Create worker skill directory (ephemeral)
    worker_skill_path = f"${CLAUDE_PLUGIN_ROOT}/skills/workers/{worker_id}/SKILL.md"
    os.makedirs(os.path.dirname(worker_skill_path), exist_ok=True)
    write_file(worker_skill_path, worker_skill)

    # 9. Generate dispatch prompt
    dispatch_prompt = f"""You are worker agent {worker_id}.

Your task: {task['name']} (Task {task['id']})

MESSAGE BUS: {message_bus_path}

Follow your worker skill instructions at: {worker_skill_path}

Protocol:
1. Check dependencies via message bus
2. Acquire file locks before modifying
3. Post progress every 5 min
4. Post TASK_COMPLETE when done

Execute autonomously. Do NOT wait for user input."""

    return {
        'worker_id': worker_id,
        'skill_path': worker_skill_path,
        'prompt': dispatch_prompt,
        'task_id': task['id'],
        'task_type': task_type
    }

Batch Worker Creation

For parallel groups, create all workers at once:

def create_workers_for_parallel_group(
    parallel_group: dict,
    dag: dict,
    track_id: str,
    message_bus_path: str
) -> list:
    """
    Create workers for all tasks in a parallel group.

    Args:
        parallel_group: Parallel group definition (id, tasks, conflict_free)
        dag: Full DAG with all task nodes
        track_id: Current track identifier
        message_bus_path: Path to message bus

    Returns:
        List of worker definitions ready for dispatch
    """

    workers = []

    for task_id in parallel_group['tasks']:
        # Find task in DAG
        task = next((n for n in dag['nodes'] if n['id'] == task_id), None)
        if not task:
            continue

        # Create worker
        worker = create_worker_agent(task, track_id, message_bus_path)

        # Add coordination info if not conflict-free
        if not parallel_group.get('conflict_free', True):
            worker['requires_coordination'] = True
            worker['shared_resources'] = parallel_group.get('shared_resources', [])

        workers.append(worker)

    return workers

Worker Dispatch

Dispatch workers via parallel Task calls:

def dispatch_workers(workers: list) -> list:
    """
    Dispatch multiple workers in parallel using Task tool.

    Returns list of Task call results.
    """

    # Create Task calls for all workers
    task_calls = []
    for worker in workers:
        task_calls.append({
            'subagent_type': 'general-purpose',
            'description': f"Execute {worker['task_id']}: {worker.get('task_name', 'task')}",
            'prompt': worker['prompt'],
            'run_in_background': True  # Run in background for true parallelism
        })

    # Dispatch all at once (Claude Code handles parallel calls)
    results = []
    for call in task_calls:
        result = Task(**call)
        results.append(result)

    return results

Worker Cleanup

After task completion, cleanup worker artifacts:

def cleanup_worker(worker_id: str):
    """
    Remove ephemeral worker skill directory.
    Called by orchestrator after worker reports completion.
    """

    worker_skill_path = f"${CLAUDE_PLUGIN_ROOT}/skills/workers/{worker_id}"

    if os.path.exists(worker_skill_path):
        shutil.rmtree(worker_skill_path)

    # Log cleanup
    print(f"Cleaned up worker: {worker_id}")

Cleanup All Workers

After parallel group completes:

def cleanup_parallel_group_workers(parallel_group_id: str, workers: list):
    """
    Cleanup all workers from a completed parallel group.
    """

    for worker in workers:
        cleanup_worker(worker['worker_id'])

    # Remove workers directory if empty
    workers_dir = "${CLAUDE_PLUGIN_ROOT}/skills/workers"
    if os.path.exists(workers_dir) and not os.listdir(workers_dir):
        os.rmdir(workers_dir)

Helper Functions

def format_list(items: list) -> str:
    """Format list for template substitution."""
    if not items:
        return "None"
    return "\n".join(f"- {item}" for item in items)


def find_unblocked_tasks(task_id: str, dag: dict) -> list:
    """Find tasks that will be unblocked when task_id completes."""
    unblocked = []
    for node in dag.get('nodes', []):
        if task_id in node.get('depends_on', []):
            # Check if this is the only remaining dependency
            remaining_deps = [d for d in node['depends_on'] if d != task_id]
            if not remaining_deps:
                unblocked.append(node['id'])
    return unblocked


def extract_section(content: str, section_header: str) -> str:
    """Extract a section from markdown content."""
    lines = content.split('\n')
    in_section = False
    section_lines = []

    for line in lines:
        if line.startswith(section_header):
            in_section = True
            continue
        elif in_section and line.startswith('## '):
            break
        elif in_section:
            section_lines.append(line)

    return '\n'.join(section_lines).strip()

Integration with Orchestrator

The orchestrator calls the agent factory during PARALLEL_EXECUTE:

# In conductor-orchestrator

async def execute_parallel_phase(phase: Phase, dag: dict):
    # 1. Get parallel groups for this phase
    parallel_groups = [
        pg for pg in dag.get('parallel_groups', [])
        if all(task_in_phase(t, phase) for t in pg['tasks'])
    ]

    for pg in parallel_groups:
        # 2. Create workers via agent factory
        workers = create_workers_for_parallel_group(
            pg, dag, track_id, message_bus_path
        )

        # 3. Dispatch workers in parallel
        results = dispatch_workers(workers)

        # 4. Monitor message bus for completion
        await wait_for_group_completion(pg, message_bus_path)

        # 5. Cleanup workers
        cleanup_parallel_group_workers(pg['id'], workers)

Worker Lifecycle

+---------------------------------------------------------------+
|                      WORKER LIFECYCLE                          |
|                                                                |
|  1. CREATE                                                     |
|     Agent Factory -> Template -> Substitution -> Skill Dir     |
|                                                                |
|  2. DISPATCH                                                   |
|     Orchestrator -> Task(prompt, run_in_background) -> Worker  |
|                                                                |
|  3. EXECUTE                                                    |
|     Worker -> Check Deps -> Lock Files -> Implement -> Commit  |
|                                                                |
|  4. REPORT                                                     |
|     Worker -> Message Bus -> TASK_COMPLETE/TASK_FAILED         |
|                                                                |
|  5. CLEANUP                                                    |
|     Orchestrator -> cleanup_worker() -> Remove Skill Dir       |
|                                                                |
+---------------------------------------------------------------+

Error Handling

def handle_worker_failure(worker: dict, error: str, message_bus_path: str):
    """
    Handle worker failure gracefully.

    1. Post failure to message bus
    2. Release any held locks
    3. Cleanup worker artifacts
    4. Notify orchestrator
    """

    # Post failure message
    post_message(message_bus_path, "TASK_FAILED", worker['worker_id'], {
        "task_id": worker['task_id'],
        "error": error
    })

    # Release all locks held by this worker
    release_all_locks_for_worker(message_bus_path, worker['worker_id'])

    # Cleanup worker
    cleanup_worker(worker['worker_id'])

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

dispatching-parallel-agents

No summary provided by upstream source.

Repository SourceNeeds Review
General

using-git-worktrees

No summary provided by upstream source.

Repository SourceNeeds Review
General

context-loader

No summary provided by upstream source.

Repository SourceNeeds Review
General

eval-integration

No summary provided by upstream source.

Repository SourceNeeds Review