Aggregate data with rollup
Apache Druid can summarize raw data at ingestion time using a process we refer to as "rollup". Rollup is a first-level aggregation operation over a selected set of columns that reduces the size of stored data.
This tutorial will demonstrate the effects of rollup on an example dataset.
For this tutorial, we'll assume you've already downloaded Druid as described in the single-machine quickstart and have it running on your local machine.
It will also be helpful to have finished Load a file and Query data tutorials.
Example data
For this tutorial, we'll use a small sample of network flow event data, representing packet and byte counts for traffic from a source to a destination IP address that occurred within a particular second.
{"timestamp":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":20,"bytes":9024}
{"timestamp":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":255,"bytes":21133}
{"timestamp":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":11,"bytes":5780}
{"timestamp":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":38,"bytes":6289}
{"timestamp":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":377,"bytes":359971}
{"timestamp":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":49,"bytes":10204}
{"timestamp":"2018-01-02T21:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":38,"bytes":6289}
{"timestamp":"2018-01-02T21:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":123,"bytes":93999}
{"timestamp":"2018-01-02T21:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":12,"bytes":2818}
A file containing this sample input data is located at quickstart/tutorial/rollup-data.json
.
We'll ingest this data using the following ingestion task spec, located at quickstart/tutorial/rollup-index.json
.
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "rollup-tutorial",
"dimensionsSpec" : {
"dimensions" : [
"srcIP",
"dstIP"
]
},
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"metricsSpec" : [
{ "type" : "count", "name" : "count" },
{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "week",
"queryGranularity" : "minute",
"intervals" : ["2018-01-01/2018-01-03"],
"rollup" : true
}
},
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/tutorial",
"filter" : "rollup-data.json"
},
"inputFormat" : {
"type" : "json"
},
"appendToExisting" : false
},
"tuningConfig" : {
"type" : "index_parallel",
"partitionsSpec": {
"type": "dynamic"
},
"maxRowsInMemory" : 25000
}
}
}
Rollup has been enabled by setting "rollup" : true
in the granularitySpec
.
Note that we have srcIP
and dstIP
defined as dimensions, a longSum metric is defined for the packets
and bytes
columns, and the queryGranularity
has been defined as minute
.
We will see how these definitions are used after we load this data.
Load the example data
From the apache-druid-{{DRUIDVERSION}} package root, run the following command:
bin/post-index-task --file quickstart/tutorial/rollup-index.json --url http://localhost:8081
After the script completes, we will query the data.
Query the example data
Let's run bin/dsql
and issue a select * from "rollup-tutorial";
query to see what data was ingested.
$ bin/dsql
Welcome to dsql, the command-line client for Druid SQL.
Type "\h" for help.
dsql> select * from "rollup-tutorial";
┌──────────────────────────┬────────┬───────┬─────────┬─────────┬─────────┐
│ __time │ bytes │ count │ dstIP │ packets │ srcIP │
├──────────────────────────┼────────┼───────┼─────────┼─────────┼─────────┤
│ 2018-01-01T01:01:00.000Z │ 35937 │ 3 │ 2.2.2.2 │ 286 │ 1.1.1.1 │
│ 2018-01-01T01:02:00.000Z │ 366260 │ 2 │ 2.2.2.2 │ 415 │ 1.1.1.1 │
│ 2018-01-01T01:03:00.000Z │ 10204 │ 1 │ 2.2.2.2 │ 49 │ 1.1.1.1 │
│ 2018-01-02T21:33:00.000Z │ 100288 │ 2 │ 8.8.8.8 │ 161 │ 7.7.7.7 │
│ 2018-01-02T21:35:00.000Z │ 2818 │ 1 │ 8.8.8.8 │ 12 │ 7.7.7.7 │
└──────────────────────────┴────────┴───────┴─────────┴─────────┴─────────┘
Retrieved 5 rows in 1.18s.
dsql>
Let's look at the three events in the original input data that occurred during 2018-01-01T01:01
:
{"timestamp":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":20,"bytes":9024}
{"timestamp":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":255,"bytes":21133}
{"timestamp":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":11,"bytes":5780}
These three rows have been "rolled up" into the following row:
┌──────────────────────────┬────────┬───────┬─────────┬─────────┬─────────┐
│ __time │ bytes │ count │ dstIP │ packets │ srcIP │
├──────────────────────────┼────────┼───────┼─────────┼─────────┼─────────┤
│ 2018-01-01T01:01:00.000Z │ 35937 │ 3 │ 2.2.2.2 │ 286 │ 1.1.1.1 │
└──────────────────────────┴────────┴───────┴─────────┴─────────┴─────────┘
The input rows have been grouped by the timestamp and dimension columns {timestamp, srcIP, dstIP}
with sum aggregations on the metric columns packets
and bytes
.
Before the grouping occurs, the timestamps of the original input data are bucketed/floored by minute, due to the "queryGranularity":"minute"
setting in the ingestion spec.
Likewise, these two events that occurred during 2018-01-01T01:02
have been rolled up:
{"timestamp":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":38,"bytes":6289}
{"timestamp":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":377,"bytes":359971}
┌──────────────────────────┬────────┬───────┬─────────┬─────────┬─────────┐
│ __time │ bytes │ count │ dstIP │ packets │ srcIP │
├──────────────────────────┼────────┼───────┼─────────┼─────────┼─────────┤
│ 2018-01-01T01:02:00.000Z │ 366260 │ 2 │ 2.2.2.2 │ 415 │ 1.1.1.1 │
└──────────────────────────┴────────┴───────┴─────────┴─────────┴─────────┘
For the last event recording traffic between 1.1.1.1 and 2.2.2.2, no rollup took place, because this was the only event that occurred during 2018-01-01T01:03
:
{"timestamp":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":49,"bytes":10204}
┌──────────────────────────┬────────┬───────┬─────────┬─────────┬─────────┐
│ __time │ bytes │ count │ dstIP │ packets │ srcIP │
├──────────────────────────┼────────┼───────┼─────────┼─────────┼─────────┤
│ 2018-01-01T01:03:00.000Z │ 10204 │ 1 │ 2.2.2.2 │ 49 │ 1.1.1.1 │
└──────────────────────────┴────────┴───────┴─────────┴─────────┴─────────┘
Note that the count
metric shows how many rows in the original input data contributed to the final "rolled up" row.