distributed-job-safety

Distributed Job Safety

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "distributed-job-safety" with this command: npx skills add terrylica/cc-skills/terrylica-cc-skills-distributed-job-safety

Distributed Job Safety

Patterns and anti-patterns for concurrent job management with pueue + mise + systemd-run, learned from production failures in distributed data pipeline orchestration.

Scope: Universal principles for any pueue + mise workflow with concurrent parameterized jobs. Examples use illustrative names but the principles apply to any domain.

Prerequisite skills: devops-tools:pueue-job-orchestration , itp:mise-tasks , itp:mise-configuration

The Nine Invariants

Non-negotiable rules for concurrent job safety. Violating any one causes silent data corruption or job failure.

Full formal specifications: references/concurrency-invariants.md

  1. Filename Uniqueness by ALL Job Parameters

Every file path shared between concurrent jobs MUST include ALL parameters that differentiate those jobs.

WRONG: {symbol}{start}{end}.json # Two thresholds collide RIGHT: {symbol}{threshold}{start}_{end}.json # Each job gets its own file

Test: If two pueue jobs can run simultaneously with different parameter values, those values MUST appear in every shared filename, temp directory, and lock file.

  1. Verify Before Mutate (No Blind Queueing)

Before queueing jobs, check what is already running. Before deleting state, check who owns it.

WRONG: Blind queue

for item in "${ITEMS[@]}"; do pueue add --group mygroup -- run_job "$item" "$param" done

RIGHT: Check first

running=$(pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running") | .label] | join(",")') if echo "$running" | grep -q "${item}@${param}"; then echo "SKIP: ${item}@${param} already running" continue fi

  1. Idempotent File Operations (missing_ok=True)

All file deletion in concurrent contexts MUST tolerate the file already being gone.

WRONG: TOCTOU race

if path.exists(): path.unlink() # Crashes if another job deleted between check and unlink

RIGHT: Idempotent

path.unlink(missing_ok=True)

  1. Atomic Writes for Shared State

Checkpoint files must never be partially written. Use the tempfile-fsync-rename pattern.

fd, temp_path = tempfile.mkstemp(dir=path.parent, prefix=".ckpt_", suffix=".tmp") with os.fdopen(fd, "w") as f: f.write(json.dumps(data)) f.flush() os.fsync(f.fileno()) os.replace(temp_path, path) # POSIX atomic rename

Bash equivalent (for NDJSON telemetry appends):

Atomic multi-line append via flock + temp file

TMPOUT=$(mktemp)

... write lines to $TMPOUT ...

flock "${LOG_FILE}.lock" bash -c "cat '${TMPOUT}' >> '${LOG_FILE}'" rm -f "$TMPOUT"

  1. Config File Is SSoT

The .mise.toml [env] section is the single source of truth for environment defaults. Per-job env overrides bypass the SSoT and allow arbitrary values with no review gate.

WRONG: Per-job override bypasses mise SSoT

pueue add -- env MY_APP_MIN_THRESHOLD=50 uv run python script.py

RIGHT: Set the correct value in .mise.toml, no per-job override needed

pueue add -- uv run python script.py

Controlled exception: pueue env set <id> KEY VALUE is acceptable for one-off overrides on stashed/queued tasks (e.g., hyperparameter sweeps). The key distinction: mise [env] is SSoT for defaults that apply to all runs; pueue env set is for one-time parameterization of a specific task without modifying the config file. See devops-tools:pueue-job-orchestration Per-Task Environment Override section.

  1. Maximize Parallelism Within Safe Margins

Always probe host resources and scale parallelism to use available capacity. Conservative defaults waste hours of idle compute.

Probe host resources

ssh host 'nproc && free -h && uptime'

Sizing formula (leave 20% margin for OS + DB + overhead)

max_jobs = min(

(available_memory_gb * 0.8) / per_job_memory_gb,

(total_cores * 0.8) / per_job_cpu_cores

)

For ClickHouse workloads: The bottleneck is often ClickHouse's concurrent_threads_soft_limit (default: 2 x nproc), not pueue's parallelism. Each query requests max_threads threads (default: nproc). Right-size --max_threads per query to match the effective thread count (soft_limit / pueue_slots), then increase pueue slots. Pueue parallelism can be adjusted live without restarting running jobs.

Post-bump monitoring (mandatory for 5 minutes after any parallelism change):

  • uptime -- load average should stay below 0.9 x nproc

  • vmstat 1 5 -- si/so columns must remain 0 (no active swapping)

  • ClickHouse errors: SELECT count() FROM system.query_log WHERE event_time > now() - INTERVAL 5 MINUTE AND type = 'ExceptionWhileProcessing' -- must be 0

Cross-reference: See devops-tools:pueue-job-orchestration ClickHouse Parallelism Tuning section for the full decision matrix.

  1. Per-Job Memory Caps via systemd-run

On Linux with cgroups v2, wrap each job with systemd-run to enforce hard memory limits.

systemd-run --user --scope -p MemoryMax=8G -p MemorySwapMax=0
uv run python scripts/process.py --symbol BTCUSDT --threshold 250

Critical: MemorySwapMax=0 is mandatory. Without it, the process escapes into swap and the memory limit is effectively meaningless.

  1. Monitor by Stable Identifiers, Not Ephemeral IDs (INV-8)

Pueue job IDs are ephemeral -- they shift when jobs are removed, re-queued, or split. Use group names and label patterns for monitoring.

WRONG: Hardcoded job IDs

if pueue status --json | jq -e ".tasks."14"" >/dev/null; then ...

RIGHT: Query by group/label

pueue status --json | jq -r '.tasks | to_entries[] | select(.value.group == "mygroup") | .value.id'

Full specification: references/concurrency-invariants.md

  1. Derived Artifact Filenames Must Include ALL Category Dimensions (INV-9)

When concurrent or sequential pipeline phases produce derived artifacts (Parquet chunks, JSONL summaries, temp files) that share a directory, every filename must include ALL discriminating dimensions -- not just the job-level parameters (INV-1), but also pipeline-level categories like direction, strategy, or generation.

WRONG: chunk{formation}{symbol}{threshold}.parquet # No direction -- LONG glob eats SHORT files RIGHT: chunk{direction}{formation}{symbol}_{threshold}.parquet # Direction-scoped

Glob scope rule: Cleanup and merge globs must match the filename pattern exactly:

WRONG: Unscoped glob -- consumes artifacts from other categories

chunk_files = folds_dir.glob("chunk*.parquet")

RIGHT: Category-scoped glob -- only touches this category's artifacts

chunk_files = folds_dir.glob(f"chunk{direction}_*.parquet")

Post-merge validation: After merging artifacts, assert expected values in category columns:

merged_df = pl.concat([pl.read_parquet(p) for p in chunk_files]) assert set(merged_df["strategy"].unique()) == {"standard"}, "Direction contamination!"

Relationship to INV-1: INV-1 ensures checkpoint file uniqueness by job parameters (runtime isolation). INV-9 extends this to derived artifacts that persist across pipeline phases (artifact isolation). Both prevent the same class of bug -- silent cross-contamination from filename collisions.

Full specification: references/concurrency-invariants.md

Anti-Patterns (Learned from Production)

17 anti-patterns documented from production failures. Full details with code examples: references/anti-patterns.md

AP Name Key Symptom Related Invariant

AP-1 Redeploying without checking running Checkpoint collisions after kill+requeue INV-2

AP-2 Checkpoint filename missing parameters FileNotFoundError on checkpoint delete INV-1

AP-3 Trusting pueue restart logs Old error appears after restart

AP-4 Assuming PyPI propagation is instant "no version found" after publish

AP-5 Editable source vs. installed wheel uv run uses old code after pip upgrade

AP-6 Sequential phase assumption Phase contention from simultaneous queueing

AP-7 Manual post-processing steps "run optimize after they finish" never happens

AP-8 Hardcoded job IDs in monitors Monitor crashes after job re-queue INV-8

AP-9 Sequential when epochs enable parallel 1,700 hours single-threaded on 25+ cores INV-6

AP-10 State file bloat Silent 60x slowdown in job submission

AP-11 Wrong working directory in remote jobs [Errno 2] No such file or directory

--

AP-12 Per-file SSH for bulk submission 300K jobs takes days (SSH overhead)

AP-13 SIGPIPE under set -euo pipefail

Exit code 141 on harmless pipe ops

AP-14 False data loss from variable NDJSON wc -l shows 3-6% fewer lines

AP-15 Cursor file deletion on completion Full re-run instead of incremental resume

AP-16 mise [env] for pueue/cron secrets Empty env vars in daemon jobs INV-5

AP-17 Unscoped glob across pipeline phases Phase A consumes Phase B's artifacts INV-9

The Mise + Pueue + systemd-run Stack

Full architecture diagram and responsibility boundaries: references/stack-architecture.md

Layer Responsibility

mise Environment variables, tool versions, task discovery

pueue Daemon persistence, parallelism limits, restart, --after

systemd-run Per-job cgroup memory caps (Linux only, no-op on macOS)

autoscaler Dynamic parallelism tuning based on host resources

Python/app Domain logic, checkpoint management, data integrity

Remote Deployment Protocol

When deploying a fix to a running host:

  1. AUDIT: ssh host 'pueue status --json' -> count running/queued/failed
  2. DECIDE: Wait for running jobs? Kill? Let them finish with old code?
  3. PULL: ssh host 'cd ~/project && git fetch origin main && git reset --hard origin/main'
  4. VERIFY: ssh host 'cd ~/project && python -c "import pkg; print(pkg.version)"'
  5. UPGRADE: ssh host 'cd ~/project && uv pip install --python .venv/bin/python --refresh pkg==X.Y.Z'
  6. RESTART: ssh host 'pueue restart <failed_id>' OR add fresh jobs
  7. MONITOR: ssh host 'pueue status --group mygroup'

Critical: Step 1 (AUDIT) is mandatory. Skipping it is the root cause of cascade failures.

See: references/deployment-checklist.md for full protocol.

Concurrency Safety Decision Tree

Adding a new parameter to a resumable job function? |-- Is it job-differentiating (two jobs can have different values)? | |-- YES -> Add to checkpoint filename | | Add to pueue job label | | Add to remote checkpoint key | |-- NO -> Skip (e.g., verbose, notify are per-run, not per-job) | |-- Does the function delete files? | |-- YES -> Use missing_ok=True | | Use atomic write for creates | |-- NO -> Standard operation | |-- Does the function write to shared storage? |-- YES -> Force deduplication after write | Use UPSERT semantics where possible |-- NO -> Standard operation

Autoscaler

Dynamic parallelism tuning for pueue groups based on host CPU and memory. Full details: references/autoscaler.md

CPU < 40% AND MEM < 60% -> SCALE UP (+1 per group) CPU > 80% OR MEM > 80% -> SCALE DOWN (-1 per group) Otherwise -> HOLD

Key principle: Ramp up incrementally (not to max). Job memory grows over time -- jumping to max parallelism risks OOM when all jobs peak simultaneously.

Project-Specific Extensions

This skill provides universal patterns that apply to any distributed job pipeline. Projects should create a local extension skill (e.g., myproject-job-safety ) in their .claude/skills/ directory that provides:

Local Extension Provides Example

Concrete function names run_resumable_job() -> myapp_populate_cache()

Application-specific env vars MY_APP_MIN_THRESHOLD , MY_APP_CH_HOSTS

Memory profiles per job type "250 dbps peaks at 5 GB, use MemoryMax=8G"

Database-specific audit queries SELECT ... FROM mydb.mytable ... countIf(x < 0)

Issue provenance tracking "Checkpoint race: GH-84"

Host-specific configuration "bigblack: 32 cores, 61 GB, groups p1/p2/p3/p4"

Two-layer invocation pattern: When this skill is triggered, also check for and invoke any local *-job-safety skill in the project's .claude/skills/ directory for project-specific configuration.

devops-tools:distributed-job-safety (universal patterns - this skill)

  • .claude/skills/myproject-job-safety (project-specific config) = Complete operational knowledge

SOTA Alternative: Temporal for Durable Workflows

For structured, repeatable job pipelines, Temporal provides built-in enforcement of many invariants in this skill:

This Skill's Invariant Temporal Equivalent

INV-2 (Verify before mutate) Workflow ID uniqueness — duplicate starts rejected

INV-3 (Idempotent operations) Activity retry with non_retryable_error_types

INV-6 (Maximize parallelism safely) max_concurrent_activities per worker

INV-8 (Stable identifiers) Workflow IDs are user-defined and permanent

When to consider Temporal: When your pipeline has well-defined activities (not ad-hoc shell commands), needs dedup/idempotency guarantees, or when the overhead of pueue guardrails (autoscaler agents, manual retry classification) exceeds the overhead of running a Temporal server.

Install: pip install temporalio (Python SDK), brew install temporal (CLI + dev server).

Lesson from 2026-03-04 incident: 5 autonomous Claude Code agents monitoring 60 pueue jobs created ~12,800 runaway tasks because pueue's restart creates new tasks (not in-place), agents had no mutation budgets, and persistent failures were blindly retried. Temporal prevents all three failure modes natively.

References

  • Anti-Patterns -- 17 production failure patterns (AP-1 through AP-17)

  • Concurrency Invariants -- Formal invariant specifications (INV-1 through INV-9)

  • Deployment Checklist -- Step-by-step remote deployment protocol

  • Environment Gotchas -- Host-specific pitfalls (G-1 through G-17)

  • Stack Architecture -- Mise + Pueue + systemd-run layer diagram

  • Autoscaler -- Dynamic parallelism tuning patterns

  • Cross-reference: devops-tools:pueue-job-orchestration -- Pueue basics, dependency chaining, installation

  • SOTA Alternative: Temporal -- Durable workflow orchestration with built-in dedup and retry

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

pandoc-pdf-generation

No summary provided by upstream source.

Repository SourceNeeds Review
General

mql5-indicator-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

mise-tasks

No summary provided by upstream source.

Repository SourceNeeds Review
General

semantic-release

No summary provided by upstream source.

Repository SourceNeeds Review