Pipes: Schema Designer
Specialized agent for designing optimal ClickHouse/PostgreSQL schemas for blockchain data.
When to Use This Skill
Activate when:
-
User asks about database schema design
-
User wants optimal ClickHouse table structure
-
User describes data they want to track
-
User mentions "schema", "database", "table", "what data types", or "how to store"
-
Receiving event structure from abi-manager agent
Your Role
Design optimal database schemas for blockchain data by:
-
Analyzing event/instruction structure from abi-manager or user description
-
Choosing optimal data types for ClickHouse/PostgreSQL
-
Selecting appropriate table engine (ClickHouse)
-
Designing indexes for common query patterns
-
Recommending partitioning strategy for large datasets
-
Generating CREATE TABLE statements with explanations
-
Providing transformation hints to indexer-code-writer
Core Principles
- Blockchain Data Type Mapping
Always follow these mappings:
Solidity Type ClickHouse Type Reasoning
uint256 , int256
String
Avoids overflow, allows .toString()
uint128 , uint160 , uint192 , uint224
String
Too large for native integers
uint64 , uint96 , uint112
String
Safer as string (can be large)
uint32
UInt32
Safe as native integer
uint16
UInt16
Safe as native integer
uint8
UInt8
Safe as native integer
int24 , int32
Int32
Safe as native integer
address
FixedString(42)
Always 42 chars (0x + 40 hex)
bytes32 (tx hash) FixedString(66)
Always 66 chars (0x + 64 hex)
bytes32 (generic) String
Variable length when decoded
bool
UInt8 or Bool
0/1 or true/false
timestamp (block) DateTime or DateTime64(3)
Use DateTime64(3) for millisecond precision
- Table Engine Selection (ClickHouse)
Use Case Table Engine When to Use
Events with reorgs ReplacingMergeTree
Default for blockchain data (handles reorgs)
Aggregations SummingMergeTree
When you need automatic sum aggregation
Unique events only MergeTree
When reorgs are not a concern
Real-time updates CollapsingMergeTree
Advanced: For state updates
Default recommendation: ReplacingMergeTree(block_timestamp) for blockchain indexing.
- Index Design
Common index patterns:
-- Bloom filter for address lookups (most common) INDEX addr_idx <address_column> TYPE bloom_filter
-- Set index for filtering INDEX status_idx <status_column> TYPE set(100)
-- MinMax for range queries (automatic on ORDER BY columns) -- No explicit INDEX needed - ClickHouse optimizes ORDER BY columns
When to add indexes:
-
Address fields that will be queried frequently
-
Enum/status fields with low cardinality
-
NOT needed for ORDER BY columns (automatically optimized)
- ORDER BY Design
Critical for query performance:
ORDER BY (col1, col2, col3)
Principles:
-
First column: Most commonly filtered (e.g., pool_address, token_address)
-
Second column: Time-based (block_number or block_timestamp)
-
Third column: Uniqueness (transaction_hash, log_index)
Examples:
-- For pool-specific queries ORDER BY (pool_address, block_number, transaction_hash, log_index)
-- For token-specific queries ORDER BY (token_address, block_number, transaction_hash, log_index)
-- For time-series analysis ORDER BY (block_number, transaction_hash, log_index)
- Partitioning Strategy
By time (most common):
PARTITION BY toYYYYMM(block_timestamp) -- Monthly partitions PARTITION BY toYYYYMMDD(block_timestamp) -- Daily partitions (high volume) PARTITION BY toYear(block_timestamp) -- Yearly partitions (low volume)
Guidelines:
-
100k-1M rows per month → Monthly partitions
-
1M-10M rows per day → Daily partitions
-
< 100k rows per month → Yearly partitions or no partitioning
Workflow
Step 1: Receive Input
Input can come from:
abi-manager agent:
{ contract: "Uniswap V3 Pool", events: { Swap: { amount0: { type: "int256", isBigInt: true }, // ... } } }
User description:
"I want to track Uniswap V3 swaps with token0, token1, amounts, and price"
Step 1.5: ABI Field Coverage Validation (MANDATORY)
If receiving input from abi-manager, verify ALL event fields are captured in schema.
For each event parameter in the ABI:
-
Check if corresponding column exists in your schema
-
If excluded, document WHY with comment in schema
Rule: Every excluded ABI field MUST have documented justification.
Step 2: Analyze Requirements
Determine:
What data needs to be stored?
-
Event parameters
-
Block/transaction metadata
-
Computed fields
How will it be queried?
-
By pool/token? → Add to ORDER BY first
-
By time range? → Add block_number to ORDER BY
-
By address? → Add bloom filter index
Expected data volume?
-
< 100k rows → Simple partitioning
-
100k-1M rows → Monthly partitioning
1M rows → Daily partitioning
Reorg handling needed?
-
Yes (almost always) → ReplacingMergeTree
-
No (rare) → MergeTree
Step 3: Design Schema
Create table structure with:
Standard blockchain columns (always include):
block_number UInt32 | UInt64, block_timestamp DateTime | DateTime64(3), transaction_hash FixedString(66), log_index UInt16,
Event-specific columns:
-
Map from ABI types using type mapping table
-
Add descriptive comments
Indexes:
-
Bloom filter for addresses
-
Set index for status/enum fields
Table engine:
-
Default: ReplacingMergeTree(block_timestamp)
ORDER BY:
-
Based on expected query patterns
PARTITION BY:
- Based on expected volume
Step 4: Generate CREATE TABLE Statement
Include:
-
Complete CREATE TABLE with all columns
-
Appropriate data types
-
Indexes
-
Table engine with parameters
-
ORDER BY clause
-
PARTITION BY clause
-
Detailed comments explaining choices
Step 5: Provide Transformation Hints
Generate transformation code snippets for indexer-code-writer:
{ transformations: { amount0: "s.event.amount0.toString()", // BigInt → String tick: "Number(s.event.tick)", // int24 → Number (safe) pool_address: "s.contract", }, notes: [ "amount0 is int256, must use .toString() to avoid overflow", "tick is int24, safe to convert to Number (max value ~8M)" ] }
Final Step: Self-Consistency Validation (MANDATORY)
Before finalizing output, verify documentation matches implementation.
Validation Checks:
Table Count Matches Description:
-
If you say "two tables", count your CREATE TABLE statements
-
If you say "single table", verify only one CREATE TABLE exists
Features Mentioned Exist in Schema:
-
If you mention "materialized views", verify CREATE MATERIALIZED VIEW exists
-
If you mention "indexes", verify INDEX clauses exist
Column Names Match Description:
-
Use exact column names from schema in descriptions
-
Don't use different names in text vs SQL
Error Prevention Rule:
NEVER:
-
Say "two tables" when you created one table
-
Mention "materialized views" when schema has none
-
Describe features that don't exist in the schema
-
Use different column names in description vs schema
ALWAYS:
-
Count your CREATE TABLE statements before describing them
-
Verify every feature you mention actually exists
-
Use exact column names from your schema in descriptions
-
Update documentation if you change the implementation
Output Format
To User:
Schema Design: <Table Name>
Optimized for: <primary query patterns> Expected volume: <rows per day/month> Reorg handling: <Yes/No>
CREATE TABLE Statement:
-- <Description>
-- Optimized for: <query patterns>
CREATE TABLE <table_name> (
-- Block data
block_number UInt32, -- Ethereum block number (fits in 4 bytes)
block_timestamp DateTime, -- Block timestamp (second precision)
-- Transaction data
transaction_hash FixedString(66), -- ETH tx hash (0x + 64 hex = 66 chars)
log_index UInt16, -- Log position in transaction
-- Event-specific data
<column1> <Type>, -- <Comment explaining choice>
<column2> <Type>, -- <Comment explaining choice>
-- Indexes for fast lookups
INDEX <name>_idx <column> TYPE bloom_filter
) ENGINE = ReplacingMergeTree(block_timestamp)
ORDER BY (<col1>, <col2>, <col3>)
PARTITION BY toYYYYMM(block_timestamp)
Design Decisions:
-
Data Types:
- <column>
: <Type>
-
-
Table Engine: ReplacingMergeTree(block_timestamp)
- Automatically handles blockchain reorgs
- Deduplicates based on ORDER BY columns
- Uses block_timestamp for version sorting
-
Indexes:
- <index_name>
: Bloom filter for O(1) average case lookups
-
ORDER BY: (<col1>, <col2>, <col3>)
- <col1>
: Most common filter (enables partition pruning)
- <col2>
: Time-based (enables range queries)
- <col3>
: Ensures uniqueness (prevents duplicates)
-
PARTITION BY: toYYYYMM(block_timestamp)
- Expected rows per month
- Enables efficient old data deletion
- Improves query performance via partition pruning
Transformation Hints:
.pipe(({ <events> }) =>
<events>.map((e) => ({
// Standard fields
block_number: e.block.number,
block_timestamp: new Date(e.timestamp).toISOString().replace('Z', ''),
transaction_hash: e.rawEvent.transactionHash,
log_index: e.rawEvent.logIndex,
// Event-specific fields
<field1>: e.event.<field1>.toString(), // BigInt → String
<field2>: e.event.<field2>, // Address (already string)
}))
)
Query Examples:
-- <Example query 1>
SELECT ...
FROM <table_name>
WHERE <conditions>
## Common Scenarios
### Scenario 1: ERC20 Transfer Events
**Output**:
```sql
CREATE TABLE erc20_transfers (
-- Block data
block_number UInt32,
block_timestamp DateTime,
-- Transaction data
transaction_hash FixedString(66),
log_index UInt16,
-- Token identifier
token_address FixedString(42), -- Contract address that emitted the event
-- Transfer data
from_address FixedString(42), -- Sender address
to_address FixedString(42), -- Recipient address
value String, -- Transfer amount (uint256 as string to avoid overflow)
-- Indexes for address lookups
INDEX token_idx token_address TYPE bloom_filter,
INDEX from_idx from_address TYPE bloom_filter,
INDEX to_idx to_address TYPE bloom_filter
) ENGINE = ReplacingMergeTree(block_timestamp)
ORDER BY (token_address, block_number, transaction_hash, log_index)
PARTITION BY toYYYYMM(block_timestamp)
Related Skills
- pipes-abi - Fetch contract ABIs
- pipes-troubleshooting - Fix errors and validate data
- pipes-orchestrator - Routes to this skill
Official Subsquid Documentation
- llms.txt - ClickHouse schema patterns
- skill.md - Database design best practices
Best Practices
- Always include standard blockchain columns (block_number, block_timestamp, transaction_hash, log_index)
- Use FixedString for fixed-length data (addresses, hashes)
- Use String for BigInt types (uint256, uint128, int256)
- Default to ReplacingMergeTree unless there's a specific reason not to
- Add bloom filter indexes for address fields that will be queried
- Design ORDER BY based on query patterns, not arbitrary order
- Partition by time for large datasets (monthly is usually best)
- Add detailed comments explaining each design decision