Validate a collection
In this tutorial, you will use a row-based validation in tabular datastream. You will
- use the
errorEvery()
,warnEvery()
,logEvery()
methods ofPipelineBuilder()
to define validation to be performed on each row of a collection datastream, and - launch a solution and observe the results.
This lesson will assume that you have an empty project and asset which you can to deploy to a workspace named 03_06_02_validate_a_collection
with the following command:
edk template deploy -ycw 03_06_02_validate_a_collection
Define and deploy a template
To use the errorEvery()
, warnEvery()
, logEvery()
methods you will first perform the following steps:
- define a datasource using a similar pattern as your previous
My Source
definition. - set the value of the datasource to be a
Map<string, { value: bigint }>()
value. - add the datasource to a template
In an asset, perform the above steps to create the resulting Typescript code:
import { SourceBuilder, Template } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { value: 0n }],
["1", { value: 15n }],
["2", { value: 25n }],
["3", { value: 55n }],
])
})
export default Template(my_source)
Define an error
You can define a per-row error to be output with the errorEvery()
method of PipelineBuilder()
. Just like the error()
method, you provide a Typescript object with the properties
if
where you define theBooleanType
expression to be evaluated as the error logic, andmessage
where you define a custom message (as aStringType
expression) to return should theif
expression evaluate toFalse
for a row.
You can use errorEvery()
in your pipeline by taking the following steps:
- add a new pipeline "My Pipeline"
- add an
errorEvery()
operation - define the
if
condition, to create an error if thevalue
property in a row is greater than 50 - define the message based on the value, and key
- add the new pipeline to the template
In the definition My Pipeline
add the above changes:
import { SourceBuilder, PipelineBuilder, Template, Const, StringJoin, Greater } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { value: 0n }],
["1", { value: 15n }],
["2", { value: 25n }],
["3", { value: 55n }],
])
})
const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.errorEvery({
if: fields => Greater(fields.value, Const(50n)),
message: (fields, key) => StringJoin`Require value less than 50, got ${fields.value} at ${key}`
})
export default Template(my_source, my_pipeline)
You will notice that errorEvery()
method has a strongly typed fields
object argument. In the case of this example, fields
will be the following type:
fields: {
value: Variable<IntegerType>;
}
When the EDK libraries provide methods in builders that relate to a tabular stream, aside from providing an argument of the stream, the argument will often be a variable with the fields
of the tabular type. These fields
are the equivalent of the tabular columns, or in other words the properties of the compound object within the collection.
You will also notice another property key
, this is another argument generated in tabular type operations, of type:
key: Variable<StringType>
In this case, the EDK libraries are creating a variable which contains the key
or each entry in the tabular datastream , which can be used in an expression.
Observe the error
Once deployed, you can test your error()
by observing the task using the edk task logs
command:
edk task logs "Pipeline.My Pipeline" -w 03_06_02_validate_a_collection
Which will result in the logs below, notice the error message which correspond to the relevant entries in the logs.
Listing task logs for Pipeline.My Pipeline available in tenant with identifier: 1b17e111-99a9-450e-b436-969beb705005 and workspace 03_06_02_validate_a_collection.
UUID REASON STATUS LEVEL MESSAGE LOGGEDAT
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition error info Worker 810cf250-4296-46de-9278-5de2161142b5 starting task instance d8780807-da82-4d60-afd1-a4f0bb12a026 attempt 1 YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition error info Loading inputs ["input"] YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition error info Executing pipeline with 1 operations YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition error info Performing assert operation YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition error error Require value less than 50, got 55 at 3 YYYY-MM-DDTHH:MM:SS.MSZ
Define a warning and log
You can also define warnings and logs to be output with the warnEvery()
and logEvery()
methods of PipelineBuilder()
. Just like the errorEvery()
, you provide a Typescript object with the properties
if
where you define theBooleanType
expression to be evaluated as the error logic for each entry of the collection, andmessage
where you define a custom message (as aStringType
expression) to return should theif
expression evaluate toFalse
.
You can use log()
in your pipeline by taking the following steps:
- add another pipeline "My Other Pipeline"
- add
warnEvery()
, andlogEvery()
operations - define the
if
condition, based on thevalue
property - define the message based on the value, and key
- add the new pipeline to the template
In the definition in a new pipeline My Other Pipeline
add the above changes:
import { SourceBuilder, PipelineBuilder, Template, Const, StringJoin, Greater } from "@elaraai/core"
const my_source = new SourceBuilder("My Source")
.value({
value: new Map([
["0", { value: 0n }],
["1", { value: 15n }],
["2", { value: 25n }],
["3", { value: 55n }],
])
})
const my_pipeline = new PipelineBuilder("My Pipeline")
.from(my_source.outputStream())
.errorEvery({
if: fields => Greater(fields.value, Const(50n)),
message: (fields, key) => StringJoin`Require value less than 50, got ${fields.value} at ${key}`
})
const my_other_pipeline = new PipelineBuilder("My Other Pipeline")
.from(my_source.outputStream())
.warnEvery({
if: (fields) => Greater(fields.value, Const(20n)),
message: (fields, key) => StringJoin`Prefer value less than 20, got ${fields.value} at ${key}`,
})
.logEvery({
if: (fields) => Greater(fields.value, Const(10n)),
message: (fields, key) => StringJoin`Noticed value greater than 10, got ${fields.value} at ${key}`,
});
export default Template(my_source, my_pipeline, my_other_pipeline);
Observe the warnings and logs
Once deployed, you can test your warning()
and log()
by observing the task using the edk task logs
command:
edk task logs "Pipeline.My Other Pipeline" -w 03_06_02_validate_a_collection
Which will result in the logs below, notice warning and log messages, which correspond to the first entries in the datasource.
Listing task logs for Pipeline.My Other Pipeline available in tenant with identifier: 1b17e111-99a9-450e-b436-969beb705005 and workspace 03_06_02_validate_a_collection.
UUID REASON STATUS LEVEL MESSAGE LOGGEDAT
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Worker 810cf250-4296-46de-9278-5de2161142b5 starting task instance c7018cc0-9ecd-4d61-a8cd-8ae83eabd6e2 attempt 1 YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Loading inputs ["input"] YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Executing pipeline with 2 operations YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Performing assert operation YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn warn Prefer value less than 20, got 25 at 2 YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Performing assert operation YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Noticed value greater than 10, got 15 at 1 YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Saving outputs ["output"] YYYY-MM-DDTHH:MM:SS.MSZ
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX new_task_definition warn info Task c7018cc0-9ecd-4d61-a8cd-8ae83eabd6e2 complete YYYY-MM-DDTHH:MM:SS.MSZ
Given that a tabular operations are designed for potentially very large collections (the technical hard limit is 566,935,682,544 rows, although exercising this is not recommended!), currently the warnEvery()
and logEvery()
operations are limited to the first occurrence.
Example solution
The code for this tutorial is available below:
Next steps
Continue to the next tutorial to perform a filter()
operation in a pipeline, to filter a collection to only include specific entries.