BigQuery – Collect offending rows in Data loading

Cloud native data warehouse products have become quite popular in last few years primarily due to low administration overhead. Some of these products are closer to being an Infrastructure service i.e. developer needs to define CPU, Memory or Storage while others are platform service. In this space, Google’s BigQuery is a pure Serverless offering where you do not need to specify any infrastructure or database related attributes. You simply bring your data and query to start using it. It’s an excellent product for analytics workload mainly because of its simplicity, speed and extensive APIs.

Data Warehouse applications involve loading huge amount of data from various sources and in a variety of formats. It’s quite likely that some of the source files (or few records in the source files) may not conform to the schema and therefore will fail to load. When you are building a data pipeline, you would like to know which records are failing so that you can either fix the data and reload or amend your schema to be more forgiving.

In this post ( my first one on BigQuery) we are going to look at how to collect offending records and write those out to a separate file (or standard output) for later inspection.
There are many ways of loading data into BigQuery. For this post, I am using load_table_from_file python API, however same principle applies to loading data from Google Cloud Storage.

To demonstrate the working of API’s error tracking and writing out to standard output or to a file, I am using a simple setup of one table (t_person_info) in my dataset, a datafile with a mixture of good and bad records and a python script (load_t_person_info.py) to load data. All of these are available from my github repository.

The table has two required columns and an optional nested and repeated column as shown below:


bipulk-macbookpro:~ bipulk$ bq show bq_learn.t_person_info
Table bipul-test-001:bq_learn.t_person_info

Last modified Schema Total Rows Total Bytes Expiration Time Partitioning Labels
----------------- ---------------------------------- ------------ ------------- ------------ ------------------- --------
11 Nov 14:05:34 |- first_name: string (required) 42 1675
                |- last_name: string (required)
                +- jobs: record (repeated)
                | |- employer: string
                | |- office_postcode: string

bipulk-macbookpro:~<span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span> bipulk$

The code load_t_person_info.py reads the datafile and insert records in the table. As shown below, before loading data, I am setting up few configuration parameters to specify datafile format as JSON (line #2). Also, specifying that the table already exists (line #4) and so do not detect the schema (line # 3) and append rows to the table (line #6). Another configuration parameter “max_bad_records” (line # 5)  is set to total number of records to ensure that full data file is tried before the loader gives up.

job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.autodetect = False
job_config.create_disposition = bigquery.job.CreateDisposition.CREATE_NEVER
job_config.max_bad_records = num_lines
job_config.write_disposition = bigquery.job.WriteDisposition.WRITE_APPEND
Once the job is configured, open the datafile and load it using BQ client object created previously in the script.
with open(fname ,'rb') as source_file:
    job = client.load_table_from_file(source_file, table_ref, location='US', job_config=job_config)
try:
    job.result()
    print('Loaded {} rows into {}:{}'.format(job.output_rows, datasetid, tableid))
    print('Job id {}'.format(job.job_id))
    # if the file loading encounters error, then open the source file for printing bad records
    if len(job.errors) &gt; 0:
        print('number of errors {}'.format(len(job.errors)))
        fp = open(fname,'r')

    for i in range(0,len(job.errors)):
        print('Error # - {}  Error Message - {}'.format(i,job.errors[i]['message']))
        # Get the location of error
        err_loc = re.findall(r'\d+', job.errors[i]['message'])
        # seek to error location
        try:
            fp.seek(int(err_loc[0]))
            print('Input data - {}'.format(fp.readline()))
        except:
             <span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span>raise

In the above code snippet, job.result() (line # 4) will attempt to load json objects from the datafile to table and if it encounters any error e.g. bad records, it collects those in the job.errors list. The next part in the code then checks if there are any element in the errors list, (line # 8 ) get the error location from the list (line # 15), get the offending record (line # 18) and print it.

A sample run of this script produces the following output, printing out the offending records to standard output.

bipulk-macbookpro:bq_learn bipulk$ ./load_t_person_info.py
Loading from file person_tab_data.json into table t_person_info
Loaded 3 rows into bq_learn:t_person_info
Job id 6b8caa79-d869-4197-9f77-28f0cca1c9a9
number of errors 3
Error # - 0 Error Message - Error while reading data, error message: JSON parsing error in row starting at position 489: Missing required field: first_name.
Input data - {"last_name": "Edward", "jobs": [{"employer": "Mega Retailer", "office_postcode" : "NW1"},{"employer": "CorenerStore", "office_postcode" : "HA1"}]}

Error # - 1 Error Message - Error while reading data, error message: JSON parsing error in row starting at position 637: Missing required field: last_name.
Input data - {"first_name": "Steve", "jobs": [{"employer": "ABigInsurer", "office_postcode" : "SW5"},{"employer": "ASmallInsurer", "office_postcode" : "HA5"}]}

Error # - 2 Error Message - Error while reading data, error message: JSON parsing error in row starting at position 784: Missing required field: last_name.
Input data - {"first_name": "Richard", "jobs": [{"employer": "Earth Media", "office_postcode" : "SW15"},{"employer": "Moon Media", "office_postcode" : "EC5"}]}
bipulk-macbookpro:bq_learn bipulk$

The ability to collect bad records in the same code can be used very effectively in building data pipelines where you need to collect errors in a separate bin.

Hope this will help in your data engineering when using BigQuery !