data-provenance

Data Provenance & Lineage

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-provenance" with this command: npx skills add sunnypatneedi/claude-starter-kit/sunnypatneedi-claude-starter-kit-data-provenance

Data Provenance & Lineage

Track where data comes from, how it transforms, and where it goes—essential for trust, compliance, and debugging.

When to Use

Use this skill when:

  • Auditing data for compliance (GDPR, HIPAA, SOX, CCPA)

  • Debugging data quality issues ("Where did this bad data come from?")

  • Understanding impact of schema changes ("What breaks if I change this field?")

  • Building data catalogs or governance systems

  • Tracking sensitive data (PII, PHI) through systems

  • Responding to data deletion requests (GDPR "right to be forgotten")

What is Data Provenance?

Provenance: The complete history and lineage of a data element

Question: "Where does the revenue number in this dashboard come from?"

Answer (with provenance): Dashboard.revenue (computed 2026-01-21 08:00) ← warehouse.daily_sales.total (aggregated 2026-01-21 02:00) ← etl_pipeline.transform_sales (ran 2026-01-21 01:30) ← production_db.orders.amount (order #12345, created 2026-01-20 15:23) ← stripe_api.charge (charge_id: ch_abc123, processed 2026-01-20 15:23) ← user input (customer: cust_xyz, card ending 4242)

Key questions provenance answers:

  • Where did this data come from? (source)

  • When was it created/updated? (timestamp)

  • How was it transformed? (logic, code version)

  • Who created/modified it? (user, system, process)

  • Why does it have this value? (business logic)

  • What depends on it? (downstream consumers)

Levels of Provenance Tracking

Level 1: Table-Level Lineage

What: Track which tables feed into other tables

┌────────────┐ │ orders │──┐ └────────────┘ │ ├──► ┌──────────────┐ ┌────────────┐ │ │ daily_sales │ │ customers │──┘ └──────────────┘ └────────────┘

Implementation: Metadata table

CREATE TABLE table_lineage ( downstream_table VARCHAR(255), upstream_table VARCHAR(255), relationship_type VARCHAR(50), -- 'direct_copy', 'join', 'aggregate' created_at TIMESTAMPTZ DEFAULT NOW(),

PRIMARY KEY (downstream_table, upstream_table) );

INSERT INTO table_lineage VALUES ('daily_sales', 'orders', 'aggregate'), ('daily_sales', 'customers', 'join');

Query: "What tables does daily_sales depend on?"

SELECT upstream_table FROM table_lineage WHERE downstream_table = 'daily_sales'; -- Result: orders, customers

Query: "What tables depend on orders?"

SELECT downstream_table FROM table_lineage WHERE upstream_table = 'orders'; -- Result: daily_sales, weekly_report, customer_lifetime_value

Level 2: Column-Level Lineage

What: Track which columns feed into which columns

orders.amount ──┐ orders.tax ──┼──► daily_sales.total_revenue orders.shipping─┘

Implementation:

CREATE TABLE column_lineage ( downstream_table VARCHAR(255), downstream_column VARCHAR(255), upstream_table VARCHAR(255), upstream_column VARCHAR(255), transformation TEXT, -- SQL or description created_at TIMESTAMPTZ DEFAULT NOW(),

PRIMARY KEY (downstream_table, downstream_column, upstream_table, upstream_column) );

INSERT INTO column_lineage VALUES ('daily_sales', 'total_revenue', 'orders', 'amount', 'SUM(amount + tax + shipping)'), ('daily_sales', 'order_count', 'orders', 'id', 'COUNT(id)'), ('daily_sales', 'customer_name', 'customers', 'name', 'LEFT JOIN on customer_id');

Query: "Where does daily_sales.total_revenue come from?"

SELECT upstream_table, upstream_column, transformation FROM column_lineage WHERE downstream_table = 'daily_sales' AND downstream_column = 'total_revenue';

Level 3: Row-Level Lineage

What: Track individual record transformations

orders.id=12345 (amount=$100) ──► daily_sales.id=67 (date=2026-01-20, total=$100) orders.id=12346 (amount=$50) ──┘

Implementation: Lineage table

CREATE TABLE row_lineage ( id BIGSERIAL PRIMARY KEY, downstream_table VARCHAR(255), downstream_pk BIGINT, upstream_table VARCHAR(255), upstream_pk BIGINT, created_at TIMESTAMPTZ DEFAULT NOW() );

-- After ETL run INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk) SELECT 'daily_sales', ds.id, 'orders', o.id FROM daily_sales ds JOIN orders o ON DATE(o.created_at) = ds.sale_date;

Query: "What orders contributed to daily_sales row 67?"

SELECT o.* FROM row_lineage rl JOIN orders o ON rl.upstream_pk = o.id WHERE rl.downstream_table = 'daily_sales' AND rl.downstream_pk = 67;

Level 4: Value-Level Lineage (Finest)

What: Track transformations at the field value level

order.amount = $100 order.tax = $10 order.shipping = $5 ↓ (SUM transformation) daily_sales.total_revenue = $115

Implementation: Event log

CREATE TABLE value_lineage ( id BIGSERIAL PRIMARY KEY, entity_type VARCHAR(50), entity_id BIGINT, field_name VARCHAR(100), old_value TEXT, new_value TEXT, transformation TEXT, source_values JSONB, -- Array of source values created_at TIMESTAMPTZ DEFAULT NOW(), created_by VARCHAR(255) -- User or process );

-- Example: Revenue calculation INSERT INTO value_lineage VALUES ( DEFAULT, 'daily_sales', 67, 'total_revenue', NULL, '115.00', 'SUM(orders.amount + orders.tax + orders.shipping) WHERE date = 2026-01-20', '[{"table": "orders", "id": 12345, "amount": 100, "tax": 10, "shipping": 5}]', NOW(), 'etl_pipeline_v1.2.3' );

Provenance Capture Methods

Method 1: Code Instrumentation

Manual tracking in ETL code:

def etl_orders_to_daily_sales(): # Extract orders = db.query("SELECT * FROM orders WHERE date = ?", yesterday)

# Transform
daily_sales = {}
for order in orders:
    date = order['created_at'].date()
    if date not in daily_sales:
        daily_sales[date] = {'total': 0, 'count': 0, 'order_ids': []}

    daily_sales[date]['total'] += order['amount']
    daily_sales[date]['count'] += 1
    daily_sales[date]['order_ids'].append(order['id'])

# Load with lineage tracking
for date, metrics in daily_sales.items():
    ds_id = db.insert(
        "INSERT INTO daily_sales (date, total, count) VALUES (?, ?, ?)",
        date, metrics['total'], metrics['count']
    )

    # Track lineage
    for order_id in metrics['order_ids']:
        db.insert(
            "INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk) VALUES (?, ?, ?, ?)",
            'daily_sales', ds_id, 'orders', order_id
        )

Method 2: SQL Parsing

Automatically extract lineage from SQL queries:

import sqlparse from sqllineage.runner import LineageRunner

sql = """ INSERT INTO daily_sales (date, total_revenue, order_count) SELECT DATE(created_at) as date, SUM(amount + tax + shipping) as total_revenue, COUNT(*) as order_count FROM orders LEFT JOIN customers ON orders.customer_id = customers.id WHERE created_at >= '2026-01-20' GROUP BY DATE(created_at) """

Parse lineage

runner = LineageRunner(sql)

print("Source tables:", runner.source_tables)

{'orders', 'customers'}

print("Target tables:", runner.target_tables)

{'daily_sales'}

Store in lineage table

for source in runner.source_tables: db.insert( "INSERT INTO table_lineage (downstream_table, upstream_table) VALUES (?, ?)", 'daily_sales', source )

Method 3: Database Triggers

Capture changes automatically:

-- Audit trail for all changes CREATE TABLE audit_log ( id BIGSERIAL PRIMARY KEY, table_name VARCHAR(255), record_id BIGINT, operation VARCHAR(10), -- INSERT, UPDATE, DELETE old_values JSONB, new_values JSONB, changed_by VARCHAR(255), changed_at TIMESTAMPTZ DEFAULT NOW() );

-- Trigger on orders table CREATE OR REPLACE FUNCTION audit_orders() RETURNS TRIGGER AS $$ BEGIN INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values, changed_by) VALUES ( 'orders', COALESCE(NEW.id, OLD.id), TG_OP, row_to_json(OLD), row_to_json(NEW), current_user ); RETURN NEW; END; $$ LANGUAGE plpgsql;

CREATE TRIGGER orders_audit AFTER INSERT OR UPDATE OR DELETE ON orders FOR EACH ROW EXECUTE FUNCTION audit_orders();

Method 4: CDC (Change Data Capture)

Stream database changes:

Using Debezium or similar CDC tool

from kafka import KafkaConsumer

consumer = KafkaConsumer('postgres.public.orders')

for message in consumer: change_event = json.loads(message.value)

# Store in lineage system
db.insert(
    "INSERT INTO change_log (table_name, operation, before, after, timestamp) VALUES (?, ?, ?, ?, ?)",
    change_event['source']['table'],
    change_event['op'],  # 'c' (create), 'u' (update), 'd' (delete)
    change_event.get('before'),
    change_event.get('after'),
    change_event['ts_ms']
)

Impact Analysis

Downstream Impact

Question: "If I change orders.amount, what breaks?"

-- Find all downstream dependencies WITH RECURSIVE dependencies AS ( -- Base: Direct dependencies SELECT downstream_table, downstream_column, 1 as depth FROM column_lineage WHERE upstream_table = 'orders' AND upstream_column = 'amount'

UNION ALL

-- Recursive: Dependencies of dependencies SELECT cl.downstream_table, cl.downstream_column, d.depth + 1 FROM column_lineage cl JOIN dependencies d ON cl.upstream_table = d.downstream_table AND cl.upstream_column = d.downstream_column WHERE d.depth < 10 -- Prevent infinite loops ) SELECT DISTINCT downstream_table, downstream_column, depth FROM dependencies ORDER BY depth, downstream_table, downstream_column;

Result:

downstream_tabledownstream_columndepth
daily_salestotal_revenue1
monthly_revenuetotal2
executive_dashboardytd_revenue3
investor_reportarr4

Interpretation: Changing orders.amount affects 4 layers of downstream tables!

Upstream Impact

Question: "What source data feeds into this dashboard metric?"

-- Trace backwards to original sources WITH RECURSIVE sources AS ( -- Base: Direct sources SELECT upstream_table, upstream_column, 1 as depth FROM column_lineage WHERE downstream_table = 'executive_dashboard' AND downstream_column = 'ytd_revenue'

UNION ALL

-- Recursive: Sources of sources SELECT cl.upstream_table, cl.upstream_column, s.depth + 1 FROM column_lineage cl JOIN sources s ON cl.downstream_table = s.upstream_table AND cl.downstream_column = s.upstream_column WHERE s.depth < 10 ) SELECT DISTINCT upstream_table, upstream_column, depth FROM sources WHERE upstream_table NOT IN ( SELECT DISTINCT downstream_table FROM column_lineage ) -- Only leaf nodes (true sources) ORDER BY upstream_table, upstream_column;

Result: Original sources for dashboard metric

upstream_tableupstream_columndepth
ordersamount4
orderstax4
ordersshipping4
stripe_eventscharge_amount5

Data Catalog

Schema Registry

Track all datasets and their metadata:

CREATE TABLE data_catalog ( id BIGSERIAL PRIMARY KEY, dataset_name VARCHAR(255) UNIQUE NOT NULL, dataset_type VARCHAR(50), -- 'table', 'view', 'api', 'file' description TEXT, owner VARCHAR(255), steward VARCHAR(255), -- Data steward (responsible for quality) sensitivity VARCHAR(50), -- 'public', 'internal', 'confidential', 'restricted' contains_pii BOOLEAN DEFAULT FALSE, retention_days INT, -- How long to keep data created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() );

CREATE TABLE data_catalog_columns ( dataset_id BIGINT REFERENCES data_catalog(id), column_name VARCHAR(255), data_type VARCHAR(50), description TEXT, is_nullable BOOLEAN, is_pii BOOLEAN DEFAULT FALSE, pii_type VARCHAR(50), -- 'email', 'ssn', 'phone', 'name', etc. sample_values TEXT[],

PRIMARY KEY (dataset_id, column_name) );

-- Example: Register orders table INSERT INTO data_catalog VALUES ( DEFAULT, 'orders', 'table', 'Customer orders from e-commerce platform', 'engineering@company.com', 'data-team@company.com', 'internal', TRUE, -- Contains PII 2555, -- 7 years retention NOW(), NOW() );

INSERT INTO data_catalog_columns VALUES (1, 'id', 'BIGINT', 'Unique order ID', FALSE, FALSE, NULL, NULL), (1, 'customer_email', 'VARCHAR(255)', 'Customer email address', FALSE, TRUE, 'email', NULL), (1, 'amount', 'DECIMAL(10,2)', 'Order total in USD', FALSE, FALSE, NULL, '{10.99, 25.50, 100.00}');

Searchable Catalog

Find datasets by keyword:

-- Full-text search CREATE INDEX idx_catalog_search ON data_catalog USING GIN(to_tsvector('english', dataset_name || ' ' || description));

-- Search for "revenue" SELECT dataset_name, dataset_type, description, owner FROM data_catalog WHERE to_tsvector('english', dataset_name || ' ' || description) @@ to_tsquery('english', 'revenue') ORDER BY dataset_name;

Compliance & Data Privacy

GDPR: Right to be Forgotten

Track all PII to enable deletion:

-- Find all PII for a user SELECT dc.dataset_name, dcc.column_name, dcc.pii_type FROM data_catalog dc JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id WHERE dcc.is_pii = TRUE;

-- Result: Tables/columns containing PII

dataset_namecolumn_namepii_type
orderscustomer_emailemail
usersemailemail
usersnamename
support_ticketsemailemail
analytics_eventsuser_iduser_id

-- Generate deletion script SELECT 'DELETE FROM ' || dataset_name || ' WHERE ' || column_name || ' = ''' || user_email || ''';' FROM ( SELECT DISTINCT dc.dataset_name, dcc.column_name FROM data_catalog dc JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id WHERE dcc.pii_type = 'email' ) subq;

-- Output: -- DELETE FROM orders WHERE customer_email = 'user@example.com'; -- DELETE FROM users WHERE email = 'user@example.com'; -- DELETE FROM support_tickets WHERE email = 'user@example.com';

PII Tracking in Data Flow

Tag PII as it flows through pipeline:

def track_pii_flow(source_table, dest_table, pii_fields): """Track movement of PII between tables""" for field in pii_fields: db.insert( """ INSERT INTO pii_lineage (source_table, source_column, dest_table, dest_column, tracked_at) VALUES (?, ?, ?, ?, NOW()) """, source_table, field, dest_table, field )

Usage

track_pii_flow('users', 'orders', ['email']) track_pii_flow('orders', 'daily_sales_with_emails', ['email'])

Query: "Where has this user's email propagated?"

db.query(""" WITH RECURSIVE pii_flow AS ( SELECT dest_table, dest_column, 1 as depth FROM pii_lineage WHERE source_table = 'users' AND source_column = 'email'

UNION ALL

SELECT pl.dest_table, pl.dest_column, pf.depth + 1
FROM pii_lineage pl
JOIN pii_flow pf ON pl.source_table = pf.dest_table AND pl.source_column = pf.dest_column
WHERE pf.depth &#x3C; 10

) SELECT DISTINCT dest_table, dest_column FROM pii_flow; """)

Visualization & Tools

Lineage Graph

Generate visual lineage:

import graphviz

def visualize_lineage(table_name): # Fetch lineage lineage = db.query(""" SELECT upstream_table, downstream_table FROM table_lineage WHERE upstream_table = ? OR downstream_table = ? """, table_name, table_name)

# Create graph
dot = graphviz.Digraph()

for row in lineage:
    dot.edge(row['upstream_table'], row['downstream_table'])

dot.render('lineage_graph', format='png', view=True)

visualize_lineage('orders')

Output:

stripe_api ──► orders ──┬──► daily_sales ──► monthly_revenue │ customers ──────────────┘

Commercial Tools

Tool Use Case Features

Apache Atlas Open-source data governance Metadata management, lineage, search

Collibra Enterprise data governance Catalog, lineage, policies, workflows

Alation Data catalog Metadata search, collaboration, lineage

Amundsen (Lyft) Open-source data discovery Search, lineage, usage analytics

DataHub (LinkedIn) Open-source metadata platform Lineage, discovery, governance

dbt Analytics engineering SQL lineage, documentation, tests

Implementation Checklist

Minimal (Start Here)

[ ] Table-level lineage tracking [ ] Audit logs for critical tables [ ] Data catalog for major datasets [ ] Documentation of ETL processes

Standard

[ ] Column-level lineage [ ] Automated lineage extraction from SQL [ ] PII tagging and tracking [ ] Impact analysis queries [ ] Change notifications for downstream consumers

Advanced

[ ] Row-level lineage [ ] Real-time lineage from CDC [ ] Searchable data catalog [ ] Automated GDPR compliance tools [ ] Data quality metrics tied to lineage [ ] Machine learning for anomaly detection

Output Format

When helping with data provenance:

Provenance Strategy

Lineage Level

  • Table-level
  • Column-level
  • Row-level
  • Value-level

Capture Method

  • Code instrumentation
  • SQL parsing
  • Database triggers
  • CDC (Change Data Capture)

Data Catalog Schema

[SQL DDL for catalog tables]

Impact Analysis Queries

[SQL queries for upstream/downstream impact]

PII Tracking

Tables with PII:

Deletion strategy: [Step-by-step process]

Visualization

[Lineage graph representation]

Compliance Requirements

  • GDPR
  • CCPA
  • HIPAA
  • SOX
  • Other: [specify]

Tooling

  • Lineage tracking: [Tool/Custom]
  • Data catalog: [Tool/Custom]
  • Visualization: [Tool/Custom]

Integration

Works with:

  • scalable-data-schema - Track schema evolution over time

  • data-infrastructure-at-scale - Lineage for pipelines and ETL

  • multi-source-data-conflation - Track source of merged data

  • systems-decompose - Plan lineage as part of feature design

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

learning-coach

No summary provided by upstream source.

Repository SourceNeeds Review
General

ugc-content-creator

No summary provided by upstream source.

Repository SourceNeeds Review
General

journaling

No summary provided by upstream source.

Repository SourceNeeds Review
General

sales-playbook

No summary provided by upstream source.

Repository SourceNeeds Review