Concatenate collections
In this tutorial, you will concatenate multiple in tabular datastreams together. You will
- use the
concatenate()
to combine two collection datastreams, and - launch a solution and observe the results.
This lesson will assume that you have an empty project and asset which you can to deploy to a workspace named 03_06_08_concatenate_collections
with the following command:
edk template deploy -ycw 03_06_08_concatenate_collections
Define and deploy a template
To use the concatenate()
method you will first perform the following steps:
- define two datasources using a similar pattern as your previous
My Source
definition. - for the first, set the value of the datasource to be a
Map<string, { value: bigint }>()
value. - for the second, set the value of the datasource to be a
Map<string, { value: bigint }>()
value. - add both datasources to a template
In an asset, perform the above steps to create the resulting Typescript code:
import { SourceBuilder, Template } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { value: 1n }],
["1", { value: 15n }],
])
})
const my_other_source = new SourceBuilder("My Other Source")
.value({
value: new Map([
["0", { value: 2n }],
["1", { value: 16n }],
])
})
export default Template(my_source, my_other_source)
Define a concatenation
You can define a concatenation of values from both My Source
, and My Other Source
with the concatenate()
method of PipelineBuilder()
:
- add a new pipeline "My Pipeline"
- add
My Other Source
as an input - add an
concatenate()
operation - define the
discriminator_value
asMy Source
- define a stream to concatenate as
My Other Source
with adiscriminator_value
ofMy Other Data
- add the new pipeline to the template
In the definition My Pipeline
add the above changes:
import { SourceBuilder, PipelineBuilder, Template, StringJoin } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["key.0", { value: 1n }],
["key.1", { value: 15n }],
])
})
const my_other_source = new SourceBuilder("My Other Source")
.value({
value: new Map([
["0", { value: 2n }],
["1", { value: 16n }],
])
})
const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.input({ name: "My Other Source", stream: my_other_source.outputStream() })
.concatenate({
discriminator_value: "My Data",
inputs: [{
input: inputs => inputs["My Other Source"],
discriminator_value: "My Other Data"
}]
});
export default Template(my_source, my_other_source, my_pipeline);
Observe the concatenation
Once deployed, you can test your innerJoin()
by observing the value of the Pipeline.My Pipeline
datastream:
edk stream get "Pipeline.My Pipeline" -w 03_06_08_concatenate_collections
Which will result in the value below.
▹▹▹▹▹ Attempting to stream Pipeline.My Pipeline to stdout
[{"key":"My Data0","value":{"value":"1","discriminator":"My Data"}},
{"key":"My Data1","value":{"value":"15","discriminator":"My Data"}},
{"key":"My Other Data0","value":{"value":"2","discriminator":"My Other Data"}},
{"key":"My Other Data1","value":{"value":"16","discriminator":"My Other Data"}}]
✔ Download complete
You can observe that Pipeline.My Pipeline
is a collection datastream, containing the values from both Writable.My Source
and Writable.My Other Source
, as well as a discriminator
which each entry.
Example solution
The code for this tutorial is available below:
Next steps
In the next tutorial, you will use the offset()
operation to perform a row-by-row transformation based on relative rows.