Python Development in Apache Beam
Project Structure
Key Directories
-
sdks/python/
-
Python SDK root
-
apache_beam/
-
Main Beam package
-
transforms/
-
Core transforms (ParDo, GroupByKey, etc.)
-
io/
-
I/O connectors
-
ml/
-
Beam ML code (RunInference, etc.)
-
runners/
-
Runner implementations and wrappers
-
runners/worker/
-
SDK worker harness
-
container/
-
Docker container configuration
-
test-suites/
-
Test configurations
-
scripts/
-
Utility scripts
Configuration Files
-
setup.py
-
Package configuration
-
pyproject.toml
-
Build configuration
-
tox.ini
-
Test automation
-
pytest.ini
-
Pytest configuration
-
.pylintrc
-
Linting rules
-
.isort.cfg
-
Import sorting
-
mypy.ini
-
Type checking
Environment Setup
Using pyenv (Recommended)
Install Python
pyenv install 3.X # Use supported version from gradle.properties
Create virtual environment
pyenv virtualenv 3.X beam-dev pyenv activate beam-dev
Install in Editable Mode
cd sdks/python pip install -e .[gcp,test]
Enable Pre-commit Hooks
pip install pre-commit pre-commit install
To disable
pre-commit uninstall
Running Tests
Unit Tests (filename: *_test.py )
Run all tests in a file
pytest -v apache_beam/io/textio_test.py
Run tests in a class
pytest -v apache_beam/io/textio_test.py::TextSourceTest
Run a specific test
pytest -v apache_beam/io/textio_test.py::TextSourceTest::test_progress
Integration Tests (filename: *_it_test.py )
On Direct Runner
python -m pytest -o log_cli=True -o log_level=Info
apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference
--test-pipeline-options='--runner=TestDirectRunner'
On Dataflow Runner
First build SDK tarball
pip install build && python -m build --sdist
Run integration test
python -m pytest -o log_cli=True -o log_level=Info
apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference
--test-pipeline-options='--runner=TestDataflowRunner --project=<project>
--temp_location=gs://<bucket>/tmp
--sdk_location=dist/apache-beam-2.XX.0.dev0.tar.gz
--region=us-central1'
Building Python SDK
Build Source Distribution
cd sdks/python pip install build && python -m build --sdist
Output: sdks/python/dist/apache-beam-X.XX.0.dev0.tar.gz
Build Wheel (faster installation)
./gradlew :sdks:python:bdistPy311linux # For Python 3.11 on Linux
Build and Push SDK Container Image
./gradlew :sdks:python:container:py311:docker
-Pdocker-repository-root=gcr.io/your-project/your-name
-Pdocker-tag=custom
-Ppush-containers
Container image will be pushed to: gcr.io/your-project/your-name/beam_python3.11_sdk:custom
To use this container image, supply it via --sdk_container_image .
Running Pipelines with Modified Code
Install modified SDK
pip install /path/to/apache-beam.tar.gz[gcp]
Run pipeline
python my_pipeline.py
--runner=DataflowRunner
--sdk_location=/path/to/apache-beam.tar.gz
--project=my_project
--region=us-central1
--temp_location=gs://my-bucket/temp
Common Issues
NameError when running DoFn
Global imports, functions, and variables in the main pipeline module are not serialized by default. Use:
--save_main_session
Specifying Additional Dependencies
Use --requirements_file=requirements.txt or custom containers.
Test Markers
- @pytest.mark.it_postcommit
- Include in PostCommit test suite
Gradle Commands for Python
Run WordCount
./gradlew :sdks:python:wordCount
Check environment
./gradlew :checkSetup
Code Quality Tools
Linting
pylint apache_beam/
Type checking
mypy apache_beam/
Formatting (via yapf)
yapf -i apache_beam/file.py
Import sorting
isort apache_beam/file.py