pytd

pytd - Treasure Data Python SDK

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "pytd" with this command: npx skills add treasure-data/td-skills/treasure-data-td-skills-pytd

pytd - Treasure Data Python SDK

Expert assistance for querying and importing data to Treasure Data using pytd, the official Python driver for analytical workflows.

When to Use This Skill

Use this skill when:

  • Querying Treasure Data from Python scripts or Jupyter notebooks

  • Importing pandas DataFrames to TD tables

  • Running Presto or Hive queries from Python

  • Building data pipelines with Python and TD

  • Performing bulk data imports or exports

  • Migrating from deprecated pandas-td library

  • Integrating TD with Python data science workflows

  • Handling large result sets with iterative retrieval

Core Principles

  1. Installation

Standard Installation:

pip install pytd

Requirements:

  • Python 3.9 or later

  • pandas 2.0 or later

  1. Authentication & Configuration

Environment Variables (Recommended):

export TD_API_KEY="your_api_key_here" export TD_API_SERVER="https://api.treasuredata.com/"

Client Initialization:

import pytd

Using environment variables

client = pytd.Client(database='sample_datasets')

Explicit credentials (not recommended for production)

client = pytd.Client( apikey='your_api_key', endpoint='https://api.treasuredata.com/', database='your_database', default_engine='presto' # or 'hive' )

Configuration Options:

  • apikey : TD API key (read from TD_API_KEY env var if not specified)

  • endpoint : TD API server URL (read from TD_API_SERVER env var)

  • database : Default database name for queries

  • default_engine : Query engine - 'presto' (default) or 'hive'

Regional Endpoints:

  1. Querying Data

Basic Query Execution

import pytd

client = pytd.Client(database='sample_datasets')

Execute Presto query

result = client.query('SELECT symbol, COUNT(1) as cnt FROM nasdaq GROUP BY symbol LIMIT 10')

Result format: {'columns': ['symbol', 'cnt'], 'data': [['AAIT', 590], ['AAL', 82], ...]}

print(result['columns']) # ['symbol', 'cnt'] print(result['data']) # [['AAIT', 590], ['AAL', 82], ...]

Query with Hive Engine

Create Hive client

client_hive = pytd.Client( database='sample_datasets', default_engine='hive' )

Execute Hive query

result = client_hive.query('SELECT hivemall_version()')

Convert Results to pandas DataFrame

import pandas as pd

result = client.query('SELECT * FROM nasdaq LIMIT 100')

Convert to DataFrame

df = pd.DataFrame(result['data'], columns=result['columns']) print(df.head())

  1. Writing Data to TD

Load DataFrame to Table

import pandas as pd import pytd

Create sample DataFrame

df = pd.DataFrame({ 'user_id': [1, 2, 3, 4], 'event_name': ['login', 'purchase', 'logout', 'login'], 'amount': [None, 99.99, None, None], 'timestamp': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-02', '2024-01-03']) })

Initialize client

client = pytd.Client(database='your_database')

Upload DataFrame

client.load_table_from_dataframe( df, 'events', # table name writer='bulk_import', # writer type if_exists='overwrite' # or 'append', 'error', 'ignore' )

Parameters:

  • df : pandas DataFrame to upload

  • table : Target table name (can be 'database.table' or just 'table' )

  • writer : Import method - 'bulk_import' (default), 'insert_into' , or 'spark'

  • if_exists : What to do if table exists - 'error' (default), 'overwrite' , 'append' , or 'ignore'

Common Patterns

Pattern 1: ETL Pipeline - Query, Transform, Load

import pytd import pandas as pd

Initialize client

client = pytd.Client(database='analytics')

Step 1: Extract - Query data from TD

query = """ SELECT user_id, event_name, event_date, COUNT(*) as event_count FROM raw_events WHERE TD_INTERVAL(time, '-1d', 'JST') GROUP BY user_id, event_name, event_date """

result = client.query(query) df = pd.DataFrame(result['data'], columns=result['columns'])

Step 2: Transform - Process data with pandas

df['event_date'] = pd.to_datetime(df['event_date']) df['is_weekend'] = df['event_date'].dt.dayofweek >= 5 df['event_count_log'] = df['event_count'].apply(lambda x: pd.np.log1p(x))

Add metadata

df['processed_at'] = pd.Timestamp.now() df['pipeline_version'] = '1.0'

Step 3: Load - Write back to TD

client.load_table_from_dataframe( df, 'analytics.user_daily_events', writer='bulk_import', if_exists='append' )

print(f"Loaded {len(df)} rows to user_daily_events")

Explanation: Complete ETL workflow that extracts yesterday's data, performs pandas transformations, and loads results back to TD. Uses bulk_import for efficient loading.

Pattern 2: Incremental Data Loading

import pytd import pandas as pd from datetime import datetime, timedelta

client = pytd.Client(database='sales')

def load_incremental_data(source_file, table_name, date_column='import_date'): """Load new data incrementally, avoiding duplicates"""

# Read new data from source
new_data = pd.read_csv(source_file)
new_data[date_column] = datetime.now()

# Get max date from existing table
try:
    result = client.query(f"""
        SELECT MAX({date_column}) as max_date
        FROM {table_name}
    """)

    max_date = result['data'][0][0] if result['data'][0][0] else None

    if max_date:
        # Filter only new records
        new_data = new_data[new_data[date_column] > max_date]
        print(f"Loading {len(new_data)} new records after {max_date}")
    else:
        print(f"Table empty, loading all {len(new_data)} records")

except Exception as e:
    # Table doesn't exist yet
    print(f"Creating new table with {len(new_data)} records")

if len(new_data) > 0:
    client.load_table_from_dataframe(
        new_data,
        table_name,
        writer='bulk_import',
        if_exists='append'
    )
    print("Load complete")
else:
    print("No new data to load")

Usage

load_incremental_data('daily_sales.csv', 'sales.transactions')

Explanation: Implements incremental loading by checking the latest timestamp in the target table and only loading newer records. Handles first-time loads gracefully.

Pattern 3: Large Result Set Processing with DB-API

import pytd from pytd.dbapi import connect

client = pytd.Client(database='large_dataset')

Create DB-API connection

conn = connect(client) cursor = conn.cursor()

Execute query that might timeout with standard query()

cursor.execute(""" SELECT user_id, event_name, event_time, properties FROM events WHERE TD_INTERVAL(time, '-7d', 'JST') """)

Process results iteratively (memory efficient)

batch_size = 10000 processed_count = 0

while True: rows = cursor.fetchmany(batch_size) if not rows: break

# Process batch
for row in rows:
    user_id, event_name, event_time, properties = row
    # Process each row
    process_event(user_id, event_name, event_time, properties)

processed_count += len(rows)
print(f"Processed {processed_count} rows...")

cursor.close() conn.close()

print(f"Total processed: {processed_count} rows")

Explanation: Uses DB-API for iterative retrieval of large result sets. Prevents memory issues and query timeouts by fetching data in batches. Essential for processing millions of rows.

Pattern 4: Multi-Database Operations

import pytd import pandas as pd

Connect to different databases

source_client = pytd.Client(database='raw_data') target_client = pytd.Client(database='analytics')

Query from source database

query = """ SELECT customer_id, product_id, purchase_date, amount FROM purchases WHERE TD_INTERVAL(time, '-1d', 'JST') """

result = source_client.query(query) df = pd.DataFrame(result['data'], columns=result['columns'])

Enrich data by querying another source

product_query = "SELECT product_id, product_name, category FROM products" products_result = source_client.query(product_query) products_df = pd.DataFrame(products_result['data'], columns=products_result['columns'])

Join data

enriched_df = df.merge(products_df, on='product_id', how='left')

Calculate metrics

daily_summary = enriched_df.groupby(['category', 'purchase_date']).agg({ 'amount': ['sum', 'mean', 'count'], 'customer_id': 'nunique' }).reset_index()

daily_summary.columns = ['category', 'date', 'total_sales', 'avg_sale', 'transaction_count', 'unique_customers']

Write to analytics database

target_client.load_table_from_dataframe( daily_summary, 'daily_category_sales', writer='bulk_import', if_exists='append' )

print(f"Loaded {len(daily_summary)} rows to analytics.daily_category_sales")

Explanation: Demonstrates working with multiple databases, joining data, performing aggregations, and writing to a different target database.

Pattern 5: Handling Time-based Data with TD Functions

import pytd import pandas as pd from datetime import datetime

client = pytd.Client(database='events')

Query with TD time functions

query = """ SELECT TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST') as date_jst, COUNT(*) as event_count, COUNT(DISTINCT user_id) as unique_users, APPROX_PERCENTILE(session_duration, 0.5) as median_duration, APPROX_PERCENTILE(session_duration, 0.95) as p95_duration FROM user_sessions WHERE TD_INTERVAL(time, '-7d', 'JST') GROUP BY 1 ORDER BY 1 DESC """

result = client.query(query) df = pd.DataFrame(result['data'], columns=result['columns'])

Convert date strings to datetime

df['date_jst'] = pd.to_datetime(df['date_jst'])

Add derived metrics

df['events_per_user'] = df['event_count'] / df['unique_users']

Write summary back

client.load_table_from_dataframe( df, 'weekly_session_summary', writer='bulk_import', if_exists='overwrite' )

Explanation: Shows proper use of TD time functions (TD_INTERVAL, TD_TIME_FORMAT) in queries and how to handle the results in pandas.

Writer Types Comparison

pytd supports three writer methods for loading data:

  1. bulk_import (Default - Recommended)

Best for: Most use cases, especially large datasets

client.load_table_from_dataframe( df, 'table_name', writer='bulk_import', if_exists='append' )

Characteristics:

  • ✓ Scalable to large datasets

  • ✓ Memory efficient (streams data)

  • ✓ No special permissions required

  • ✓ Best balance of performance and simplicity

  • ✗ Slower than Spark for very large datasets

  • Uses CSV format internally

When to use: Default choice for most data loads (100s of MB to GBs)

  1. insert_into

Best for: Small datasets, real-time updates

client.load_table_from_dataframe( df, 'table_name', writer='insert_into', if_exists='append' )

Characteristics:

  • ✓ Simple, no dependencies

  • ✓ Good for small datasets (<1000 rows)

  • ✗ Not scalable (issues individual INSERT queries)

  • ✗ Slow for large datasets

  • ✗ Uses Presto query capacity

  • Uses Presto INSERT INTO statements

When to use: Only for small datasets or when you need immediate writes without bulk import delay

  1. spark (High Performance)

Best for: Very large datasets, high-performance pipelines

from pytd.writer import SparkWriter

writer = SparkWriter( td_spark_path='/path/to/td-spark-assembly.jar' # Optional )

client.load_table_from_dataframe( df, 'table_name', writer=writer, if_exists='append' )

Characteristics:

  • ✓ Highest performance

  • ✓ Direct writes to Plazma storage

  • ✓ Best for very large datasets (10s of GBs+)

  • ✗ Requires pytd[spark] installation

  • ✗ Requires Plazma Public API access (contact support)

  • ✗ Additional dependencies

When to use: Large-scale data pipelines requiring maximum throughput

Enabling Spark Writer:

  • Install: pip install pytd[spark]

  • Contact support@treasuredata.com to enable Plazma Public API access

  • (Optional) Download td-spark JAR for custom versions

Best Practices

Use Environment Variables for Credentials

export TD_API_KEY="your_api_key" export TD_API_SERVER="https://api.treasuredata.com/"

Never hardcode API keys in scripts

Choose the Right Writer

  • bulk_import : Default choice for most scenarios

  • insert_into : Only for small datasets (<1000 rows)

  • spark : For very large datasets with proper setup

Use TD Time Functions in Queries

Good: Uses partition pruning

query = "SELECT * FROM table WHERE TD_INTERVAL(time, '-1d', 'JST')"

Avoid: Scans entire table

query = "SELECT * FROM table WHERE date = '2024-01-01'"

Handle Large Results with DB-API Use pytd.dbapi for queries returning millions of rows to avoid memory issues

Specify Database in Table Name

Explicit database (recommended)

client.load_table_from_dataframe(df, 'database.table')

Uses client's default database

client.load_table_from_dataframe(df, 'table')

Add Time Column for Partitioning

df['time'] = pd.to_datetime(df['timestamp']).astype(int) // 10**9 client.load_table_from_dataframe(df, 'table')

Use Presto for Analytics, Hive for Special Functions

  • Presto: Faster for most analytical queries

  • Hive: Required for Hivemall, UDFs, some advanced features

Batch Processing for Large ETL Process data in chunks to avoid memory issues:

for chunk in pd.read_csv('large_file.csv', chunksize=100000): # Process chunk client.load_table_from_dataframe(chunk, 'table', if_exists='append')

Error Handling

try: result = client.query(query) except Exception as e: print(f"Query failed: {e}") # Handle error appropriately

Close Connections in Long-Running Scripts

from pytd.dbapi import connect

conn = connect(client) try: # Use connection cursor = conn.cursor() cursor.execute(query) # Process results finally: conn.close()

Common Issues and Solutions

Issue: Import Errors or Module Not Found

Symptoms:

  • ModuleNotFoundError: No module named 'pytd'

  • ImportError: cannot import name 'SparkWriter'

Solutions:

Verify Installation

pip list | grep pytd

Install/Upgrade pytd

pip install --upgrade pytd

For Spark Support

pip install pytd[spark]

Check Python Version

python --version # Should be 3.9+

Issue: Authentication Errors

Symptoms:

  • Unauthorized: Invalid API key

  • 403 Forbidden

Solutions:

Verify Environment Variables

echo $TD_API_KEY echo $TD_API_SERVER

Check API Key Format

Verify API key is set correctly

import os print(os.getenv('TD_API_KEY'))

Verify Regional Endpoint

US

endpoint = 'https://api.treasuredata.com/'

Tokyo

endpoint = 'https://api.treasuredata.co.jp/'

EU

endpoint = 'https://api.eu01.treasuredata.com/'

Check API Key Permissions

  • Ensure key has appropriate read/write permissions

  • Regenerate key if necessary from TD console

Issue: Query Timeout or Memory Errors

Symptoms:

  • Query times out after several minutes

  • MemoryError when fetching large results

  • Connection drops during query execution

Solutions:

Use DB-API for Large Results

from pytd.dbapi import connect

conn = connect(client) cursor = conn.cursor() cursor.execute(query)

Fetch in batches

for row in cursor.fetchmany(10000): process(row)

Add Time Filters for Partition Pruning

query = """ SELECT * FROM large_table WHERE TD_INTERVAL(time, '-1d', 'JST') -- Add this! """

Limit Result Size

query = "SELECT * FROM table WHERE ... LIMIT 100000"

Use Aggregations Instead of Raw Data

Instead of fetching all rows

query = "SELECT * FROM table"

Aggregate first

query = """ SELECT date, user_id, COUNT(*) as cnt FROM table GROUP BY 1, 2 """

Issue: DataFrame Upload Fails

Symptoms:

  • ValueError: DataFrame is empty

  • Type errors during upload

  • Data corruption in uploaded table

Solutions:

Check DataFrame is Not Empty

if df.empty: print("DataFrame is empty, skipping upload") else: client.load_table_from_dataframe(df, 'table')

Handle Data Types Properly

Convert timestamps to Unix epoch

df['time'] = pd.to_datetime(df['timestamp']).astype(int) // 10**9

Handle NaN values

df['amount'] = df['amount'].fillna(0)

Convert to appropriate types

df['user_id'] = df['user_id'].astype(str) df['count'] = df['count'].astype(int)

Check Column Names

TD column names should be lowercase and use underscores

df.columns = df.columns.str.lower().str.replace(' ', '_')

Remove Invalid Characters

Remove or replace problematic characters

df = df.applymap(lambda x: str(x).replace('\x00', '') if isinstance(x, str) else x)

Try Different Writer

If bulk_import fails, try insert_into for debugging

client.load_table_from_dataframe( df.head(10), # Test with small sample 'table', writer='insert_into' )

Issue: Spark Writer Not Working

Symptoms:

  • ImportError: Spark writer not available

  • Spark job fails

  • Permission denied errors

Solutions:

Install Spark Dependencies

pip install pytd[spark]

Enable Plazma Public API

Specify JAR Path (if needed)

from pytd.writer import SparkWriter

writer = SparkWriter( td_spark_path='/path/to/td-spark-assembly.jar' )

Check Permissions

  • Ensure API key has write access to target database

  • Verify Plazma access is enabled

Advanced Topics

Custom Query Options

Query with custom parameters

result = client.query( 'SELECT * FROM table', engine='presto', priority=1, # Higher priority (1-2, default 0) retry_limit=3 )

Working with Job Status

Start query asynchronously

job = client.query('SELECT COUNT(*) FROM large_table', wait=False)

Check job status

print(f"Job ID: {job.job_id}") print(f"Status: {job.status()}")

Wait for completion

job.wait()

Get results

if job.success(): result = job.result() else: print(f"Job failed: {job.error()}")

Custom Writers

from pytd.writer import BulkImportWriter

Configure writer with custom options

writer = BulkImportWriter( chunk_size=10000, # Rows per chunk time_column='time' # Specify time column )

client.load_table_from_dataframe( df, 'table', writer=writer, if_exists='append' )

Migrating from pandas-td

If you have existing code using the deprecated pandas-td library:

Before (pandas-td):

import pandas_td as td

con = td.connect(apikey='your_api_key', endpoint='https://api.treasuredata.com/') df = td.read_td('SELECT * FROM sample_datasets.nasdaq', con)

After (pytd):

import pytd.pandas_td as td

con = td.connect(apikey='your_api_key', endpoint='https://api.treasuredata.com/') df = td.read_td('SELECT * FROM sample_datasets.nasdaq', con)

Or use the modern pytd API:

import pytd import pandas as pd

client = pytd.Client(database='sample_datasets') result = client.query('SELECT * FROM nasdaq') df = pd.DataFrame(result['data'], columns=result['columns'])

Testing and Development

Test Connection

import pytd

try: client = pytd.Client(database='sample_datasets') result = client.query('SELECT 1 as test') print("Connection successful!") print(result) except Exception as e: print(f"Connection failed: {e}")

Verify Data Upload

import pandas as pd import pytd

Create test data

test_df = pd.DataFrame({ 'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie'], 'value': [100, 200, 300], 'time': [1704067200, 1704153600, 1704240000] # Unix timestamps })

client = pytd.Client(database='test_db')

Upload

print("Uploading test data...") client.load_table_from_dataframe( test_df, 'test_table', writer='bulk_import', if_exists='overwrite' )

Verify

print("Verifying upload...") result = client.query('SELECT * FROM test_table ORDER BY id') verify_df = pd.DataFrame(result['data'], columns=result['columns'])

print("\nUploaded data:") print(verify_df)

Check counts match

assert len(test_df) == len(verify_df), "Row count mismatch!" print("\nVerification successful!")

Performance Testing

import pytd import pandas as pd import time

client = pytd.Client(database='test_db')

Generate test data

df = pd.DataFrame({ 'id': range(100000), 'value': range(100000), 'time': int(time.time()) })

Test bulk_import

start = time.time() client.load_table_from_dataframe(df, 'perf_test_bulk', writer='bulk_import', if_exists='overwrite') bulk_time = time.time() - start print(f"bulk_import: {bulk_time:.2f}s for {len(df)} rows")

Test insert_into (small sample only!)

small_df = df.head(100) start = time.time() client.load_table_from_dataframe(small_df, 'perf_test_insert', writer='insert_into', if_exists='overwrite') insert_time = time.time() - start print(f"insert_into: {insert_time:.2f}s for {len(small_df)} rows")

Jupyter Notebook Integration

pytd works seamlessly with Jupyter notebooks:

Notebook cell 1: Setup

import pytd import pandas as pd import matplotlib.pyplot as plt

client = pytd.Client(database='analytics')

Notebook cell 2: Query data

query = """ SELECT TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST') as date, COUNT(*) as events FROM user_events WHERE TD_INTERVAL(time, '-30d', 'JST') GROUP BY 1 ORDER BY 1 """

result = client.query(query) df = pd.DataFrame(result['data'], columns=result['columns']) df['date'] = pd.to_datetime(df['date'])

Notebook cell 3: Visualize

plt.figure(figsize=(12, 6)) plt.plot(df['date'], df['events']) plt.title('Daily Events - Last 30 Days') plt.xlabel('Date') plt.ylabel('Event Count') plt.xticks(rotation=45) plt.tight_layout() plt.show()

Notebook cell 4: Write results back

summary = df.describe()

Process and save summary back to TD if needed

Resources

Related Skills

  • trino: Understanding Trino SQL syntax for queries in pytd

  • hive: Using Hive-specific functions and syntax

  • digdag: Orchestrating Python scripts using pytd in workflows

  • td-javascript-sdk: Browser-based data collection (frontend) vs pytd (backend/analytics)

Comparison with Other Tools

Tool Purpose When to Use

pytd Full-featured Python driver Analytics, data pipelines, pandas integration

td-client-python Basic REST API wrapper Simple CRUD, when pytd is too heavy

pandas-td (deprecated) Legacy pandas integration Don't use - migrate to pytd

TD Toolbelt CLI tool Command-line operations, shell scripts

Recommendation: Use pytd for all Python-based analytical work and ETL pipelines. Use td-client-python only for basic REST API operations.

Last updated: 2025-01 | pytd version: Latest (Python 3.9+)

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

openclaw-version-monitor

监控 OpenClaw GitHub 版本更新,获取最新版本发布说明,翻译成中文, 并推送到 Telegram 和 Feishu。用于:(1) 定时检查版本更新 (2) 推送版本更新通知 (3) 生成中文版发布说明

Archived SourceRecently Updated
Coding

ask-claude

Delegate a task to Claude Code CLI and immediately report the result back in chat. Supports persistent sessions with full context memory. Safe execution: no data exfiltration, no external calls, file operations confined to workspace. Use when the user asks to run Claude, delegate a coding task, continue a previous Claude session, or any task benefiting from Claude Code's tools (file editing, code analysis, bash, etc.).

Archived SourceRecently Updated
Coding

ai-dating

This skill enables dating and matchmaking workflows. Use it when a user asks to make friends, find a partner, run matchmaking, or provide dating preferences/profile updates. The skill should execute `dating-cli` commands to complete profile setup, task creation/update, match checking, contact reveal, and review.

Archived SourceRecently Updated