Join collections
In this tutorial, you will use a row-based relation join in tabular datastream. You will
- use the
innerJoin()
method ofPipelineBuilder()
to join two collection datastreams, and - launch a solution and observe the results.
This lesson will assume that you have an empty project and asset which you can to deploy to a workspace named 03_06_07_join_collections
with the following command:
edk template deploy -ycw 03_06_07_join_collections
Define and deploy a template
To use the innerJoin()
method you will first perform the following steps:
- define two datasources using a similar pattern as your previous
My Source
definition. - for the first, set the value of the datasource to be a
Map<string, { value: bigint }>()
value. - to make the situation more complex, use a key value in the collection with a prefix string
key.
. - for the second, set the value of the datasource to be a
Map<string, { value: string }>()
value. - add both datasources to a template
In an asset, perform the above steps to create the resulting Typescript code:
import { SourceBuilder, Template } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["key.0", { value: 1n }],
["key.1", { value: 15n }],
])
})
const my_other_source = new SourceBuilder("My Other Source")
.value({
value: new Map([
["0", { other_value: "a" }],
["1", { other_value: "b" }],
])
})
export default Template(my_source)
Define a relational join
You can define a per-row inner join to be output with the innerJoin()
method of PipelineBuilder()
. In this exercise, the desired output will combine values from both My Source
, and My Other Source
, such that:
- the keys for both match in the join (one has a prefix of
key.
, one doesn't) - the output value corresponding to the
other_value
with a prefix ofcategory
You can perform the above by taking the following steps:
- add a new pipeline "My Pipeline"
- add
My Other Source
as an input - add an
innerJoin()
operation - define the
right_input
to join in asMy Other Source
- define the
right_key
as the key ofMy Other Source
prefixed withkey.
- define the
right_selection
to prefix the values ofMy Other Source
withcategory
- add the new pipeline to the template
In the definition My Pipeline
add the above changes:
import { SourceBuilder, PipelineBuilder, Template, StringJoin } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["key.0", { value: 1n }],
["key.1", { value: 15n }],
])
})
const my_other_source = new SourceBuilder("My Other Source")
.value({
value: new Map([
["0", { other_value: "a" }],
["1", { other_value: "b" }],
])
})
const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.input({ name: "My Other Source", stream: my_other_source.outputStream() })
.innerJoin({
right_input: (inputs) => inputs["My Other Source"],
right_key: (_fields, key) => StringJoin`key.${key}`,
right_selections: {
other_value: (fields) => StringJoin`category ${fields.other_value}`,
}
})
export default Template(my_source, my_other_source, my_pipeline);
Observe the relation join
Once deployed, you can test your innerJoin()
by observing the value of the Pipeline.My Pipeline
datastream:
edk stream get "Pipeline.My Pipeline" -w 03_06_07_join_collections
Which will result in the value below.
▹▹▹▹▹ Attempting to stream Pipeline.My Pipeline to stdout
[{"key":"key.0","value":{"value":"1","other_value":"category a"}},
{"key":"key.1","value":{"value":"15","other_value":"category b"}}]
✔ Download complete
You can observe that Pipeline.My Pipeline
is a collection datastream, containing the joined values from both Writable.My Source
and Writable.My Other Source
.
Joining isn't limited to leftJoin
of collections, other possible methods are leftJoin()
, rightJoin()
, and outerJoin()
.
Example solution
The code for this tutorial is available below:
Next steps
In the next tutorial, you will use the concatenate()
operation to concatenate two collection datastreams together.