Parquet-Coder
Patterns for efficient columnar data storage with Parquet.
Basic Operations
import pandas as pd import pyarrow as pa import pyarrow.parquet as pq
Write with compression
df.to_parquet('data.parquet', compression='snappy', index=False)
Common compression options:
- snappy: Fast, good compression (default)
- gzip: Slower, better compression
- zstd: Best balance of speed/compression
- None: No compression (fastest writes)
Read entire file
df = pd.read_parquet('data.parquet')
Read specific columns only (predicate pushdown)
df = pd.read_parquet('data.parquet', columns=['id', 'name', 'value'])
PyArrow for Large Files
Read as PyArrow Table (more memory efficient)
table = pq.read_table('data.parquet')
Convert to pandas when needed
df = table.to_pandas()
Filter while reading (row group filtering)
table = pq.read_table( 'data.parquet', filters=[ ('date', '>=', '2024-01-01'), ('status', '=', 'active') ] )
Read in batches for huge files
parquet_file = pq.ParquetFile('huge.parquet') for batch in parquet_file.iter_batches(batch_size=100_000): df_batch = batch.to_pandas() process(df_batch)
Partitioned Datasets
Write partitioned by columns
df.to_parquet( 'data/', partition_cols=['year', 'month'], compression='snappy' )
Creates: data/year=2024/month=01/part-0.parquet
Read partitioned dataset
df = pd.read_parquet('data/') # Reads all partitions
Read specific partitions only
df = pd.read_parquet('data/year=2024/')
With PyArrow dataset API (more control)
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet', partitioning='hive')
Filter on partition columns (very fast)
table = dataset.to_table( filter=(ds.field('year') == 2024) & (ds.field('month') >= 6) )
Schema Definition
Explicit schema for consistency
schema = pa.schema([ ('id', pa.int64()), ('name', pa.string()), ('value', pa.float64()), ('date', pa.date32()), ('tags', pa.list_(pa.string())), ('metadata', pa.map_(pa.string(), pa.string())), ])
Write with schema
table = pa.Table.from_pandas(df, schema=schema) pq.write_table(table, 'data.parquet')
Read and validate schema
file_schema = pq.read_schema('data.parquet') assert file_schema.equals(schema), "Schema mismatch!"
Schema Evolution
def merge_schemas(old_schema: pa.Schema, new_schema: pa.Schema) -> pa.Schema: """Create unified schema from old and new.""" fields = {f.name: f for f in old_schema} for field in new_schema: if field.name not in fields: fields[field.name] = field elif fields[field.name].type != field.type: # Handle type conflicts (e.g., promote int to float) fields[field.name] = pa.field( field.name, promote_type(fields[field.name].type, field.type) ) return pa.schema(list(fields.values()))
def append_with_schema_evolution( existing_path: str, new_df: pd.DataFrame, output_path: str ) -> None: """Append data with automatic schema evolution.""" existing = pq.read_table(existing_path) new_table = pa.Table.from_pandas(new_df)
# Unify schemas
unified_schema = pa.unify_schemas([existing.schema, new_table.schema])
# Cast both to unified schema
existing = existing.cast(unified_schema)
new_table = new_table.cast(unified_schema)
# Concatenate and write
combined = pa.concat_tables([existing, new_table])
pq.write_table(combined, output_path)
Row Group Optimization
Control row group size (affects read performance)
pq.write_table( table, 'data.parquet', row_group_size=100_000, # Rows per group compression='snappy' )
Read metadata to see row groups
parquet_file = pq.ParquetFile('data.parquet') print(f"Num row groups: {parquet_file.metadata.num_row_groups}") print(f"Num rows: {parquet_file.metadata.num_rows}")
Read specific row groups
table = parquet_file.read_row_groups([0, 1]) # First two groups
Metadata and Statistics
Add custom metadata
custom_metadata = { b'created_by': b'etl_pipeline', b'version': b'1.0', b'source': b'api_export' } schema = table.schema.with_metadata(custom_metadata) table = table.cast(schema) pq.write_table(table, 'data.parquet')
Read metadata
parquet_file = pq.ParquetFile('data.parquet') print(parquet_file.schema.metadata)
Column statistics (min/max for filtering)
metadata = parquet_file.metadata for i in range(metadata.num_row_groups): rg = metadata.row_group(i) for j in range(rg.num_columns): col = rg.column(j) if col.statistics: print(f"{col.path_in_schema}: min={col.statistics.min}, max={col.statistics.max}")
Delta Lake Integration
from deltalake import DeltaTable, write_deltalake
Write as Delta table (versioned parquet)
write_deltalake('delta_table/', df, mode='overwrite')
Append data
write_deltalake('delta_table/', new_df, mode='append')
Read Delta table
dt = DeltaTable('delta_table/') df = dt.to_pandas()
Time travel
df_old = dt.load_version(0).to_pandas()
Compact small files
dt.optimize.compact()
Vacuum old versions
dt.vacuum(retention_hours=168) # Keep 7 days
Performance Tips
1. Use appropriate column types
schema = pa.schema([ ('category', pa.dictionary(pa.int8(), pa.string())), # For repeated strings ('count', pa.int32()), # Not int64 if values fit ])
2. Sort data before writing (improves predicate pushdown)
df = df.sort_values(['date', 'category']) df.to_parquet('sorted.parquet')
3. Use column selection
df = pd.read_parquet('data.parquet', columns=['needed', 'columns'])
4. Use filters for row pruning
df = pd.read_parquet('data/', filters=[('status', '==', 'active')])
5. Parallel reads
import pyarrow.dataset as ds dataset = ds.dataset('data/', format='parquet') table = dataset.to_table(use_threads=True)
Conversion Utilities
def csv_to_parquet( csv_path: str, parquet_path: str, chunk_size: int = 100_000 ) -> None: """Convert large CSV to Parquet efficiently.""" writer = None
for chunk in pd.read_csv(csv_path, chunksize=chunk_size):
table = pa.Table.from_pandas(chunk)
if writer is None:
writer = pq.ParquetWriter(parquet_path, table.schema)
writer.write_table(table)
writer.close()
def json_to_parquet(json_path: str, parquet_path: str) -> None: """Convert JSON lines to Parquet.""" df = pd.read_json(json_path, lines=True) df.to_parquet(parquet_path, compression='snappy')