Dataflow Pipeline to Ingest into Multiple BigQuery Tables using Dynamic Destination and Side Input

Ingesting data from transactional systems to data warehouses or data lakes for analysis is one of the commonly used patterns. On Google Cloud, we have seen a variety of data pipelines to bring data from transactional systems/databases to Google Cloud Storage and/or BigQuery. Some of these patterns are

  • Batch Pipeline where data is pushed from Transactional systems to Google Cloud Storage and then loaded from Storage into BigQuery.
  • Streaming Pipeline where Data is pushed to PubSub or Kafka and then use Dataflow to ingest into BigQuery
  • Using partner solutions such as Striim to replicate data directly from Transactional databases to BigQuery.

Out of the above mentioned patterns, real-time streaming to BigQuery using PubSub and Dataflow is becoming increasingly popular. And to make it easier for users, Google Cloud provides a set of pre-built templates to ingest data from PubSub to BigQuery. However pipelines created using these templates can load data only to a single BigQuery table. And hence it’s not suitable for scenarios where you need to bring 100s of tables to BigQuery.

Dataflow has a feature called Dynamic Destination which allows every element in the PCollection to be inspected and sent to a destination determined based on an identifier in the data. In this blog post, I will delve into the details of how to build a single pipeline that can be used to ingest data from a number of source tables into a number of BigQuery tables via a single PubSub Topic using Dynamic Destination and Side Input.

However, note that Dynamic Destination is not the only way to solve this problem. The other options are:

  • One pipeline per table using Google Provided template such as PubSub Topic to BigQuery. This is the easiest low code option and is suitable when you are bringing data from only a handful of tables.
  • Multiple independent pipelines mapping a topic to a table. Code for this option will be  simpler. In fact you can use one of the Google provided template code and modify it to loop over for every source topic to the corresponding sink table. It will work fine even for a large number of tables however the Job Graph will not look pretty ! You will find a code snippet and some further details on this approach later in this post.

Solution using Dynamic Destination and Side Input

Dynamic destination feature in Apache Beam allows you to write elements in a PCollection to different BigQuery tables with different schema. In this example, I am using Side Input to provide the schema of the table to the main pipeline.

An example code is available at my Github page and a sample schema configuration file for Side Input is here. Note that the table name in schema configuration is in format <projectid>:<datasetid>.<tablename>.

A high level description of the  solution is:

  • Source systems ingest data in a PubSub topic.
  • A Side Input PCollection containing schema definition of target tables is created from a file stored on Cloud Storage.
  • Main PCollection is created from data from PubSub topic. The PubSub topic receives data in JSON format which is converted to a Python dictionary.
  • Each element is inspected to determine the destination table. Note that I have used a “tablename” tag in the data stream, but it can be any identifier as long as a destination table can be determined.  
  • For the given table, schema is provided to the Pipeline from the Side Input PCollection.
  • Table will be created if it doesn’t exist.
  • And rows will be appended to the table.

This approach is completely configuration driven. In fact using the Slowly Changing Side Input Pattern, you can extend it to create a pipeline which can handle addition of new tables without a need to restart the pipeline. Another advantage is that input from all tables are ingested to a single topic, hence simplifying the configuration.

The key here is to identify the destination table from the elements in main PCollection and then get the schema of the destination table as demonstrated in the following code snippet:

(
pipeline | "Read Data From Input Topic" >> beam.io.ReadFromPubSub(topic=data_topic)
         | "Get Table data from input row" >> beam.Map(lambda r : data_ingestion.getData(r))
         | "Write to BigQuery Table" >>   beam.io.WriteToBigQuery(table = lambda row: row['tablename'],
                                            schema = lambda table, schema_coll : schema_coll[table],
                                            schema_side_inputs=(schema_coll,),                                                       
                                            create_disposition='CREATE_IF_NEEDED',                                                          
                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)

You may think that the performance will not be as good as a separate pipeline because of the fact that there is only one PubSub topic to read from and a single sink IO. Based on limited testing using a Airflow based test harness to ingest between 500 – 1000 records/second across 8 different tables for over a day, I didn’t see any performance issue. The mean system latency is about 4 sec with 10 worker nodes of type n1-standard-2. Of course performance and latency are use case specific, but  in general use of dynamic destination and side input doesn’t appear to be causing any performance issues.

The Job Graph for this pipeline is shown below.

Solution using Multiple Parallel Pipelines inside one Job

As mentioned above, another option is to use multiple parallel pipelines in one dataflow job where data from source tables are ingested to separate PubSub topics.

Code snippet for this pattern is shown below . In this example, the source data is read from a file on Cloud Storage instead of PubSub. So for the streaming pipeline, changing from Cloud Storage to PubSub will work. Key points to note that a config file has a list of tables, URI (GCS location) of the source data and schema for the tables. It iterates over this list, reads data from GCS URIs and load into BigQuery tables.

# Read configuration from a file on GCS. Configuration data will contain Sourcefile Name, Target table name, Target Table Schema in JSON format. For example:

{
    "tablelist" : [
      {"sourcefile" : "gs://bk-dataflow-experiment/data/cust_data.json", "targettable" : "da_belgium_dataset.cust_data", "schemauri" : "gs://bk-dataflow-experiment/schema/schema_cust_data.txt" },
      {"sourcefile" : "gs://bk-dataflow-experiment/data/sales_data.json", "targettable" : "da_belgium_dataset.sales_data", "schemauri" : "gs://bk-dataflow-experiment/schema/schema_sales_data.txt" }
    ]
}

…

cdata = readconfigfile(config_file_name)


# Iterate for all source files, tables in the configuration.
with beam.Pipeline(options=pipeline_options) as pipeline:
   for p in cdata['tablelist']:
       i_file_path = p['sourcefile']
       schemauri = p['schemauri']
       schema=getschema(schemauri)
       dest_table_id = p['targettable']

       (   pipeline | "Read From Input Datafile" + dest_table_id >> beam.io.ReadFromText(i_file_path)
                    | "Convert to Dict" + dest_table_id >> beam.Map(lambda r: data_ingestion.parse_method(r))
                    | "Write to BigQuery Table" + dest_table_id >> beam.io.WriteToBigQuery('{0}:{1}'.format(project_name, dest_table_id),
                                                                       schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
       )

An example code to demonstrate this option is available at my Github page along with a sample configuration file.

The Job Graph for this pipeline will look like the following. It will have one ‘Leg’ for each source/sink combination. My example config file has only two tables hence only 2 legs in the JOB Graph. 

Hope this post was useful and saved you some time when deciding on pattern to build dataflow pipelines to ingest data from multiple source tables to multiple tables in BigQuery.