数据管道工具箱
快速构建ETL数据管道:提取 → 转换 → 加载 → 调度
核心功能
- 多源提取 — REST APIs、GraphQL、SQL数据库、CSV/JSON/Parquet文件、S3/云存储、Kafka/SQS
- 数据转换 — 清洗、过滤、聚合、关联、跨表Join
- 多目标加载 — PostgreSQL/MySQL、Snowflake/BigQuery、S3、数据仓库
- 定时调度 — Cron任务或事件触发
- 监控告警 — 失败自动通知,可视化运行状态
快速开始
# 创建数据管道
./pipeline.sh create my-pipeline
# 添加数据源
./pipeline.sh extract my-pipeline api --url https://api.example.com/data
# 添加转换规则
./pipeline.sh transform my-pipeline filter "status == 'active'"
./pipeline.sh transform my-pipeline aggregate "group by category, sum(amount)"
# 添加目标存储
./pipeline.sh load my-pipeline postgres --connection $DATABASE_URL
# 运行管道
./pipeline.sh run my-pipeline
支持的数据源
| 类型 | 具体来源 |
|---|---|
| APIs | REST API, GraphQL, 内部服务 |
| 数据库 | PostgreSQL, MySQL, MongoDB, SQL Server |
| 文件 | CSV, JSON, Parquet, Excel |
| 云存储 | AWS S3, Google Cloud Storage |
| 消息队列 | Kafka, AWS SQS |
支持的目标存储
| 类型 | 具体目标 |
|---|---|
| 数据库 | PostgreSQL, MySQL, BigQuery, Snowflake |
| 数据仓库 | ClickHouse, DuckDB, TimescaleDB |
| 文件存储 | S3, GCS, 本地文件 |
| API | 第三方API回传 |
典型使用场景
场景1:每日销售数据汇总
# 从CRM API提取昨日销售数据
./pipeline.sh extract daily-sales api \
--url "https://crm.example.com/api/orders?date=yesterday"
# 转换:按产品分类汇总
./pipeline.sh transform daily-sales aggregate \
--group-by "product_category" \
--sum "quantity,amount"
# 加载到数据仓库
./pipeline.sh load daily-sales bigquery \
--project "my-project" --dataset "sales" --table "daily_summary"
# 设置每日定时任务
./pipeline.sh schedule daily-sales "0 6 * * *"
场景2:用户行为数据同步
# 从日志文件提取
./pipeline.sh extract user-logs file --path "/var/logs/app/*.json"
# 清洗和转换
./pipeline.sh transform user-logs filter "event_type != 'heartbeat'"
./pipeline.sh transform user-logs add-column "timestamp:parse_timestamp(time)"
# 加载到ClickHouse
./pipeline.sh load user-logs clickhouse --connection $CH_URL
监控与告警
查看运行状态
./pipeline.sh status my-pipeline
# 输出:
# Status: ✅ Running
# Last Run: 2026-05-05 06:00:00
# Duration: 45s
# Records Processed: 12,847
# Errors: 0
配置告警
# 失败时发送邮件
./pipeline.sh alert my-pipeline email --to admin@example.com
# 失败时发送飞书消息
./pipeline.sh alert my-pipeline webhook --url "https://open.feishu.cn/..."
推荐资源
- ShadowAI API(数据管道配套): https://referer.shadowai.xyz/r/1056448
由 AI智造工坊 (http://ai.qnitgroup.com) 整理发布 | 安装源: ClawHub