Parallel File Processor
Version: 1.1.0 Category: Development Last Updated: 2026-01-02
Process multiple files concurrently with intelligent batching, progress tracking, and result aggregation for significant performance improvements.
Quick Start
from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import pandas as pd
def process_csv(file_path: Path) -> dict: """Process a single CSV file.""" df = pd.read_csv(file_path) return {'file': file_path.name, 'rows': len(df), 'columns': len(df.columns)}
Get all CSV files
files = list(Path('data/raw/').glob('*.csv'))
Process in parallel
results = [] with ThreadPoolExecutor(max_workers=8) as executor: futures = {executor.submit(process_csv, f): f for f in files} for future in as_completed(futures): results.append(future.result())
print(f"Processed {len(results)} files")
When to Use
-
Processing large numbers of files (100+ files)
-
Batch operations on directory contents
-
Extracting data from multiple ZIP archives
-
Aggregating results from parallel operations
-
CPU-bound file transformations
-
IO-bound file operations with proper concurrency
Core Pattern
Directory Scan -> Filter -> Batch -> Parallel Process -> Aggregate -> Output
Implementation
Core Components
from dataclasses import dataclass, field from pathlib import Path from typing import ( List, Dict, Any, Callable, Optional, Generator, TypeVar, Generic ) from enum import Enum import logging
logger = logging.getLogger(name)
T = TypeVar('T') R = TypeVar('R')
class ProcessingMode(Enum): """Processing execution mode.""" SEQUENTIAL = "sequential" THREAD_POOL = "thread_pool" PROCESS_POOL = "process_pool" ASYNC = "async"
@dataclass class FileInfo: """File metadata container.""" path: Path size_bytes: int modified_time: float extension: str relative_path: Optional[str] = None
@classmethod
def from_path(cls, path: Path, base_path: Path = None) -> 'FileInfo':
"""Create FileInfo from path."""
stat = path.stat()
relative = str(path.relative_to(base_path)) if base_path else None
return cls(
path=path,
size_bytes=stat.st_size,
modified_time=stat.st_mtime,
extension=path.suffix.lower(),
relative_path=relative
)
@dataclass class ProcessingResult(Generic[T]): """Result of processing a single file.""" file_info: FileInfo success: bool result: Optional[T] = None error: Optional[str] = None duration_seconds: float = 0.0
@dataclass class BatchResult(Generic[T]): """Aggregated results from batch processing.""" total_files: int = 0 successful: int = 0 failed: int = 0 results: List[ProcessingResult[T]] = field(default_factory=list) total_duration_seconds: float = 0.0 errors: List[str] = field(default_factory=list)
@property
def success_rate(self) -> float:
"""Calculate success rate as percentage."""
if self.total_files == 0:
return 100.0
return (self.successful / self.total_files) * 100
def successful_results(self) -> List[T]:
"""Get list of successful results only."""
return [r.result for r in self.results if r.success and r.result is not None]
File Scanner
import fnmatch from typing import List, Optional, Set, Generator from pathlib import Path
class FileScanner: """ Scan directories for files matching patterns.
Supports glob patterns, extension filtering, and size limits.
"""
def __init__(self,
include_patterns: List[str] = None,
exclude_patterns: List[str] = None,
extensions: Set[str] = None,
min_size: int = 0,
max_size: int = None,
recursive: bool = True):
"""
Initialize file scanner.
Args:
include_patterns: Glob patterns to include (e.g., ['*.csv', '*.xlsx'])
exclude_patterns: Glob patterns to exclude (e.g., ['*_backup*'])
extensions: File extensions to include (e.g., {'.csv', '.xlsx'})
min_size: Minimum file size in bytes
max_size: Maximum file size in bytes
recursive: Scan subdirectories
"""
self.include_patterns = include_patterns or ['*']
self.exclude_patterns = exclude_patterns or []
self.extensions = extensions
self.min_size = min_size
self.max_size = max_size
self.recursive = recursive
def scan(self, directory: Path) -> Generator[FileInfo, None, None]:
"""
Scan directory and yield matching files.
Args:
directory: Directory to scan
Yields:
FileInfo for each matching file
"""
directory = Path(directory)
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory}")
if not directory.is_dir():
raise ValueError(f"Not a directory: {directory}")
# Choose iteration method
if self.recursive:
files = directory.rglob('*')
else:
files = directory.glob('*')
for path in files:
if path.is_file() and self._matches(path):
try:
yield FileInfo.from_path(path, directory)
except Exception as e:
logger.warning(f"Could not get info for {path}: {e}")
def _matches(self, path: Path) -> bool:
"""Check if file matches all criteria."""
name = path.name
# Check include patterns
if not any(fnmatch.fnmatch(name, p) for p in self.include_patterns):
return False
# Check exclude patterns
if any(fnmatch.fnmatch(name, p) for p in self.exclude_patterns):
return False
# Check extension
if self.extensions and path.suffix.lower() not in self.extensions:
return False
# Check size
try:
size = path.stat().st_size
if size < self.min_size:
return False
if self.max_size and size > self.max_size:
return False
except OSError:
return False
return True
def count(self, directory: Path) -> int:
"""Count matching files without loading all info."""
return sum(1 for _ in self.scan(directory))
def list_files(self, directory: Path) -> List[FileInfo]:
"""Get all matching files as list."""
return list(self.scan(directory))
Parallel Processor
import time from concurrent.futures import ( ThreadPoolExecutor, ProcessPoolExecutor, as_completed, Future ) from typing import Callable, TypeVar, Generic, List import asyncio from functools import partial
T = TypeVar('T') R = TypeVar('R')
class ParallelProcessor(Generic[T, R]): """ Process items in parallel with configurable execution modes. """
def __init__(self,
processor: Callable[[T], R],
mode: ProcessingMode = ProcessingMode.THREAD_POOL,
max_workers: int = None,
batch_size: int = None,
timeout: float = None):
"""
Initialize parallel processor.
Args:
processor: Function to process each item
mode: Processing mode (thread, process, async)
max_workers: Maximum concurrent workers
batch_size: Items per batch (for memory management)
timeout: Timeout per item in seconds
"""
self.processor = processor
self.mode = mode
self.max_workers = max_workers or self._default_workers()
self.batch_size = batch_size or 100
self.timeout = timeout
self._progress_callback: Optional[Callable[[int, int], None]] = None
def _default_workers(self) -> int:
"""Get default worker count based on mode."""
import os
cpu_count = os.cpu_count() or 4
if self.mode == ProcessingMode.PROCESS_POOL:
return cpu_count
elif self.mode == ProcessingMode.THREAD_POOL:
return cpu_count * 2 # IO-bound benefits from more threads
else:
return cpu_count
def on_progress(self, callback: Callable[[int, int], None]):
"""Set progress callback: callback(completed, total)."""
self._progress_callback = callback
return self
def process(self, items: List[T]) -> BatchResult[R]:
"""
Process all items and return aggregated results.
Args:
items: Items to process
Returns:
BatchResult with all results
"""
start_time = time.time()
total = len(items)
if self.mode == ProcessingMode.SEQUENTIAL:
result = self._process_sequential(items)
elif self.mode == ProcessingMode.THREAD_POOL:
result = self._process_threaded(items)
elif self.mode == ProcessingMode.PROCESS_POOL:
result = self._process_multiprocess(items)
elif self.mode == ProcessingMode.ASYNC:
result = asyncio.run(self._process_async(items))
else:
raise ValueError(f"Unknown mode: {self.mode}")
result.total_duration_seconds = time.time() - start_time
return result
def _process_threaded(self, items: List[T]) -> BatchResult[R]:
"""Process items using thread pool."""
result = BatchResult(total_files=len(items))
completed = 0
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_item = {
executor.submit(self._process_single, item): item
for item in items
}
# Collect results as they complete
for future in as_completed(future_to_item):
proc_result = future.result()
result.results.append(proc_result)
if proc_result.success:
result.successful += 1
else:
result.failed += 1
if proc_result.error:
result.errors.append(proc_result.error)
completed += 1
if self._progress_callback:
self._progress_callback(completed, len(items))
return result
def _process_single(self, item: T) -> ProcessingResult[R]:
"""Process a single item with error handling."""
start_time = time.time()
# Create FileInfo if item is a Path or FileInfo
if isinstance(item, Path):
file_info = FileInfo.from_path(item)
elif isinstance(item, FileInfo):
file_info = item
else:
# Create dummy FileInfo for non-file items
file_info = FileInfo(
path=Path(""),
size_bytes=0,
modified_time=0,
extension=""
)
try:
result = self.processor(item)
return ProcessingResult(
file_info=file_info,
success=True,
result=result,
duration_seconds=time.time() - start_time
)
except Exception as e:
return ProcessingResult(
file_info=file_info,
success=False,
error=str(e),
duration_seconds=time.time() - start_time
)
File Processor
class FileProcessor: """ High-level file processing with parallel execution.
Combines scanning, filtering, and parallel processing.
"""
def __init__(self,
scanner: FileScanner = None,
mode: ProcessingMode = ProcessingMode.THREAD_POOL,
max_workers: int = None):
self.scanner = scanner or FileScanner()
self.mode = mode
self.max_workers = max_workers
def process_directory(self,
directory: Path,
processor: Callable[[FileInfo], Any],
progress_callback: Callable[[int, int], None] = None
) -> BatchResult:
"""Process all matching files in a directory."""
files = self.scanner.list_files(directory)
logger.info(f"Found {len(files)} files to process")
if not files:
return BatchResult()
parallel = ParallelProcessor(
processor=processor,
mode=self.mode,
max_workers=self.max_workers
)
if progress_callback:
parallel.on_progress(progress_callback)
return parallel.process(files)
def aggregate_csv(self,
directory: Path,
output_path: Path = None,
**read_kwargs) -> pd.DataFrame:
"""Read and aggregate all CSV files in directory."""
self.scanner = FileScanner(extensions={'.csv'})
def read_csv(file_info: FileInfo) -> pd.DataFrame:
df = pd.read_csv(file_info.path, **read_kwargs)
df['_source_file'] = file_info.path.name
return df
result = self.process_directory(directory, read_csv)
dfs = result.successful_results()
if not dfs:
return pd.DataFrame()
combined = pd.concat(dfs, ignore_index=True)
if output_path:
combined.to_csv(output_path, index=False)
return combined
def extract_all_zips(self,
directory: Path,
output_directory: Path
) -> BatchResult:
"""Extract all ZIP files in directory."""
import zipfile
self.scanner = FileScanner(extensions={'.zip'})
output_directory.mkdir(parents=True, exist_ok=True)
def extract_zip(file_info: FileInfo) -> Dict:
extract_dir = output_directory / file_info.path.stem
extract_dir.mkdir(exist_ok=True)
with zipfile.ZipFile(file_info.path, 'r') as zf:
zf.extractall(extract_dir)
return {
'source': str(file_info.path),
'destination': str(extract_dir),
'files_extracted': len(zf.namelist())
}
return self.process_directory(directory, extract_zip)
Progress Tracking
from datetime import datetime, timedelta import sys
class ProgressTracker: """Track and display processing progress."""
def __init__(self,
total: int,
description: str = "Processing",
show_eta: bool = True,
bar_width: int = 40):
self.total = total
self.description = description
self.show_eta = show_eta
self.bar_width = bar_width
self.completed = 0
self.start_time: Optional[datetime] = None
def start(self):
"""Start tracking."""
self.start_time = datetime.now()
self.completed = 0
self._display()
def update(self, completed: int, total: int):
"""Update progress."""
self.completed = completed
self.total = total
self._display()
def _display(self):
"""Display progress bar."""
if self.total == 0:
return
pct = self.completed / self.total
filled = int(self.bar_width * pct)
bar = '#' * filled + '-' * (self.bar_width - filled)
# Calculate ETA
eta_str = ""
if self.show_eta and self.start_time and self.completed > 0:
elapsed = (datetime.now() - self.start_time).total_seconds()
rate = self.completed / elapsed
remaining = (self.total - self.completed) / rate if rate > 0 else 0
eta_str = f" ETA: {timedelta(seconds=int(remaining))}"
line = (f"\r{self.description}: |{bar}| "
f"{self.completed}/{self.total} ({pct*100:.1f}%){eta_str}")
sys.stdout.write(line)
sys.stdout.flush()
if self.completed == self.total:
print()
def finish(self):
"""Mark processing as complete."""
self.completed = self.total
self._display()
Result Aggregator
import json
class ResultAggregator: """Aggregate and export batch processing results."""
def __init__(self, batch_result: BatchResult):
self.batch_result = batch_result
def to_dataframe(self) -> pd.DataFrame:
"""Convert results to DataFrame."""
data = []
for r in self.batch_result.results:
row = {
'file_path': str(r.file_info.path),
'file_name': r.file_info.path.name,
'file_size': r.file_info.size_bytes,
'success': r.success,
'duration_seconds': r.duration_seconds,
'error': r.error
}
if r.success and isinstance(r.result, dict):
for k, v in r.result.items():
if not k.startswith('_'):
row[f'result_{k}'] = v
data.append(row)
return pd.DataFrame(data)
def summary(self) -> Dict[str, Any]:
"""Generate summary statistics."""
return {
'total_files': self.batch_result.total_files,
'successful': self.batch_result.successful,
'failed': self.batch_result.failed,
'success_rate_pct': self.batch_result.success_rate,
'total_duration_seconds': self.batch_result.total_duration_seconds,
'avg_duration_seconds': (
self.batch_result.total_duration_seconds /
self.batch_result.total_files
if self.batch_result.total_files > 0 else 0
),
'errors': self.batch_result.errors[:10]
}
def export_csv(self, path: Path):
"""Export results to CSV."""
df = self.to_dataframe()
df.to_csv(path, index=False)
def export_json(self, path: Path):
"""Export summary to JSON."""
summary = self.summary()
with open(path, 'w') as f:
json.dump(summary, f, indent=2)
def combine_dataframes(self) -> pd.DataFrame:
"""Combine results that are DataFrames."""
dfs = [r for r in self.batch_result.successful_results()
if isinstance(r, pd.DataFrame)]
if not dfs:
return pd.DataFrame()
return pd.concat(dfs, ignore_index=True)
YAML Configuration
Basic Configuration
config/parallel_processing.yaml
scan: directory: "data/raw/" recursive: true
include_patterns: - ".csv" - ".xlsx"
exclude_patterns: - "_backup" - "~$*"
extensions: - ".csv" - ".xlsx"
size_limits: min_bytes: 100 max_bytes: 104857600 # 100MB
processing: mode: thread_pool # sequential, thread_pool, process_pool, async max_workers: 8 batch_size: 100 timeout_seconds: 30
output: results_csv: "data/results/processing_results.csv" summary_json: "data/results/summary.json" combined_output: "data/processed/combined.csv"
progress: enabled: true show_eta: true
Usage Examples
Example 1: Process CSV Files
from parallel_file_processor import ( FileScanner, FileProcessor, ProcessingMode, ProgressTracker, ResultAggregator ) from pathlib import Path import pandas as pd
Define processing function
def process_csv(file_info): """Extract statistics from CSV file.""" df = pd.read_csv(file_info.path) return { 'rows': len(df), 'columns': len(df.columns), 'memory_mb': df.memory_usage(deep=True).sum() / 1e6, 'numeric_columns': len(df.select_dtypes(include='number').columns) }
Setup scanner and processor
scanner = FileScanner(extensions={'.csv'}) processor = FileProcessor( scanner=scanner, mode=ProcessingMode.THREAD_POOL, max_workers=8 )
Create progress tracker
tracker = ProgressTracker(0, "Processing CSVs") tracker.start()
Process with progress
result = processor.process_directory( Path("data/raw/"), process_csv, progress_callback=tracker.update )
tracker.finish()
Aggregate results
aggregator = ResultAggregator(result) print(f"\nSummary: {aggregator.summary()}") aggregator.export_csv(Path("data/results/csv_stats.csv"))
Example 2: Parallel ZIP Extraction
Extract all ZIPs in parallel
processor = FileProcessor(mode=ProcessingMode.THREAD_POOL)
result = processor.extract_all_zips( directory=Path("data/archives/"), output_directory=Path("data/extracted/") )
print(f"Extracted {result.successful} ZIP files") print(f"Failed: {result.failed}")
Get extraction details
aggregator = ResultAggregator(result) df = aggregator.to_dataframe() total_files = df['result_files_extracted'].sum() print(f"Total files extracted: {total_files}")
Example 3: Aggregate Data from Multiple Sources
Aggregate CSV files with custom processing
def load_and_clean(file_info): """Load CSV and perform basic cleaning.""" df = pd.read_csv(file_info.path)
# Clean column names
df.columns = [c.lower().strip().replace(' ', '_') for c in df.columns]
# Add metadata
df['_source'] = file_info.path.name
df['_loaded_at'] = pd.Timestamp.now()
return df
processor = FileProcessor( scanner=FileScanner(extensions={'.csv'}), mode=ProcessingMode.THREAD_POOL )
result = processor.process_directory( Path("data/monthly_reports/"), load_and_clean )
Combine all DataFrames
aggregator = ResultAggregator(result) combined_df = aggregator.combine_dataframes()
print(f"Combined {len(combined_df)} rows from {result.successful} files") combined_df.to_csv("data/combined_reports.csv", index=False)
Example 4: Custom Batch Processing
from parallel_file_processor import ParallelProcessor, ProcessingMode
Process list of items (not files)
items = list(range(1000))
def heavy_computation(item): """CPU-intensive calculation.""" import math result = sum(math.sin(i * item) for i in range(10000)) return {'item': item, 'result': result}
Use process pool for CPU-bound work
processor = ParallelProcessor( processor=heavy_computation, mode=ProcessingMode.PROCESS_POOL, max_workers=4 )
Track progress
def show_progress(completed, total): pct = (completed / total) * 100 print(f"\rProgress: {pct:.1f}%", end='', flush=True)
processor.on_progress(show_progress)
result = processor.process(items) print(f"\nCompleted {result.successful}/{result.total_files} items")
Performance Tips
Mode Selection
Workload Type Recommended Mode Reason
File I/O THREAD_POOL
IO-bound, threads avoid GIL issues
Data parsing THREAD_POOL
Pandas releases GIL during IO
CPU computation PROCESS_POOL
Bypasses GIL for true parallelism
Network requests ASYNC
Best for many concurrent connections
Simple operations SEQUENTIAL
Overhead may exceed benefit
Worker Count
import os
IO-bound (reading files, network)
io_workers = os.cpu_count() * 2
CPU-bound (heavy computation)
cpu_workers = os.cpu_count()
Memory-constrained (large files)
memory_workers = max(2, os.cpu_count() // 2)
Batch Size
-
Small files (<1MB): Large batches (500-1000)
-
Medium files (1-100MB): Medium batches (50-100)
-
Large files (>100MB): Small batches (10-20) or one at a time
Best Practices
Do
-
Choose correct processing mode for workload type
-
Use progress callbacks for long operations
-
Batch large file sets to manage memory
-
Log individual failures for debugging
-
Consider retry logic for transient errors
-
Monitor memory usage with large DataFrames
Don't
-
Use process pool for IO-bound tasks
-
Skip error handling in processor functions
-
Load all results into memory at once
-
Ignore batch result statistics
-
Use too many workers for memory-constrained tasks
Error Handling
Common Errors
Error Cause Solution
MemoryError
Too many files loaded Use batching or streaming
PermissionError
File access denied Check file permissions
TimeoutError
Processing too slow Increase timeout or optimize
OSError
Too many open files Reduce max_workers
Error Template
def safe_process_directory(directory: Path, processor: Callable) -> dict: """Process directory with comprehensive error handling.""" try: if not directory.exists(): return {'status': 'error', 'message': 'Directory not found'}
file_processor = FileProcessor()
result = file_processor.process_directory(directory, processor)
if result.failed > 0:
return {
'status': 'partial',
'successful': result.successful,
'failed': result.failed,
'errors': result.errors[:10]
}
return {'status': 'success', 'processed': result.successful}
except Exception as e:
return {'status': 'error', 'message': str(e)}
Execution Checklist
-
Processing mode matches workload type
-
Worker count appropriate for resources
-
Batch size prevents memory issues
-
Progress callback configured for feedback
-
Error handling in processor function
-
Results aggregated and exported
-
Summary statistics reviewed
-
Failed files identified and logged
Metrics
Metric Target Description
Throughput 2-3x sequential Parallel speedup factor
Success Rate
99% Percentage of files processed
Memory Usage <4GB Peak memory consumption
Error Rate <1% Processing failures
Related Skills
-
data-pipeline-processor - Data transformation
-
yaml-workflow-executor - Workflow automation
-
engineering-report-generator - Report generation
Version History
-
1.1.0 (2026-01-02): Upgraded to SKILL_TEMPLATE_v2 format with Quick Start, Error Handling, Metrics, Execution Checklist, additional examples
-
1.0.0 (2024-10-15): Initial release with FileScanner, ParallelProcessor, progress tracking, result aggregation