flow-management

Flow Management Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "flow-management" with this command: npx skills add jediv/dataiku-chat-control/jediv-dataiku-chat-control-flow-management

Flow Management Patterns

Reference patterns for building and orchestrating Dataiku flows via the Python API.

When to Use This Skill

  • Building datasets that depend on upstream datasets

  • Running multiple recipes in the correct order

  • Creating multi-step pipelines (e.g., aggregate then join then train)

  • Checking job status and handling failures mid-pipeline

Build a Single Dataset

recipe = project.get_recipe("my_recipe") job = recipe.run(job_type='NON_RECURSIVE_FORCED_BUILD', partitions=None, wait=True, no_fail=False) state = job.get_status()["baseStatus"]["state"] # "DONE" or "FAILED"

recipe.run() already waits for completion. Use no_fail=True to prevent exceptions on failure.

Note: job_type options are NON_RECURSIVE_FORCED_BUILD (default), RECURSIVE_BUILD , RECURSIVE_FORCED_BUILD .

Build Multiple Datasets in Dependency Order

When downstream datasets depend on upstream ones, build them sequentially:

def build_recipe(project, recipe_name): """Build a recipe and return success status.""" print(f"Building {recipe_name}...") recipe = project.get_recipe(recipe_name) job = recipe.run(no_fail=True) status = job.get_status() state = status.get("baseStatus", {}).get("state")

if state == "DONE":
    print(f"  {recipe_name}: success")
    return True
else:
    # Extract error details
    activities = status.get("baseStatus", {}).get("activities", {})
    for name, info in activities.items():
        if info.get("firstFailure"):
            print(f"  {recipe_name} error: {info['firstFailure'].get('message')}")
    return False

Build in dependency order: upstream first, then downstream

pipeline = [ "group_LAB_RESULTS_AGG", # Step 1: aggregate "group_CLINICAL_NOTES_AGG", # Step 2: aggregate (independent of step 1) "join_ML_TRAINING_DATA", # Step 3: join (depends on steps 1 & 2) ]

for recipe_name in pipeline: success = build_recipe(project, recipe_name) if not success: print(f"Pipeline failed at {recipe_name}. Fix and retry.") break

Build Independent Recipes in Parallel

For recipes with no dependency between them, you can build the output datasets directly:

These two aggregations are independent — build them before the join

ds1 = project.get_dataset("LAB_RESULTS_AGG") ds2 = project.get_dataset("CLINICAL_NOTES_AGG")

Build both (sequentially via API, but could overlap in Dataiku)

job1 = project.get_recipe("group_LAB_RESULTS_AGG").run(no_fail=True) job2 = project.get_recipe("group_CLINICAL_NOTES_AGG").run(no_fail=True)

Then build the dependent join

job3 = project.get_recipe("join_ML_TRAINING_DATA").run(no_fail=True)

Check What Exists Before Creating

Before creating recipes or datasets, check if they already exist to make scripts idempotent:

existing_datasets = [d.get("name") for d in project.list_datasets()] existing_recipes = [r.get("name") for r in project.list_recipes()]

if "MY_OUTPUT" not in existing_datasets: # Create the dataset... pass

if "my_recipe" not in existing_recipes: # Create the recipe... pass

Verify Pipeline Results

After building a pipeline, verify the final output:

ds = project.get_dataset("ML_TRAINING_DATA") schema = ds.get_settings().get_raw().get("schema", {}).get("columns", [])

print(f"Output has {len(schema)} columns:") for col in schema: print(f" - {col['name']} ({col.get('type', 'unknown')})")

Common Pipeline Patterns

Pattern Steps Key Concern

Aggregate + Join Group inputs → Join aggregated outputs Build aggregations before the join

Clean + Transform Prepare recipe → Group/Join Schema updates after each step

ETL to Warehouse Prepare → Sync to SQL connection Set SQL schema before sync

ML Pipeline Prep → Aggregate → Join → Train Full dependency chain, verify schema at each step

Handling Failures Mid-Pipeline

for recipe_name in pipeline: success = build_recipe(project, recipe_name) if not success: # Get detailed error info jobs = project.list_jobs() job = project.get_job(jobs[0]['def']['id']) print(job.get_log()[-2000:]) # Last 2000 chars of log break

See skills/troubleshooting/ for detailed error diagnosis patterns.

Schema Propagation

When an upstream schema changes, propagate it through the flow:

flow = project.get_flow() propagation = flow.new_schema_propagation("source_dataset") future = propagation.start() future.wait_for_result()

For more control, use the builder options:

propagation.set_auto_rebuild(True) propagation.stop_at("recipe_name") propagation.mark_recipe_as_ok("recipe_name")

Build Datasets via Project API

For more control than recipe.run() , use the project-level job builder:

job = project.new_job('RECURSIVE_FORCED_BUILD') job.with_output('dataset_name', object_type='DATASET') result = job.start_and_wait(no_fail=True)

This supports building multiple outputs in one job:

job = project.new_job('NON_RECURSIVE_FORCED_BUILD') job.with_output('dataset_1', object_type='DATASET') job.with_output('dataset_2', object_type='DATASET') job.start_and_wait()

Detailed References

  • references/build-strategies.md — Dependency ordering, idempotent builds, dataset status checks

Related Skills

  • skills/recipe-patterns/ — How to create and configure individual recipes

  • skills/dataset-management/ — How to create and manage datasets

  • skills/troubleshooting/ — How to debug failed builds

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

recipe-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

data-catalog

No summary provided by upstream source.

Repository SourceNeeds Review
General

dataset-management

No summary provided by upstream source.

Repository SourceNeeds Review