Skip to content

Commit

Permalink
docs(tutorial): add conditional logic example (#2258)
Browse files Browse the repository at this point in the history
* docs(tutorial): add conditional logic example

* docs(docker): change docker page to scripts

* update tutorial pages

* docs: add forEach example to flowable tasks

* update scripts page

* fix outputs typo

* Update 05.flowable.md

* Update content/docs/03.tutorial/06.errors.md

Co-authored-by: Will Russell <[email protected]>

* Update 07.scripts.md

---------

Co-authored-by: Barthélémy Ledoux <[email protected]>
Co-authored-by: Will Russell <[email protected]>
  • Loading branch information
3 people authored Feb 21, 2025
1 parent 7a9402e commit 90ef840
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 26 deletions.
14 changes: 9 additions & 5 deletions content/docs/03.tutorial/01.fundamentals.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ The combination of `id` and `namespace` serves as a **unique identifier** for a

### Labels

To add another layer of organization, you can use [labels](../04.workflow-components/08.labels.md), allowing you to group flows using key-value pairs.
To add another layer of organization, you can use [labels](../04.workflow-components/08.labels.md), allowing you to group flows using key-value pairs. In short, labels are customizable tags to simplify monitoring and filtering of flows and executions.


### Description(s)
Expand Down Expand Up @@ -86,15 +86,15 @@ Learn more about flows in the [Flows section](../04.workflow-components/01.flow.
## Tasks
[Tasks](../04.workflow-components/01.tasks/index.md) are atomic actions in your flows. You can design your tasks to be small and granular, such as fetching data from a REST API or running a self-contained Python script. However, tasks can also represent large and complex processes, like triggering containerized processes or long-running batch jobs (e.g. using dbt, Spark, AWS Batch, Azure Batch, etc.) and waiting for their completion.
[Tasks](../04.workflow-components/01.tasks/index.md) are atomic actions in your flows. You can design your tasks to be small and granular, such as fetching data from a REST API or running a self-contained Python script. However, tasks can also represent large and complex processes, like triggering containerized processes or long-running batch jobs (e.g., using dbt, Spark, AWS Batch, Azure Batch, etc.) and waiting for their completion.
### The order of task execution
Tasks are defined in the form of a **list**. By default, all tasks in the list will be executed **sequentially** — the second task will start as soon as the first one finishes successfully.
Kestra provides additional **customization** allowing to run tasks **in parallel**, iterating (_sequentially or in parallel_) over a list of items, or to **allow failure** of specific tasks. These kinds of actions are called [`Flowable`](./05.flowable.md) tasks because they define the flow logic.
Kestra provides additional **customization** to run tasks **in parallel**, iterating (_sequentially or in parallel_) over a list of items, or to **allow failure** of specific tasks. These kinds of actions are called [`Flowable`](./05.flowable.md) tasks because they define the flow logic.

A task in Kestra must have an `id` and a `type`. Other properties depend on the task type. You can think of a task as a step in a flow that should execute a specific action, such as running a Python or Node.js script in a Docker container, or loading data from a database.
A task in Kestra must have an `id` and a `type`. Other properties depend on the task type. You can think of a task as a step in a flow that should execute a specific action, such as running a Python or Node.js script in a Docker container or loading data from a database.

```yaml
tasks:
Expand Down Expand Up @@ -131,7 +131,7 @@ Let's look at supported task types.

Tasks from the `io.kestra.plugin.core.storage` category, along with [Outputs](../04.workflow-components/06.outputs.md), are used to interact with the **internal storage**. Kestra uses internal storage to **pass data between tasks**. You can think of internal storage as an S3 bucket. In fact, you can use your private S3 bucket as internal storage.

This storage layer helps avoid proliferation of connectors. For example, you can use the PostgreSQL plugin to extract data from a PostgreSQL database and load it to the internal storage. Other task(s) can read that data from internal storage and load it to other systems such as Snowflake, BigQuery, or Redshift, or process it using any other plugin, without requiring point to point connections between each of them.
This storage layer helps prevent connector proliferation. For example, the PostgreSQL plugin can extract data from a PostgreSQL database and load it into internal storage. Other tasks can then access this data and load it into systems like Snowflake, BigQuery, or Redshiftor process it with any other pluginwithout requiring direct point-to-point connections.

### KV Store

Expand Down Expand Up @@ -169,6 +169,10 @@ This flow has a single task that will fetch data from the [dummyjson](https://du

![New execution](/docs/tutorial/fundamentals/new_execution.png)

After executing, you'll be directed to the Gantt view to see the stages of your flow progress. In this simple example, we see the API request successfully execute. We'll continue adding more to our flow in the coming sections.

![gantt view](/docs/tutorial/fundamentals/gantt-view.png)

::next-link
[Next, let's parametrize this flow using `inputs`](./02.inputs.md)
::
Expand Down
8 changes: 2 additions & 6 deletions content/docs/03.tutorial/02.inputs.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@ Instead of hardcoding values in your flow, you can use inputs to make your workf
<iframe src="https://www.youtube.com/embed/qrmEh-0BILg?si=pFClJdBpqXMZRHUT" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" referrerpolicy="strict-origin-when-cross-origin" allowfullscreen></iframe>
</div>

## How to retrieve inputs

Inputs can be accessed in any task using the following [expression](../expressions/index.md) `{{ inputs.input_name }}`.

---

## Defining inputs

Similar to `tasks`, `inputs` is a list of key-value pairs. Each input must have a `name` and a `type`. You can also set `defaults` for each input. Setting default values for an input is always recommended, especially if you want to run your flow on a schedule.

To reference an input value in your flow, use the `{{ inputs.input_name }}` syntax.
To retrieve an input value, you need to identify the input in an [expression](../expressions/index.md). In Kestra, this bracket notation, `{{ }}`, is used to wrap an expression. For an input, follow this general `{{ inputs.input_name }}` syntax. We gave our input the `id` value of `user`, so in the example below we express it in the task message as `{{ inputs.user }}`:

```yaml
id: inputs_demo
Expand All @@ -38,7 +34,7 @@ tasks:
message: Hey there, {{ inputs.user }}
```
Try running the above flow with different values for the `user` input. You can do this by clicking on the **Execute** button in the UI and then typing the desired value in the menu.
Try running the above flow with different values for the `user` input. You can do this by clicking on the **Execute** button in the UI and then typing the desired value in the menu. For any downstream task, inputs are accessible using expressions.

![Inputs](/docs/tutorial/inputs/inputs.png)

Expand Down
2 changes: 1 addition & 1 deletion content/docs/03.tutorial/03.outputs.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Tasks and flows can generate outputs, which can be passed to downstream processe

## How to retrieve outputs

Use the syntax `{{ outputs.task_id.output_property }}` to retrieve a specific output of a task.
Similar to inputs, use [expressions](../expressions/index.md) to access outputs in downstream tasks. Use the syntax `{{ outputs.task_id.output_property }}` to retrieve a specific output value of a task.

If your task id contains one or more hyphens (the `-` sign), wrap the task id in square brackets, like `{{ outputs['task-id'].output_property }}`.

Expand Down
5 changes: 2 additions & 3 deletions content/docs/03.tutorial/04.triggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ triggers:
## Add a trigger to your flow
Let's look at another trigger example. This trigger starts our flow every Monday at 10 AM.
Building on our example flow from the previous pages, let's add one of the above triggers to the flow.
```yaml
triggers:
- id: every_monday_at_10_am
type: io.kestra.plugin.core.trigger.Schedule
cron: 0 10 * * 1
```
Our `getting_started` flow now runs every Monday at 10AM, starting the week with the new product data.

::collapse{title="Click here to see the full workflow example with this Schedule trigger"}
```yaml
id: getting_started
namespace: company.team
Expand Down Expand Up @@ -97,7 +97,6 @@ triggers:
type: io.kestra.plugin.core.trigger.Schedule
cron: 0 10 * * 1
```
::

To learn more about triggers, check out the full [triggers documentation](../04.workflow-components/07.triggers/index.md).

Expand Down
68 changes: 67 additions & 1 deletion content/docs/03.tutorial/05.flowable.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,73 @@ tasks:
Kestra.timer('processing_time', processing_time, {'partition': filename})
```
To learn more about flowable tasks, check out the full [flowable tasks documentation](../04.workflow-components/01.tasks/00.flowable-tasks.md).
## Add conditional logic using Flowable tasks
In addition to running independent processes in parallel, very often you will want specific tasks to run only if certain conditions are met, else some other action happens. Conditional logic can be applied to inputs, outputs, variables, and many other components of your flow, even other flows.
For example, you can use the [If task](/plugins/core/tasks/flows/io.kestra.plugin.core.flow.if) specify your conditions and what action to take whether those conditions are met or not.
The below flow is an example of the If task in action. Calling back to [dummyjson](https://dummyjson.com), an API request is made based on the product category input of either `beauty` or `notebook` (one does not exist).

The `if` task has a `condition` of `"{{ json(outputs.api.body).products | length > 0 }}"` (i.e., checking to see if the API body is not empty, and there is a product). The log message then depends on whether the actual product category exists or not.

```yaml
id: getting_started
namespace: company.team
inputs:
- id: category
type: SELECT
displayName: Select a category
values: ['beauty', 'notebooks']
defaults: 'beauty'
tasks:
- id: api
type: io.kestra.plugin.core.http.Request
uri: "https://dummyjson.com/products/category/{{inputs.category}}"
method: GET
- id: if
type: io.kestra.plugin.core.flow.If
condition: "{{ json(outputs.api.body).products | length > 0 }}"
then:
- id: when_true
type: io.kestra.plugin.core.log.Log
message: "This category has products"
else:
- id: when_false
type: io.kestra.plugin.core.log.Log
message: "The category has no products."
```

## Add a loop to a flow using Flowable tasks

It is a common process in orchestration to have a set of values you want to operate on. In Kestra, the **ForEach** flowable task executes a group of tasks for each value in the list. There are many ways to implement ForEach to conduct complex looping operations possibly incorporating conditional flowable tasks or subtasks, and you can see more examples in the [ForEach documentation](/plugins/core/flow/io.kestra.plugin.core.flow.foreach).

As an introduction to the feature, the below example demonstrates using ForEach to make an API call to [OpenLibrary](https://openlibrary.org/dev/docs/api/search) to get a list of associated titles for each Author in the list. The values are defined as a JSON string or an array, i.e., a list of string values `["value1", "value2"]` or a list of key-value pairs `[{"key": "value1"}, {"key": "value2"}]`.

You can access the current iteration value using the variable {{ taskrun.value }}:

```yaml
id: for_loop_example
namespace: tutorial
tasks:
- id: for_each
type: io.kestra.plugin.core.flow.ForEach
values: ["pynchon", "dostoyevsky", "hedayat"]
tasks:
- id: api
type: io.kestra.plugin.core.http.Request
uri: "https://openlibrary.org/search.json?author={{ taskrun.value }}&sort=new"
```

After executing, the Gantt view shows separate runs for each of the three listed authors in our task.

![forEach example](/docs/tutorial/flowable-tasks/for-each-author.png)

While simple, the examples demonstrate the flexibility provided by flowable tasks in both simple and complex workflows. To learn more about flowable tasks and see more examples, check out the full [flowable tasks documentation](../04.workflow-components/01.tasks/00.flowable-tasks.md).

::next-link
[Next, let's configure failure notifications and retries](./06.errors.md)
Expand Down
14 changes: 7 additions & 7 deletions content/docs/03.tutorial/06.errors.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: Errors and Retries
title: Error Handling
icon: /docs/icons/tutorial.svg
---

Expand Down Expand Up @@ -149,13 +149,13 @@ tasks:
type: io.kestra.plugin.core.http.Request
uri: https://dummyjson.com/products
retry:
type: constant # type: string
interval: PT20S # type: Duration
maxDuration: PT1H # type: Duration
maxAttempt: 10 # type: int
warningOnRetry: true # type: boolean, default is false
type: constant
interval: PT20S
maxDuration: PT1H
maxAttempt: 10
warningOnRetry: true
```

::next-link
[Next, let's run tasks in separate containers](./07.docker.md)
[Next, let's run script tasks in separate containers](./07.docker.md)
::
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: Docker
title: Scripts
icon: /docs/icons/tutorial.svg
---

Run custom Python, R, Julia, Node.js, and Shell scripts in isolated containers.
Kestra is language agnostic. Run custom Python, R, Julia, Node.js, Shell scripts and more in isolated Docker containers.

<div class="video-container">
<iframe src="https://www.youtube.com/embed/147apT4xGfE?si=tFGrveCwDHnf4BPX" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" referrerpolicy="strict-origin-when-cross-origin" allowfullscreen></iframe>
Expand All @@ -18,7 +18,6 @@ Many tasks in Kestra will run in dedicated Docker containers, including among ot

Kestra uses Docker for those tasks to ensure that they run in a consistent environment and to avoid dependency conflicts.


::alert{type="warning"}
The above tasks require a Docker daemon running on the host. Refer to the [Troubleshooting guide](../09.administrator-guide/16.troubleshooting.md) if you encounter any issues configuring Docker.
::
Expand Down
1 change: 1 addition & 0 deletions nuxt.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ export default defineNuxtConfig({
'/videos': {redirect: '/tutorial-videos/all'},
'/tutorial-videos': {redirect: '/tutorial-videos/all'},
'/community-guidelines': {redirect: '/docs/getting-started/community-guidelines'},
'/docs/getting-started/docker': {redirect: '/docs/getting-started/scripts'},
'/t/**': {proxy: 'https://eu.posthog.com/**'},
},

Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added public/docs/tutorial/fundamentals/gantt-view.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 90ef840

Please sign in to comment.