Distributed Job Safety
Patterns and anti-patterns for concurrent job management with pueue + mise + systemd-run, learned from production failures in distributed data pipeline orchestration.
Scope: Universal principles for any pueue + mise workflow with concurrent parameterized jobs. Examples use illustrative names but the principles apply to any domain.
Prerequisite skills: devops-tools:pueue-job-orchestration , itp:mise-tasks , itp:mise-configuration
The Nine Invariants
Non-negotiable rules for concurrent job safety. Violating any one causes silent data corruption or job failure.
Full formal specifications: references/concurrency-invariants.md
- Filename Uniqueness by ALL Job Parameters
Every file path shared between concurrent jobs MUST include ALL parameters that differentiate those jobs.
WRONG: {symbol}{start}{end}.json # Two thresholds collide RIGHT: {symbol}{threshold}{start}_{end}.json # Each job gets its own file
Test: If two pueue jobs can run simultaneously with different parameter values, those values MUST appear in every shared filename, temp directory, and lock file.
- Verify Before Mutate (No Blind Queueing)
Before queueing jobs, check what is already running. Before deleting state, check who owns it.
WRONG: Blind queue
for item in "${ITEMS[@]}"; do pueue add --group mygroup -- run_job "$item" "$param" done
RIGHT: Check first
running=$(pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running") | .label] | join(",")') if echo "$running" | grep -q "${item}@${param}"; then echo "SKIP: ${item}@${param} already running" continue fi
- Idempotent File Operations (missing_ok=True)
All file deletion in concurrent contexts MUST tolerate the file already being gone.
WRONG: TOCTOU race
if path.exists(): path.unlink() # Crashes if another job deleted between check and unlink
RIGHT: Idempotent
path.unlink(missing_ok=True)
- Atomic Writes for Shared State
Checkpoint files must never be partially written. Use the tempfile-fsync-rename pattern.
fd, temp_path = tempfile.mkstemp(dir=path.parent, prefix=".ckpt_", suffix=".tmp") with os.fdopen(fd, "w") as f: f.write(json.dumps(data)) f.flush() os.fsync(f.fileno()) os.replace(temp_path, path) # POSIX atomic rename
Bash equivalent (for NDJSON telemetry appends):
Atomic multi-line append via flock + temp file
TMPOUT=$(mktemp)
... write lines to $TMPOUT ...
flock "${LOG_FILE}.lock" bash -c "cat '${TMPOUT}' >> '${LOG_FILE}'" rm -f "$TMPOUT"
- Config File Is SSoT
The .mise.toml [env] section is the single source of truth for environment defaults. Per-job env overrides bypass the SSoT and allow arbitrary values with no review gate.
WRONG: Per-job override bypasses mise SSoT
pueue add -- env MY_APP_MIN_THRESHOLD=50 uv run python script.py
RIGHT: Set the correct value in .mise.toml, no per-job override needed
pueue add -- uv run python script.py
Controlled exception: pueue env set <id> KEY VALUE is acceptable for one-off overrides on stashed/queued tasks (e.g., hyperparameter sweeps). The key distinction: mise [env] is SSoT for defaults that apply to all runs; pueue env set is for one-time parameterization of a specific task without modifying the config file. See devops-tools:pueue-job-orchestration Per-Task Environment Override section.
- Maximize Parallelism Within Safe Margins
Always probe host resources and scale parallelism to use available capacity. Conservative defaults waste hours of idle compute.
Probe host resources
ssh host 'nproc && free -h && uptime'
Sizing formula (leave 20% margin for OS + DB + overhead)
max_jobs = min(
(available_memory_gb * 0.8) / per_job_memory_gb,
(total_cores * 0.8) / per_job_cpu_cores
)
For ClickHouse workloads: The bottleneck is often ClickHouse's concurrent_threads_soft_limit (default: 2 x nproc), not pueue's parallelism. Each query requests max_threads threads (default: nproc). Right-size --max_threads per query to match the effective thread count (soft_limit / pueue_slots), then increase pueue slots. Pueue parallelism can be adjusted live without restarting running jobs.
Post-bump monitoring (mandatory for 5 minutes after any parallelism change):
-
uptime -- load average should stay below 0.9 x nproc
-
vmstat 1 5 -- si/so columns must remain 0 (no active swapping)
-
ClickHouse errors: SELECT count() FROM system.query_log WHERE event_time > now() - INTERVAL 5 MINUTE AND type = 'ExceptionWhileProcessing' -- must be 0
Cross-reference: See devops-tools:pueue-job-orchestration ClickHouse Parallelism Tuning section for the full decision matrix.
- Per-Job Memory Caps via systemd-run
On Linux with cgroups v2, wrap each job with systemd-run to enforce hard memory limits.
systemd-run --user --scope -p MemoryMax=8G -p MemorySwapMax=0
uv run python scripts/process.py --symbol BTCUSDT --threshold 250
Critical: MemorySwapMax=0 is mandatory. Without it, the process escapes into swap and the memory limit is effectively meaningless.
- Monitor by Stable Identifiers, Not Ephemeral IDs (INV-8)
Pueue job IDs are ephemeral -- they shift when jobs are removed, re-queued, or split. Use group names and label patterns for monitoring.
WRONG: Hardcoded job IDs
if pueue status --json | jq -e ".tasks."14"" >/dev/null; then ...
RIGHT: Query by group/label
pueue status --json | jq -r '.tasks | to_entries[] | select(.value.group == "mygroup") | .value.id'
Full specification: references/concurrency-invariants.md
- Derived Artifact Filenames Must Include ALL Category Dimensions (INV-9)
When concurrent or sequential pipeline phases produce derived artifacts (Parquet chunks, JSONL summaries, temp files) that share a directory, every filename must include ALL discriminating dimensions -- not just the job-level parameters (INV-1), but also pipeline-level categories like direction, strategy, or generation.
WRONG: chunk{formation}{symbol}{threshold}.parquet # No direction -- LONG glob eats SHORT files RIGHT: chunk{direction}{formation}{symbol}_{threshold}.parquet # Direction-scoped
Glob scope rule: Cleanup and merge globs must match the filename pattern exactly:
WRONG: Unscoped glob -- consumes artifacts from other categories
chunk_files = folds_dir.glob("chunk*.parquet")
RIGHT: Category-scoped glob -- only touches this category's artifacts
chunk_files = folds_dir.glob(f"chunk{direction}_*.parquet")
Post-merge validation: After merging artifacts, assert expected values in category columns:
merged_df = pl.concat([pl.read_parquet(p) for p in chunk_files]) assert set(merged_df["strategy"].unique()) == {"standard"}, "Direction contamination!"
Relationship to INV-1: INV-1 ensures checkpoint file uniqueness by job parameters (runtime isolation). INV-9 extends this to derived artifacts that persist across pipeline phases (artifact isolation). Both prevent the same class of bug -- silent cross-contamination from filename collisions.
Full specification: references/concurrency-invariants.md
Anti-Patterns (Learned from Production)
17 anti-patterns documented from production failures. Full details with code examples: references/anti-patterns.md
AP Name Key Symptom Related Invariant
AP-1 Redeploying without checking running Checkpoint collisions after kill+requeue INV-2
AP-2 Checkpoint filename missing parameters FileNotFoundError on checkpoint delete INV-1
AP-3 Trusting pueue restart logs Old error appears after restart
AP-4 Assuming PyPI propagation is instant "no version found" after publish
AP-5 Editable source vs. installed wheel uv run uses old code after pip upgrade
AP-6 Sequential phase assumption Phase contention from simultaneous queueing
AP-7 Manual post-processing steps "run optimize after they finish" never happens
AP-8 Hardcoded job IDs in monitors Monitor crashes after job re-queue INV-8
AP-9 Sequential when epochs enable parallel 1,700 hours single-threaded on 25+ cores INV-6
AP-10 State file bloat Silent 60x slowdown in job submission
AP-11 Wrong working directory in remote jobs [Errno 2] No such file or directory
--
AP-12 Per-file SSH for bulk submission 300K jobs takes days (SSH overhead)
AP-13 SIGPIPE under set -euo pipefail
Exit code 141 on harmless pipe ops
AP-14 False data loss from variable NDJSON wc -l shows 3-6% fewer lines
AP-15 Cursor file deletion on completion Full re-run instead of incremental resume
AP-16 mise [env] for pueue/cron secrets Empty env vars in daemon jobs INV-5
AP-17 Unscoped glob across pipeline phases Phase A consumes Phase B's artifacts INV-9
The Mise + Pueue + systemd-run Stack
Full architecture diagram and responsibility boundaries: references/stack-architecture.md
Layer Responsibility
mise Environment variables, tool versions, task discovery
pueue Daemon persistence, parallelism limits, restart, --after
systemd-run Per-job cgroup memory caps (Linux only, no-op on macOS)
autoscaler Dynamic parallelism tuning based on host resources
Python/app Domain logic, checkpoint management, data integrity
Remote Deployment Protocol
When deploying a fix to a running host:
- AUDIT: ssh host 'pueue status --json' -> count running/queued/failed
- DECIDE: Wait for running jobs? Kill? Let them finish with old code?
- PULL: ssh host 'cd ~/project && git fetch origin main && git reset --hard origin/main'
- VERIFY: ssh host 'cd ~/project && python -c "import pkg; print(pkg.version)"'
- UPGRADE: ssh host 'cd ~/project && uv pip install --python .venv/bin/python --refresh pkg==X.Y.Z'
- RESTART: ssh host 'pueue restart <failed_id>' OR add fresh jobs
- MONITOR: ssh host 'pueue status --group mygroup'
Critical: Step 1 (AUDIT) is mandatory. Skipping it is the root cause of cascade failures.
See: references/deployment-checklist.md for full protocol.
Concurrency Safety Decision Tree
Adding a new parameter to a resumable job function? |-- Is it job-differentiating (two jobs can have different values)? | |-- YES -> Add to checkpoint filename | | Add to pueue job label | | Add to remote checkpoint key | |-- NO -> Skip (e.g., verbose, notify are per-run, not per-job) | |-- Does the function delete files? | |-- YES -> Use missing_ok=True | | Use atomic write for creates | |-- NO -> Standard operation | |-- Does the function write to shared storage? |-- YES -> Force deduplication after write | Use UPSERT semantics where possible |-- NO -> Standard operation
Autoscaler
Dynamic parallelism tuning for pueue groups based on host CPU and memory. Full details: references/autoscaler.md
CPU < 40% AND MEM < 60% -> SCALE UP (+1 per group) CPU > 80% OR MEM > 80% -> SCALE DOWN (-1 per group) Otherwise -> HOLD
Key principle: Ramp up incrementally (not to max). Job memory grows over time -- jumping to max parallelism risks OOM when all jobs peak simultaneously.
Project-Specific Extensions
This skill provides universal patterns that apply to any distributed job pipeline. Projects should create a local extension skill (e.g., myproject-job-safety ) in their .claude/skills/ directory that provides:
Local Extension Provides Example
Concrete function names run_resumable_job() -> myapp_populate_cache()
Application-specific env vars MY_APP_MIN_THRESHOLD , MY_APP_CH_HOSTS
Memory profiles per job type "250 dbps peaks at 5 GB, use MemoryMax=8G"
Database-specific audit queries SELECT ... FROM mydb.mytable ... countIf(x < 0)
Issue provenance tracking "Checkpoint race: GH-84"
Host-specific configuration "bigblack: 32 cores, 61 GB, groups p1/p2/p3/p4"
Two-layer invocation pattern: When this skill is triggered, also check for and invoke any local *-job-safety skill in the project's .claude/skills/ directory for project-specific configuration.
devops-tools:distributed-job-safety (universal patterns - this skill)
- .claude/skills/myproject-job-safety (project-specific config) = Complete operational knowledge
SOTA Alternative: Temporal for Durable Workflows
For structured, repeatable job pipelines, Temporal provides built-in enforcement of many invariants in this skill:
This Skill's Invariant Temporal Equivalent
INV-2 (Verify before mutate) Workflow ID uniqueness — duplicate starts rejected
INV-3 (Idempotent operations) Activity retry with non_retryable_error_types
INV-6 (Maximize parallelism safely) max_concurrent_activities per worker
INV-8 (Stable identifiers) Workflow IDs are user-defined and permanent
When to consider Temporal: When your pipeline has well-defined activities (not ad-hoc shell commands), needs dedup/idempotency guarantees, or when the overhead of pueue guardrails (autoscaler agents, manual retry classification) exceeds the overhead of running a Temporal server.
Install: pip install temporalio (Python SDK), brew install temporal (CLI + dev server).
Lesson from 2026-03-04 incident: 5 autonomous Claude Code agents monitoring 60 pueue jobs created ~12,800 runaway tasks because pueue's restart creates new tasks (not in-place), agents had no mutation budgets, and persistent failures were blindly retried. Temporal prevents all three failure modes natively.
References
-
Anti-Patterns -- 17 production failure patterns (AP-1 through AP-17)
-
Concurrency Invariants -- Formal invariant specifications (INV-1 through INV-9)
-
Deployment Checklist -- Step-by-step remote deployment protocol
-
Environment Gotchas -- Host-specific pitfalls (G-1 through G-17)
-
Stack Architecture -- Mise + Pueue + systemd-run layer diagram
-
Autoscaler -- Dynamic parallelism tuning patterns
-
Cross-reference: devops-tools:pueue-job-orchestration -- Pueue basics, dependency chaining, installation
-
SOTA Alternative: Temporal -- Durable workflow orchestration with built-in dedup and retry