Building Customisable pipeline using Dataflow Template

Cloud Dataflow offers a unique feature called Dataflow Template to allow building customizable and reusable pipelines. Additionally, it is a much better way to segregate the development, test and production process of creating and running a data pipeline using Apache Beam. The documentation covers plenty of details about templates (classic and flex) as well as a tutorial on how to build and run templates. However, the documented example uses GCS as source and sink. In most of the pipelines that I have helped to build, BigQuery is usually the sink and I found that building templates with BigQuery as a sink has some additional nuances. 

In this blog post, I will demonstrate building and running a classic dataflow template with BigQuery as the sink covering these specific points.

The pipeline is pretty simple as shown below

Code for this pipeline is available on my github page.

At a high level, the pipeline code does the following

  • Reads a CSV file from Google Cloud storage. The uri of the input file is a runtime variable.
  • Creates a dictionary object from the rows of the input CSV file.
  • Add the rows to a BigQuery table whose name is a runtime variable.

To support runtime parameters and their values, you will need to use one of the ValueProvider. There are three types of ValueProviders namely RuntimeValueProvider (default option), StaticValueProvider and NestedValueProvider. Unlike StaticValueProvider, parameter values using RuntimeValueProvider are available at the runtime only and not during pipeline construction and hence they don’t influence the execution graph. The dataflow documentation covers these specifics in detail. 

For this pipeline, RuntimeValueProvider will be used.

In the code snippet below, I am setting two runtime parameters. These are input_file_path and bq_table_id. Note that bq_table_id is the entire table reference in the format dataset_id.bq_table_id. Dataset_id as a standalone runtime parameter is not supported and will throw error when used in the WriteToBigQuery sink io call.

class RunTimeOptions(PipelineOptions):
   @classmethod
   def _add_argparse_args(cls, parser):
       parser.add_value_provider_argument(
           "--input_file_path",
           help="uri of the input file"
       )
       parser.add_value_provider_argument(
           "--bq_table_id",
           help="dataset_id.BQ table id"
       )

Use the RunTimeOptions set above.

user_options = pipeline_options.view_as(RunTimeOptions)

And in the pipeline reference user_options.input_file_path and user_option.bq_table_id to pass the values of these two runtime parameters.

p =  (pipeline | "Read From Input Datafile" >> beam.io.ReadFromText(user_options.input_file_path)
               | "Convert to Dict" >> beam.Map(lambda r: data_ingestion.parse_method(r))
               | "Write to BigQuery Table" >> beam.io.WriteToBigQuery(table=user_options.bq_table_id,
                                                schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
      )

pipeline.run()

Build the template

Build the template using the following command:

 python3 -m <template name>     \
 --runner DataflowRunner     \
 --project <project-name>     \
 --staging_location gs://bk_dataflow_template/staging     \
 --temp_location gs://bk_dataflow_template/temp     \
 --template_location gs://bk_dataflow_template/templates/df_csv_gcs_to_bq \
 --experiment=use_beam_bq_sink 

Option –experiment=use_beam_bq_sink is required as currently dataflow overrides the BigQuery with native version which doesn’t support ValueProvider. 

Run the pipeline using template

Run the pipeline using the template by passing the uri of input CSV file and table reference in the form of dataset_id.table_id. This command will schedule the pipeline to run immediately and return the control. You can check the status of pipeline using Dataflow Console (or CLI).

 gcloud dataflow jobs run template-csv-gcs-to-bq-04 \
 --gcs-location gs://bk_dataflow_template/templates/df_csv_gcs_to_bq \
 --region europe-west2 \
 --parameters input_file_path=gs://da_batch_pipeline/risk_test_data,bq_table_id=da_batch_pipeline.df_test_table 

Hope this post was useful and saved you some time when building your first Dataflow template with BigQuery as a sink.