annotating-task-lineage

Annotate Airflow tasks with data lineage using inlets and outlets. Use when the user wants to add lineage metadata to tasks, specify input/output datasets, or enable lineage tracking for operators without built-in OpenLineage extraction.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "annotating-task-lineage" with this command: npx skills add necatiarslan/airflow-vscode-extension/necatiarslan-airflow-vscode-extension-annotating-task-lineage

Annotating Task Lineage with Inlets and Outlets

This skill guides you through adding manual lineage annotations to Airflow tasks using inlets and outlets.

When to Use This Approach

ScenarioUse Inlets/Outlets?
Operator has OpenLineage methodsNo, modify the OL method directly
Operator has no built-in OpenLineage extractorYes
Simple table-level lineage is sufficientYes
Quick lineage setup without custom codeYes
Need column-level lineageNo, use OpenLineage methods or custom extractor
Complex extraction logic neededNo, use OpenLineage methods or custom extractor

Supported Types for Inlets/Outlets

OpenLineage Datasets (recommended)

from openlineage.client.event_v2 import Dataset

source_table = Dataset(
    namespace="postgres://mydb:5432",
    name="public.orders",
)

Airflow Assets (Airflow 3+)

from airflow.sdk import Asset

orders_asset = Asset(uri="s3://my-bucket/data/orders")

Airflow Datasets (Airflow 2.4+)

from airflow.datasets import Dataset

orders_dataset = Dataset(uri="s3://my-bucket/data/orders")

Basic Usage

Setting Inlets and Outlets on Operators

from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum

source_table = Dataset(namespace="snowflake://account", name="raw.orders")
target_table = Dataset(namespace="snowflake://account", name="staging.orders_clean")
output_file = Dataset(namespace="s3://my-bucket", name="exports/orders.parquet")

with DAG(
    dag_id="etl_with_lineage",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    schedule="@daily",
) as dag:

    transform = BashOperator(
        task_id="transform_orders",
        bash_command="echo 'transforming...'",
        inlets=[source_table],
        outlets=[target_table],
    )

    export = BashOperator(
        task_id="export_to_s3",
        bash_command="echo 'exporting...'",
        inlets=[target_table],
        outlets=[output_file],
    )

    transform >> export

Multiple Inputs and Outputs

from openlineage.client.event_v2 import Dataset

customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")

daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")

aggregate_task = PythonOperator(
    task_id="build_daily_aggregates",
    python_callable=build_aggregates,
    inlets=[customers, orders, products],
    outlets=[daily_summary, customer_metrics],
)

Custom Operators

Option 1: Implement OpenLineage Methods (recommended)

from airflow.models import BaseOperator

class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

    def get_openlineage_facets_on_complete(self, task_instance):
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
            outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
        )

Option 2: Set Inlets/Outlets Dynamically

from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset

class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        self.inlets = [Dataset(namespace="warehouse://db", name=self.source_table)]
        self.outlets = [Dataset(namespace="warehouse://db", name=self.target_table)]

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

migrating-airflow-2-to-3

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

airflow-hitl

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

setting-up-astro-project

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

tracing-downstream-lineage

No summary provided by upstream source.

Repository SourceNeeds Review