Data Ingestion Optimization in Apache Druid

Data Ingestion Optimization in Apache Druid

Apache Druid is a real-time database to power modern analytics applications. Druid is a right choice for analytical applications that require ad-hoc analytics, instant data visibility, or supporting high concurrency is important. Druid is often used to power UIs where an interactive, consistent user experience is desired.

During data ingestion process, Druid reads the data from your source system and loads in to deep storage ( S3 or HDFS) as per configuration specified in JSON formatted ingestion spec .Then data from deep storage asynchronously distributed into various data nodes (historical nodes) in small data chunks (files) called segments for each data source. A data source (similar to ‘table’ in RDBMS) in Druid can contain multiple segments and these segments spawned across the data nodes.

Ingestion into Druid can be either batch or streaming. In this article, we will focus on batch ingestion optimization techniques. Native batch ingestion (index_parellel) in Druid can be done dynamic, hash or single-dimension partition methods based on the use case. 

Partition Method - Use cases

Single Dimension

If the data has a uniformly distributed column which is frequently used in your queries, consider using single_dim partitionsSpec to maximize the performance of your queries.

Hash Partitioning 

If your data doesn't have a uniformly distributed column, but is expected to have a rollup ratio when you roll up with some dimensions, consider using hashed partitionsSpec.

Dynamic partitioning

If the above two scenarios are not the case or you don't need to roll up your datasource, consider using dynamic partitionsSpec.

Tuning parameters in an ingestion spec

Please note that one or more tuning parameters discussed in this article may associate with only single-dimension partition method ingestion.

Data rollup during ingestion

Druid can roll up data at ingestion time to reduce the amount of raw data to store on disk. Rollup is a form of pre aggregation.

Though rollup is enabled by default, Druid combines  rows that have identical dimension values and timestamp values after queryGranularity-based truncation as per below parameter into a single row.

Query Granularity : What level you need aggregate the metrics for time dimension column – Day/Month.

  “granularitySpec”:{"queryGranularity": "DAY"}	                    

How data rolled up during ingestion with different values for query granularity is shown below.

Data in a source file (pre-ingestion) is given below.

Saledate	SaleAmount
01-Jan-21	10
02-Jan-21	8
02-Jan-21	6
03-Jan-21	3
04-Jan-21	5
04-Jan-21	6
01-Feb-21	4
02-Feb-21	8
03-Feb-21	4
03-Feb-21	6

        

Data in Druid after ingestion with query granularity as ‘Day’ & ‘MONTH’ is given separately below.

QueryGranularity :Day		QueryGranularity :Month
Saledate	SaleAmount		Saledate	SaleAmount

01-Jan-21	10		        01-Jan-21	38
02-Jan-21	14		        01-Feb-21	22
03-Jan-21	3			
04-Jan-21	11			
01-Feb-21	4			
02-Feb-21	8			
03-Feb-21	10			
        

In ingestion spec, we shall name this transformation column (“rowcount”)  to persist how many source rows (“count”) rolled up for a specific target row in Druid data source after ingestion.

  {         "name": "rowcount"

            "type": "count" },
        

where "count" refers to a count-type metric generated at ingestion time.

We can run a below Druid query to find the rollup ratio.

SELECT SUM("rowcount") / COUNT(*)  FROM datasource        

In summary, higher the rollup ratio, higher the diskspace savings and higher the improvement over query response due to data pre-aggregation.

 10000 rows compacted into 500 ingested events. Rollup Ratio =10000/ 500=20
 10000 rows compacted into 250 ingested events. Rollup Ratio =10000/ 250=40
 10000 rows compacted into 100 ingested events. Rollup Ratio =10000/ 100=10        

Pre-sorted data to improve the ingestion performance

During ingestion using single dimension partition method , Data will be sorted to  co-locate the data based on the time dimension and other dimension column specified. Hence it is wise to pre-sort the data in source file itself and then instruct Druid that  data at source is already sorted using below parameter in ingestion spec so that data will be ingested as it is.  

 "assumeGrouped": true        

    

This configuration will make ingestion faster since sort operation during ingestion will be skipped. but if this assumption is violated , data will be stored in sub-optimal partitions.

Optimal nodes required for ingestion

Single-dimension partition Ingestion runs in below phases.

index_parallel is the primary task which drives all these 3 phases and make use output of each phase tasks as an input to the next phase of tasks.\

partial_dimension_distribution – In this phase, phase, the Primary index_parallel task splits the input data and assigns them to worker tasks. Each worker task reads the assigned split and builds a histogram for partition Dimension. The Parallel task collects those histograms from worker tasks and finds the best range partitioning based on partitionDimension to evenly distribute rows across partitions.

partial_range_index_generate - Each new set of worker task reads a split created as in the previous phase, partitions the data by the time chunk from the segmentGranularity in the granularitySpec and then by the range partitioning found in the previous phase. The partitioned data is stored in local storage of the middlemanager node.

partial_index_generic_merge- New set of worker tasks to merge the partitioned data created in the previous phase. Each worker task reads the segments falling in the same partition of the same range from multiple MiddleManager processes and merges them to create the final segments. Finally, they push the final segments to the deep storage.

Let’s take a close look on how many worker tasks were created for a specific batch ingestion from source files located in S3 and data source properties in Druid after ingestion.

Datasource                   : transaction_detail
Rowcount                     :862 millionrows 
Datasource size in Druid     : 49GB
Number of segments created   :113
Avg row size                 : 56
Average row count in segment : 7.73 millions
Ingestion time               : 41 minutes

Numer of tasks in each phase of ingestion
partial_dimension_distribution - 271
partial_range_index_generate   - 271
partial_index_generic_merge    - 460
        

Let’s look into a math to calculate number of  EC2 nodes required to ingest data in above data source into a Druid cluster built on AWS.

Let’s assume we use i3.4xlarge EC2 machine for middle manager for the above ingestion use case and this node has 16 virtual CPU. If we reserve one vcpu for system process, remaining 15 processes are available to spin worker  processes for ingestion.


(Number of i3.4xlarge nodes    =  (Maximum processes required in any     
  required)                                ingestion phase )                                                   
                                   __________________________________
                                                      15

                                = 460/15=30.66 ~ 31

_        

Hence 31   EC2 middle managers with size i3.4xlarge are required to ingest above data.

Though 465 worker processes are available, only 271 worker thread used in phase1 and phase2. Remaining available worker threads are not utilized because  phase2 does not initiate until all worker tasks complete in phase1 and this rule  applies to phase 3 as well.

Ingestion cost     = i3.4xlarge cost x nodes x Ingestiontime (hour) 

                   = 1.248$ x 31 x 41/60 =26.43$
        

We can plan ingestion of other data sources in parallel to optimally leverage  the idle worker processes in first two phases.

Optimize the number of segments & average segment size 

Data in Druid data source partitioned as segments. Since each segment for a data source stored in different data node, the number of segments shall be optimal so that data is accessed using Massively Parallel Process (MPP).

If the number of segment is less, then eventually, data fetch could happen less parallel though many nodes available. In other hand, if there are more segments with small chunks of data in each, it will be overkill to access the data from many nodes and then merge the data sources from various segments.         

Druid documentation says, it is optimal to maintain average segment size around 500MB to 1GB.We can use below tuning parameter in the ingestion spec to stuff the data in each segment so that segment size stay as optimal. 

                 "targetRowsPerSegment": 5000000        

You may be noticed a peculiar fact that size of a table with 500k rows could be more than the size of a table having 10 million rows. It is quite possible that that the table with 10 M rows has only few columns or key value pair where as other table might have many columns and bunch of those columns had values require more bytes to store. 

Hence for a data source with many and wider column values, a smaller value for ‘targetRowsPerSegment’ could enable optimal average segment size.

During development stage, you can test & try the ingestion couple of times with different values for  ‘targetRowsPerSegment’ before concluding an optimal value.

Optimal usage of worker threads

Sometimes you may want to set a quota on available middle manager nodes for a specific ingestion task. We can use below tuning parameter to limit/avail the resources.

             "maxNumConcurrentSubTasks": 500

                   "totalNumMergeTasks": 700        

Speed up partial_index_generic_merge phase 

You can leverage below tuning parameter to enable worker threads in middle manager to use more segments to merge in a single pause.

                    "maxNumSegmentsToMerge": 100        

Conclusion:

We shall understand the impact of the each configuration parameter and try couple of combinations to optimise data ingestion process in Druid to enable evenly data distribution to improve the query response time.Do note that one set of tuning spec will not fit for all your data ingestion loads though they load into the same Druid cluster.

Author : Jayakumar Radhakrishnan has two decades of experience in data engineering and data platforms,MPP Data warehouse appliances,Cloud and Big data Analytics. Passionate about how data can be leveraged to gain insights for an informed decision.

Thanks for sharing, Jayakumar Radhakrishnan. Felt nostalgic. Reminded me of druid experiments during insights days.

Like
Reply

Excellent article with detailed emphasis on tuning . Shows your expertise on Apache #druid . 👍

Like
Reply

To view or add a comment, sign in

More articles by Jayakumar R.

Others also viewed

Explore content categories