In many analytical applications, the performance of queries in BigQuery are better than expected. Typically these queries scan a large number of rows, apply an analytical function and return the result to the calling program in a matter of seconds. However, even in an analytical application, you will often have a combination of analytical queries and some low latency or small queries which scan a narrow range of rows based on a set of filter conditions. In relational databases these type of queries are usually served by index scanning. BigQuery doesn’t have indexes, so theoretically a use case querying a narrow set of rows may not be suitable for BigQuery, but BigQuery has two very powerful features to reduce the volume of data scanned and improve the performance of queries. These are partitioning and clustering.
In this blog post, I will explore the benefits offered by partitioning and clustering, when to use them and choosing the right columns for partitioning and clustering using a synthetic dataset and a couple of hypothetical queries.
Dataset used for testing
The dataset for this performance testing consists of 3 tables with nearly identical schema but differing partitioning and clustering scheme. The table schema is simple and contains 2 categorical columns, one timestamp column and 10 numeric columns. The 2 categorical columns are customer identifier (cust_identifier) and country code.
|Table||Partition Type||Partition Column||Clustering Column||Data Volume (size)|
|demo_table_10c_date_part_cust_cluster||Date based (Daily partition)||DATE(tstamp)||cust_identifier||656.87 GB|
|demo_table_10c_cust_part_date_cluster||Integer Range||Hash of cust_identifier using |
|demo_table_10c_date_part_no_cluster||Date based (Daily partition)||DATE(tstamp)||None||656.87 GB|
- Number of rows : 7,197,026,476
- Number of distinct customers : 1679610
- Number of days of data (number of daily partitions) : 102
Additionally, the demo_table_10c_cust_part_date_cluster table includes a derived column containing the return value of “ABS(MOD(FARM_FINGERPRINT(a.cust_identifier),4000))” which is used for partitioning by customer identifier.
This derived column is computed using a set of functions. First it computes the fingerprint of customer identifiers (which is a STRING) using the FARM_FINGERPRINT Hash function. Then it takes the absolute value of output of the MOD function to put all customer identifiers into 4000 buckets, which is the maximum number of allowed partitions in BigQuery.
The partitioning schema for the timestamp column is the DATE part of the value in tstamp column.
To highlight the benefit of partitioning and clustering, I will be using three sets of queries to represent three different use cases.
- Search one customer data across the entire dataset i.e. filter only on the customer identifier column.
- Search all customers’ data for a given day i.e. filter only on the timestamp column.
- Search one customer data for a given day i.e. filtering on cust_identifier and timestamp column.
Applying these on the three tables will result in 9 distinct queries. These 9 queries are executed multiple times for a different set of input values of customer identifier and date to reduce any anomalies due to environment or input data. To make the testing process easier and repeatable, I have put these into a python code which is available on my github page.
Few examples of these queries are:
Usecase #1 – Query #1
SELECT cust_identifier , tstamp, (attr1 + attr2 -attr3) some_random_calc FROM bq_demo_ds.demo_table_10c_date_part_cust_cluster WHERE cust_identifier = '2A76';
Usecase #1 – Query #2
SELECT cust_identifier , tstamp, (attr1 + attr2 -attr3) some_random_calc FROM bq_demo_ds.demo_table_10c_cust_part_date_cluster WHERE cust_identifier_bucket = ABS(MOD(FARM_FINGERPRINT('2A76'),4000)) AND cust_identifier = '2A76';
SELECT cust_identifier , tstamp, (attr1 + attr2 -attr3) some_random_calc FROM bq_demo_ds.demo_table_10c_date_part_cust_cluster WHERE EXTRACT(DATE FROM tstamp) = '2021-01-30';
SELECT cust_identifier , tstamp, (attr1 + attr2 -attr3) some_random_calc FROM bq_demo_ds.demo_table_10c_date_part_cust_cluster WHERE EXTRACT(DATE FROM tstamp) = '2021-04-27' AND cust_identifier = '2A76';
The second query for Usecase #1 has additional filtering criteria [WHERE cust_identifier_bucket = ABS(MOD(FARM_FINGERPRINT(‘2A76’),4000))] to allow partition pruning. The function used in the filtering is the same as the one used to populate the derived column.
The other queries are pretty straightforward and filters either on customer identifier or timestamp or both.
As you can see in the python code, these queries are tagged as “cnqn” e.g. c1q1, c2q2 etc representing case # and query #, which makes it easier to collect the job statistics from INFORMATION_SCHEMA metadata views to compare bytes processed and query latency in terms of elapsed time in milliseconds. Query caching has been disabled in the QueryJobConfig to ensure that all queries are executed and consume slots (CPU and Memory) as well as issues IOs rather than the result being returned from the BigQuery caching layer.
The code is simple and self explanatory. It builds two arrays containing customer identifiers and timestamp to collect input values. Then it loops over the customer identifier and executes usecase #1 on all tables. Similarly loops over the tstamp values and executes usecase #2 and finally randomly picks customer identifiers and a dates to execute usecase #3.
Collecting Job stats from INFORMATION_SCHEMA metadata view
INFORMATION_SCHEMA JOBS_BY_PROJECT metadata views collects and keeps job execution statistics. These are documented on the BigQuery documentation site. For the purpose of this test, I am using the following query to collect and aggregate the bytes processed, bytes billed and elapsed time in milliseconds for the 9 different queries (c1q1 to c3q3) across multiple runs (as shown in num_exec column).
WITH jqr as ( SELECT SUBSTR(job_id,8,4) as job_group, job_id, start_time, end_time, total_bytes_processed, total_bytes_billed, TIMESTAMP_DIFF(end_time, start_time, MILLISECOND) AS elasped_millisecond FROM `region-europe-west1`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE creation_time BETWEEN TIMESTAMP('2021-12-19 13:41:00') AND TIMESTAMP('2021-12-19 14:41:00') AND job_type = "QUERY" AND job_id LIKE 'cpperf%' AND end_time BETWEEN TIMESTAMP('2021-12-19 13:41:00') AND TIMESTAMP('2021-12-19 14:41:00') AND project_id = 'data-analytics-bk') SELECT job_group, count(*) num_exec, ceiling(avg(total_bytes_processed)/1024/1024) avg_mb_processed_per_run, ceiling(avg(total_bytes_billed)/1024/1024) avg_mb_billed_per_run, ceiling(avg(elasped_millisecond)) avg_millisecond_per_run FROM jqr GROUP BY job_group ORDER by job_group;
Output of the query
The query categories are
- c1- Search one customer data across the entire dataset i.e. filter only on the customer identifier column.
- c2 – Search all customers data for a given day i.e. filter only on the timestamp column.
- c3 – Search one customer data for a given day i.e. filtering on cust_identifier and timestamp column.
Interpreting job statistics / query performance
Plotting the output of the above query using scatter charts highlights the positive impact of clustering in all of these use cases.
“Search one customer data across the entire dataset” queries
Searching one customer data across the dataset is much quicker when a table is partitioned on a hash value of customer identifier (c1q2). On an average it took 256 milliseconds and processed 80 MB of data. However clustering has a positive impact on performance and bytes processed even in the case where partitioning is on timestamp column as evident from the chart above (c1q1). It took on an average of 597 milliseconds and processed 6119 MB per run to search for a customer in the entire dataset. Comparing it with the query time and bytes processed on non clustered tables (c1q3), it is significantly inexpensive (1/42th) and 4x quicker.
“Search all customers’ data for a given day” queries
Searching all customers for a given day is much quicker when the table is partitioned on timestamp (c2q1 and c2q3). This is expected as querying a table partitioned on customer identifier will not be able to prune any partition and will do a full table scan. However clustering on the timestamp column reduces the bytes processed. Hence the MB processed in this case (c2q2) is 172 GB instead of the full table which is 710 GB. So there is still some benefit.
You may have noticed that in spite of very different scanned bytes, the elapsed time for all three queries in the above chart is nearly the same. Query c2q2 scanned 63x more bytes but took only 1.1x longer. This is due to the fact that most of the time spent is in the output stage of the query (nearly 11 seconds) . And given that the number of rows returned by all three queries are identical, the elapsed time is not much different. However querying the timestamp based partitioned table costs 1/63th in comparison to querying the table partitioned using customer identifier when using on-demand pricing. Even for reserved slots, it will be cheaper as evident from the screenshots of two queries below (see the Slot time consumed 30 min vs 5 min).
SELECT /*CASE#2 QUERY#2 */ cust_identifier , tstamp, (attr1 + attr2 -attr3) some_random_calc FROM bq_demo_ds.demo_table_10c_cust_part_date_cluster WHERE EXTRACT(DATE FROM tstamp) = '2021-01-28';
SELECT /*CASE#2 QUERY#3 */ cust_identifier , tstamp, (attr1 + attr2 -attr3) some_random_calc FROM bq_demo_ds.demo_table_10c_date_part_no_cluster WHERE EXTRACT(DATE FROM tstamp) = '2021-01-28';
“Search one customer data for a given day” queries
Searching a customer within a given date range is much quicker and scans fewer bytes when a table is partitioned either on timestamp or customer identifier. In both of these cases, partition pruning will reduce the volume of data to be scanned and processed. Though partition using hash of customer identifier and clustering on timestamp is slightly better.
From the tests carried out, we can see that clustering has a positive impact on query duration and bytes scanned. It can provide a single storage solution for both analytical and small/low latency type queries at no extra complexity or cost.
Clustering doesn;t cost any extra and maintenance overhead to reorder data following a lot of DMLs and loads are not billed to the customer.
For the use cases where a mixed group of queries on high cardinality and low cardinality data is required, it is worth running a similar test and see if adding clustering on an appropriate set of columns helps. You can have upto 4 columns in a cluster.
Another pattern to solve this problem could be to have a pipeline writes data to two sets of tables with differing partitioning and clustering schemes similar to the setup discussed in this blog post.
And if the partitioning and clustering combination doesn’t provide the acceptable performance, you can also set up a combination of BigQuery with a relational database (e.g. CloudSQL or Spanner) or a NoSQL database e.g. BigTable to run analytical queries in BigQuery and small/low latency queries on relational or NoSQL databases. However it will add complexity and cost to the overall solution.
Hope you found this post useful.
Code used to setup and run test cases for this blog post can be found on my github repository.