airflow-hitl

Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "airflow-hitl" with this command: npx skills add astronomer/agents/astronomer-agents-airflow-hitl

Airflow Human-in-the-Loop Operators

Implement human approval gates, form inputs, and human-driven branching in Airflow DAGs using the HITL operators. These deferrable operators pause workflow execution until a human responds via the Airflow UI or REST API.

Implementation Checklist

Execute steps in order. Prefer deferrable HITL operators over custom sensors/polling loops.

CRITICAL: Requires Airflow 3.1+. NOT available in Airflow 2.x.

Deferrable: All HITL operators are deferrable—they release their worker slot while waiting for human input.

UI Location: View pending actions at Browse → Required Actions in Airflow UI. Respond via the task instance page's Required Actions tab or the REST API.

Cross-reference: For AI/LLM calls, see the airflow-ai skill.


Step 1: Choose operator

OperatorHuman actionOutcome
ApprovalOperatorApprove or RejectReject causes downstream tasks to be skipped (approval task itself succeeds)
HITLOperatorSelect option(s) + formReturns selections
HITLBranchOperatorSelect downstream task(s)Runs selected, skips others
HITLEntryOperatorSubmit formReturns form data

Step 2: Implement operator

ApprovalOperator

from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
    @task
    def prepare():
        return "Review quarterly report"

    approval = ApprovalOperator(
        task_id="approve_report",
        subject="Report Approval",
        body="{{ ti.xcom_pull(task_ids='prepare') }}",
        defaults="Approve",  # Optional: auto on timeout
        params={"comments": Param("", type="string")},
    )

    @task
    def after_approval(result):
        print(f"Decision: {result['chosen_options']}")

    chain(prepare(), approval)
    after_approval(approval.output)

approval_example()

HITLOperator

Required parameters: subject and options.

from airflow.providers.standard.operators.hitl import HITLOperator
from airflow.sdk import dag, task, chain, Param
from datetime import timedelta
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def hitl_example():
    hitl = HITLOperator(
        task_id="select_option",
        subject="Select Payment Method",
        body="Choose how to process payment",
        options=["ACH", "Wire", "Check"],  # REQUIRED
        defaults=["ACH"],
        multiple=False,
        execution_timeout=timedelta(hours=4),
        params={"amount": Param(1000, type="number")},
    )

    @task
    def process(result):
        print(f"Selected: {result['chosen_options']}")
        print(f"Amount: {result['params_input']['amount']}")

    process(hitl.output)

hitl_example()

HITLBranchOperator

IMPORTANT: Options can either:

  1. Directly match downstream task IDs - simpler approach
  2. Use options_mapping - for human-friendly labels that map to task IDs
from airflow.providers.standard.operators.hitl import HITLBranchOperator
from airflow.sdk import dag, task, chain
from pendulum import datetime

DEPTS = ["marketing", "engineering", "sales"]

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def branch_example():
    branch = HITLBranchOperator(
        task_id="select_dept",
        subject="Select Departments",
        options=[f"Fund {d}" for d in DEPTS],
        options_mapping={f"Fund {d}": d for d in DEPTS},
        multiple=True,
    )

    for dept in DEPTS:
        @task(task_id=dept)
        def handle(dept_name: str = dept):
            # Bind the loop variable at definition time to avoid late-binding bugs
            print(f"Processing {dept_name}")
        chain(branch, handle())

branch_example()

HITLEntryOperator

from airflow.providers.standard.operators.hitl import HITLEntryOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def entry_example():
    entry = HITLEntryOperator(
        task_id="get_input",
        subject="Enter Details",
        body="Provide response",
        params={
            "response": Param("", type="string"),
            "priority": Param("p3", type="string"),
        },
    )

    @task
    def process(result):
        print(f"Response: {result['params_input']['response']}")

    process(entry.output)

entry_example()

Step 3: Optional features

Notifiers

from airflow.sdk import BaseNotifier, Context
from airflow.providers.standard.operators.hitl import HITLOperator

class MyNotifier(BaseNotifier):
    template_fields = ("message",)
    def __init__(self, message=""): self.message = message
    def notify(self, context: Context):
        if context["ti"].state == "running":
            url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com")
            self.log.info(f"Action needed: {url}")

hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")])

Restrict respondents

Format depends on your auth manager:

Auth ManagerFormatExample
SimpleAuthManagerUsername["admin", "manager"]
FabAuthManagerEmail["manager@example.com"]
AstroAstro ID["cl1a2b3cd456789ef1gh2ijkl3"]

Astro Users: Find Astro ID at Organization → Access Management.

hitl = HITLOperator(..., respondents=["manager@example.com"])  # FabAuthManager

Timeout behavior

  • With defaults: Task succeeds, default option(s) selected
  • Without defaults: Task fails on timeout
hitl = HITLOperator(
    ...,
    options=["Option A", "Option B"],
    defaults=["Option A"],  # Auto-selected on timeout
    execution_timeout=timedelta(hours=4),
)

Markdown in body

The body parameter supports markdown formatting and is Jinja templatable:

hitl = HITLOperator(
    ...,
    body="""**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}

| Category | Amount |
|----------|--------|
| Marketing | $1M |
""",
)

Callbacks

All HITL operators support standard Airflow callbacks:

def on_hitl_failure(context):
    print(f"HITL task failed: {context['task_instance'].task_id}")

def on_hitl_success(context):
    print(f"HITL task succeeded with: {context['task_instance'].xcom_pull()}")

hitl = HITLOperator(
    task_id="approval_required",
    subject="Review needed",
    options=["Approve", "Reject"],
    on_failure_callback=on_hitl_failure,
    on_success_callback=on_hitl_success,
)

Step 4: API integration

For external responders (Slack, custom app):

import requests, os

HOST = os.getenv("AIRFLOW_HOST")
TOKEN = os.getenv("AIRFLOW_API_TOKEN")

# Get pending actions
r = requests.get(f"{HOST}/api/v2/hitlDetails/?state=pending",
                 headers={"Authorization": f"Bearer {TOKEN}"})

# Respond
requests.patch(
    f"{HOST}/api/v2/hitlDetails/{dag_id}/{run_id}/{task_id}",
    headers={"Authorization": f"Bearer {TOKEN}"},
    json={"chosen_options": ["ACH"], "params_input": {"amount": 1500}}
)

Step 5: Safety checks

Before finalizing, verify:

  • Airflow 3.1+ installed
  • For HITLBranchOperator: options map to downstream task IDs
  • defaults values are in options list
  • API token configured if using external responders

Reference


Related Skills

  • airflow-ai: For AI/LLM task decorators and GenAI patterns
  • authoring-dags: For general DAG writing best practices
  • testing-dags: For testing DAGs with debugging cycles

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

airflow

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

analyzing-data

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

authoring-dags

No summary provided by upstream source.

Repository SourceNeeds Review