Skip to main content

Aggregate a collection

In this tutorial, you will use a group-based aggregation in tabular datastream. You will

  • use the aggregate() method of PipelineBuilder() to aggregate the values of a collection datastream based on a group value, and
  • use the aggregate() method of PipelineBuilder() to aggregate the values of a collection datastream in total, and
  • launch a solution and observe the results.

This lesson will assume that you have an empty project and asset which you can to deploy to a workspace named 03_06_04_aggregate_a_collection with the following command:

edk template deploy -ycw 03_06_04_aggregate_a_collection

Define and deploy a template

To use the filter() method you will first perform the following steps:

  1. define a datasource using a similar pattern as your previous My Source definition.
  2. set the value of the datasource to be a Map<string, { category: string, value: bigint }>() value.
  3. add the datasource to a template

In an asset, perform the above steps to create the resulting Typescript code:

import { SourceBuilder, Template } from "@elaraai/core"

const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { category: "a", value: 0n }],
["1", { category: "b", value: 15n }],
["2", { category: "a", value: 25n }],
["3", { category: "b", value: 55n }],
])
})

export default Template(my_source)

Define a group-based aggregation

You can define a per-group aggregation to be output with the aggregate() method of PipelineBuilder(), by taking the following steps:

  1. add a new pipeline "My Pipeline"
  2. add an aggregate() operation
  3. define the group value of the aggregate based on a StringType expression of the category
  4. define multiple aggregations within the group, being the unique category, and the sum of the value
  5. add the new pipeline to the template

In the definition My Pipeline add the above changes:

import { SourceBuilder, PipelineBuilder, Template, Const, Greater } from "@elaraai/core"

const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { value: 0n }],
["1", { value: 15n }],
["2", { value: 25n }],
["3", { value: 55n }],
])
})

const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.aggregate({
group_value: fields => fields.category,
aggregations: {
category: fields => Unique(fields.category),
sum_of_value: fields => Sum(fields.value)
}
})

export default Template(my_source, my_pipeline)

You should notice the Unique() and Sum() standard aggregations, you can any one of the following aggregations (depending on type of value):

OperationDescription
CountCreate an Aggregation to count the number of rows where value is not null.
DistinctCountCreate an Aggregation to count the number of distinct, non-null values of value.
UniqueCreate an Aggregation to find the unique non-null value of value. If multiple non-null values are encountered, it returns null.
SumCreate an Aggregation to find the sum of the non-null values of value (which defaults to field).
MeanCreate an Aggregation to find the mean of the non-null values of value (which defaults to field).
StdDevCreate an Aggregation to find the standard deviation of the non-null values of value (which defaults to field).
CollectSetCreate an Aggregation to find the set of the non-null string values of value (which defaults to field).
CollectDictCreate an Aggregation to create a dictionary of distinct key-value pairs. If multiple distinct, non-null values exist for a given key, then the value is null for that key.
CollectDictCountCreate an Aggregation to create a dictionary of the count of non-null keys
CollectDictSumCreate an Aggregation to create a dictionary of the sum of non-null values for key-value pairs.
CollectDictMeanCreate an Aggregation to create a dictionary of the mean of non-null values for key-value pairs.
AnyCreate an Aggregation that returns true if one or more value is true.
EveryCreate an Aggregation that returns true if all values are true
ModeCreate an Aggregation to find the most common, non-null value of value (which defaults to field). If a tie is encountered, any one of the most-common values is chosen.
MinimumCreate an Aggregation to find the smallest, non-null value of value (which defaults to field).
MaximumCreate an Aggregation to find the largest, non-null value of value (which defaults to field).
SpanCreate an Aggregation to find the span of the non-null values of value (which defaults to field), meaning the difference between the largest and smallest values.
FindMinimumCreate an Aggregation to find the smallest, non-null value of value (which defaults to field).
FindMaximumCreate an Aggregation to find the key corresponding to the largest, non-null value of value (which defaults to field).
MedianCreate an Aggregation to find the median of the non-null values of value (which defaults to field). In case of a tie, the greater number is returned.
SparseDictSumCreate an Aggregation to calculate the sum of the values of a "sparse" dictionary where missing elements are presumed to have value of zero.
SparseDictMeanCreate an Aggregation to calculate the average values of a "sparse" dictionary where missing elements are presumed to have value of zero.
SparseDictVarianceCreate an Aggregation to calculate the variance of the values of a "sparse" dictionary where missing elements are presumed to have value of zero.
SparseDictCovarianceCreate an Aggregation to calculate the covariance of the values of a "sparse" dictionary where missing elements are presumed to have value of zero.

Observe the group-based aggregation

Once deployed, you can test your aggregate() by observing the value of the Pipeline.My Pipeline datastream:

edk stream get "Pipeline.My Pipeline"  -w 03_06_04_aggregate_a_collection

Which will result in the logs below.

▹▹▹▹▹ Attempting to stream Pipeline.My Pipeline to stdout
[{"key":"a","value":{"category":"a","sum_of_value":"25"}},
{"key":"b","value":{"category":"b","sum_of_value":"70"}}]
✔ Download complete

You can observe that Pipeline.My Pipeline is a collection datastream containing the sum of value for each value of category.

Define a group-less aggregation

You can define a per-group aggregation to be output with the aggregate() method of PipelineBuilder(), by taking the following steps:

  1. add a another pipeline "My Other Pipeline"
  2. add an aggregate()
  3. define a single aggregation, being the sum of the value
  4. add the new pipeline to the template

In the definition My Other Pipeline add the above changes:

import { SourceBuilder, PipelineBuilder, Template, Sum, Unique } from "@elaraai/core"

const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { category: "a", value: 0n }],
["1", { category: "b", value: 15n }],
["2", { category: "a", value: 25n }],
["3", { category: "b", value: 55n }],
])
})

const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.aggregate({
group_value: fields => fields.category,
aggregations: {
category: fields => Unique(fields.category),
sum_of_value: fields => Sum(fields.value)
}
})

const my_other_pipeline = new PipelineBuilder("My Other Pipeline")
.from(my_source.outputStream())
.aggregate({
aggregations: {
sum_of_value: fields => Sum(fields.value)
}
})

export default Template(my_source, my_pipeline, my_other_pipeline);
Compound datastream type

You will notice that since there is only a single group (i.e. no group), the type of the datastrem Pipeline.My Other Pipeline is a compound type:

StructType<{
sum_of_value: IntegerType;
}>

In later lessons you will learn how to take advantage of the compound output type.

Observe the group-less aggregation

Once deployed, you can test your aggregate() by observing the value of the Pipeline.My Other Pipeline datastream:

edk stream get "Pipeline.My Other Pipeline"  -w 03_06_04_aggregate_a_collection

Which will result in the value below.

▹▹▹▹▹ Attempting to stream Pipeline.My Other Pipeline to stdout
{"sum_of_value":"95"}
✔ Download complete

You can observe that Pipeline.My Pipeline is a compound datastream containing the sum of value.

Example solution

The code for this tutorial is available below:

Next steps

In the next tutorial, you will use the select() operation to select parts of a collection datastream.