Join collections
In this tutorial, you will use a row-based relation join in tabular datastream. You will
- use the
innerJoin()method ofPipelineBuilder()to join two collection datastreams, and - launch a solution and observe the results.
This lesson will assume that you have an empty project and asset which you can to deploy to a workspace named 03_06_07_join_collections with the following command:
edk template deploy -ycw 03_06_07_join_collections
Define and deploy a template
To use the innerJoin() method you will first perform the following steps:
- define two datasources using a similar pattern as your previous
My Sourcedefinition. - for the first, set the value of the datasource to be a
Map<string, { value: bigint }>()value. - to make the situation more complex, use a key value in the collection with a prefix string
key.. - for the second, set the value of the datasource to be a
Map<string, { value: string }>()value. - add both datasources to a template
In an asset, perform the above steps to create the resulting Typescript code:
import { SourceBuilder, Template } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["key.0", { value: 1n }],
["key.1", { value: 15n }],
])
})
const my_other_source = new SourceBuilder("My Other Source")
.value({
value: new Map([
["0", { other_value: "a" }],
["1", { other_value: "b" }],
])
})
export default Template(my_source)
Define a relational join
You can define a per-row inner join to be output with the innerJoin() method of PipelineBuilder(). In this exercise, the desired output will combine values from both My Source, and My Other Source, such that:
- the keys for both match in the join (one has a prefix of
key., one doesn't) - the output value corresponding to the
other_valuewith a prefix ofcategory
You can perform the above by taking the following steps:
- add a new pipeline "My Pipeline"
- add
My Other Sourceas an input - add an
innerJoin()operation - define the
right_inputto join in asMy Other Source - define the
right_keyas the key ofMy Other Sourceprefixed withkey. - define the
right_selectionto prefix the values ofMy Other Sourcewithcategory - add the new pipeline to the template
In the definition My Pipeline add the above changes:
import { SourceBuilder, PipelineBuilder, Template, StringJoin } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["key.0", { value: 1n }],
["key.1", { value: 15n }],
])
})
const my_other_source = new SourceBuilder("My Other Source")
.value({
value: new Map([
["0", { other_value: "a" }],
["1", { other_value: "b" }],
])
})
const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.input({ name: "My Other Source", stream: my_other_source.outputStream() })
.innerJoin({
right_input: (inputs) => inputs["My Other Source"],
right_key: (_fields, key) => StringJoin`key.${key}`,
right_selections: {
other_value: (fields) => StringJoin`category ${fields.other_value}`,
}
})
export default Template(my_source, my_other_source, my_pipeline);
Observe the relation join
Once deployed, you can test your innerJoin() by observing the value of the Pipeline.My Pipeline datastream:
edk stream get "Pipeline.My Pipeline" -w 03_06_07_join_collections
Which will result in the value below.
▹▹▹▹▹ Attempting to stream Pipeline.My Pipeline to stdout
[{"key":"key.0","value":{"value":"1","other_value":"category a"}},
{"key":"key.1","value":{"value":"15","other_value":"category b"}}]
✔ Download complete
You can observe that Pipeline.My Pipeline is a collection datastream, containing the joined values from both Writable.My Source and Writable.My Other Source.
Joining isn't limited to leftJoin of collections, other possible methods are leftJoin(), rightJoin(), and outerJoin().
Example solution
The code for this tutorial is available below:
Next steps
In the next tutorial, you will use the concatenate() operation to concatenate two collection datastreams together.