Skip to content

Commit

Permalink
Features/dlt deploy airflow (#356)
Browse files Browse the repository at this point in the history
* added new deployment method = airflow, made schedule is not required

* added dags and build folder

* use enum for Deployment methods, copy templates for airflow deployment

* refactor deploy_command

* refactor deploy_command: decomposition of big fuctions

* refactor deploy_command: made some methods are static, decomposed methods

* use remote repo to clone

* created Base class for Deployment with abstractmethods, created separated GithubAction and Airflow deployment classes

* added --location cli option

* adds utils methods to convert graph representations and finding isolated components

* adds methods to generate dag of resources and decompose source

* bumps version to 0.2.9a0

* implements airflow dlt wrapper + initial tests

* enables wrapper ci tests

* Fix broken link in docs/website/docs/pipelines/google_sheets.md

* pushing experiment 3 blog post

* updating the tl;dr format

* making requested changes and adding metadata image in the HEAD

* testing featured image in the metadata block

* structured data lakes

* fix quote

* fix links

* formatting improvements

* rename

* renames helpers, activates pipeline in task, tests multiple runs per dag

* fixes lack of section context when evaluating source yielding resources

* adds activation and deactivation to pipeline

* [refactor] added missed typing

* [refactor] added missed typing, fixed linter errors

* [tests] fix tests for github-actions

* minor changes to the blog post

* [fix] del aiflow from init helpers

* adding colab screenshot at the bottom

* [fix] move COMMAND_DEPLOY_REPO_LOCATION to deploy command

* [test] added tests for airflow (same as for github actions)

* [test][fix] branch = None

* [test][fix] replace hardcode with DeployMethods

* [fix] comment fmt.confirm("Do you want ...

* makes password mandatory secret for postgres

---------

Co-authored-by: Marcin Rudolf <[email protected]>
Co-authored-by: Tung Bui Quang (Leo) <[email protected]>
Co-authored-by: Rahul Joshi <[email protected]>
Co-authored-by: Adrian <Adrian>
  • Loading branch information
4 people authored May 29, 2023
1 parent 5fc368c commit f22963c
Show file tree
Hide file tree
Showing 12 changed files with 596 additions and 275 deletions.
36 changes: 26 additions & 10 deletions dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Sequence
from typing import Any, Sequence, Optional
import yaml
import os
import argparse
Expand All @@ -13,7 +13,7 @@
import dlt.cli.echo as fmt
from dlt.cli import utils
from dlt.cli.init_command import init_command, list_pipelines_command, DLT_INIT_DOCS_URL, DEFAULT_PIPELINES_REPO
from dlt.cli.deploy_command import PipelineWasNotRun, deploy_command, DLT_DEPLOY_DOCS_URL
from dlt.cli.deploy_command import PipelineWasNotRun, deploy_command, DLT_DEPLOY_DOCS_URL, DeploymentMethods, COMMAND_DEPLOY_REPO_LOCATION
from dlt.cli.pipeline_command import pipeline_command, DLT_PIPELINE_COMMAND_DOCS_URL
from dlt.cli.telemetry_command import DLT_TELEMETRY_DOCS_URL, change_telemetry_status_command, telemetry_status_command
from dlt.pipeline.exceptions import CannotRestorePipelineException
Expand Down Expand Up @@ -42,7 +42,14 @@ def list_pipelines_command_wrapper(repo_location: str, branch: str) -> int:


@utils.track_command("deploy", False, "deployment_method")
def deploy_command_wrapper(pipeline_script_path: str, deployment_method: str, schedule: str, run_on_push: bool, run_on_dispatch: bool, branch: str) -> int:
def deploy_command_wrapper(
pipeline_script_path: str,
deployment_method: str,
schedule: Optional[str],
run_on_push: bool,
run_on_dispatch: bool,
repo_location: str,
branch: Optional[str]) -> int:
try:
utils.ensure_git_command("deploy")
except Exception as ex:
Expand All @@ -51,19 +58,21 @@ def deploy_command_wrapper(pipeline_script_path: str, deployment_method: str, sc

from git import InvalidGitRepositoryError, NoSuchPathError
try:
deploy_command(pipeline_script_path, deployment_method, schedule, run_on_push, run_on_dispatch, branch)
deploy_command(pipeline_script_path, deployment_method, schedule, run_on_push, run_on_dispatch, repo_location, branch)
except (CannotRestorePipelineException, PipelineWasNotRun) as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("You must run the pipeline locally successfully at least once in order to deploy it.")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_DEPLOY_DOCS_URL))
return -1
except InvalidGitRepositoryError:
click.secho(
"No git repository found for pipeline script %s.\nAdd your local code to Github as described here: %s" % (fmt.bold(pipeline_script_path), fmt.bold("https://docs.github.com/en/get-started/importing-your-projects-to-github/importing-source-code-to-github/adding-locally-hosted-code-to-github")),
"No git repository found for pipeline script %s." % fmt.bold(pipeline_script_path),
err=True,
fg="red"
)
fmt.note("If you do not have a repository yet, the easiest way to proceed is to create one on Github and then clone it here.")
fmt.note("If you do not have a repository yet, you can do either of:")
fmt.note("- Run the following command to initialize new repository: %s" % fmt.bold("git init"))
fmt.note("- Add your local code to Github as described here: %s" % fmt.bold("https://docs.github.com/en/get-started/importing-your-projects-to-github/importing-source-code-to-github/adding-locally-hosted-code-to-github"))
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_DEPLOY_DOCS_URL))
return -1
except NoSuchPathError as path_ex:
Expand All @@ -76,6 +85,7 @@ def deploy_command_wrapper(pipeline_script_path: str, deployment_method: str, sc
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_DEPLOY_DOCS_URL))
return -1
# TODO: display stack trace if with debug flag
return 0

Expand Down Expand Up @@ -188,10 +198,16 @@ def main() -> int:

deploy_cmd = subparsers.add_parser("deploy", help="Creates a deployment package for a selected pipeline script")
deploy_cmd.add_argument("pipeline_script_path", metavar="pipeline-script-path", help="Path to a pipeline script")
deploy_cmd.add_argument("deployment_method", metavar="deployment-method", choices=["github-action"], default="github-action", help="Deployment method")
deploy_cmd.add_argument("--schedule", required=True, help="A schedule with which to run the pipeline, in cron format. Example: '*/30 * * * *' will run the pipeline every 30 minutes.")
deploy_cmd.add_argument(
"deployment_method",
metavar="deployment-method",
choices=list(map(lambda value: value.value, DeploymentMethods.__members__.values())),
default=DeploymentMethods.github_actions.value,
help="Deployment method: %s" % ", ".join(map(lambda value: value.value, DeploymentMethods.__members__.values())))
deploy_cmd.add_argument("--schedule", required=False, help="A schedule with which to run the pipeline, in cron format. Example: '*/30 * * * *' will run the pipeline every 30 minutes.")
deploy_cmd.add_argument("--run-manually", default=True, action="store_true", help="Allows the pipeline to be run manually form Github Actions UI.")
deploy_cmd.add_argument("--run-on-push", default=False, action="store_true", help="Runs the pipeline with every push to the repository.")
deploy_cmd.add_argument("--location", default=COMMAND_DEPLOY_REPO_LOCATION, help="Advanced. Uses a specific url or local path to pipelines repository.")
deploy_cmd.add_argument("--branch", default=None, help="Advanced. Uses specific branch of the deploy repository to fetch the template.")

schema = subparsers.add_parser("schema", help="Shows, converts and upgrades schemas")
Expand All @@ -205,7 +221,7 @@ def main() -> int:
pipe_cmd.add_argument("--pipelines-dir", help="Pipelines working directory", default=None)
pipe_cmd.add_argument("--verbose", "-v", action='count', default=0, help="Provides more information for certain commands.", dest="verbosity")
# pipe_cmd.add_argument("--dataset-name", help="Dataset name used to sync destination when local pipeline state is missing.")
# pipe_cmd.add_argument("--destination", help="Destination name used to to sync when local pipeline state is missing.")
# pipe_cmd.add_argument("--destination", help="Destination name used to sync when local pipeline state is missing.")

pipeline_subparsers = pipe_cmd.add_subparsers(dest="operation", required=False)

Expand Down Expand Up @@ -269,7 +285,7 @@ def main() -> int:
else:
return init_command_wrapper(args.pipeline, args.destination, args.generic, args.location, args.branch)
elif args.command == "deploy":
return deploy_command_wrapper(args.pipeline_script_path, args.deployment_method, args.schedule, args.run_on_push, args.run_manually, args.branch)
return deploy_command_wrapper(args.pipeline_script_path, args.deployment_method, args.schedule, args.run_on_push, args.run_manually, args.location, args.branch)
elif args.command == "telemetry":
return telemetry_status_command_wrapper()
else:
Expand Down
Loading

0 comments on commit f22963c

Please sign in to comment.