Skip to main content

Validate a collection

In this tutorial, you will use a row-based validation in tabular datastream. You will

  • use the errorEvery(), warnEvery(), logEvery() methods of PipelineBuilder() to define validation to be performed on each row of a collection datastream, and
  • launch a solution and observe the results.

This lesson will assume that you have an empty project and asset which you can to deploy to a workspace named 03_06_02_validate_a_collection with the following command:

edk template deploy -ycw 03_06_02_validate_a_collection

Define and deploy a template

To use the errorEvery(), warnEvery(), logEvery() methods you will first perform the following steps:

  1. define a datasource using a similar pattern as your previous My Source definition.
  2. set the value of the datasource to be a Map<string, { value: bigint }>() value.
  3. add the datasource to a template

In an asset, perform the above steps to create the resulting Typescript code:

import { SourceBuilder, Template } from "@elaraai/core"

const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { value: 0n }],
["1", { value: 15n }],
["2", { value: 25n }],
["3", { value: 55n }],
])
})

export default Template(my_source)

Define an error

You can define a per-row error to be output with the errorEvery() method of PipelineBuilder(). Just like the error() method, you provide a Typescript object with the properties

  • if where you define the BooleanType expression to be evaluated as the error logic, and
  • message where you define a custom message (as a StringType expression) to return should the if expression evaluate to False for a row.

You can use errorEvery() in your pipeline by taking the following steps:

  1. add a new pipeline "My Pipeline"
  2. add an errorEvery() operation
  3. define the if condition, to create an error if the value property in a row is greater than 50
  4. define the message based on the value, and key
  5. add the new pipeline to the template

In the definition My Pipeline add the above changes:

import { SourceBuilder, PipelineBuilder, Template, Const, StringJoin, Greater } from "@elaraai/core"

const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { value: 0n }],
["1", { value: 15n }],
["2", { value: 25n }],
["3", { value: 55n }],
])
})

const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.errorEvery({
if: fields => Greater(fields.value, Const(50n)),
message: (fields, key) => StringJoin`Require value less than 50, got ${fields.value} at ${key}`
})

export default Template(my_source, my_pipeline)
Tabular operations are strongly-typed

You will notice that errorEvery() method has a strongly typed fields object argument. In the case of this example, fields will be the following type:

fields: {
value: Variable<IntegerType>;
}

When the EDK libraries provide methods in builders that relate to a tabular stream, aside from providing an argument of the stream, the argument will often be a variable with the fields of the tabular type. These fields are the equivalent of the tabular columns, or in other words the properties of the compound object within the collection.

You will also notice another property key, this is another argument generated in tabular type operations, of type:

key: Variable<StringType>

In this case, the EDK libraries are creating a variable which contains the key or each entry in the tabular datastream , which can be used in an expression.

Observe the error

Once deployed, you can test your error() by observing the task using the edk task logs command:

edk task logs "Pipeline.My Pipeline" -w 03_06_02_validate_a_collection

Which will result in the logs below, notice the error message which correspond to the relevant entries in the logs.

Listing task logs for Pipeline.My Pipeline available in tenant with identifier: 1b17e111-99a9-450e-b436-969beb705005 and workspace 03_06_02_validate_a_collection.


UUID REASON STATUS LEVEL MESSAGE LOGGEDAT
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition error info Worker 810cf250-4296-46de-9278-5de2161142b5 starting task instance d8780807-da82-4d60-afd1-a4f0bb12a026 attempt 1 YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition error info Loading inputs ["input"] YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition error info Executing pipeline with 1 operations YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition error info Performing assert operation YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition error error Require value less than 50, got 55 at 3 YYYY-MM-DDTHH:MM:SS.MSZ

Define a warning and log

You can also define warnings and logs to be output with the warnEvery() and logEvery() methods of PipelineBuilder(). Just like the errorEvery(), you provide a Typescript object with the properties

  • if where you define the BooleanType expression to be evaluated as the error logic for each entry of the collection, and
  • message where you define a custom message (as a StringType expression) to return should the if expression evaluate to False.

You can use log() in your pipeline by taking the following steps:

  1. add another pipeline "My Other Pipeline"
  2. add warnEvery(), and logEvery() operations
  3. define the if condition, based on the value property
  4. define the message based on the value, and key
  5. add the new pipeline to the template

In the definition in a new pipeline My Other Pipeline add the above changes:

import { SourceBuilder, PipelineBuilder, Template, Const, StringJoin, Greater } from "@elaraai/core"

const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { value: 0n }],
["1", { value: 15n }],
["2", { value: 25n }],
["3", { value: 55n }],
])
})

const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.errorEvery({
if: fields => Greater(fields.value, Const(50n)),
message: (fields, key) => StringJoin`Require value less than 50, got ${fields.value} at ${key}`
})

const my_other_pipeline = new PipelineBuilder("My Other Pipeline")
.from(my_source.outputStream())
.warnEvery({
if: (fields) => Greater(fields.value, Const(20n)),
message: (fields, key) => StringJoin`Prefer value less than 20, got ${fields.value} at ${key}`,
})
.logEvery({
if: (fields) => Greater(fields.value, Const(10n)),
message: (fields, key) => StringJoin`Noticed value greater than 10, got ${fields.value} at ${key}`,
});

export default Template(my_source, my_pipeline, my_other_pipeline);

Observe the warnings and logs

Once deployed, you can test your warning() and log() by observing the task using the edk task logs command:

 edk task logs "Pipeline.My Other Pipeline" -w 03_06_02_validate_a_collection

Which will result in the logs below, notice warning and log messages, which correspond to the first entries in the datasource.

Listing task logs for Pipeline.My Other Pipeline available in tenant with identifier: 1b17e111-99a9-450e-b436-969beb705005 and workspace 03_06_02_validate_a_collection.


UUID REASON STATUS LEVEL MESSAGE LOGGEDAT
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Worker 810cf250-4296-46de-9278-5de2161142b5 starting task instance c7018cc0-9ecd-4d61-a8cd-8ae83eabd6e2 attempt 1 YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Loading inputs ["input"] YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Executing pipeline with 2 operations YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Performing assert operation YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn warn Prefer value less than 20, got 25 at 2 YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Performing assert operation YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Noticed value greater than 10, got 15 at 1 YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Saving outputs ["output"] YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Task c7018cc0-9ecd-4d61-a8cd-8ae83eabd6e2 complete YYYY-MM-DDTHH:MM:SS.MSZ
Limited to first entries

Given that a tabular operations are designed for potentially very large collections (the technical hard limit is 566,935,682,544 rows, although exercising this is not recommended!), currently the warnEvery() and logEvery() operations are limited to the first occurrence.

Example solution

The code for this tutorial is available below:

Next steps

Continue to the next tutorial to perform a filter() operation in a pipeline, to filter a collection to only include specific entries.