데이터 엔지니어( 실습 정리 )/airflow
(3) - airflow 스케줄
세용용용용
2024. 8. 7. 22:14

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!
1. airflow 스케줄링 ( 예제 데이터 : 던파 경매장 데이터 )
책에서는 사용자 이벤트를 api로 가져오지만 해당 코드를 찾을수가 없어 대체 했습니다 ㅠㅠㅠㅠㅠ
- 사용자 이벤트를 매일 추적하고 분석할수 있는 dag를 구현해보자!!
- 이벤트 로그는 데이터가 많기에 오랜 기간 데이터를 저장하지 않음, 하지만 그럼에도 필요한 경우는 아마존 s3 및 타사 클라우드를 스토리지를 통해 낮은 비용으로 데이터 저장 가능
던파 경매장 이벤트 pipline
| dag |
| 던파 경매장 이벤트 data get ( task 1 ) |
| 통계 및 계산 ( task 2 ) |
(1-1) dag 실습 ( 01_unscheduled.py )
import datetime as dt # 날짜와 시간 데이터를 처리하기 위한 클래스를 제공
from pathlib import Path # 파일 시스템 경로를 객체 지향적으로 다룰 수 있게 해주는 모듈
import pandas as pd # 판다스
from airflow import DAG
from airflow.operators.bash import BashOperator # Bash 명령을 실행하는 작업
from airflow.operators.python import PythonOperator # Python 함수를 실행하는 작업
# dag 정의
dag=DAG(
dag_id="01_unscheduled",
start_date=dt.datetime(2024, 8, 6),
schedule_interval=None, # dag 스케줄 지정안함
)
# task 정의
# 던파 경매 데이터 가져오기
fetch_events=BashOperator(
task_id="fetch_events", # task name
bash_command=(
'mkdir -p /opt/airflow/data/data && '
'curl -o /opt/airflow/data/events.json -G "https://api.neople.co.kr/df/auction-sold" '
'--data-urlencode "itemName=균열의 단편" '
'--data-urlencode "wordType=match" '
'--data-urlencode "wordShort=false" '
'--data-urlencode "limit=30" '
'--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'
),
dag=dag, # task가 어떤 dag에 속할것인지 명시
)
# 던파 경매 데이터 전처리 및 통계
def _calculate_stats(input_path, output_path):
# 전처리
events=pd.read_json(input_path)
events=pd.json_normalize(events['rows'])
events=events[['count','price']]
# 통계
events['per_price']=events['price']/events['count']
events['result_ava_price'] = events['per_price'].mean()
events['diff']=events['per_price']-events['result_ava_price']
# 경로가 있는지 확인
Path(output_path).parent.mkdir(exist_ok=True) # 부모디렉토리를 반환하고 생성( 존재시 넘김 )
events.to_csv(output_path, index=False)
calculate_stats=PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={
"input_path": "/opt/airflow/data/events.json",
"output_path": "/opt/airflow/data/csv_dir/stats.csv",
},
dag=dag,
)
# 의존성 정의
fetch_events >> calculate_stats
requirements.txt 추가
apache-airflow==2.9.3
requests==2.31.0
pandas==1.3.5
numpy==1.21.5
도커 build and run
# docker build
docker build -t airflow_test:2.9.3 .
# docker run
docker run -d \
-p 9898:8080 \
-v /work/dag_airflow/01_unscheduled.py:/opt/airflow/dags/01_unscheduled.py \
-v /work/rowdata_airflow:/opt/airflow/data \
--name airflow_container \
airflow_test:2.9.3
# 컨테이너 접속
docker exec -it airflow_container /bin/bash
# 스케줄 실행
nohup airflow scheduler > /dev/null 2>&1 &
실행 테스트

로컬에 데이터 저장 확인

최종 통계 데이터 확인

2. airflow 스케줄링 ( 정기적 실행 )
- dag 정의시 schedule_interval=None 이거나 디폴트로 초기화시 예약실행이 되지않음 즉, ui를 통해 사용자가 수동 트리거를 통해 워크플로우를 동작
- 2장에서 schedule_interval="@daily" 로 dag를 정의하며 매일 자정에 주기적으로 워크플로우를 실행할수 있었다.
- 종료 인자가 없으면 이론상 계속 실행이됨, 만약 기간이 정해져 있는 경우라면 end_date에 인수를 설정해 실행 중지end_date=dt.datetime(year=2019, month=1, day=5)
- 다양한 스케줄을 위해 airflow는 cron을 지원!! ( * * * * * / 분 시 일 월 요일 )
(2-1) airflow cron 예제
0 * * * * >>> 정시에 실행
0 0 * * * >>> 자정에 실행
0 0 * * 0 >>> 일요일 자정에 실행
0 0 1 * * >>> 1월 자정에 실행
45 23 * * SAT >>> 토요일 23시 45분에 실행
좀더 복잡한 cron도 가능
0 0 * * MON, WED, FRI >>> 월,화,금 자정에 실행
0 0 * * MON-FRI >>> 월~금 자정에 실행
0 0,12 * * * >>> 24,12시에 실행
(2-2) airflow 빈도 기반 스케줄 설정 ( timedelta )
- cron 기반 스케줄일 특정 빈도마다 스케줄을 정의하는 것에는 한계가 있음...
- airflow는 빈도기반 스케줄링을 위한 timedelta도 지원 ( schedule_interval=dt.timedelta(days=3) )
3. 데이터 증분 처리하기
airflow 스케줄을 통해 주기적으로 데이터 워크플로으 작업을 하며 산출물을 일자별로 분리하도록 구현
(3-1) 실행 날짜를 사용해 동적 시간 참조
execution_date, next_execution_date >> 스케줄 간격으로 실행, 종료되는 시간을 위한 매개변수
만약, execution_date, next_execution_date 기준으로 동적으로 데이터를 가져오고 싶으면
start_date={{execution_date.strftime('%Y-%m-%d')}}
end_date={{next_execution_date.strftime('%Y-%m-%d')}}
매개변수를 통해 동적으로 실행날짜를 참조하고 strftime 메서드를 통해 문자열 형식으로 반환하여 날짜 지정 가능
(3-2) 데이터 파티셔닝
산출물에도 동적 매개변수를 추가하여 덮어쓰는 방식이 아닌 주기적으로 데이터를 파티셔닝 가능
##### 기존 task #####
fetch_events=BashOperator(
task_id="fetch_events", # task name
bash_command=(
'mkdir -p /opt/airflow/data/data && '
'curl -o /opt/airflow/data/events.json -G "https://api.neople.co.kr/df/auction-sold" '
'--data-urlencode "itemName=균열의 단편" '
'--data-urlencode "wordType=match" '
'--data-urlencode "wordShort=false" '
'--data-urlencode "limit=30" '
'--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'
),
dag=dag, # task가 어떤 dag에 속할것인지 명시
)
>>> 스케줄로 실행시 매번 데이터는 events.json 파일에 덮어쓰기됨
##### 수정 task #####
fetch_events=BashOperator(
task_id="fetch_events", # task name
bash_command=(
'mkdir -p /opt/airflow/data/data && '
'curl -o /opt/airflow/data/{{execution_date.strftime('%Y-%m-%d')}}events.json -G "https://api.neople.co.kr/df/auction-sold" '
'--data-urlencode "itemName=균열의 단편" '
'--data-urlencode "wordType=match" '
'--data-urlencode "wordShort=false" '
'--data-urlencode "limit=30" '
'--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'
),
dag=dag, # task가 어떤 dag에 속할것인지 명시
)
>>> 동적으로 매개변수를 추가하며 데이터를 파티셔닝 할수 있다
이를 통해 데이터 세트를 작고 관리하기 쉬운 조각으로 나누는 전략이 가능 ( 파티셔닝 )
이후 받아온 데이터를 처리하는 task로 같이 변경해주면 완료
##### 기존 task #####
# 던파 경매 데이터 전처리 및 통계
def _calculate_stats(input_path, output_path):
# 전처리
events=pd.read_json(input_path)
events=pd.json_normalize(events['rows'])
events=events[['count','price']]
# 통계
events['per_price']=events['price']/events['count']
events['result_ava_price'] = events['per_price'].mean()
events['diff']=events['per_price']-events['result_ava_price']
# 경로가 있는지 확인
Path(output_path).parent.mkdir(exist_ok=True) # 부모디렉토리를 반환하고 생성( 존재시 넘김 )
events.to_csv(output_path, index=False)
calculate_stats=PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={
"input_path": "/opt/airflow/data/events.json",
"output_path": "/opt/airflow/data/csv_dir/stats.csv",
},
dag=dag,
)
##### 변경 task #####
# 던파 경매 데이터 전처리 및 통계
def _calculate_stats(input_path, output_path):
# 전처리
events=pd.read_json(input_path)
events=pd.json_normalize(events['rows'])
events=events[['count','price']]
# 통계
events['per_price']=events['price']/events['count']
events['result_ava_price'] = events['per_price'].mean()
events['diff']=events['per_price']-events['result_ava_price']
# 경로가 있는지 확인
Path(output_path).parent.mkdir(exist_ok=True) # 부모디렉토리를 반환하고 생성( 존재시 넘김 )
events.to_csv(output_path, index=False)
calculate_stats=PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={
"input_path": "/opt/airflow/data/{{execution_date.strftime('%Y-%m-%d')}}events.json",
"output_path": "/opt/airflow/data/csv_dir/{{execution_date.strftime('%Y-%m-%d')}}stats.csv",
},
dag=dag,
)
4. airflow 실행날짜 이해
- 간격 기반 스케줄 : 명시적 스케줄 간격 동안 실행
- 시점 기반 스케줄 : 시작/종료 시점으로 스케줄 간격을 추측
5. 과거 데이터 메꾸기 위한 백필 사용
airflow는 과거의 시작날로 스케줄 정의 가능 즉, dag를 과거 시점부터 실행할수 있고 이런 프로세스를 백필 이라함
dag=DAG(
dag_id="test",
schedule_interval="@daily",
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
catchup=False, # 백필 비활성화 변수
)
dag실행 날짜 2019-1-3 이면 현재 스케줄 간격 부터 작업 시작
만약 catchup=True 이면 dag의 start_date 스케줄 간격 부터 작업 시작 ( 백필 )
- 백필의 장점은 코드 변경 시에서 데이터 재처리 가능, 하지만 여러 상황에 따라 제한이 있다.
6. 태스크 디자인을 위한 사례
airflow task의 가장 중요한 두 속성 >>> 원자성, 멱등성
(6-1) 원자성
- airflow 태스크는 성공적으로 수행되거나 시스템 상태에 영향을 미치지 않고 실패 하도록 정의
- 모든것이 완료되거나, 완료되지 않도록 보장
- 한 task 안에서 두개의 작업은 원자성을 무너뜨림
# 던파 경매 데이터 전처리 및 통계
def _calculate_stats(input_path, output_path):
# 전처리
events=pd.read_json(input_path)
events=pd.json_normalize(events['rows'])
events=events[['count','price']]
# 통계
events['per_price']=events['price']/events['count']
events['result_ava_price'] = events['per_price'].mean()
events['diff']=events['per_price']-events['result_ava_price']
# 경로가 있는지 확인
Path(output_path).parent.mkdir(exist_ok=True) # 부모디렉토리를 반환하고 생성( 존재시 넘김 )
events.to_csv(output_path, index=False)
# 파일을 다른 시스템으로 보내는 함수 실행
scp_file(output_path, ip=192.168.56.33, user=root)
calculate_stats=PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={
"input_path": "/opt/airflow/data/{{execution_date.strftime('%Y-%m-%d')}}events.json",
"output_path": "/opt/airflow/data/csv_dir/{{execution_date.strftime('%Y-%m-%d')}}stats.csv",
},
dag=dag,
)
- 기존 통계 처리후 csv로 저장하는 task에 다른 시스템으로 데이터를 보내는 작업을 추가시
- 시스템 문제로 (네트워크 등등..) 파일이 전송되지 않더라도 로컬에 csv는 저장이 되므로 작업이 성공한 것 처럼 보임,,
- 해당 작업 원자성 유지를 위해 다른 시스템으로 보내는 작업은 별도의 task로 분리 해야됨
def _scp_file(ip, user, **context):
# 파일을 다른 시스템으로 보내는 함수 실행
scp_file(output_path, ip=ip, user=user)
scp_file=PythonOperator(
python_callable=_scp_file,
task_id="scp_file"
op_kwargs={"ip":"192.168.56.33", "user":"root"},
templates_dict={"output_path":"/opt/airflow/data/csv_dir/{{execution_date.strftime('%Y-%m-%d')}}stats.csv"},
dag=dag,
)
- 각 작업을 task로 따로 분리시 서로 영향을 주지 않기에 원자성이 유지됨
(6-2) 멱등성
입력 변경 없이 태스크를 다시 실행해도 전체 결과는 변경되지 않아야됨
# 데이터 추가
'curl -O /opt/airflow/data/events.json -G "https://api.neople.co.kr/df/auction-sold" '
'--data-urlencode "itemName=균열의 단편" '
'--data-urlencode "wordType=match" '
'--data-urlencode "wordShort=false" '
'--data-urlencode "limit=30" '
'--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'
# 데이터 덮어쓰기
'curl -o /opt/airflow/data/events.json -G "https://api.neople.co.kr/df/auction-sold" '
'--data-urlencode "itemName=균열의 단편" '
'--data-urlencode "wordType=match" '
'--data-urlencode "wordShort=false" '
'--data-urlencode "limit=30" '
'--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'
데이터 쓰는 태스크 예시 일부에서
- 첫번쨰는 같은 파일에 추가를 하게되므로 다시 실행할 떄마다 결과가 점점 늘어남 ( 비멱등성 태스크 )
- 두번쨰는 같은 파일에 덮어 쓰게되므로 다시 실행하더라도 동일안 결과가 생성 ( 멱등성 태스크 )
3장 정리
- 스케줄 을 통해 dag실행 가능
- airflow는 cron, timedelta를 통해 다양한 스케줄 구성 가능
- 템플릿을 사용해 변수 동적 할당 가능
- 백필을 통해 과거 시점 작업 실행 가능
- 멱등성은 동일 출력 결과를 생성하게 구현하여 동일한 재실행 보장