Apache Beam Runners
Overview
Runners execute Beam pipelines on distributed processing backends. Each runner translates the portable Beam model to its native execution engine.
Available Runners
Runner Location Description
Direct runners/direct-java/
Local execution for testing
Prism runners/prism/
Portable local runner
Dataflow runners/google-cloud-dataflow-java/
Google Cloud Dataflow
Flink runners/flink/
Apache Flink
Spark runners/spark/
Apache Spark
Samza runners/samza/
Apache Samza
Jet runners/jet/
Hazelcast Jet
Twister2 runners/twister2/
Twister2
Direct Runner
For local development and testing.
Java
PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); Pipeline p = Pipeline.create(options);
Python
options = PipelineOptions() options.view_as(StandardOptions).runner = 'DirectRunner' p = beam.Pipeline(options=options)
Command Line
--runner=DirectRunner
Dataflow Runner
Prerequisites
-
GCP project with Dataflow API enabled
-
Service account with Dataflow Admin role
-
GCS bucket for staging
Java Usage
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowRunner.class); options.setProject("my-project"); options.setRegion("us-central1"); options.setTempLocation("gs://my-bucket/temp");
Python Usage
options = PipelineOptions([ '--runner=DataflowRunner', '--project=my-project', '--region=us-central1', '--temp_location=gs://my-bucket/temp' ])
Runner v2
--experiments=use_runner_v2
Custom SDK Container
--sdkContainerImage=gcr.io/project/beam_java11_sdk:custom
Flink Runner
Embedded Mode
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); options.setRunner(FlinkRunner.class); options.setFlinkMaster("[local]");
Cluster Mode
options.setFlinkMaster("host:port");
Portable Mode (Python)
options = PipelineOptions([ '--runner=FlinkRunner', '--flink_master=host:port', '--environment_type=LOOPBACK' # or DOCKER, EXTERNAL ])
Spark Runner
Java
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); options.setSparkMaster("local[*]"); # or spark://host:port
Python (Portable)
options = PipelineOptions([ '--runner=SparkRunner', '--spark_master_url=local[*]' ])
Testing with Runners
ValidatesRunner Tests
Tests that validate runner correctness:
Direct Runner
./gradlew :runners:direct-java:validatesRunner
Flink Runner
./gradlew :runners:flink:1.18:validatesRunner
Spark Runner
./gradlew :runners:spark:3:validatesRunner
Dataflow Runner
./gradlew :runners:google-cloud-dataflow-java:validatesRunner
TestPipeline with Runners
@Rule public TestPipeline pipeline = TestPipeline.create();
// Set runner via system property -DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'
Portable Runners
Concept
-
SDK-independent execution via Fn API
-
SDK runs in container, communicates via gRPC
Environment Types
-
DOCKER
-
SDK in Docker container
-
LOOPBACK
-
SDK in same process (testing)
-
EXTERNAL
-
SDK at specified address
-
PROCESS
-
SDK in subprocess
Job Server
Start Flink job server:
./gradlew :runners:flink:1.18:job-server:runShadow
Start Spark job server:
./gradlew :runners:spark:3:job-server:runShadow
Runner-Specific Options
Dataflow
Option Description
--project
GCP project
--region
GCP region
--tempLocation
GCS temp location
--stagingLocation
GCS staging
--numWorkers
Initial workers
--maxNumWorkers
Max workers
--workerMachineType
VM type
Flink
Option Description
--flinkMaster
Flink master address
--parallelism
Default parallelism
--checkpointingInterval
Checkpoint interval
Spark
Option Description
--sparkMaster
Spark master URL
--sparkConf
Additional Spark config
Building Runner Artifacts
Dataflow Worker Jar
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
Flink Job Server
./gradlew :runners:flink:1.18:job-server:shadowJar
Spark Job Server
./gradlew :runners:spark:3:job-server:shadowJar
Debugging
Direct Runner
-
Enable logging: -Dorg.slf4j.simpleLogger.defaultLogLevel=debug
-
Use --targetParallelism=1 for deterministic execution
Dataflow
-
Check Dataflow UI: console.cloud.google.com/dataflow
-
Use --experiments=upload_graph for graph debugging
-
Worker logs in Cloud Logging
Portable Runners
-
Enable debug logging on job server
-
Check SDK harness logs in worker containers