Skip to main content

Join collections

In this tutorial, you will use a row-based relation join in tabular datastream. You will

  • use the innerJoin() method of PipelineBuilder() to join two collection datastreams, and
  • launch a solution and observe the results.

This lesson will assume that you have an empty project and asset which you can to deploy to a workspace named 03_06_07_join_collections with the following command:

edk template deploy -ycw 03_06_07_join_collections

Define and deploy a template

To use the innerJoin() method you will first perform the following steps:

  1. define two datasources using a similar pattern as your previous My Source definition.
  2. for the first, set the value of the datasource to be a Map<string, { value: bigint }>() value.
  3. to make the situation more complex, use a key value in the collection with a prefix string key..
  4. for the second, set the value of the datasource to be a Map<string, { value: string }>() value.
  5. add both datasources to a template

In an asset, perform the above steps to create the resulting Typescript code:

import { SourceBuilder, Template } from "@elaraai/core"

const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["key.0", { value: 1n }],
["key.1", { value: 15n }],
])
})

const my_other_source = new SourceBuilder("My Other Source")
.value({
value: new Map([
["0", { other_value: "a" }],
["1", { other_value: "b" }],
])
})

export default Template(my_source)

Define a relational join

You can define a per-row inner join to be output with the innerJoin() method of PipelineBuilder(). In this exercise, the desired output will combine values from both My Source, and My Other Source, such that:

  • the keys for both match in the join (one has a prefix of key., one doesn't)
  • the output value corresponding to the other_value with a prefix of category

You can perform the above by taking the following steps:

  1. add a new pipeline "My Pipeline"
  2. add My Other Source as an input
  3. add an innerJoin() operation
  4. define the right_input to join in as My Other Source
  5. define the right_key as the key of My Other Source prefixed with key.
  6. define the right_selection to prefix the values of My Other Source with category
  7. add the new pipeline to the template

In the definition My Pipeline add the above changes:

import { SourceBuilder, PipelineBuilder, Template, StringJoin } from "@elaraai/core"

const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["key.0", { value: 1n }],
["key.1", { value: 15n }],
])
})

const my_other_source = new SourceBuilder("My Other Source")
.value({
value: new Map([
["0", { other_value: "a" }],
["1", { other_value: "b" }],
])
})

const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.input({ name: "My Other Source", stream: my_other_source.outputStream() })
.innerJoin({
right_input: (inputs) => inputs["My Other Source"],
right_key: (_fields, key) => StringJoin`key.${key}`,
right_selections: {
other_value: (fields) => StringJoin`category ${fields.other_value}`,
}
})

export default Template(my_source, my_other_source, my_pipeline);

Observe the relation join

Once deployed, you can test your innerJoin() by observing the value of the Pipeline.My Pipeline datastream:

edk stream get "Pipeline.My Pipeline"  -w 03_06_07_join_collections

Which will result in the value below.

▹▹▹▹▹ Attempting to stream Pipeline.My Pipeline to stdout
[{"key":"key.0","value":{"value":"1","other_value":"category a"}},
{"key":"key.1","value":{"value":"15","other_value":"category b"}}]
✔ Download complete

You can observe that Pipeline.My Pipeline is a collection datastream, containing the joined values from both Writable.My Source and Writable.My Other Source.

Join types

Joining isn't limited to leftJoin of collections, other possible methods are leftJoin(), rightJoin(), and outerJoin().

Example solution

The code for this tutorial is available below:

Next steps

In the next tutorial, you will use the concatenate() operation to concatenate two collection datastreams together.