Dask
Overview
Dask is a Python library for parallel and distributed computing that enables three critical capabilities:
-
Larger-than-memory execution on single machines for data exceeding available RAM
-
Parallel processing for improved computational speed across multiple cores
-
Distributed computation supporting terabyte-scale datasets across multiple machines
Dask scales from laptops (processing ~100 GiB) to clusters (processing ~100 TiB) while maintaining familiar Python APIs.
When to Use This Skill
This skill should be used when:
-
Process datasets that exceed available RAM
-
Scale pandas or NumPy operations to larger datasets
-
Parallelize computations for performance improvements
-
Process multiple files efficiently (CSVs, Parquet, JSON, text logs)
-
Build custom parallel workflows with task dependencies
-
Distribute workloads across multiple cores or machines
Core Capabilities
Dask provides five main components, each suited to different use cases:
- DataFrames - Parallel Pandas Operations
Purpose: Scale pandas operations to larger datasets through parallel processing.
When to Use:
-
Tabular data exceeds available RAM
-
Need to process multiple CSV/Parquet files together
-
Pandas operations are slow and need parallelization
-
Scaling from pandas prototype to production
Reference Documentation: For comprehensive guidance on Dask DataFrames, refer to references/dataframes.md which includes:
-
Reading data (single files, multiple files, glob patterns)
-
Common operations (filtering, groupby, joins, aggregations)
-
Custom operations with map_partitions
-
Performance optimization tips
-
Common patterns (ETL, time series, multi-file processing)
Quick Example:
import dask.dataframe as dd
Read multiple files as single DataFrame
ddf = dd.read_csv('data/2024-*.csv')
Operations are lazy until compute()
filtered = ddf[ddf['value'] > 100] result = filtered.groupby('category').mean().compute()
Key Points:
-
Operations are lazy (build task graph) until .compute() called
-
Use map_partitions for efficient custom operations
-
Convert to DataFrame early when working with structured data from other sources
- Arrays - Parallel NumPy Operations
Purpose: Extend NumPy capabilities to datasets larger than memory using blocked algorithms.
When to Use:
-
Arrays exceed available RAM
-
NumPy operations need parallelization
-
Working with scientific datasets (HDF5, Zarr, NetCDF)
-
Need parallel linear algebra or array operations
Reference Documentation: For comprehensive guidance on Dask Arrays, refer to references/arrays.md which includes:
-
Creating arrays (from NumPy, random, from disk)
-
Chunking strategies and optimization
-
Common operations (arithmetic, reductions, linear algebra)
-
Custom operations with map_blocks
-
Integration with HDF5, Zarr, and XArray
Quick Example:
import dask.array as da
Create large array with chunks
x = da.random.random((100000, 100000), chunks=(10000, 10000))
Operations are lazy
y = x + 100 z = y.mean(axis=0)
Compute result
result = z.compute()
Key Points:
-
Chunk size is critical (aim for ~100 MB per chunk)
-
Operations work on chunks in parallel
-
Rechunk data when needed for efficient operations
-
Use map_blocks for operations not available in Dask
- Bags - Parallel Processing of Unstructured Data
Purpose: Process unstructured or semi-structured data (text, JSON, logs) with functional operations.
When to Use:
-
Processing text files, logs, or JSON records
-
Data cleaning and ETL before structured analysis
-
Working with Python objects that don't fit array/dataframe formats
-
Need memory-efficient streaming processing
Reference Documentation: For comprehensive guidance on Dask Bags, refer to references/bags.md which includes:
-
Reading text and JSON files
-
Functional operations (map, filter, fold, groupby)
-
Converting to DataFrames
-
Common patterns (log analysis, JSON processing, text processing)
-
Performance considerations
Quick Example:
import dask.bag as db import json
Read and parse JSON files
bag = db.read_text('logs/*.json').map(json.loads)
Filter and transform
valid = bag.filter(lambda x: x['status'] == 'valid') processed = valid.map(lambda x: {'id': x['id'], 'value': x['value']})
Convert to DataFrame for analysis
ddf = processed.to_dataframe()
Key Points:
-
Use for initial data cleaning, then convert to DataFrame/Array
-
Use foldby instead of groupby for better performance
-
Operations are streaming and memory-efficient
-
Convert to structured formats (DataFrame) for complex operations
- Futures - Task-Based Parallelization
Purpose: Build custom parallel workflows with fine-grained control over task execution and dependencies.
When to Use:
-
Building dynamic, evolving workflows
-
Need immediate task execution (not lazy)
-
Computations depend on runtime conditions
-
Implementing custom parallel algorithms
-
Need stateful computations
Reference Documentation: For comprehensive guidance on Dask Futures, refer to references/futures.md which includes:
-
Setting up distributed client
-
Submitting tasks and working with futures
-
Task dependencies and data movement
-
Advanced coordination (queues, locks, events, actors)
-
Common patterns (parameter sweeps, dynamic tasks, iterative algorithms)
Quick Example:
from dask.distributed import Client
client = Client() # Create local cluster
Submit tasks (executes immediately)
def process(x): return x ** 2
futures = client.map(process, range(100))
Gather results
results = client.gather(futures)
client.close()
Key Points:
-
Requires distributed client (even for single machine)
-
Tasks execute immediately when submitted
-
Pre-scatter large data to avoid repeated transfers
-
~1ms overhead per task (not suitable for millions of tiny tasks)
-
Use actors for stateful workflows
- Schedulers - Execution Backends
Purpose: Control how and where Dask tasks execute (threads, processes, distributed).
When to Choose Scheduler:
-
Threads (default): NumPy/Pandas operations, GIL-releasing libraries, shared memory benefit
-
Processes: Pure Python code, text processing, GIL-bound operations
-
Synchronous: Debugging with pdb, profiling, understanding errors
-
Distributed: Need dashboard, multi-machine clusters, advanced features
Reference Documentation: For comprehensive guidance on Dask Schedulers, refer to references/schedulers.md which includes:
-
Detailed scheduler descriptions and characteristics
-
Configuration methods (global, context manager, per-compute)
-
Performance considerations and overhead
-
Common patterns and troubleshooting
-
Thread configuration for optimal performance
Quick Example:
import dask import dask.dataframe as dd
Use threads for DataFrame (default, good for numeric)
ddf = dd.read_csv('data.csv') result1 = ddf.mean().compute() # Uses threads
Use processes for Python-heavy work
import dask.bag as db bag = db.read_text('logs/*.txt') result2 = bag.map(python_function).compute(scheduler='processes')
Use synchronous for debugging
dask.config.set(scheduler='synchronous') result3 = problematic_computation.compute() # Can use pdb
Use distributed for monitoring and scaling
from dask.distributed import Client client = Client() result4 = computation.compute() # Uses distributed with dashboard
Key Points:
-
Threads: Lowest overhead (~10 µs/task), best for numeric work
-
Processes: Avoids GIL (~10 ms/task), best for Python work
-
Distributed: Monitoring dashboard (~1 ms/task), scales to clusters
-
Can switch schedulers per computation or globally
Best Practices
For comprehensive performance optimization guidance, memory management strategies, and common pitfalls to avoid, refer to references/best-practices.md . Key principles include:
Start with Simpler Solutions
Before using Dask, explore:
-
Better algorithms
-
Efficient file formats (Parquet instead of CSV)
-
Compiled code (Numba, Cython)
-
Data sampling
Critical Performance Rules
- Don't Load Data Locally Then Hand to Dask
Wrong: Loads all data in memory first
import pandas as pd df = pd.read_csv('large.csv') ddf = dd.from_pandas(df, npartitions=10)
Correct: Let Dask handle loading
import dask.dataframe as dd ddf = dd.read_csv('large.csv')
- Avoid Repeated compute() Calls
Wrong: Each compute is separate
for item in items: result = dask_computation(item).compute()
Correct: Single compute for all
computations = [dask_computation(item) for item in items] results = dask.compute(*computations)
- Don't Build Excessively Large Task Graphs
-
Increase chunk sizes if millions of tasks
-
Use map_partitions /map_blocks to fuse operations
-
Check task graph size: len(ddf.dask_graph())
- Choose Appropriate Chunk Sizes
-
Target: ~100 MB per chunk (or 10 chunks per core in worker memory)
-
Too large: Memory overflow
-
Too small: Scheduling overhead
- Use the Dashboard
from dask.distributed import Client client = Client() print(client.dashboard_link) # Monitor performance, identify bottlenecks
Common Workflow Patterns
ETL Pipeline
import dask.dataframe as dd
Extract: Read data
ddf = dd.read_csv('raw_data/*.csv')
Transform: Clean and process
ddf = ddf[ddf['status'] == 'valid'] ddf['amount'] = ddf['amount'].astype('float64') ddf = ddf.dropna(subset=['important_col'])
Load: Aggregate and save
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean']}) summary.to_parquet('output/summary.parquet')
Unstructured to Structured Pipeline
import dask.bag as db import json
Start with Bag for unstructured data
bag = db.read_text('logs/*.json').map(json.loads) bag = bag.filter(lambda x: x['status'] == 'valid')
Convert to DataFrame for structured analysis
ddf = bag.to_dataframe() result = ddf.groupby('category').mean().compute()
Large-Scale Array Computation
import dask.array as da
Load or create large array
x = da.from_zarr('large_dataset.zarr')
Process in chunks
normalized = (x - x.mean()) / x.std()
Save result
da.to_zarr(normalized, 'normalized.zarr')
Custom Parallel Workflow
from dask.distributed import Client
client = Client()
Scatter large dataset once
data = client.scatter(large_dataset)
Process in parallel with dependencies
futures = [] for param in parameters: future = client.submit(process, data, param) futures.append(future)
Gather results
results = client.gather(futures)
Selecting the Right Component
Use this decision guide to choose the appropriate Dask component:
Data Type:
-
Tabular data → DataFrames
-
Numeric arrays → Arrays
-
Text/JSON/logs → Bags (then convert to DataFrame)
-
Custom Python objects → Bags or Futures
Operation Type:
-
Standard pandas operations → DataFrames
-
Standard NumPy operations → Arrays
-
Custom parallel tasks → Futures
-
Text processing/ETL → Bags
Control Level:
-
High-level, automatic → DataFrames/Arrays
-
Low-level, manual → Futures
Workflow Type:
-
Static computation graph → DataFrames/Arrays/Bags
-
Dynamic, evolving → Futures
Integration Considerations
File Formats
-
Efficient: Parquet, HDF5, Zarr (columnar, compressed, parallel-friendly)
-
Compatible but slower: CSV (use for initial ingestion only)
-
For Arrays: HDF5, Zarr, NetCDF
Conversion Between Collections
Bag → DataFrame
ddf = bag.to_dataframe()
DataFrame → Array (for numeric data)
arr = ddf.to_dask_array(lengths=True)
Array → DataFrame
ddf = dd.from_dask_array(arr, columns=['col1', 'col2'])
With Other Libraries
-
XArray: Wraps Dask arrays with labeled dimensions (geospatial, imaging)
-
Dask-ML: Machine learning with scikit-learn compatible APIs
-
Distributed: Advanced cluster management and monitoring
Debugging and Development
Iterative Development Workflow
- Test on small data with synchronous scheduler:
dask.config.set(scheduler='synchronous') result = computation.compute() # Can use pdb, easy debugging
- Validate with threads on sample:
sample = ddf.head(1000) # Small sample
Test logic, then scale to full dataset
- Scale with distributed for monitoring:
from dask.distributed import Client client = Client() print(client.dashboard_link) # Monitor performance result = computation.compute()
Common Issues
Memory Errors:
-
Decrease chunk sizes
-
Use persist() strategically and delete when done
-
Check for memory leaks in custom functions
Slow Start:
-
Task graph too large (increase chunk sizes)
-
Use map_partitions or map_blocks to reduce tasks
Poor Parallelization:
-
Chunks too large (increase number of partitions)
-
Using threads with Python code (switch to processes)
-
Data dependencies preventing parallelism
Reference Files
All reference documentation files can be read as needed for detailed information:
-
references/dataframes.md
-
Complete Dask DataFrame guide
-
references/arrays.md
-
Complete Dask Array guide
-
references/bags.md
-
Complete Dask Bag guide
-
references/futures.md
-
Complete Dask Futures and distributed computing guide
-
references/schedulers.md
-
Complete scheduler selection and configuration guide
-
references/best-practices.md
-
Comprehensive performance optimization and troubleshooting
Load these files when users need detailed information about specific Dask components, operations, or patterns beyond the quick guidance provided here.