JY-AIOT Collector Developer
Overview
本技能用于为矿山鸿雁(jy-aiot)产品的Plugin框架快速生成数据采集模块代码。根据用户提供的Markdown表格格式的协议定义文档,自动生成符合Plugin框架规范的采集模块,包括:
- 通信层代码:HTTP服务端/客户端、FTP文件读取、数据库客户端、MQTT订阅、Kafka订阅
- 数据解析器:将原始数据转换为框架抽象的标准格式
- 配置模板:符合Plugin框架规范的YAML/JSON配置文件
- 集成入口:可直接集成到jy-aiot产品的采集框架中
Protocol Document Format
Required Fields
协议文档必须包含以下表格结构:
## 通信配置
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| protocol | string | 是 | 通信协议:http_server/http_client/ftp/database/mqtt/kafka |
| host | string | 是 | 服务地址 |
| port | int | 是 | 端口号 |
| path | string | 否 | 路径(HTTP/FTP) |
| username | string | 否 | 用户名 |
| password | string | 否 | 密码 |
| interval | int | 否 | 采集间隔(秒),默认60 |
## 数据点定义
| 字段名称 | 字段标识 | 数据类型 | 字节偏移 | 字节长度 | 缩放因子 | 起始位 | 位长度 | 备注 |
|----------|----------|----------|----------|----------|----------|--------|--------|------|
| 温度 | temperature | float | 0 | 4 | 0.1 | - | - | 设备温度值 |
| 压力 | pressure | int | 4 | 2 | 1 | - | - | 管道压力 |
## 数据解析规则
| 规则ID | 目标字段 | 源数据表达式 | 转换公式 | 数据单位 | 备注 |
|--------|----------|--------------|----------|----------|------|
| rule_01 | temperature | raw[0:4] | bytes_to_float_le(value) | ℃ | 小端解析 |
| rule_02 | pressure | raw[4:6] | bytes_to_int_le(value) / 100 | MPa | 缩放转换 |
Supported Communication Protocols
| 协议 | 说明 | 适用场景 |
|---|---|---|
| http_server | HTTP服务端 | 被动接收设备推送数据 |
| http_client | HTTP客户端 | 主动轮询RESTful API |
| ftp | FTP文件读取 | 定时拉取设备导出文件 |
| database | 数据库客户端 | 直接读取数据库表 |
| mqtt | MQTT订阅 | 订阅MQTT主题接收数据 |
| kafka | Kafka订阅 | 订阅Kafka topic接收数据 |
Supported Data Types
| 类型标识 | Go类型 | 说明 | 示例 |
|---|---|---|---|
| int | int32/int64 | 有符号整数 | 传感器整数值 |
| uint | uint32/uint64 | 无符号整数 | 计数器值 |
| float | float32/float64 | 浮点数 | 温度、压力 |
| string | string | 字符串 | 设备状态文本 |
| bool | bool | 布尔值 | 开关状态 |
| bytes | []byte | 字节数组 | 原始二进制数据 |
Workflow
1. Protocol Analysis
分析用户提供的协议文档,提取:
- 通信方式(protocol字段)
- 连接参数(host、port、credentials)
- 数据点列表(字段名称、标识、类型、位置)
- 解析规则(转换公式、单位)
2. Code Generation
根据协议类型生成对应的采集模块代码:
collector_{protocol}/
├── config.yaml # 通信和采集配置
├── collector.go # 采集器主逻辑
├── parser.go # 数据解析器
├── metrics.go # 数据模型定义
└── README.md # 集成说明
3. Core Components
Collector Interface
所有采集模块必须实现Plugin框架的核心接口:
type Collector interface {
// Init 初始化采集器
Init(config Config) error
// Start 启动采集
Start(ctx context.Context) error
// Stop 停止采集
Stop() error
// GetMetrics 获取采集到的数据
GetMetrics() []Metric
// Health 健康检查
Health() HealthStatus
}
Metric Data Model
type Metric struct {
DeviceID string `json:"device_id"` // 设备标识
MetricID string `json:"metric_id"` // 指标标识
Timestamp time.Time `json:"timestamp"` // 采集时间
Value interface{} `json:"value"` // 值
Quality DataQuality `json:"quality"` // 数据质量
Tags map[string]string `json:"tags"` // 标签
Fields map[string]string `json:"fields"` // 扩展字段
}
type DataQuality int
const (
QualityGood DataQuality = 0 // 好
QualityBad DataQuality = 1 // 坏数据
QualityUncertain DataQuality = 2 // 不确定
)
4. Protocol-Specific Implementations
HTTP Server Collector
用于被动接收设备推送的数据:
type HTTPServerCollector struct {
config HTTPServerConfig
handler *http.Server
parser DataParser
buffer sync.Map
}
func (c *HTTPServerCollector) Start(ctx context.Context) error {
c.handler = &http.Server{
Addr: fmt.Sprintf(":%d", c.config.Port),
Handler: c.createRouter(),
}
go c.handler.ListenAndServe()
return nil
}
MQTT Collector
用于订阅MQTT主题接收实时数据:
type MQTTCollector struct {
config MQTTConfig
client mqtt.Client
parser DataParser
buffer sync.Map
}
func (c *MQTTCollector) Start(ctx context.Context) error {
opts := mqtt.NewClientOptions().
AddBroker(fmt.Sprintf("tcp://%s:%d", c.config.Host, c.config.Port)).
SetClientID(c.config.ClientID)
c.client = mqtt.NewClient(opts)
if token := c.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
c.client.Subscribe(c.config.Topic, 0, c.messageHandler)
return nil
}
Database Collector
用于直接查询数据库:
type DatabaseCollector struct {
config DatabaseConfig
db *sql.DB
parser DataParser
interval time.Duration
}
func (c *DatabaseCollector) Start(ctx context.Context) error {
connStr := fmt.Sprintf("user=%s password=%s host=%s port=%d dbname=%s",
c.config.Username, c.config.Password,
c.config.Host, c.config.Port, c.config.Database)
var err error
c.db, err = sql.Open("postgres", connStr)
if err != nil {
return err
}
ticker := time.NewTicker(c.interval)
go c.pollLoop(ctx, ticker)
return nil
}
Generated Code Structure
Directory Layout
jy-aiot-collector-{protocol}/
├── config/
│ ├── collector.yaml # 主配置
│ └── metrics.yaml # 指标定义
├── internal/
│ ├── collector.go # 采集器实现
│ ├── parser.go # 数据解析器
│ ├── client.go # 协议客户端
│ └── model.go # 数据模型
├── scripts/
│ ├── test_connection.sh # 连接测试脚本
│ └── mock_server.py # 模拟服务器(测试用)
├── deploy/
│ ├── docker-compose.yml # 部署配置
│ └── k8s-deployment.yaml # K8s部署
├── go.mod
├── main.go
└── README.md
Configuration Template
# collector.yaml
collector:
name: "temperature_sensor"
protocol: "mqtt"
enabled: true
interval: 10 # 秒
mqtt:
broker: "tcp://192.168.1.100:1883"
topic: "sensors/+/temperature"
client_id: "jy-aiot-collector-001"
username: "collector"
password: "${MQTT_PASSWORD}"
qos: 1
clean_session: true
metrics:
- id: "temperature"
name: "设备温度"
source: "payload.temperature"
datatype: "float"
unit: "℃"
tags:
device_type: "sensor"
parser:
format: "json"
timestamp_field: "timestamp"
timestamp_format: "2006-01-02T15:04:05Z07:00"
output:
buffer_size: 1000
batch_size: 100
flush_interval: 5
Parser Implementation
// internal/parser.go
type DataParser interface {
Parse(raw []byte) ([]Metric, error)
Validate(data interface{}) bool
Transform(value interface{}, rule string) interface{}
}
type JSONParser struct {
timestampField string
timestampFormat string
}
func (p *JSONParser) Parse(raw []byte) ([]Metric, error) {
var data map[string]interface{}
if err := json.Unmarshal(raw, &data); err != nil {
return nil, err
}
metrics := make([]Metric, 0)
// 解析逻辑
return metrics, nil
}
Integration Guide
1. Module Registration
在Plugin框架中注册采集模块:
// plugin.go
func init() {
collector.Register("mqtt", NewMQTTCollector)
collector.Register("http_server", NewHTTPServerCollector)
collector.Register("database", NewDatabaseCollector)
}
2. Configuration Loading
加载并验证配置:
// config/config.go
type Config struct {
Collector CollectorConfig `yaml:"collector"`
MQTT MQTTConfig `yaml:"mqtt"`
Metrics []MetricConfig `yaml:"metrics"`
Parser ParserConfig `yaml:"parser"`
Output OutputConfig `yaml:"output"`
}
func LoadConfig(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var config Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, err
}
// 验证配置
if err := config.Validate(); err != nil {
return nil, err
}
return &config, nil
}
3. Health Check
实现健康检查接口:
func (c *MQTTCollector) Health() HealthStatus {
status := HealthStatus{
Status: "healthy",
Checks: make(map[string]string),
}
if !c.client.IsConnected() {
status.Status = "unhealthy"
status.Checks["connection"] = "disconnected"
}
return status
}
Example Usage
User Request Example
用户请求:
请基于以下协议文档生成MQTT温度传感器采集模块:
## 通信配置
| 字段 | 值 |
|------|-----|
| protocol | mqtt |
| host | 192.168.1.100 |
| port | 1883 |
| topic | sensors/+/temperature |
## 数据点定义
| 字段名称 | 字段标识 | 数据类型 |
|----------|----------|----------|
| 温度值 | temperature | float |
| 设备ID | device_id | string |
Generated Response
根据协议文档生成完整的采集模块代码,包括:
collector.yaml配置文件internal/collector.go采集器实现internal/parser.goJSON解析器main.go程序入口README.md集成说明
Resources
scripts/
generate_collector.py- 采集模块代码生成脚本test_protocol.py- 协议解析测试工具mock_device.py- 模拟设备数据源
references/
- COMMUNICATION.md - 各通信协议详细配置
- PARSING.md - 数据解析规则和转换函数
- MODELS.md - 数据模型和接口定义
- EXAMPLES.md - 完整采集模块示例
assets/
templates/collector_template.go- 采集器代码模板templates/config_template.yaml- 配置文件模板schemas/metric.schema.json- 指标定义JSON Schema