Skip to content

Commit

Permalink
fix: normalization docker op (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
gane5hvarma authored Jun 26, 2024
1 parent 51a8a30 commit 129e1b0
Showing 1 changed file with 139 additions and 40 deletions.
179 changes: 139 additions & 40 deletions src/orchestrator/templates/shopify_template.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,117 @@ import os

from dagster_docker import docker_container_op, execute_docker_container

from dagster import DefaultScheduleStatus, ScheduleDefinition, graph, op, RetryPolicy, Backoff, Jitter
from dagster import DefaultScheduleStatus, ScheduleDefinition, graph, op, In, Nothing, Any, RetryPolicy, Backoff, Jitter
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus, JobSelector
from dagster import (
Array,
Field,
Permissive,
StringSource
)



DOCKER_CONFIG_SCHEMA = {
"image": Field(
StringSource,
is_required=False,
description="The docker image to be used if the repository does not specify one.",
),
"network": Field(
StringSource,
is_required=False,
description=(
"Name of the network to which to connect the launched container at creation time"
),
),
"registry": Field(
{
"url": Field(StringSource),
"username": Field(StringSource),
"password": Field(StringSource),
},
is_required=False,
description="Information for using a non local/public docker registry",
),
"env_vars": Field(
[str],
is_required=False,
description=(
"The list of environment variables names to include in the docker container. "
"Each can be of the form KEY=VALUE or just KEY (in which case the value will be pulled "
"from the local environment)"
),
),
"container_kwargs": Field(
Permissive(),
is_required=False,
description=(
"key-value pairs that can be passed into containers.create. See "
"https://docker-py.readthedocs.io/en/stable/containers.html for the full list "
"of available options."
),
),
"networks": Field(
Array(StringSource),
is_required=False,
description=(
"Names of the networks to which to connect the launched container at creation time"
),
)
}

DOCKER_CONTAINER_OP_CONFIG = {
**DOCKER_CONFIG_SCHEMA,
"image": Field(
StringSource,
is_required=True,
description="The image in which to run the Docker container.",
),
"entrypoint": Field(
[str],
is_required=False,
description="The ENTRYPOINT for the Docker container",
),
"command": Field(
[str],
is_required=False,
description="The command to run in the container within the launched Docker container.",
),
}

@op(name="initialise_{{ sync['id'].replace('-','_') }}")
def initialise():
pass

@op(ins={"src": In(Nothing), "dst": In(Nothing)}, config_schema=DOCKER_CONTAINER_OP_CONFIG)
def my_docker_container_op(context):
"""An op that runs a Docker container using the docker Python API.

Contrast with the `docker_executor`, which runs each Dagster op in a Dagster job in its
own Docker container.

This op may be useful when:
- You need to orchestrate a command that isn't a Dagster op (or isn't written in Python)
- You want to run the rest of a Dagster job using a specific executor, and only a single
op in docker.

For example:

.. literalinclude:: ../../../../../../python_modules/libraries/dagster-docker/dagster_docker_tests/test_example_docker_container_op.py
:start-after: start_marker
:end-before: end_marker
:language: python

You can create your own op with the same implementation by calling the `execute_docker_container` function
inside your own op.
"""
execute_docker_container(context, **context.op_config)


source_op = docker_container_op.configured(
{
"image": "{{ sync['source']['credential']['docker_image'] }}:{{ sync['source']['credential']['docker_tag'] }}",
Expand Down Expand Up @@ -74,20 +174,12 @@ destination_op = docker_container_op.configured(
name="destination_op_{{ sync['id'].replace('-','_') }}"
)

@op(
name="normalization_op_{{ sync['id'].replace('-','_') }}",
retry_policy=RetryPolicy(
max_retries=100,
delay=2, # 2s
backoff=Backoff.EXPONENTIAL,
jitter=Jitter.PLUS_MINUS,
),
)
def normalization_op(context, a, b):
execute_docker_container(
context,
image = "airbyte/normalization:latest",
command = [


normalization_op = my_docker_container_op.configured(
{
"image": "airbyte/normalization:latest",
"command": [
"run",
"--integration-type",
"postgres",
Expand All @@ -96,40 +188,33 @@ def normalization_op(context, a, b):
"--catalog",
"/tmp/configured_catalog.json"
],
container_kwargs = { # keyword args to be passed to the container. example:
"container_kwargs" : { # keyword args to be passed to the container. example:
"volumes": ["{{ prefix }}/intermediate_store:{{ prefix }}/intermediate_store","{{ prefix }}/{{ app }}/repo/config/{{sync['id']}}-{{ sync['destination']['id'] }}.json:/tmp/config.json" , "{{prefix}}/{{app}}/repo/catalog/{{sync['id']}}-{{ sync['source']['id'] }}.json:/tmp/configured_source_catalog.json", "{{prefix}}/{{app}}/repo/catalog/{{sync['id']}}-{{ sync['destination']['id'] }}.json:/tmp/configured_catalog.json"],
},
env_vars= list({**os.environ}.keys()),
)

@op(
name="transformation_po_op_{{ sync['id'].replace('-','_') }}",
retry_policy=RetryPolicy(
max_retries=100,
delay=2, # 2s
backoff=Backoff.EXPONENTIAL,
jitter=Jitter.PLUS_MINUS,
),
"env_vars" : list({**os.environ}.keys()),
},
name = "normalization_op_{{ sync['id'].replace('-','_') }}"
)
def transformation_po_op(context, c):
execute_docker_container(
context,
image = "valmiio/transform-po:latest",
command = [


transformation_po_op = docker_container_op.configured(
{
"image":"valmiio/transform-po:latest",
"command": [
"--config-file",
"/tmp/config.json",
],
container_kwargs = { # keyword args to be passed to the container. example:
"container_kwargs" : { # keyword args to be passed to the container. example:
"volumes": ["{{ prefix }}/intermediate_store:{{ prefix }}/intermediate_store","{{ prefix }}/{{ app }}/repo/config/{{sync['id']}}-{{ sync['destination']['id'] }}.json:/tmp/config.json" ],
},
env_vars= list({**os.environ}.keys()),
)


"env_vars": list({**os.environ}.keys()),

},
name="transformation_po_op_{{ sync['id'].replace('-','_') }}"
)

@op(name="finalizer_{{ sync['id'].replace('-','_') }}" , retry_policy=RetryPolicy(
max_retries=100,
max_retries=5,
delay=2, # 2s
backoff=Backoff.EXPONENTIAL,
jitter=Jitter.PLUS_MINUS,
Expand Down Expand Up @@ -160,8 +245,22 @@ def job():
jitter=Jitter.PLUS_MINUS,
)
)(i)
c = normalization_op(a,b)
d = transformation_po_op(c)
c = normalization_op.with_retry_policy(
RetryPolicy(
max_retries=100,
delay=2, # 2s
backoff=Backoff.EXPONENTIAL,
jitter=Jitter.PLUS_MINUS,
)
)(src=a,dst=b)
d = transformation_po_op.with_retry_policy(
RetryPolicy(
max_retries=100,
delay=2, # 2s
backoff=Backoff.EXPONENTIAL,
jitter=Jitter.PLUS_MINUS,
)
)(c)
finalizer(d)

return sync_graph.to_job(name="{{ sync['id'].replace('-','_') }}")
Expand Down

0 comments on commit 129e1b0

Please sign in to comment.