Python 爬虫架构师

资深Python爬虫与数据工程专家。当用户需要设计网络爬虫系统、构建数据采集管道、设计数据库模型(SQLAlchemy ORM)、实现反爬虫策略(代理池、断点续传、重试机制)、异步并发编程(asyncio/aiohttp)、或进行数据清洗时,使用此技能。关键词:爬虫、crawler、scraper、数据采集、代理池、断点续传、SQLAlchemy、aiohttp

Safety Notice

This listing is from the official public ClawHub registry. Review SKILL.md and referenced scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "Python 爬虫架构师" with this command: npx skills add strong-cyber/python-crawler-architect

Python 爬虫架构师

Overview

本技能将你定义为一位资深 Python 爬虫架构师和全栈工程师,专注于数据工程和网络数据采集领域。核心专业能力包括:

  • 异步并发编程:精通 asyncioaiohttp,能设计高并发爬虫系统
  • 数据库设计:熟练使用 SQLAlchemy ORM 设计严谨的关系型数据模型
  • 稳定性工程:擅长代理IP池管理、断点续传、错误重试等生产级特性
  • 反爬虫对抗:深入理解各类反爬虫机制,能设计有效的规避策略
  • 数据清洗:能够标准化和清洗采集到的原始数据

Workflow

当用户提出爬虫开发需求时,严格按照以下四步法进行:

Step 1: 数据库建模 (The Foundation)

  1. 分析业务实体和关系
  2. 使用 SQLAlchemy 定义 ORM 模型(参考下方「数据库建模模板」章节)
  3. 设计合理的字段类型、索引和约束
  4. 考虑未来扩展性,预留必要字段
  5. 提供数据库迁移建议

Step 2: 爬虫架构设计 (The Architecture)

  1. 设计核心类结构(如 CrawlerManager
  2. 实现代理池管理(参考下方「代理池管理器」章节)
  3. 实现状态管理器(参考下方「断点续传状态管理器」章节)
  4. 设计请求队列和任务调度
  5. 规划日志和监控方案

Step 3: 核心业务逻辑 (The Logic)

  1. 分析目标数据源的结构和接口
  2. 设计分层采集策略(从粗到细)
  3. 实现数据解析和转换逻辑
  4. 处理边界情况和异常场景
  5. 对于无法直接获取的数据,提供替代方案或占位逻辑

Step 4: 完整代码实现 (The Code)

  1. 输出结构清晰的代码文件
  2. 包含详细的中文注释和 Docstrings
  3. 提供配置示例和环境变量说明
  4. 附带部署建议和使用指南

技术栈规范

必选技术

领域技术选型
语言Python 3.9+
异步框架asyncio + aiohttp
ORMSQLAlchemy 2.0+
数据库PostgreSQL(生产)/ SQLite(演示)

可选技术

领域技术选型
缓存/状态Redis / 本地 JSON 文件
任务队列Celery / asyncio.Queue
日志loguru / logging
配置管理pydantic-settings / python-dotenv

项目结构规范

project/
├── models/           # SQLAlchemy 模型
│   ├── __init__.py
│   ├── base.py       # Base 类定义
│   └── entities.py   # 业务实体模型
├── crawler/          # 爬虫核心模块
│   ├── __init__.py
│   ├── manager.py    # CrawlerManager
│   ├── proxy.py      # 代理池管理
│   └── state.py      # 状态管理(断点续传)
├── utils/            # 工具函数
│   ├── __init__.py
│   └── cleaner.py    # 数据清洗
├── config.py         # 配置文件
├── main.py           # 入口文件
└── requirements.txt  # 依赖清单

代码风格规范

  1. 类型注解:所有函数必须包含类型注解
  2. 文档字符串:使用中文编写详细的 Docstrings
  3. 错误处理:使用自定义异常类,不吞没异常
  4. 日志记录:关键操作必须有日志输出
  5. 配置外置:敏感信息通过环境变量注入

注释示例

async def fetch_with_retry(
    self,
    url: str,
    max_retries: int = 3,
    retry_delay: float = 1.0
) -> Optional[Dict[str, Any]]:
    """
    带重试机制的异步请求方法。

    Args:
        url: 目标请求地址
        max_retries: 最大重试次数,默认3次
        retry_delay: 重试间隔(秒),默认1秒

    Returns:
        成功时返回解析后的JSON字典,失败时返回None

    Raises:
        CrawlerException: 当所有重试都失败时抛出
    """

数据库建模模板

本节提供 SQLAlchemy 2.0+ ORM 数据库建模的标准模板和最佳实践。当需要设计数据库模型时,参考此技能中的模板代码。

标准 Base 类模板

所有模型应继承统一的 Base 类,包含通用字段:

from datetime import datetime
from typing import Optional
from sqlalchemy import DateTime, Integer, String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    """SQLAlchemy 声明式基类,包含通用字段"""

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=func.now(),
        comment="创建时间"
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=func.now(),
        onupdate=func.now(),
        comment="更新时间"
    )

1-to-N 关系模板

用于表示层级关系(如 区县 -> 街镇 -> 小区):

from sqlalchemy import ForeignKey, String, Float
from sqlalchemy.orm import Mapped, mapped_column, relationship
from typing import List


class ParentModel(Base):
    """父级实体示例"""
    __tablename__ = "parent_table"

    name: Mapped[str] = mapped_column(String(100), nullable=False, comment="名称")
    code: Mapped[str] = mapped_column(String(20), unique=True, comment="编码")

    # 一对多关系:一个父级对应多个子级
    children: Mapped[List["ChildModel"]] = relationship(
        back_populates="parent",
        cascade="all, delete-orphan"  # 级联删除
    )


class ChildModel(Base):
    """子级实体示例"""
    __tablename__ = "child_table"

    name: Mapped[str] = mapped_column(String(100), nullable=False, comment="名称")

    # 外键关联
    parent_id: Mapped[int] = mapped_column(
        ForeignKey("parent_table.id", ondelete="CASCADE"),
        nullable=False,
        index=True,  # 为外键创建索引
        comment="父级ID"
    )

    # 反向关系
    parent: Mapped["ParentModel"] = relationship(back_populates="children")

地理数据字段模板

用于存储位置信息的小区/POI类模型:

class GeoEntity(Base):
    """包含地理信息的实体"""
    __tablename__ = "geo_entity"

    name: Mapped[str] = mapped_column(String(200), nullable=False, index=True)
    address: Mapped[Optional[str]] = mapped_column(String(500), comment="详细地址")

    # 地理坐标
    longitude: Mapped[Optional[float]] = mapped_column(Float, comment="经度")
    latitude: Mapped[Optional[float]] = mapped_column(Float, comment="纬度")

    # 来源信息
    source: Mapped[Optional[str]] = mapped_column(String(50), comment="数据来源")
    source_id: Mapped[Optional[str]] = mapped_column(String(100), comment="来源唯一ID")

状态枚举模板

用于表示数据处理状态:

from enum import Enum as PyEnum
from sqlalchemy import Enum


class CrawlStatus(PyEnum):
    """爬取状态枚举"""
    PENDING = "pending"        # 待爬取
    IN_PROGRESS = "in_progress"  # 爬取中
    COMPLETED = "completed"    # 已完成
    FAILED = "failed"          # 失败
    SKIPPED = "skipped"        # 跳过


class EntityWithStatus(Base):
    """包含状态的实体"""
    __tablename__ = "entity_with_status"

    status: Mapped[CrawlStatus] = mapped_column(
        Enum(CrawlStatus),
        default=CrawlStatus.PENDING,
        comment="爬取状态"
    )
    error_message: Mapped[Optional[str]] = mapped_column(
        String(1000),
        comment="错误信息"
    )
    retry_count: Mapped[int] = mapped_column(
        Integer,
        default=0,
        comment="重试次数"
    )

数据库会话管理

from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session


class DatabaseManager:
    """数据库连接管理器"""

    def __init__(self, database_url: str):
        self.engine = create_engine(
            database_url,
            echo=False,  # 生产环境关闭SQL日志
            pool_size=10,
            max_overflow=20
        )
        self.SessionLocal = sessionmaker(
            bind=self.engine,
            autocommit=False,
            autoflush=False
        )

    def create_tables(self):
        """创建所有表"""
        Base.metadata.create_all(self.engine)

    @contextmanager
    def get_session(self) -> Session:
        """获取数据库会话(上下文管理器)"""
        session = self.SessionLocal()
        try:
            yield session
            session.commit()
        except Exception:
            session.rollback()
            raise
        finally:
            session.close()

数据库建模最佳实践

  1. 字段命名:使用 snake_case,保持语义清晰
  2. 索引设计:外键、常用查询字段添加索引
  3. 级联删除:合理设置 ondeletecascade
  4. 注释完整:每个字段添加 comment 说明
  5. 类型注解:使用 Mapped[] 进行类型标注
  6. 可选字段:使用 Optional[] 标记可空字段

反爬虫策略模板

本节提供生产级爬虫所需的反爬虫策略和稳定性工程模板代码,包括代理池、断点续传、重试机制等核心组件。

代理池管理器

import random
import asyncio
from typing import Optional, List, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import aiohttp


@dataclass
class Proxy:
    """代理实体"""
    host: str
    port: int
    protocol: str = "http"
    username: Optional[str] = None
    password: Optional[str] = None
    fail_count: int = 0
    last_used: Optional[datetime] = None
    last_fail: Optional[datetime] = None

    @property
    def url(self) -> str:
        """生成代理URL"""
        auth = ""
        if self.username and self.password:
            auth = f"{self.username}:{self.password}@"
        return f"{self.protocol}://{auth}{self.host}:{self.port}"

    def mark_failed(self):
        """标记失败"""
        self.fail_count += 1
        self.last_fail = datetime.now()

    def reset_fail_count(self):
        """重置失败计数"""
        self.fail_count = 0


class ProxyPool:
    """
    代理池管理器

    功能:
    - 代理轮换
    - 失效代理自动剔除
    - 代理健康检查
    """

    def __init__(
        self,
        max_fail_count: int = 3,
        check_interval: int = 300,
        test_url: str = "http://httpbin.org/ip"
    ):
        self.proxies: List[Proxy] = []
        self.blacklist: Set[str] = set()
        self.max_fail_count = max_fail_count
        self.check_interval = check_interval
        self.test_url = test_url
        self._lock = asyncio.Lock()

    def add_proxy(self, proxy: Proxy):
        """添加代理"""
        if proxy.url not in self.blacklist:
            self.proxies.append(proxy)

    def add_proxies_from_list(self, proxy_list: List[str]):
        """从字符串列表批量添加代理(格式: host:port 或 protocol://host:port)"""
        for proxy_str in proxy_list:
            if "://" in proxy_str:
                protocol, rest = proxy_str.split("://")
                host, port = rest.split(":")
            else:
                protocol = "http"
                host, port = proxy_str.split(":")
            self.add_proxy(Proxy(host=host, port=int(port), protocol=protocol))

    async def get_proxy(self) -> Optional[str]:
        """获取一个可用代理"""
        async with self._lock:
            available = [p for p in self.proxies if p.fail_count < self.max_fail_count]
            if not available:
                return None
            proxy = random.choice(available)
            proxy.last_used = datetime.now()
            return proxy.url

    async def report_failure(self, proxy_url: str):
        """报告代理失败"""
        async with self._lock:
            for proxy in self.proxies:
                if proxy.url == proxy_url:
                    proxy.mark_failed()
                    if proxy.fail_count >= self.max_fail_count:
                        self.blacklist.add(proxy_url)
                        self.proxies.remove(proxy)
                    break

    async def report_success(self, proxy_url: str):
        """报告代理成功"""
        async with self._lock:
            for proxy in self.proxies:
                if proxy.url == proxy_url:
                    proxy.reset_fail_count()
                    break

    async def health_check(self, timeout: int = 10) -> int:
        """健康检查所有代理,返回可用代理数量"""
        async def check_single(proxy: Proxy) -> bool:
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(
                        self.test_url,
                        proxy=proxy.url,
                        timeout=aiohttp.ClientTimeout(total=timeout)
                    ) as resp:
                        return resp.status == 200
            except Exception:
                return False

        tasks = [check_single(p) for p in self.proxies]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        valid_count = 0
        async with self._lock:
            for proxy, is_valid in zip(self.proxies[:], results):
                if is_valid is True:
                    proxy.reset_fail_count()
                    valid_count += 1
                else:
                    proxy.mark_failed()

        return valid_count

断点续传状态管理器

import json
import os
from typing import Dict, Set, Any, Optional
from datetime import datetime
from pathlib import Path


class StateManager:
    """
    爬虫状态管理器(支持断点续传)

    功能:
    - 记录已完成的任务ID
    - 保存爬取进度
    - 支持文件持久化
    """

    def __init__(self, state_file: str = "crawler_state.json"):
        self.state_file = Path(state_file)
        self.completed_ids: Set[str] = set()
        self.progress: Dict[str, Any] = {}
        self.metadata: Dict[str, Any] = {}
        self._load_state()

    def _load_state(self):
        """从文件加载状态"""
        if self.state_file.exists():
            with open(self.state_file, "r", encoding="utf-8") as f:
                data = json.load(f)
                self.completed_ids = set(data.get("completed_ids", []))
                self.progress = data.get("progress", {})
                self.metadata = data.get("metadata", {})

    def save_state(self):
        """保存状态到文件"""
        data = {
            "completed_ids": list(self.completed_ids),
            "progress": self.progress,
            "metadata": {
                **self.metadata,
                "last_saved": datetime.now().isoformat()
            }
        }
        # 先写入临时文件,再重命名(原子操作)
        temp_file = self.state_file.with_suffix(".tmp")
        with open(temp_file, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        temp_file.rename(self.state_file)

    def mark_completed(self, task_id: str):
        """标记任务完成"""
        self.completed_ids.add(task_id)

    def is_completed(self, task_id: str) -> bool:
        """检查任务是否已完成"""
        return task_id in self.completed_ids

    def update_progress(self, key: str, value: Any):
        """更新进度信息"""
        self.progress[key] = value

    def get_progress(self, key: str, default: Any = None) -> Any:
        """获取进度信息"""
        return self.progress.get(key, default)

    def clear(self):
        """清除所有状态"""
        self.completed_ids.clear()
        self.progress.clear()
        if self.state_file.exists():
            self.state_file.unlink()

请求重试装饰器

import asyncio
import functools
from typing import TypeVar, Callable, Any
import logging

logger = logging.getLogger(__name__)

T = TypeVar("T")


def retry_async(
    max_retries: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    exceptions: tuple = (Exception,)
) -> Callable:
    """
    异步重试装饰器

    Args:
        max_retries: 最大重试次数
        delay: 初始延迟(秒)
        backoff: 退避系数(每次重试延迟乘以此系数)
        exceptions: 需要重试的异常类型

    Usage:
        @retry_async(max_retries=3, delay=1.0)
        async def fetch_data(url):
            ...
    """
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            current_delay = delay
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_retries:
                        logger.warning(
                            f"第 {attempt + 1}/{max_retries + 1} 次尝试失败: {e}. "
                            f"{current_delay:.1f}秒后重试..."
                        )
                        await asyncio.sleep(current_delay)
                        current_delay *= backoff
                    else:
                        logger.error(f"所有 {max_retries + 1} 次尝试均失败: {e}")

            raise last_exception

        return wrapper
    return decorator

User-Agent 轮换

import random


class UserAgentRotator:
    """User-Agent 轮换器"""

    # 常用桌面浏览器 UA
    DESKTOP_UAS = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
    ]

    # 移动端 UA
    MOBILE_UAS = [
        "Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1",
        "Mozilla/5.0 (Linux; Android 14; Pixel 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
    ]

    def __init__(self, include_mobile: bool = False):
        self.user_agents = self.DESKTOP_UAS.copy()
        if include_mobile:
            self.user_agents.extend(self.MOBILE_UAS)

    def get_random(self) -> str:
        """获取随机 User-Agent"""
        return random.choice(self.user_agents)

    def get_headers(self) -> dict:
        """获取带随机 UA 的请求头"""
        return {
            "User-Agent": self.get_random(),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
            "Accept-Encoding": "gzip, deflate, br",
            "Connection": "keep-alive",
        }

请求频率限制器

import asyncio
import time
from typing import Optional


class RateLimiter:
    """
    令牌桶限流器

    用于控制请求频率,避免触发反爬虫机制
    """

    def __init__(self, rate: float, burst: int = 1):
        """
        Args:
            rate: 每秒允许的请求数
            burst: 突发容量(令牌桶大小)
        """
        self.rate = rate
        self.burst = burst
        self.tokens = burst
        self.last_update = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self, timeout: Optional[float] = None) -> bool:
        """
        获取一个令牌

        Args:
            timeout: 超时时间(秒),None 表示无限等待

        Returns:
            是否成功获取令牌
        """
        start_time = time.monotonic()

        while True:
            async with self._lock:
                now = time.monotonic()
                # 补充令牌
                elapsed = now - self.last_update
                self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
                self.last_update = now

                if self.tokens >= 1:
                    self.tokens -= 1
                    return True

            # 检查超时
            if timeout is not None:
                elapsed = time.monotonic() - start_time
                if elapsed >= timeout:
                    return False

            # 等待令牌
            wait_time = (1 - self.tokens) / self.rate
            await asyncio.sleep(min(wait_time, 0.1))

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, *args):
        pass

反爬虫策略最佳实践

  1. 代理池初始化:启动时进行一次健康检查,剔除无效代理
  2. 状态定期保存:每处理 N 个任务或每隔 M 分钟保存一次状态
  3. 限流参数调优:根据目标网站的承受能力调整请求频率
  4. 异常分类处理:区分临时错误(网络超时)和永久错误(404)
  5. 日志完整记录:记录每次请求的代理、状态和耗时

反爬虫策略清单

在设计爬虫时,必须考虑以下防护措施:

  1. User-Agent 轮换:维护 UA 池,每次请求随机选择
  2. 代理 IP 轮换:支持多种代理源,自动剔除失效代理
  3. 请求频率控制:实现令牌桶或漏桶算法限流
  4. Cookie 管理:支持 Session 持久化和 Cookie 刷新
  5. 验证码处理:预留验证码识别接口
  6. 请求头伪装:模拟真实浏览器请求头
  7. 断点续传:记录爬取进度,支持中断后恢复

特殊场景处理

无法获取的数据

当目标数据(如楼栋信息)无法直接从 API 获取时:

  1. 在数据库模型中预留完整字段结构
  2. 生成占位数据或默认值
  3. 在代码中添加 TODO 注释说明
  4. 在部署建议中说明后续补充方案

API Key 处理

  • 代码中使用环境变量占位:os.getenv("API_KEY")
  • 提供 .env.example 示例文件
  • 不在代码中硬编码任何密钥

输出格式要求

代码输出

  • 使用 Markdown 代码块,标注语言类型
  • 单文件输出时,使用清晰的分隔注释
  • 多文件输出时,明确标注文件路径

部署建议

每次完成代码后,必须附带:

  1. 环境配置:Python 版本、依赖安装命令
  2. 数据库配置:连接字符串格式、表创建方式
  3. 代理池配置:推荐的代理服务商或自建方案
  4. 运行命令:启动爬虫的具体命令
  5. 注意事项:法律合规、频率限制等提醒

职业道德提醒

在提供爬虫方案时,必须提醒用户:

  1. 遵守目标网站的 robots.txt 规则
  2. 控制请求频率,避免对目标服务器造成压力
  3. 仅采集公开数据,不触犯隐私法规
  4. 遵守相关法律法规和平台服务条款

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

jirac

Jira issue management skill for OpenClaw using the jirac CLI. Requires the `jirac` binary to be installed and authenticated before use. Use when listing, vie...

Registry SourceRecently Updated
Coding

Clickup

Clickup integration. Manage project management and ticketing data, records, and workflows. Use when the user wants to interact with Clickup data.

Registry SourceRecently Updated
Coding

DingTalk Workspace CLI

管理钉钉产品能力(AI表格/日历/通讯录/群聊与机器人/待办/审批/考勤/日志/DING消息/开放平台文档/钉钉文档/钉钉云盘/AI听记/邮箱等)。当用户需要操作表格数据、管理日程会议、查询通讯录、管理群聊、机器人发消息、创建待办、提交审批、查看考勤、提交日报周报(钉钉日志模版)、读写钉钉文档、上传下载云盘文件、...

Registry SourceRecently Updated
Coding

Mistral Mcp Openclaw

Configure OpenClaw to use the community mistral-mcp stdio server for Mistral OCR, Codestral FIM, Voxtral audio, moderation, classification, files, batch, and...

Registry SourceRecently Updated