Skip to content

HCPC Corrections dag added #392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions .idea/.name

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/sagerx.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/sqldialects.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 83 additions & 0 deletions airflow/dags/hcpc_corrections/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import pendulum
from airflow_operator import create_dag
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonOperator
from common_dag_tasks import get_ds_folder, get_data_folder
from pathlib import Path
import requests, pandas as pd

dag_id = "cms_hcpcs_2020"
url = "https://www.cms.gov/files/zip/2020-corrections-alpha-numeric-hcpcs-file.zip"
"""change this url to whichever one you want"""
dag = create_dag(
dag_id=dag_id,
schedule="0 4 * * *",
start_date=pendulum.yesterday(),
catchup=False,
concurrency=2,
)

def download_excel():
folder = get_data_folder(dag_id)
folder.mkdir(parents=True, exist_ok=True)
out_path = folder / "HCPC2020_Corrections_Alpha.xlsx"
response = requests.get(url)
response.raise_for_status()

zip_path = folder / "hcpcs.zip"
with open(zip_path, "wb") as f:
f.write(response.content)

import zipfile
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(folder)

"""converts xlsx to csv"""
def convert_to_csv():
folder = get_data_folder(dag_id)
excel_file = next(folder.glob("*.xlsx"))
csv_file = excel_file.with_suffix(".csv")
df = pd.read_excel(excel_file, skiprows=4)
df.to_csv(csv_file, index=False)
print(f"Saved CSV: {csv_file}")

with dag:
download_task = PythonOperator(
task_id="download_excel",
python_callable=download_excel,
)

convert_task = PythonOperator(
task_id="convert_to_csv",
python_callable=convert_to_csv,
)

load_task = PostgresOperator(
task_id="load_hcpcs",
postgres_conn_id="postgres_default",
sql="""
DROP TABLE IF EXISTS sagerx_lake.cms_hcpcs_2020 CASCADE;

CREATE TABLE sagerx_lake.cms_hcpcs_2020 (
code TEXT,
action TEXT,
eff_date TEXT,
short_desc TEXT,
long_desc TEXT,
tos TEXT,
betos TEXT,
cov TEXT,
price TEXT,
xref_code TEXT,
asc_ind TEXT,
asc_date TEXT,
comments TEXT
);

COPY sagerx_lake.cms_hcpcs_2020
FROM '/opt/airflow/data/cms_hcpcs_2020/HCPC2020_Corrections_Alpha.csv'
DELIMITER ',' CSV HEADER;
""",
)

download_task >> convert_task >> load_task
7 changes: 7 additions & 0 deletions airflow/dags/hcpc_corrections/load_sample.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DROP TABLE IF EXISTS public.julius_hcpcs_corrections_2020 CASCADE;

CREATE TABLE public.julius_hcpcs_corrections_2020 (
hcpcs_code TEXT,
description TEXT,
action TEXT
);
10 changes: 5 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,25 @@ services:
airflow-webserver:
<<: *airflow-common
container_name: airflow-webserver
command: webserver
command: >
bash -c "pip install openpyxl && airflow webserver"
environment:
<<: *airflow-common-env
AWS_ACCESS_KEY_ID: ${ACCESS_KEY}
AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY}
AWS_DEST_BUCKET: ${DEST_BUCKET}
ports:
- 8001:8080

airflow-scheduler:
<<: *airflow-common
container_name: airflow-scheduler
command: scheduler
command: >
bash -c "pip install openpyxl && airflow scheduler"
environment:
<<: *airflow-common-env
AWS_ACCESS_KEY_ID: ${ACCESS_KEY}
AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY}
AWS_DEST_BUCKET: ${DEST_BUCKET}

networks:
airflow-dbt-network:
driver: bridge
driver: bridge
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
openpyxl
1 change: 1 addition & 0 deletions test_upload.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This is a test file from sagerx