TL;DR
With intelligent data optimization based on patterns of usage and high priority columns as identified by domain experts, columnar data such as ORC and Parquet have the ability to create a smaller data footprint. This storage operation is idempotent, unchanging. Depending on how the data is currently being stored, the savings can be significant. In the case laid out below, we achieved as much as 70% reduction in both storage size and query duration.
Introduction
So you have created a table. With the hours and days spent setting up the streaming and/or batch pipeline, defining the logic, and working through bugs and edge cases, you have made it, congratulations! You have your production ready code and your table partitions are being populated with data. All that is left is to monitor the pipeline, collect statistics, and let the users build their models and report any issues, right? In this blog post, we will be working under the assumptions that
- we are dealing with large data lakes in the petabytes: partitions with many gigabytes and directories in the terabytes,
- the underlying problem is related to small non-optimal leaf files and nothing else related to cluster setup, architecture, etc.
Small Leaf Files
After some time, our table has written many hours, days, months, etc. of data and our users are reporting that the table is sluggish even when accessing a single partition. Upon investigation, we notice that the pipeline is writing many small leaf files in the partition.
What is the small leaf file problem?
A small file is a file that is much smaller than the HDFS block size. For both parquet and orc files, the optimal file size, for large data, is between 512MB to 1024MB. Since we are dealing with large data, our HDFS block size will be 256MB. That is, our leaf files are 2-4x the block size which would be the optimal settings.
Why are we seeing non-optimal partition sizes?
By default, Spark will write 200 files per partition due to spark.sql.shuffle.partitions=200
for writes that introduce an action(s) which produces a shuffle. In a map-only write, the number of files would be related to the number of initial partitions. In the example that follows, the assumption is that data is being written from data engineering/model pipeline that applies some logic before writing the data which introduces a shuffle (common to most pipeline logic producing business use tables).
However, this number could be higher or lower depending on your cluster setup or user specified settings. Let’s say our table is partitioned in day and hours. For a given day, we have 24 hours and each partition between 10GB and 25GB. Our leaf files would be between 51.2MB and 128MB.
Naive Approach
For this table, we keep a rolling 90 days of data. This table contains 432,000 leaf files no bigger than 128MB and we know the average day of data is roughly 2TB making this directory about 180TB. Using a 3 factor replication, we are looking at a data footprint of 540TB. We know from our colleagues and/or previous experience we need to set up a data compaction job. To do this, we could use coalesce or repartition but the use of repartition will involve a shuffle. Which one should we use? Coalesce because it doesn’t require a shuffle? Considering a partition with 8GB:
Coalesce | Repartition | |
---|---|---|
Shuffle | No | Yes |
Roughly equal size leaf files | Not guaranteed | Yes |
Number of partitions to set | Could set to 8 but coalesce may create a file > 1GB | 8 ideally but repartition may shuffle the data poorly |
Data size | Unchanged | Could change |
In the naive approach, we set a global coalesce or repartition value not a value per partition.
Coalesce
With coalesce, our runtime will be faster since it does not require a shuffle and the data size won’t change. However, coalesce will not guarantee that it distributes the data into the files equally, because of this, we cannot use the value of 8 which would lead files between 128MB and 1024MB++ depending on the partition which would be the most optimal results using a global value.
Repartition
With repartition, we introduce a shuffle. The benefit of the shuffle, though, is that the leaf file sizes will be relatively equal when Spark writes the data. An interesting value is the number of partitions. If repartition will make equal sized leaf files, why should we not use 8? Since a repartition is changing the data, columnar compression benefits can be disrupted if the data had a natural ordering. What this means is a repartition can yield 3 potential results, it could shuffle the data into a better natural ordering and the data will decrease, it could have no immediate effect on the data size, or it could make the data worse actually causing the data to increase in size.
Optimizing Approach
How can we do a better job? First, we need to understand our data. By this, what columns are most important for model building, data analysis, filtering, etc. Are some columns more important than others when using this table? In the case of filtering, we are not considering partition columns since these are taken care of with partition pruning when a user specifies where partition_col = some_partition
.
Linear Ordering
With linear ordering, we want to write our data ordered by the most influential columns in hierarchical order. To determine the columns of interest, it is best to consult with the table owner(s), the power users of this table, and subject matter experts (SME) for these data sources. Also, when linear ordering multiple columns, priority is based on the order they are specified in the clause – that is, the first column has higher impact than the last column in the list. Because of this, picking only a handful or so (no more than 10 but I would suggest less) columns is all that is needed.
Around March to April of 2020, we, True Analytics Tech, demonstrated the benefits of linear ordering on our largest data set. We looked at 2 weeks of data. When we talk about run times in the image below, this was for running a count. In each case, we collected the results for 10 runs after the JVM was warmed up. In a more complex run time test, not shown here, we aggregated some data and joined it back to the original data for 2 weeks. On the optimized data, it ran in around 5 to 10 minutes. We then tested it against cloned data (2 weeks) and the data in the table. Both took 1 hour to get 50% complete. We cloned out 2 weeks of data to run against so we could determine that the longer run times were not related to the complete size of the table.
Naive | Linear Optimization | Reduction | |
---|---|---|---|
Data Size | 14.9TB | 3.4TB | 77% |
Number of Leaf Files | 147,258 | 28,483 | 81% |
Count Avg, Min, Max (sec) | 274, 172, 377 | 26, 23, 32 | 91%, 87%, 92% |
Our data footprint reduction was 77% with run times seeing reductions around 90% in the 2 weeks sampled data. As a result of this outcome, we decided to apply this across the board on our largest data sets and spent 2020 optimizing our incoming data and our older data.
Our data for this table has grown to approximately 2TB per day and by applying linear ordering, we are able to reduce our data footprint by 1.5TB a day, and with 3 factor replication, we have 4.5TB a day in data savings. Below, you will see our data monitoring produced by the data model team who run compaction on one of our bigger datasets. We wait 8 days before we remove the non-optimal data so the data not yet cleaned up will show around 2.5TB since both sources are present.
Our retention policy for this base layer is 90 days, and from this, we have 3 derived aggregate layers by hour, day, and month which we are currently retaining for about 1-2 years. The savings we have been able to exhibit on this singular table with the aggregate rollups has been 1.322PB. Additionally, our Audience team was able to optimize their data storage producing another 1PB of savings. That is, we have currently saved 2.331PB of data as of 17 June 2021.
At True Analytics, we store our data on premise. However, to illustrate the benefits of optimizing your compaction process, we look at Amazon’s pricing tier. We will only be looking at storage saving but keep in mind the end users will experience run time reductions. For the Singapore location, the costs for standard S3 are 0.025 USD for the first 50TB, 0.024 USD for next 450TB, and 0.023 USD thereafter and this is per month. For Amazon pricing, we base this on 1 factor; that is, 2.331 * 1024 / 3 = 795TB. By reducing our data footprint, we would achieve 19,287.04 USD (623,164 THB) per month in cost savings.
Space Filling Curves
Space filling curves are another widely used method we could use to optimize our data. Again, we want to determine the most influential columns and the number of columns to use rule still applies here as well.
Space filling curves come in many flavors:
- Peano
- Koch
- Hilbert
- Morton (Z)
to name a few of the big ones.
Why would we want to use a space filling curve and not just use linear ordering? Linear ordering as present above involves multidimensional indices where each has less and less weight on the querying results. If you are looking for the most optimal way to store your data, reducing the data footprint and being able to map a multi-dimensional index to one dimension is what we need to do. This is where space filling curves come into play.
Here we look at an example of using Morton curve (Z ordering) on a set of (x, y)
coordinates. Consider the set {(1, 1), (1, 8), (2, 1), (3, 1)}
. This set is currently ordered linearly by x, y. In Z ordering, what would happen?
Integer | Binary | Bit Interleave |
---|---|---|
(1, 1) | (0001, 0001) | 00000011 |
(1, 8) | (0001, 1000) | 10000010 |
(2, 1) | (0010, 0001) | 00001001 |
(3, 1) | (0011, 0001) | 00001011. |
If we ordered the data based on the results of bit interleaving, we would have {(1, 1), (2, 1), (3, 1), (1, 8)}
. The y played a role in sorting before x was completely sorted allowing the other variables columns to have an effect on the sorted data.
Space filling curves can allow faster data retrieval times when used with table column statistics. When this happens, the query can skip larger chunks of data minimizing the number of files that need to be traversed. Here, at Analytics Platform, we ran the Z ordering and compared it with our naive and linear results. We saw the data footprint decrease by 16% from linear ordering. In this case, our naive data was 1.5TB, 398GB for linear, and 334GB when Z ordering was applied.
We have not done a full performance test on large data sets with table statistics. We are unable to show the benefits on our data using Z ordering. However, both Amazon and Databricks have articles outlining performance gains when Z ordering is used. For our use case, we neither used an Amazon cluster nor Databricks optimization in Deltalake. Everything was built in house for use on our on premise cluster.
Validation
We ran a validation suite to validate data optimization was an idempotent transformation when it came to the data. First we ran through standard metrics such as counts, min, max, etc on columns for the naive and optimized data and finally hashed both data sets to generate a checksum.
Conclusion
Over the past year and a half since we started using this optimized compaction, we have generated data savings of between 50-70% on our largest data sets. This has improved our cluster utilization, slowed down our rate of server expansion, freed up our operations team from constantly needing to rebalance nodes, and achieved performance gains allowing our data model and data science teams to run their pipelines quicker with less resources. However, we are not done yet. We are expanding data compaction to most of our datasets in our on-premise cluster.