beamcookbook

View project on GitHub

Home

Runtime Arguments Tutorial

Often with DataFlow you will need to define custom inputs when you run the workflow. This example will define a custom option class that we will use to store the runtime variables we need while defining our pipeline.

Launch this in Cloud Shell

cloudshell launch-tutorial docs/java/tutorials/custom_options.md

Open Project Folder CD into tutorial folders

cd tutorials/java/custom-options

Code Walkthrough

AppOptions Class

This class has one runtime value, with a default. When you invoke the pipeline you will use this argument

--minimumValue=5

Open AppOptions.java

  • line 9

    Description of the property

    @Description("Minimum Value") 
    


  • line 10

    Default value, if this is not set the property is required.

    @Default.Integer(5) 
    


  • line 11-12

    Getter and Setter methods for the property.

    Integer getMinimumValue();
    void setMinimumValue(Integer value);
    


Pipeline Class

This class defines the Apache Beam pipeline DAG, it also contains the Java Main method.

Open StarterPipeline.java

  • line 38-39:

    Initializes the Pipeline with our custom AppOptions class. Now we can use the getter methods as needed.

    AppOptions appOptions = PipelineOptionsFactory.fromArgs(args).as(com.gcp.cookbook.AppOptions.class);
    Pipeline p = Pipeline.create(appOptions);
    


  • line 41:

    Populate the pipeline with a list of Integers, so we have some messages to send through the pipeline.

    p.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    


  • line 43:

    Filter out all numbers smaller then the filterPattern.

    .apply(Filter.greaterThan(appOptions.getMinimumValue()))
    


  • line 46-51:

    Log the integers that passed the filter.

    .apply(ParDo.of(new DoFn<Integer, Integer>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
          LOG.info(c.element().toString());
          //return the value
          c.output(c.element());
      }
    }));
    


Run Pipeline

Run Locally

mvn compile exec:java \
    -Dexec.mainClass=com.gcp.cookbook.StarterPipeline \
    -Dexec.args="--runner=DirectRunner"

Run with DataFlow

    export project=<project id>
mvn compile exec:java \
    -Dexec.mainClass=com.gcp.cookbook.StarterPipeline \
    -Dexec.args="--runner=DataFlowRunner --project=${project}"

Deploy Template

To deploy a template we will need a GCS bucket that DataFlow can stage a file in. Create a GCS bucket with a “/staging” sub folder.

    export project=<project id>
    export bucket=<bucket>
mvn compile exec:java \
    -Dexec.mainClass=com.gcp.cookbook.StarterPipeline \
    -Dexec.args="--runner=DataFlowRunner --project=${project} --stagingLocation=gs://${bucket}/staging"