Runtime Arguments Tutorial
Often with DataFlow you will need to define custom inputs when you run the workflow. This example will define a custom option class that we will use to store the runtime variables we need while defining our pipeline.
Launch this in Cloud Shell
cloudshell launch-tutorial docs/java/tutorials/custom_options.md
Open Project Folder CD into tutorial folders
cd tutorials/java/custom-options
Code Walkthrough
AppOptions Class
This class has one runtime value, with a default. When you invoke the pipeline you will use this argument
--minimumValue=5
Open
-
line 9 Description of the property
@Description("Minimum Value")
-
line 10 Default value, if this is not set the property is required.
@Default.Integer(5)
-
line 11-12 Getter and Setter methods for the property.
Integer getMinimumValue(); void setMinimumValue(Integer value);
Pipeline Class
This class defines the Apache Beam pipeline DAG, it also contains the Java Main method.
Open
-
line 38-39: Initializes the Pipeline with our custom AppOptions class. Now we can use the getter methods as needed.
AppOptions appOptions = PipelineOptionsFactory.fromArgs(args).as(com.gcp.cookbook.AppOptions.class); Pipeline p = Pipeline.create(appOptions);
-
line 41: Populate the pipeline with a list of Integers, so we have some messages to send through the pipeline.
p.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
-
line 43: Filter out all numbers smaller then the filterPattern.
.apply(Filter.greaterThan(appOptions.getMinimumValue()))
-
line 46-51: Log the integers that passed the filter.
.apply(ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { LOG.info(c.element().toString()); //return the value c.output(c.element()); } }));
Run Pipeline
Run Locally
mvn compile exec:java \
-Dexec.mainClass=com.gcp.cookbook.StarterPipeline \
-Dexec.args="--runner=DirectRunner"
Run with DataFlow
export project=<project id>
mvn compile exec:java \
-Dexec.mainClass=com.gcp.cookbook.StarterPipeline \
-Dexec.args="--runner=DataFlowRunner --project=${project}"
Deploy Template
To deploy a template we will need a GCS bucket that DataFlow can stage a file in. Create a GCS bucket with a “/staging” sub folder.
export project=<project id>
export bucket=<bucket>
mvn compile exec:java \
-Dexec.mainClass=com.gcp.cookbook.StarterPipeline \
-Dexec.args="--runner=DataFlowRunner --project=${project} --stagingLocation=gs://${bucket}/staging"