-
파이프라인을 구축하기 전, 접근 방식에 대한 기술적 계획을 세우는 것이 중요하다.
-
데이터로 무엇을 하려는지에 따라 빈도, 데이터의 크기, 형식, 소스 유형 등을 고려해야한다.
-
시뮬레이션 할 예제 서비스 설명
- 주식 시장 예측을 위해 회사의 페이지 뷰 증가 / 감소를 파악하고 감성 분석을 수행
- 위키피디어에서 매 시간 페이지 뷰 덤프 하나를 다운로드
- 압축 해제 후 페이지 뷰 데이터를 확인
dag = DAG(
dag_id="listing_4_01",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@hourly",
)
get_data = BashOperator(
task_id="get_data",
bash_command=(
"curl -o /tmp/wikipageviews.gz "
"https://dumps.wikimedia.org/other/pageviews/"
"{{ execution_date.year }}/" #이중 중괄호는 런타임 시에 삽입될 변수를 나타낸다.
"{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
"pageviews-{{ execution_date.year }}"
"{{ '{:02}'.format(execution_date.month) }}"
"{{ '{:02}'.format(execution_date.day) }}-"
"{{ '{:02}'.format(execution_date.hour) }}0000.gz" #모든 파이썬 변수 또는 표현식에 대해 제공할 수 있다.
),
dag=dag,
)
- execution_data는 작업 런타임 시에 사용할 수 있는 변수이다.
- 런타임시에 값이 들어오기 때문에 프로그래밍 할 때는 값을 알 수 없다.
- 이중 중괄호는 Jinja 템플릿 문자열을 나타낸다.
- Airflow는 날짜 시간에 Pendulum 라이브러리를 사용하는데, execution_data는 Pendulum 라이브러리의 datetime 객체이기 때문에 파이썬 datetime의 호환 객체이다.
- pendulum은 파이썬의 datetime과 동일하게 작동하기 때문에 같은 결과를 얻는다.
>>> from datetime import datetime
>>> import pendulum
>>> datetime.now().year
2022
>>> pendulum.now().year
2022
- 따라서 execution_date의 month, day, hour를 얻을 수 있는 것이다.
- execution_date 뿐만 아니라 더 많은 변수가 템플릿화 될 수 있다.
- 태스크 콘텍스트 reference 설명 (version 2.4.1 기준)
키 | 설명 | 예시 |
---|---|---|
data_interval_start | 태스크 스케줄 간격의 시작 날짜 / 시간 (기존의 execution_date) | DateTime(2022, 10, 23, 0, 0, 0, tzinfo=Timezone('UTC')) |
data_interval_end | 태스크 스케줄 간격의 종료 날짜 / 시간 (기존의 next_execution_date) | DateTime(2022, 10, 24, 0, 0, 0, tzinfo=Timezone('UTC')) |
logical_date | 태스크 스케줄 간격의 시작 날짜 / 시간 (기존의 execution_date) | 자동실행이랑 트리거랑 결과 값 다름.. (확인 필요) 자동실행은 data_interval_start와 같고, 트리거는 트리거 작동 시간임 |
ds | %Y-%m-%d 형식의 logical_date | “2018-01-01” |
ds_nodash | %Y%m%d 형식의 logical_date | “20180101” |
ts | %Y-%m-%dT%H:%M:%S%z 형식의 logical_date | ”2018-01-01T00:00:00+00:00” |
ts_nodash_with_tz | %Y%m%dT%H%M%S%z 형식의 logical_date | ”20180101T000000+0000” |
ts_nodash | %Y%m%dT%H%M%S 형식의 logical_date | ”20180101T000000” |
prev_data_interval_start_success | 직전 성공한 태스크 스케줄 간격의 시작 날짜 / 시간 | “2022-10-23T00:00:00+00:00” |
prev_data_interval_end_success | 직전 성공한 태스크 스케줄 간격의 종료 날짜 / 시간 | “2022-10-24T00:00:00+00:00” |
prev_start_date_success | 직전 성공한 태스크 스케줄 간격의 실제 실행 날짜 / 시간 | 2022-10-24T06:09:54.429103+00:00 |
dag | 현재 DAG 개체 | DAG object |
task | 현재 오퍼레이터 | PythonOperator object |
macros | airflow.macros 모듈 | macro module |
task_instance | 현재 TaskInstance 객체 | TaskInstance object |
ti | task_instance와 동일한 현재 TaskInstance 객체 | TaskInstance object |
params | 태스크 콘텍스트에 대한 사용자 제공 변수 | {} |
var | Airflow 전역에 대한 사용자 지정 변수 | https://dydwnsekd.tistory.com/65 |
var.value.{my_var} | bashoperator에서의 value 형태의 사용자 지정 변수 호출 | my_var 변수의 값 |
var.json.{my_var}.{key} | bashoperator에서의 json 형태의 사용자 지정 변수 호출 | my_var 변수의 key 값에 대한 value 값 |
conn.my_conn_id | ||
task_instance_key_str | 현재 TaskInstance의 고유 식별자 ({dag_id}{task_id}{ds_nodash}) | “dag_id__task_id__20190101” “listing_4_03__print_context__20221024” |
conf | Airflow 구성 | airflow.configuration. AirflowConfigParser object |
run_id | DagRun의 run_id (일반적으로 접두사 + datetime으로 구성된 키) | “scheduled__2022-10-22T00:00:00+00:00” “manual__2022-10-24T06:09:53.925423+00:00” |
dag_run | 현재 DagRun 개체 | DagRun object |
test_mode | Airflow가 테스트 모드에서 실행 중인지 여부 | True or False |
- macros 모듈
키 | 설명 | 예시 |
---|---|---|
macros.datetime | datetime.datetime과 같음 | |
macros.timedelta | datetime.timedelta와 같음 | |
macros.dateutil | datetuil과 같음 | |
macros.time | datetime.time과 같음 | |
macros.uuid | uuid와 같음 | |
macros.random | random.random과 같음 | 0~1 사이의 랜덤 float 값 |
macros.ds_add | YYYY-MM-DD 포멧의 날짜에 일수를 더해줌 | ds_add('2015-01-01', 5) |
'2015-01-06' | ||
macros.ds_format | 날짜 형식 변환 | ds_format('2015-01-01', "%Y-%m-%d", "%m-%d-%y") |
'01-01-15' |
- BashOperator는 런타임에 자동으로 템플릿이 지정되는 bash_command 인수에 문자열을 제공한다.
- 그에 반해 PythonOperator는 런타임 콘텍스트로 템플릿화할 수 있는 인수를 사용치 않고 별도로 런타임 콘텍스트를 적용할 수 있는 python_callable 인수를 사용한다.
- python_callable에 오는 것은 문자열이 아니라 함수이기 때문에 함수에서 키워드 인수를 받아서 사용할 수 있다.
- 즉 함수 내의 코드를 자동으로 템플릿화 할 수 없기 때문에 직접 인수로 전달해주어야한다.
def templated_test(d1):
print("{{ ds }}")
print("ds test:", d1)
# 해당 함수를 PythonOperator로 수행해도 첫번째 프린트문은 '{{ ds }}'만 출력됨
def _print_context(****kwargs**):
print(context)
def _print_context(****context**):
# 키워드 인수인 **kwargs 이름을 태스크 콘텍스트를 저장하려는 의도를 표현하기 위해 context로 변경
print(context)
print_context = PythonOperator(
task_id="print_context",
python_callable=_print_context,
dag=dag
)
- 콘텍스트 변수는 모든 콘텍스트 변수의 집합이며 현재 실행되는 태스크의 시작 및 종료 날짜 시간 인쇄와 같이 태스크 실행 간격에 대해 다양한 동작을 제공할 수 있다.
def _print_context(****context**):
start = context["execution_date"] #context로 부터 execution_date 추출
end = context["next_execution_date"]
print(f"Start: {start}, end: {end}")
print_context = PythonOperator(
task_id="print_context", python_callable=_print_context, dag=dag
)
# 출력 예시:
# Start: 2019-07-13T14:00:00+00:00, end: 2019-07-13T15:00:00+00:00
- 명시적으로 context 변수를 키워드 인수로 설정할 수 있다.
- execution_date라는 인수가 필요하다는 것을 선언하였기 때문에 context 인수에서 캡처되지 않는다.
def _get_data(execution_date, ****context**): #execution_date라는 인수가 필요하다는 것을 선언
year, month, day, hour, *_=execution_date.timetuple()
.....
-> context["execution_date"] 처럼 사용하지 않고 바로 execution_date 사용 가능
-
PythonOperator는 콜러블 함수에서 추가 인수를 제공하는 방법도 지원한다.
- 첫 번째 방법 :
op_args
인수 사용-
오퍼레이터를 실행하면 op_args에 제공된 리스트의 각 값이 콜러블 함수에 전달된다.
- _get_data("/tmp/wikipageviews.gz") 처럼 호출한 것과 동일한 결과
def _get_data(output_path, **context): #output_path라는 인수가 필요하다는 것을 선언 (생략) get_data = PythonOperator( task_id="get_data", python_callable=_get_data, **op_args**=["/tmp/wikipageviews.gz"], #op_args를 사용하여 콜러블 함수에 추가 변수 제공 dag=dag, )
-
def _get_data(*output_path, **context): #output_path라는 인수가 필요하다는 것을 선언 (생략) get_data = PythonOperator( task_id="get_data", python_callable=_get_data, **op_args**=["/tmp/wikipageviews.gz","/tmp/wikipageviews1.gz"], #op_args를 사용하여 콜러블 함수에 추가 변수 제공 dag=dag, )
- 두 번째 방법 :
op_kwargs
인수 사용-
op_args와 유사하게 op_kwargs의 모든 값은 콜러블 함수에 전달되지만, 여기서는 키워드 인수로 전달된다.
- _get_data(output_path = "/tmp/wikipageviews.gz") 처럼 호출한 것과 동일한 결과
def _get_data(output_path, **context): #output_path라는 인수가 필요하다는 것을 선언 (생략) get_data = PythonOperator( task_id="get_data", python_callable=_get_data, **op_kwargs**={"output_path" : "/tmp/wikipageviews.gz"}, #op_kwargs에 주어진 명령어가 호출 가능한 키워드 인수로 전달됨 dag=dag, )
-
- 첫 번째 방법 :
- Airflow UI는 템플릿 인수 오류를 디버깅하는데 유용하다.
- Task의 Details의 Rendered Template을 보면 템플릿 인수 값을 검사할 수 있다.
UI 확인의 단점
- 렌더링된 템플릿 보기는 Airflow에서 작업이 스케줄 될때까지 기다린 후 확인 가능.
CLI로 확인하기
- CLI로 확인하면 작업을 실행하지 않고도 UI에 표시된 것과 동일한 결과를 확인 가능.
- 메타스토어에도 아무것도 등록되지 않음
airflow tasks render [dag id] [task id] [desired execution date]
ex. airflow tasks render listing_4_03 print_context 2022-10-25T08:27:33.038119+00:00
시간별 위키피디아 페이지 뷰를 처리하여 사용하는 사례 진행
- 페이지 뷰를 추출하여 Postgres 데이터 베이스에 기록한다.
- 이때, 데이터베이스에 직접 쓰는 작업은 PostgresOperator로 수행한다.
- 메타스토어를 사용 (XCom: 5장에서 다룸)
- airflow는 XCom이라는 기본 메커니즘을 제공하여 Airflow 메타스토어에서 pickle 개체를 저장하고 나중에 읽을 수 있다.
- airflow의 메타스토어(일반적으로 MySQL 또는 Postgres 데이터베이스)는 크기가 한정되어 있고, 피클링 된 객체는 메타스토어의 blob에 저장되기 때문에 문자열 몇 개 같은 작은 데이터 전송 시에 적용하는 것이 좋다.
- 영구적인 위치 (ex. 디스크나 데이터베이스)에 태스크 결과를 기록
- 큰 데이터를 태스크 간 전송하기 위해 적용하는 것이 좋다.
- 향후 더 많은 페이지 처리로 데이터 크기가 커질 수 있을 때도 XCom 대신 디스크에 결과를 저장하는 것이 좋다.
write_to_postgres = PostgresOperator(
task_id="write_to_postgres",
postgres_conn_id="my_postgres", # 연결에 사용할 인증 정보의 식별자
sql="postgres_query.sql", # SQL 쿼리 또는 SQL 쿼리를 포함하는 파일의 경로
dag=dag,
)
- Airflow는
postgres_conn_id
등 자격증명 식별자들을 메타스토어에 저장하고 관리하며 운영자가 필요할때 가져와서 쓸 수 있다.
CLI를 사용하여 자격 증명 등록
airflow connections add \
--conn-type postgres \
--conn-host localhost \
--conn-login postgres \
--conn-password mysecretpassword \
my_postgres # <- 연결 식별자
그러면 연결이 UI에 생성된다.
UI에서 자격 증명 등록
Admin > Connections > +
- PostgresOperator는 Postgres와 통신하기 위해 훅hook이라고 불리는 것을 인스턴스화 한다.
- 인스턴스화된 훅은 연결 생성, Postgres에 쿼리를 전송하고 연결에 대한 종료 작업을 처리한다.
- 여기서 오퍼레이터는 사용자의 요청을 훅으로 전달하는 작업만 담당한다.
- 파이프라인을 구축할 때는 훅은 오퍼레이터 내부에서 동작하기 때문에 신경 쓸 필요는 없다.
- 오퍼레이터의 일부 인수는 템플릿화 할 수 있다.
- 템플릿 작업은 런타임에 실행된다.
- PythonOperator 템플릿은 다른 오퍼레이터와 다르게 작동한다. 변수를 호출 가능하도록 구성해서 전달해야 한다.
- 템플릿 인수의 결과는 airflow tasks render로 확인할 수 있다.
- 오퍼레이터는 훅을 통해 다른 시스템과 통신할 수 있다.
- 오퍼레이터는 무엇을 해야하는지 기술하고, 훅은 작업 방법을 결정한다.