Apache Beam Core Concepts
The Beam Model
Evolved from Google's MapReduce, FlumeJava, and Millwheel projects. Originally called the "Dataflow Model."
Key Abstractions
Pipeline
A Pipeline encapsulates the entire data processing task, including reading, transforming, and writing data.
// Java Pipeline p = Pipeline.create(options); p.apply(...) .apply(...) .apply(...); p.run().waitUntilFinish();
Python
with beam.Pipeline(options=options) as p: (p | 'Read' >> beam.io.ReadFromText('input.txt') | 'Transform' >> beam.Map(process) | 'Write' >> beam.io.WriteToText('output'))
PCollection
A distributed dataset that can be bounded (batch) or unbounded (streaming).
Properties
-
Immutable - Once created, cannot be modified
-
Distributed - Elements processed in parallel
-
May be bounded or unbounded
-
Timestamped - Each element has an event timestamp
-
Windowed - Elements assigned to windows
PTransform
A data processing operation that transforms PCollections.
// Java PCollection<String> output = input.apply(MyTransform.create());
Python
output = input | 'Name' >> beam.ParDo(MyDoFn())
Core Transforms
ParDo
General-purpose parallel processing.
// Java input.apply(ParDo.of(new DoFn<String, Integer>() { @ProcessElement public void processElement(@Element String element, OutputReceiver<Integer> out) { out.output(element.length()); } }));
Python
class LengthFn(beam.DoFn): def process(self, element): yield len(element)
input | beam.ParDo(LengthFn())
Or simpler:
input | beam.Map(len)
GroupByKey
Groups elements by key.
PCollection<KV<String, Integer>> input = ...; PCollection<KV<String, Iterable<Integer>>> grouped = input.apply(GroupByKey.create());
CoGroupByKey
Joins multiple PCollections by key.
Combine
Combines elements (sum, mean, etc.).
// Global combine input.apply(Combine.globally(Sum.ofIntegers()));
// Per-key combine input.apply(Combine.perKey(Sum.ofIntegers()));
Flatten
Merges multiple PCollections.
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3); PCollection<String> merged = collections.apply(Flatten.pCollections());
Partition
Splits a PCollection into multiple PCollections.
Windowing
Types
-
Fixed Windows - Regular, non-overlapping intervals
-
Sliding Windows - Overlapping intervals
-
Session Windows - Gaps of inactivity define boundaries
-
Global Window - All elements in one window (default)
input.apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))));
input | beam.WindowInto(beam.window.FixedWindows(300))
Triggers
Control when results are emitted.
input.apply(Window.<T>into(FixedWindows.of(Duration.standardMinutes(5))) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1)))) .withAllowedLateness(Duration.standardHours(1)) .accumulatingFiredPanes());
Side Inputs
Additional inputs to ParDo.
PCollectionView<Map<String, String>> sideInput = lookupTable.apply(View.asMap());
mainInput.apply(ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { Map<String, String> lookup = c.sideInput(sideInput); // Use lookup... } }).withSideInputs(sideInput));
Pipeline Options
Configure pipeline execution.
public interface MyOptions extends PipelineOptions { @Description("Input file") @Required String getInput(); void setInput(String value); }
MyOptions options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
Schema
Strongly-typed access to structured data.
@DefaultSchema(AutoValueSchema.class) @AutoValue public abstract class User { public abstract String getName(); public abstract int getAge(); }
PCollection<User> users = ...; PCollection<Row> rows = users.apply(Convert.toRows());
Error Handling
Dead Letter Queue Pattern
TupleTag<String> successTag = new TupleTag<>() {}; TupleTag<String> failureTag = new TupleTag<>() {};
PCollectionTuple results = input.apply(ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { try { c.output(process(c.element())); } catch (Exception e) { c.output(failureTag, c.element()); } } }).withOutputTags(successTag, TupleTagList.of(failureTag)));
results.get(successTag).apply(WriteToSuccess()); results.get(failureTag).apply(WriteToDeadLetter());
Cross-Language Pipelines
Use transforms from other SDKs.
Use Java Kafka connector from Python
from apache_beam.io.kafka import ReadFromKafka
result = pipeline | ReadFromKafka( consumer_config={'bootstrap.servers': 'localhost:9092'}, topics=['my-topic'] )
Best Practices
-
Prefer built-in transforms over custom DoFns
-
Use schemas for type-safe operations
-
Minimize side inputs for performance
-
Handle late data explicitly
-
Test with DirectRunner before deploying
-
Use TestPipeline for unit tests