Skip to main content

Concatenate collections

In this tutorial, you will concatenate multiple in tabular datastreams together. You will

  • use the concatenate() to combine two collection datastreams, and
  • launch a solution and observe the results.

This lesson will assume that you have an empty project and asset which you can to deploy to a workspace named 03_06_08_concatenate_collections with the following command:

edk template deploy -ycw 03_06_08_concatenate_collections

Define and deploy a template

To use the concatenate() method you will first perform the following steps:

  1. define two datasources using a similar pattern as your previous My Source definition.
  2. for the first, set the value of the datasource to be a Map<string, { value: bigint }>() value.
  3. for the second, set the value of the datasource to be a Map<string, { value: bigint }>() value.
  4. add both datasources to a template

In an asset, perform the above steps to create the resulting Typescript code:

import { SourceBuilder, Template } from "@elaraai/core"

const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { value: 1n }],
["1", { value: 15n }],
])
})

const my_other_source = new SourceBuilder("My Other Source")
.value({
value: new Map([
["0", { value: 2n }],
["1", { value: 16n }],
])
})

export default Template(my_source, my_other_source)

Define a concatenation

You can define a concatenation of values from both My Source, and My Other Source with the concatenate() method of PipelineBuilder():

  1. add a new pipeline "My Pipeline"
  2. add My Other Source as an input
  3. add an concatenate() operation
  4. define the discriminator_value as My Source
  5. define a stream to concatenate as My Other Source with a discriminator_value of My Other Data
  6. add the new pipeline to the template

In the definition My Pipeline add the above changes:

import { SourceBuilder, PipelineBuilder, Template, StringJoin } from "@elaraai/core"

const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["key.0", { value: 1n }],
["key.1", { value: 15n }],
])
})

const my_other_source = new SourceBuilder("My Other Source")
.value({
value: new Map([
["0", { value: 2n }],
["1", { value: 16n }],
])
})

const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.input({ name: "My Other Source", stream: my_other_source.outputStream() })
.concatenate({
discriminator_value: "My Data",
inputs: [{
input: inputs => inputs["My Other Source"],
discriminator_value: "My Other Data"
}]
});

export default Template(my_source, my_other_source, my_pipeline);

Observe the concatenation

Once deployed, you can test your innerJoin() by observing the value of the Pipeline.My Pipeline datastream:

edk stream get "Pipeline.My Pipeline"  -w 03_06_08_concatenate_collections

Which will result in the value below.

▹▹▹▹▹ Attempting to stream Pipeline.My Pipeline to stdout
[{"key":"My Data0","value":{"value":"1","discriminator":"My Data"}},
{"key":"My Data1","value":{"value":"15","discriminator":"My Data"}},
{"key":"My Other Data0","value":{"value":"2","discriminator":"My Other Data"}},
{"key":"My Other Data1","value":{"value":"16","discriminator":"My Other Data"}}]
✔ Download complete

You can observe that Pipeline.My Pipeline is a collection datastream, containing the values from both Writable.My Source and Writable.My Other Source, as well as a discriminator which each entry.

Example solution

The code for this tutorial is available below:

Next steps

In the next tutorial, you will use the offset() operation to perform a row-by-row transformation based on relative rows.