Aggregate a collection
In this tutorial, you will use a group-based aggregation in tabular datastream. You will
- use the
aggregate()
method ofPipelineBuilder()
to aggregate the values of a collection datastream based on a group value, and - use the
aggregate()
method ofPipelineBuilder()
to aggregate the values of a collection datastream in total, and - launch a solution and observe the results.
This lesson will assume that you have an empty project and asset which you can to deploy to a workspace named 03_06_04_aggregate_a_collection
with the following command:
edk template deploy -ycw 03_06_04_aggregate_a_collection
Define and deploy a template
To use the filter()
method you will first perform the following steps:
- define a datasource using a similar pattern as your previous
My Source
definition. - set the value of the datasource to be a
Map<string, { category: string, value: bigint }>()
value. - add the datasource to a template
In an asset, perform the above steps to create the resulting Typescript code:
import { SourceBuilder, Template } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { category: "a", value: 0n }],
["1", { category: "b", value: 15n }],
["2", { category: "a", value: 25n }],
["3", { category: "b", value: 55n }],
])
})
export default Template(my_source)
Define a group-based aggregation
You can define a per-group aggregation to be output with the aggregate()
method of PipelineBuilder()
, by taking the following steps:
- add a new pipeline "My Pipeline"
- add an
aggregate()
operation - define the group value of the aggregate based on a
StringType
expression of thecategory
- define multiple aggregations within the group, being the unique
category
, and the sum of thevalue
- add the new pipeline to the template
In the definition My Pipeline
add the above changes:
import { SourceBuilder, PipelineBuilder, Template, Const, Greater } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { value: 0n }],
["1", { value: 15n }],
["2", { value: 25n }],
["3", { value: 55n }],
])
})
const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.aggregate({
group_value: fields => fields.category,
aggregations: {
category: fields => Unique(fields.category),
sum_of_value: fields => Sum(fields.value)
}
})
export default Template(my_source, my_pipeline)
You should notice the Unique()
and Sum()
standard aggregations, you can any one of the following aggregations (depending on type of value):
Operation | Description |
---|---|
Count | Create an Aggregation to count the number of rows where value is not null. |
DistinctCount | Create an Aggregation to count the number of distinct, non-null values of value . |
Unique | Create an Aggregation to find the unique non-null value of value . If multiple non-null values are encountered, it returns null. |
Sum | Create an Aggregation to find the sum of the non-null values of value (which defaults to field ). |
Mean | Create an Aggregation to find the mean of the non-null values of value (which defaults to field ). |
StdDev | Create an Aggregation to find the standard deviation of the non-null values of value (which defaults to field ). |
CollectSet | Create an Aggregation to find the set of the non-null string values of value (which defaults to field ). |
CollectDict | Create an Aggregation to create a dictionary of distinct key-value pairs. If multiple distinct, non-null values exist for a given key, then the value is null for that key. |
CollectDictCount | Create an Aggregation to create a dictionary of the count of non-null keys |
CollectDictSum | Create an Aggregation to create a dictionary of the sum of non-null values for key-value pairs. |
CollectDictMean | Create an Aggregation to create a dictionary of the mean of non-null values for key-value pairs. |
Any | Create an Aggregation that returns true if one or more value is true . |
Every | Create an Aggregation that returns true if all values are true |
Mode | Create an Aggregation to find the most common, non-null value of value (which defaults to field ). If a tie is encountered, any one of the most-common values is chosen. |
Minimum | Create an Aggregation to find the smallest, non-null value of value (which defaults to field ). |
Maximum | Create an Aggregation to find the largest, non-null value of value (which defaults to field ). |
Span | Create an Aggregation to find the span of the non-null values of value (which defaults to field ), meaning the difference between the largest and smallest values. |
FindMinimum | Create an Aggregation to find the smallest, non-null value of value (which defaults to field ). |
FindMaximum | Create an Aggregation to find the key corresponding to the largest, non-null value of value (which defaults to field ). |
Median | Create an Aggregation to find the median of the non-null values of value (which defaults to field ). In case of a tie, the greater number is returned. |
SparseDictSum | Create an Aggregation to calculate the sum of the values of a "sparse" dictionary where missing elements are presumed to have value of zero. |
SparseDictMean | Create an Aggregation to calculate the average values of a "sparse" dictionary where missing elements are presumed to have value of zero. |
SparseDictVariance | Create an Aggregation to calculate the variance of the values of a "sparse" dictionary where missing elements are presumed to have value of zero. |
SparseDictCovariance | Create an Aggregation to calculate the covariance of the values of a "sparse" dictionary where missing elements are presumed to have value of zero. |
Observe the group-based aggregation
Once deployed, you can test your aggregate()
by observing the value of the Pipeline.My Pipeline
datastream:
edk stream get "Pipeline.My Pipeline" -w 03_06_04_aggregate_a_collection
Which will result in the logs below.
▹▹▹▹▹ Attempting to stream Pipeline.My Pipeline to stdout
[{"key":"a","value":{"category":"a","sum_of_value":"25"}},
{"key":"b","value":{"category":"b","sum_of_value":"70"}}]
✔ Download complete
You can observe that Pipeline.My Pipeline
is a collection datastream containing the sum of value
for each value of category
.
Define a group-less aggregation
You can define a per-group aggregation to be output with the aggregate()
method of PipelineBuilder()
, by taking the following steps:
- add a another pipeline "My Other Pipeline"
- add an
aggregate()
- define a single aggregation, being the sum of the
value
- add the new pipeline to the template
In the definition My Other Pipeline
add the above changes:
import { SourceBuilder, PipelineBuilder, Template, Sum, Unique } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { category: "a", value: 0n }],
["1", { category: "b", value: 15n }],
["2", { category: "a", value: 25n }],
["3", { category: "b", value: 55n }],
])
})
const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.aggregate({
group_value: fields => fields.category,
aggregations: {
category: fields => Unique(fields.category),
sum_of_value: fields => Sum(fields.value)
}
})
const my_other_pipeline = new PipelineBuilder("My Other Pipeline")
.from(my_source.outputStream())
.aggregate({
aggregations: {
sum_of_value: fields => Sum(fields.value)
}
})
export default Template(my_source, my_pipeline, my_other_pipeline);
You will notice that since there is only a single group (i.e. no group), the type of the datastrem Pipeline.My Other Pipeline
is a compound type:
StructType<{
sum_of_value: IntegerType;
}>
In later lessons you will learn how to take advantage of the compound output type.
Observe the group-less aggregation
Once deployed, you can test your aggregate()
by observing the value of the Pipeline.My Other Pipeline
datastream:
edk stream get "Pipeline.My Other Pipeline" -w 03_06_04_aggregate_a_collection
Which will result in the value below.
▹▹▹▹▹ Attempting to stream Pipeline.My Other Pipeline to stdout
{"sum_of_value":"95"}
✔ Download complete
You can observe that Pipeline.My Pipeline
is a compound datastream containing the sum of value
.
Example solution
The code for this tutorial is available below:
Next steps
In the next tutorial, you will use the select()
operation to select parts of a collection datastream.