From 129e1b042bb9cd86f98e5849222a5fcc97ded659 Mon Sep 17 00:00:00 2001 From: Ganesh varma Date: Wed, 26 Jun 2024 16:22:39 +0530 Subject: [PATCH] fix: normalization docker op (#84) --- .../templates/shopify_template.jinja | 179 ++++++++++++++---- 1 file changed, 139 insertions(+), 40 deletions(-) diff --git a/src/orchestrator/templates/shopify_template.jinja b/src/orchestrator/templates/shopify_template.jinja index 3077820..9dba009 100644 --- a/src/orchestrator/templates/shopify_template.jinja +++ b/src/orchestrator/templates/shopify_template.jinja @@ -27,17 +27,117 @@ import os from dagster_docker import docker_container_op, execute_docker_container -from dagster import DefaultScheduleStatus, ScheduleDefinition, graph, op, RetryPolicy, Backoff, Jitter +from dagster import DefaultScheduleStatus, ScheduleDefinition, graph, op, In, Nothing, Any, RetryPolicy, Backoff, Jitter import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus, JobSelector +from dagster import ( + Array, + Field, + Permissive, + StringSource +) + + +DOCKER_CONFIG_SCHEMA = { + "image": Field( + StringSource, + is_required=False, + description="The docker image to be used if the repository does not specify one.", + ), + "network": Field( + StringSource, + is_required=False, + description=( + "Name of the network to which to connect the launched container at creation time" + ), + ), + "registry": Field( + { + "url": Field(StringSource), + "username": Field(StringSource), + "password": Field(StringSource), + }, + is_required=False, + description="Information for using a non local/public docker registry", + ), + "env_vars": Field( + [str], + is_required=False, + description=( + "The list of environment variables names to include in the docker container. " + "Each can be of the form KEY=VALUE or just KEY (in which case the value will be pulled " + "from the local environment)" + ), + ), + "container_kwargs": Field( + Permissive(), + is_required=False, + description=( + "key-value pairs that can be passed into containers.create. See " + "https://docker-py.readthedocs.io/en/stable/containers.html for the full list " + "of available options." + ), + ), + "networks": Field( + Array(StringSource), + is_required=False, + description=( + "Names of the networks to which to connect the launched container at creation time" + ), + ) +} + +DOCKER_CONTAINER_OP_CONFIG = { + **DOCKER_CONFIG_SCHEMA, + "image": Field( + StringSource, + is_required=True, + description="The image in which to run the Docker container.", + ), + "entrypoint": Field( + [str], + is_required=False, + description="The ENTRYPOINT for the Docker container", + ), + "command": Field( + [str], + is_required=False, + description="The command to run in the container within the launched Docker container.", + ), +} @op(name="initialise_{{ sync['id'].replace('-','_') }}") def initialise(): pass +@op(ins={"src": In(Nothing), "dst": In(Nothing)}, config_schema=DOCKER_CONTAINER_OP_CONFIG) +def my_docker_container_op(context): + """An op that runs a Docker container using the docker Python API. + + Contrast with the `docker_executor`, which runs each Dagster op in a Dagster job in its + own Docker container. + + This op may be useful when: + - You need to orchestrate a command that isn't a Dagster op (or isn't written in Python) + - You want to run the rest of a Dagster job using a specific executor, and only a single + op in docker. + + For example: + + .. literalinclude:: ../../../../../../python_modules/libraries/dagster-docker/dagster_docker_tests/test_example_docker_container_op.py + :start-after: start_marker + :end-before: end_marker + :language: python + + You can create your own op with the same implementation by calling the `execute_docker_container` function + inside your own op. + """ + execute_docker_container(context, **context.op_config) + + source_op = docker_container_op.configured( { "image": "{{ sync['source']['credential']['docker_image'] }}:{{ sync['source']['credential']['docker_tag'] }}", @@ -74,20 +174,12 @@ destination_op = docker_container_op.configured( name="destination_op_{{ sync['id'].replace('-','_') }}" ) -@op( - name="normalization_op_{{ sync['id'].replace('-','_') }}", - retry_policy=RetryPolicy( - max_retries=100, - delay=2, # 2s - backoff=Backoff.EXPONENTIAL, - jitter=Jitter.PLUS_MINUS, - ), -) -def normalization_op(context, a, b): - execute_docker_container( - context, - image = "airbyte/normalization:latest", - command = [ + + +normalization_op = my_docker_container_op.configured( + { + "image": "airbyte/normalization:latest", + "command": [ "run", "--integration-type", "postgres", @@ -96,40 +188,33 @@ def normalization_op(context, a, b): "--catalog", "/tmp/configured_catalog.json" ], - container_kwargs = { # keyword args to be passed to the container. example: + "container_kwargs" : { # keyword args to be passed to the container. example: "volumes": ["{{ prefix }}/intermediate_store:{{ prefix }}/intermediate_store","{{ prefix }}/{{ app }}/repo/config/{{sync['id']}}-{{ sync['destination']['id'] }}.json:/tmp/config.json" , "{{prefix}}/{{app}}/repo/catalog/{{sync['id']}}-{{ sync['source']['id'] }}.json:/tmp/configured_source_catalog.json", "{{prefix}}/{{app}}/repo/catalog/{{sync['id']}}-{{ sync['destination']['id'] }}.json:/tmp/configured_catalog.json"], }, - env_vars= list({**os.environ}.keys()), - ) - -@op( - name="transformation_po_op_{{ sync['id'].replace('-','_') }}", - retry_policy=RetryPolicy( - max_retries=100, - delay=2, # 2s - backoff=Backoff.EXPONENTIAL, - jitter=Jitter.PLUS_MINUS, - ), + "env_vars" : list({**os.environ}.keys()), + }, + name = "normalization_op_{{ sync['id'].replace('-','_') }}" ) -def transformation_po_op(context, c): - execute_docker_container( - context, - image = "valmiio/transform-po:latest", - command = [ + + +transformation_po_op = docker_container_op.configured( + { + "image":"valmiio/transform-po:latest", + "command": [ "--config-file", "/tmp/config.json", ], - container_kwargs = { # keyword args to be passed to the container. example: + "container_kwargs" : { # keyword args to be passed to the container. example: "volumes": ["{{ prefix }}/intermediate_store:{{ prefix }}/intermediate_store","{{ prefix }}/{{ app }}/repo/config/{{sync['id']}}-{{ sync['destination']['id'] }}.json:/tmp/config.json" ], }, - env_vars= list({**os.environ}.keys()), - ) - - + "env_vars": list({**os.environ}.keys()), + }, + name="transformation_po_op_{{ sync['id'].replace('-','_') }}" +) @op(name="finalizer_{{ sync['id'].replace('-','_') }}" , retry_policy=RetryPolicy( - max_retries=100, + max_retries=5, delay=2, # 2s backoff=Backoff.EXPONENTIAL, jitter=Jitter.PLUS_MINUS, @@ -160,8 +245,22 @@ def job(): jitter=Jitter.PLUS_MINUS, ) )(i) - c = normalization_op(a,b) - d = transformation_po_op(c) + c = normalization_op.with_retry_policy( + RetryPolicy( + max_retries=100, + delay=2, # 2s + backoff=Backoff.EXPONENTIAL, + jitter=Jitter.PLUS_MINUS, + ) + )(src=a,dst=b) + d = transformation_po_op.with_retry_policy( + RetryPolicy( + max_retries=100, + delay=2, # 2s + backoff=Backoff.EXPONENTIAL, + jitter=Jitter.PLUS_MINUS, + ) + )(c) finalizer(d) return sync_graph.to_job(name="{{ sync['id'].replace('-','_') }}")