From 0f1f5cb949100429261d3be8de3b47bfed438c34 Mon Sep 17 00:00:00 2001 From: Sergey Lyapustin Date: Fri, 24 Jun 2022 21:21:16 +0200 Subject: [PATCH] Simplified demo, updated documentation. --- README.md | 5 + dags/README.md | 1 + dags/etl.py | 79 -------- dags/sql/employees_schema.sql | 7 - dags/sql/employees_temp_schema.sql | 8 - docker-compose.yml | 278 ----------------------------- 6 files changed, 6 insertions(+), 372 deletions(-) create mode 100644 dags/README.md delete mode 100644 dags/etl.py delete mode 100644 dags/sql/employees_schema.sql delete mode 100644 dags/sql/employees_temp_schema.sql delete mode 100644 docker-compose.yml diff --git a/README.md b/README.md index 43ea358..25b89f4 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,11 @@ # airflow Airflow Demo on Heroku +This create a demo instance of Apache Airflow with basic configuration, including: + +- PostgreSQL DB +- Redis DB +- CeleryExecutor ## Deploy diff --git a/dags/README.md b/dags/README.md new file mode 100644 index 0000000..38cfcaf --- /dev/null +++ b/dags/README.md @@ -0,0 +1 @@ +Place your custom DAGs here. diff --git a/dags/etl.py b/dags/etl.py deleted file mode 100644 index 510d3ba..0000000 --- a/dags/etl.py +++ /dev/null @@ -1,79 +0,0 @@ -import datetime -import pendulum -import os - -import requests -from airflow.decorators import dag, task -from airflow.providers.postgres.hooks.postgres import PostgresHook -from airflow.providers.postgres.operators.postgres import PostgresOperator - - -@dag( - #schedule_interval="0 0 * * *", - schedule_interval="*/5 * * * *", - start_date=pendulum.datetime(2022, 6, 19, tz="UTC"), - catchup=False, - dagrun_timeout=datetime.timedelta(minutes=60), -) -def Etl(): - create_employees_table = PostgresOperator( - task_id="create_employees_table", - postgres_conn_id="tutorial_pg_conn", - sql="sql/employees_schema.sql", - ) - - create_employees_temp_table = PostgresOperator( - task_id="create_employees_temp_table", - postgres_conn_id="tutorial_pg_conn", - sql="sql/employees_temp_schema.sql", - ) - - @task - def get_data(): - # NOTE: configure this as appropriate for your airflow environment - data_path = "/opt/airflow/dags/files/employees.csv" - os.makedirs(os.path.dirname(data_path), exist_ok=True) - - url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv" - - response = requests.request("GET", url) - - with open(data_path, "w") as file: - file.write(response.text) - - postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") - conn = postgres_hook.get_conn() - cur = conn.cursor() - with open(data_path, "r") as file: - cur.copy_expert( - "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", - file, - ) - conn.commit() - - @task - def merge_data(): - query = """ - INSERT INTO employees - SELECT * - FROM ( - SELECT DISTINCT * - FROM employees_temp - ) - ON CONFLICT ("Serial Number") DO UPDATE - SET "Serial Number" = excluded."Serial Number"; - """ - try: - postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") - conn = postgres_hook.get_conn() - cur = conn.cursor() - cur.execute(query) - conn.commit() - return 0 - except Exception as e: - return 1 - - [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data() - - -dag = Etl() diff --git a/dags/sql/employees_schema.sql b/dags/sql/employees_schema.sql deleted file mode 100644 index fa6363c..0000000 --- a/dags/sql/employees_schema.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE TABLE IF NOT EXISTS employees ( - "Serial Number" NUMERIC PRIMARY KEY, - "Company Name" TEXT, - "Employee Markme" TEXT, - "Description" TEXT, - "Leave" INTEGER - ); \ No newline at end of file diff --git a/dags/sql/employees_temp_schema.sql b/dags/sql/employees_temp_schema.sql deleted file mode 100644 index 01bb0da..0000000 --- a/dags/sql/employees_temp_schema.sql +++ /dev/null @@ -1,8 +0,0 @@ -DROP TABLE IF EXISTS employees_temp; - CREATE TABLE employees_temp ( - "Serial Number" NUMERIC PRIMARY KEY, - "Company Name" TEXT, - "Employee Markme" TEXT, - "Description" TEXT, - "Leave" INTEGER - ); \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 98c733f..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,278 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. -# -# WARNING: This configuration is for local development. Do not use it in a production deployment. -# -# This configuration supports basic configuration using environment variables or an .env file -# The following variables are supported: -# -# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. -# Default: apache/airflow:2.3.2 -# AIRFLOW_UID - User ID in Airflow containers -# Default: 50000 -# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode -# -# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). -# Default: airflow -# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). -# Default: airflow -# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. -# Default: '' -# -# Feel free to modify this file to suit your needs. ---- -version: '3' -x-airflow-common: - &airflow-common - # In order to add custom dependencies or upgrade provider packages you can use your extended image. - # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml - # and uncomment the "build" line below, Then run `docker-compose build` to build the images. - image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.3.2} - # build: . - environment: - &airflow-common-env - AIRFLOW__CORE__EXECUTOR: CeleryExecutor - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow - # For backward compatibility, with Airflow <2.3 - AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow - AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow - AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 - AIRFLOW__CORE__FERNET_KEY: '' - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' - AIRFLOW__CORE__LOAD_EXAMPLES: 'false' - AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth' - _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} - volumes: - - ./dags:/opt/airflow/dags - - ./logs:/opt/airflow/logs - - ./plugins:/opt/airflow/plugins - user: "${AIRFLOW_UID:-50000}:0" - depends_on: - &airflow-common-depends-on - redis: - condition: service_healthy - postgres: - condition: service_healthy - -services: - postgres: - image: postgres:13 - environment: - POSTGRES_USER: airflow - POSTGRES_PASSWORD: airflow - POSTGRES_DB: airflow - volumes: - - postgres-db-volume:/var/lib/postgresql/data - healthcheck: - test: ["CMD", "pg_isready", "-U", "airflow"] - interval: 5s - retries: 5 - restart: always - - redis: - image: redis:latest - expose: - - 6379 - healthcheck: - test: ["CMD", "redis-cli", "ping"] - interval: 5s - timeout: 30s - retries: 50 - restart: always - - airflow-webserver: - <<: *airflow-common - command: webserver - ports: - - 8080:8080 - healthcheck: - test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] - interval: 10s - timeout: 10s - retries: 5 - restart: always - depends_on: - <<: *airflow-common-depends-on - airflow-init: - condition: service_completed_successfully - - airflow-scheduler: - <<: *airflow-common - command: scheduler - healthcheck: - test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] - interval: 10s - timeout: 10s - retries: 5 - restart: always - depends_on: - <<: *airflow-common-depends-on - airflow-init: - condition: service_completed_successfully - - airflow-worker: - <<: *airflow-common - command: celery worker - healthcheck: - test: - - "CMD-SHELL" - - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' - interval: 10s - timeout: 10s - retries: 5 - environment: - <<: *airflow-common-env - # Required to handle warm shutdown of the celery workers properly - # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation - DUMB_INIT_SETSID: "0" - restart: always - depends_on: - <<: *airflow-common-depends-on - airflow-init: - condition: service_completed_successfully - - airflow-triggerer: - <<: *airflow-common - command: triggerer - healthcheck: - test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] - interval: 10s - timeout: 10s - retries: 5 - restart: always - depends_on: - <<: *airflow-common-depends-on - airflow-init: - condition: service_completed_successfully - - airflow-init: - <<: *airflow-common - entrypoint: /bin/bash - # yamllint disable rule:line-length - command: - - -c - - | - function ver() { - printf "%04d%04d%04d%04d" $${1//./ } - } - airflow_version=$$(gosu airflow airflow version) - airflow_version_comparable=$$(ver $${airflow_version}) - min_airflow_version=2.2.0 - min_airflow_version_comparable=$$(ver $${min_airflow_version}) - if (( airflow_version_comparable < min_airflow_version_comparable )); then - echo - echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" - echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" - echo - exit 1 - fi - if [[ -z "${AIRFLOW_UID}" ]]; then - echo - echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" - echo "If you are on Linux, you SHOULD follow the instructions below to set " - echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." - echo "For other operating systems you can get rid of the warning with manually created .env file:" - echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user" - echo - fi - one_meg=1048576 - mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) - cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) - disk_available=$$(df / | tail -1 | awk '{print $$4}') - warning_resources="false" - if (( mem_available < 4000 )) ; then - echo - echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" - echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" - echo - warning_resources="true" - fi - if (( cpus_available < 2 )); then - echo - echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" - echo "At least 2 CPUs recommended. You have $${cpus_available}" - echo - warning_resources="true" - fi - if (( disk_available < one_meg * 10 )); then - echo - echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" - echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" - echo - warning_resources="true" - fi - if [[ $${warning_resources} == "true" ]]; then - echo - echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" - echo "Please follow the instructions to increase amount of resources available:" - echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin" - echo - fi - mkdir -p /sources/logs /sources/dags /sources/plugins - chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} - exec /entrypoint airflow version - # yamllint enable rule:line-length - environment: - <<: *airflow-common-env - _AIRFLOW_DB_UPGRADE: 'true' - _AIRFLOW_WWW_USER_CREATE: 'true' - _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} - _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} - _PIP_ADDITIONAL_REQUIREMENTS: '' - user: "0:0" - volumes: - - .:/sources - - airflow-cli: - <<: *airflow-common - profiles: - - debug - environment: - <<: *airflow-common-env - CONNECTION_CHECK_MAX_COUNT: "0" - # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 - command: - - bash - - -c - - airflow - - # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up - # or by explicitly targeted on the command line e.g. docker-compose up flower. - # See: https://docs.docker.com/compose/profiles/ - flower: - <<: *airflow-common - command: celery flower - profiles: - - flower - ports: - - 5555:5555 - healthcheck: - test: ["CMD", "curl", "--fail", "http://localhost:5555/"] - interval: 10s - timeout: 10s - retries: 5 - restart: always - depends_on: - <<: *airflow-common-depends-on - airflow-init: - condition: service_completed_successfully - -volumes: - postgres-db-volume: