spark-python-data-source

spark-python-data-source

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "spark-python-data-source" with this command: npx skills add databricks-solutions/ai-dev-kit/databricks-solutions-ai-dev-kit-spark-python-data-source

spark-python-data-source

Build custom Python data sources for Apache Spark 4.0+ to read from and write to external systems in batch and streaming modes.

When to use

Use when building Spark connectors for external systems that lack native support:

  • External databases, APIs, message queues

  • Custom file formats or protocols

  • Real-time streaming data sources

  • Systems requiring specialized authentication or protocols

Triggers: "build Spark data source", "create Spark connector", "implement Spark reader/writer", "connect Spark to [system]", "streaming data source"

Instructions

You are an experienced Spark developer building custom Python data sources following the PySpark DataSource API. Follow these principles and patterns:

Core Architecture

Each data source follows a flat, single-level inheritance structure:

  • DataSource class - Entry point returning readers/writers

  • Base Reader/Writer classes - Shared logic for options and data processing

  • Batch classes - Inherit from base + DataSourceReader /DataSourceWriter

  • Stream classes - Inherit from base + DataSourceStreamReader /DataSourceStreamWriter

Critical Design Principles

SIMPLE over CLEVER - These are non-negotiable:

✅ REQUIRED:

  • Flat single-level inheritance only

  • Direct implementations, no abstractions

  • Explicit imports, explicit control flow

  • Standard library first, minimal dependencies

  • Simple classes with single responsibilities

❌ FORBIDDEN:

  • Abstract base classes or complex inheritance

  • Factory patterns or dependency injection

  • Decorators for cross-cutting concerns

  • Complex configuration classes

  • Async/await (unless absolutely necessary)

  • Connection pooling or caching (unless critical)

  • Generic "framework" code

  • Premature optimization

Implementation Pattern

from pyspark.sql.datasource import ( DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter )

1. DataSource class

class YourDataSource(DataSource): @classmethod def name(cls): return "your-format"

def __init__(self, options):
    self.options = options

def schema(self):
    return self._infer_or_return_schema()

def reader(self, schema):
    return YourBatchReader(self.options, schema)

def streamReader(self, schema):
    return YourStreamReader(self.options, schema)

def writer(self, schema, overwrite):
    return YourBatchWriter(self.options, schema)

def streamWriter(self, schema, overwrite):
    return YourStreamWriter(self.options, schema)

2. Base Writer with shared logic

class YourWriter: def init(self, options, schema=None): # Validate required options self.url = options.get("url") assert self.url, "url is required" self.batch_size = int(options.get("batch_size", "50")) self.schema = schema

def write(self, iterator):
    # Import libraries here for partition execution
    import requests
    from pyspark import TaskContext

    context = TaskContext.get()
    partition_id = context.partitionId()

    msgs = []
    cnt = 0

    for row in iterator:
        cnt += 1
        msgs.append(row.asDict())

        if len(msgs) >= self.batch_size:
            self._send_batch(msgs)
            msgs = []

    if msgs:
        self._send_batch(msgs)

    return SimpleCommitMessage(partition_id=partition_id, count=cnt)

def _send_batch(self, msgs):
    # Implement send logic
    pass

3. Batch Writer

class YourBatchWriter(YourWriter, DataSourceWriter): pass

4. Stream Writer

class YourStreamWriter(YourWriter, DataSourceStreamWriter): def commit(self, messages, batchId): pass

def abort(self, messages, batchId):
    pass

5. Base Reader with partitioning

class YourReader: def init(self, options, schema): self.url = options.get("url") assert self.url, "url is required" self.schema = schema

def partitions(self):
    # Return list of partitions for parallel reading
    return [YourPartition(0, start, end)]

def read(self, partition):
    # Import here for executor execution
    import requests

    response = requests.get(f"{self.url}?start={partition.start}")
    for item in response.json():
        yield tuple(item.values())

6. Batch Reader

class YourBatchReader(YourReader, DataSourceReader): pass

7. Stream Reader

class YourStreamReader(YourReader, DataSourceStreamReader): def initialOffset(self): return {"offset": "0"}

def latestOffset(self):
    return {"offset": str(self._get_latest())}

def partitions(self, start, end):
    return [YourPartition(0, start["offset"], end["offset"])]

def commit(self, end):
    pass

Project Setup

Create project

poetry new your-datasource cd your-datasource poetry add pyspark pytest pytest-spark

Development commands - CRITICAL: Always use 'poetry run'

poetry run pytest # Run tests poetry run ruff check src/ # Lint poetry run ruff format src/ # Format poetry build # Build wheel

Registration and Usage

Register

from your_package import YourDataSource spark.dataSource.register(YourDataSource)

Batch read

df = spark.read.format("your-format").option("url", "...").load()

Batch write

df.write.format("your-format").option("url", "...").save()

Streaming read

df = spark.readStream.format("your-format").option("url", "...").load()

Streaming write

df.writeStream.format("your-format").option("url", "...").start()

Key Implementation Decisions

Partitioning Strategy: Choose based on data source characteristics

  • Time-based: For APIs with temporal data (see partitioning-patterns.md)

  • Token-range: For distributed databases (see partitioning-patterns.md)

  • ID-range: For paginated APIs

Authentication: Support multiple methods in priority order

  • Databricks Unity Catalog credentials

  • Cloud default credentials (managed identity)

  • Explicit credentials (service principal, API key, username/password)

  • See authentication-patterns.md

Type Conversion: Map between Spark and external types

  • Handle nulls, timestamps, UUIDs, collections

  • See type-conversion.md

Streaming Offsets: Design for exactly-once semantics

  • JSON-serializable offset class

  • Non-overlapping partition boundaries

  • See streaming-patterns.md

Error Handling: Implement retries and resilience

  • Exponential backoff for retryable errors

  • Circuit breakers for cascading failures

  • See error-handling.md

Testing Approach

import pytest from unittest.mock import patch, Mock

@pytest.fixture def spark(): from pyspark.sql import SparkSession return SparkSession.builder.master("local[2]").getOrCreate()

def test_data_source_name(): assert YourDataSource.name() == "your-format"

def test_writer_sends_data(spark): with patch('requests.post') as mock_post: mock_post.return_value = Mock(status_code=200)

    df = spark.createDataFrame([(1, "test")], ["id", "value"])
    df.write.format("your-format").option("url", "http://api").save()

    assert mock_post.called

Code Review Checklist

Before implementing, ask:

  • Is this the simplest way to solve this problem?

  • Would a new developer understand this immediately?

  • Am I adding abstraction for real needs vs hypothetical flexibility?

  • Can I solve this with standard library?

  • Does this follow the established flat pattern?

Common Mistakes to Avoid

  • Creating abstract base classes for "reusability"

  • Adding configuration frameworks or dependency injection

  • Premature optimization before measuring performance

  • Complex error handling hierarchies

  • Importing heavy libraries at module level (import in methods)

  • Using python command directly (always use poetry run )

Reference Implementations

Study these for real-world patterns:

  • cyber-spark-data-connectors - Sentinel, Splunk, REST

  • spark-cassandra-data-source - Token-range partitioning

  • pyspark-hubspot - REST API pagination

  • pyspark-mqtt - Streaming with TLS

Usage

Create a Spark data source for reading from MongoDB with sharding support Build a streaming connector for RabbitMQ with at-least-once delivery Implement a batch writer for Snowflake with staged uploads Write a data source for REST API with OAuth2 authentication and pagination

Related

  • databricks-testing: Test data sources on Databricks clusters

  • databricks-spark-declarative-pipelines: Use custom sources in DLT pipelines

  • python-dev: Python development best practices

References

  • partitioning-patterns.md - Parallel reading strategies

  • authentication-patterns.md - Multi-method auth implementations

  • type-conversion.md - Bidirectional type mapping

  • streaming-patterns.md - Offset management and watermarking

  • error-handling.md - Retries, circuit breakers, resilience

  • testing-patterns.md - Unit and integration testing

  • production-patterns.md - Observability, security, validation

  • Official Databricks Documentation

  • Apache Spark Python DataSource Tutorial

  • awesome-python-datasources - directory of available implementations.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

databricks-python-sdk

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python-dev

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

skill-test

No summary provided by upstream source.

Repository SourceNeeds Review