
Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!
1. 센서 사용 폴링조건
예제 예시 : 슈퍼마켓 프로모션 데이터 처리
| airflow dag | |||
| copy_to_raw_supermarket_1 | copy_to_raw_supermarket_2 | copy_to_raw_supermarket_3 | copy_to_raw_supermarket_4 |
| process_supermarket_1 | process_supermarket_2 | process_supermarket_3 | process_supermarket_4 |
| create_metrics | |||
- copy_to_raw : 데이터를 raw 스토리지에 복사
- process_supermarket : 앱에서 데이터를 읽을수 있도록 데이터베이스상 모든 row 데이터를 변환후 저장
- create_metrics : 분석을 위한 다양한 지표 계산 및 집계
(1-1) 센서를 통해 조건 확인
airflow 오퍼레이터 특수 타입인 센서
센서는 특정 조건이 true인지 확인후 true면 성공
# FileSensor
# 파일 경로가 생성될 떄 까지 기다림
from airflow.sensors.filesystem import FileSensor
wait_for_supermarket_1=FileSensor(
task_id="wait_for_supermarket_1",
file_path="/work/row_data/data.csv"
)
해당 FileSensor은 /work/row_data/data.csv 파일이 존재하는지 확인후 있으면 true 반환
Poking은 센서를 확인하는 상태 주기이며, poke_interval 인수로 설정 가능 ( 디폴트는 60초 )
이 처럼 데이터가 제공되면 센서 상태가 true가 되고, 다운 스트림 태스크가 처리됨
(1-2) 사용자 지정 조건 폴링 ( 실습 데이터 던파 경매장 데이터 )
PythonSensor을 사용해 사용자 지정 조건 구현 (sensor_test.py)
import datetime as dt # 날짜와 시간 데이터를 처리하기 위한 클래스를 제공
from pathlib import Path # 파일 시스템 경로를 객체 지향적으로 다룰 수 있게 해주는 모듈
import pandas as pd # 판다스
from airflow import DAG
from airflow.operators.bash import BashOperator # Bash 명령을 실행하는 작업
from airflow.operators.python import PythonOperator # Python 함수를 실행하는 작업
from airflow.sensors.python import PythonSensor # 사용자 지정 폴링 조건
# dag 정의
dag = DAG(
dag_id="sensor_test",
start_date=dt.datetime.now(), # 현재 시각을 start_date로 설정
schedule_interval="0 0 * * *", # 매일 자정 실행
)
def _wait_for_donpodata(dir_name, year, month, day):
donpa_row_path = Path("/opt/airflow/" + dir_name)
data_files = list(donpa_row_path.glob(f"data-{year}{month:0>2}{day:0>2}-*.json"))
success_file = donpa_row_path / f"{year}{month:0>2}{day:0>2}_SUCCESS"
return bool(data_files) and success_file.exists()
wait_for_donpa_test = PythonSensor(
task_id="wait_for_donpa_test",
python_callable=_wait_for_donpodata,
op_kwargs={
"dir_name": "data",
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
},
dag=dag,
)
계속 기다리다 data_files, success_file 확인후 dag 종료 되는것을 확인!!!!

(1-3) 원활하지 않는 흐름의 센서 처리 ( 눈덩이 효과 )
만약 데이터가 더 이상 제공되지 않으면 최대 시간을 초과한 센서는 실패 처리됨
하지만 기본적으로 센서 타임아웃은 7일로 설정
즉, dag가 하루에 한번 실행된다 가정하면 dag실행 은 매일 추가 될것
이런 문제를 해결을 위해 airflow 실행 태스크 수를 제한하는 방법!!!!
# dag 정의시
dag=DAG(
Dag_id="couponing_app",
Start_date=datetime(2019,1,1),
Schedule_interval="0 0 * * *", # 매일 자정 실행
Concurrency=50, # 동시에 50개의 태스크 실행 허용
)
센서 데드록 : 실행중인 task 조건이 true가 될떄 까지 다른 task가 대기하므로 모든 슬롯이 데드록 됨
센서 클래스 poke, reschedule 을 통한 해결
- 기본적으로 poke로 설정되 있어 최대 태스크에 도달하면 새로운 태스크가 차단....
- reschedule 모드는 포크 동작을 실행할 떄만 슬롯을 차지!! 대기 시간 동안을 슬롯을 차지안함
- 동시 task수는 airflow 전역 설정 옵션으로 제어 가능!!
2. 다른 dag를 트리거 하기 ( 예제 데이터 던파 경매장 데이터 )
여러 아이템을 추가해 분석을 할떄 각 아이템 분석이 모두 완료 까지 기다리지 않고, 아이템 마다 통계 지표를 구현
import datetime as dt # 날짜와 시간 데이터를 처리하기 위한 클래스를 제공
from pathlib import Path # 파일 시스템 경로를 객체 지향적으로 다룰 수 있게 해주는 모듈
import pandas as pd # 판다스
from airflow import DAG
from airflow.operators.bash import BashOperator # Bash 명령을 실행하는 작업
from airflow.operators.python import PythonOperator # Python 함수를 실행하는 작업
from airflow.sensors.python import PythonSensor # 사용자 지정 폴링 조건
from airflow.operators.trigger_dagrun import TriggerDagRunOperator # dag 트리거
# dag 정의
dag = DAG(
dag_id="sensor_test",
start_date=dt.datetime.now(), # 현재 시각을 start_date로 설정
schedule_interval="0 0 * * *", # 매일 자정 실행
)
def _wait_for_donpodata(dir_name, year, month, day):
donpa_row_path = Path("/opt/airflow/data/" + dir_name)
data_files = list(donpa_row_path.glob(f"data-{year}{month:0>2}{day:0>2}-*.json"))
success_file = donpa_row_path / f"{year}{month:0>2}{day:0>2}_SUCCESS"
return bool(data_files) and success_file.exists()
wait_for_donpa_cracks = PythonSensor(
task_id="wait_for_donpa_cracks",
python_callable=_wait_for_donpodata,
op_kwargs={
"dir_name": "cracks",
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
},
mode="reschedule",
dag=dag,
)
wait_for_donpa_lion = PythonSensor(
task_id="wait_for_donpa_lion",
python_callable=_wait_for_donpodata,
op_kwargs={
"dir_name": "lion",
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
},
mode="reschedule",
dag=dag,
)
wait_for_donpa_cube = PythonSensor(
task_id="wait_for_donpa_cube",
python_callable=_wait_for_donpodata,
op_kwargs={
"dir_name": "cube",
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
},
mode="reschedule",
dag=dag,
)
mode="reschedule" 옵션 설명
- 리소스 절약 모드: 센서는 지정된 poke_interval 동안 작업을 일시 중지하고, 해당 시간이 지나면 다시 조건을 확인합니다.
- 자원 사용: 이 모드에서는 센서가 대기 중일 때 워커 슬롯을 해제합니다. 즉, 센서가 대기 중인 동안에도 다른 작업이 실행될 수 있습니다.

- Status : reschedule 상태인것을 확인!!!
- 여기서 각 아이템 마다 task를 붙이게 되면 복잡한 위크플로우가 만들어지게 되고 더 많은 반복 task 발생..
- 아이템 별 로 통계 dag를 호출하며 복잡성 해결
import datetime as dt # 날짜와 시간 데이터를 처리하기 위한 클래스를 제공
from pathlib import Path # 파일 시스템 경로를 객체 지향적으로 다룰 수 있게 해주는 모듈
import pandas as pd # 판다스
from airflow import DAG
from airflow.operators.bash import BashOperator # Bash 명령을 실행하는 작업
from airflow.operators.python import PythonOperator # Python 함수를 실행하는 작업
from airflow.sensors.python import PythonSensor # 사용자 지정 폴링 조건
from airflow.operators.trigger_dagrun import TriggerDagRunOperator # dag 트리거
# dag 정의
dag = DAG(
dag_id="sensor_test",
start_date=dt.datetime.now(), # 현재 시각을 start_date로 설정
schedule_interval="0 0 * * *", # 매일 자정 실행
)
def _wait_for_donpodata(dir_name, year, month, day):
donpa_row_path = Path("/opt/airflow/data/" + dir_name)
data_files = list(donpa_row_path.glob(f"data-{year}{month:0>2}{day:0>2}-*.json"))
success_file = donpa_row_path / f"{year}{month:0>2}{day:0>2}_SUCCESS"
return bool(data_files) and success_file.exists()
wait_for_donpa_cracks = PythonSensor(
task_id="wait_for_donpa_cracks",
python_callable=_wait_for_donpodata,
op_kwargs={
"dir_name": "cracks",
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
},
mode="reschedule",
dag=dag,
)
# TriggerDagRunOperator를 사용하여 dag1 트리거하는 task
trigger_dag1 = TriggerDagRunOperator(
task_id="trigger_dag1",
trigger_dag_id="cracks_metrics",
# conf 변수로 트리거시 필요한 변수 전달
conf={
"dir_name": "cracks",
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
},
dag=dag,
)
# 의존성 정의
wait_for_donpa_cracks >> trigger_dag1
# 통계 dag 정의
dag1=DAG(
dag_id="cracks_metrics",
start_date=dt.datetime.now(),
schedule_interval=None,
)
# 던파 경매 데이터 전처리 및 통계
def _calculate_stats(**kwargs):
conf = kwargs['dag_run'].conf
dir_name = conf.get("dir_name", "")
year = conf.get("year", "")
month = conf.get("month", "")
day = conf.get("day", "")
# 전처리 및 통계 계산
file_path = f"/opt/airflow/data/{dir_name}/data-{year}{month:0>2}{day:0>2}-1.json"
events = pd.read_json(file_path)
events = pd.json_normalize(events['rows'])
events = events[['count', 'price']]
# 통계
events['per_price'] = events['price'] / events['count']
events['result_ava_price'] = events['per_price'].mean()
events['diff'] = events['per_price'] - events['result_ava_price']
# 통계 저장
output_path = f"/opt/airflow/data/{dir_name}/data-{year}{month:0>2}{day:0>2}-1.csv"
events.to_csv(output_path, index=False)
calculate_stats=PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
provide_context=True, # context 전달을 활성화
dag=dag1,
)
트리거 하여 dag실행

통계 데이터 저장 확인

manual__ : 수동으로 dag 실행이 시작되었음 을 나타냄

(2-1) 다른 dag 상태 폴링하기
- TriggerDagRunOperator은 dag간 의존성 문제는 해결하지 못함..
- 다른 dag가 실행 되기전 여러 트리거 dag가 완료되어야 하면?
- 즉, dag 1,2,3 이 완료된후 집계 지표를 생성하는 dag4 가 실행되어야하면?
- airflow는 단일 dag내에 task간 의존성은 관리하지만 dag간 의존성을 관리하는 방법은 제공안함..
- 이럴떄 dag에서 task 상태를 포크하는 센서인 External TaskSensor을 적용

- wait_for_etl_dag1,2,3 task가 마지막 report 태스크를 실행전 세개의 dag가 완료된 상태를 확인하는 프락시 역할을 함
- ExternalTaskSensor 작동법은 다른 dag의 태스크를 지정해 해당 태스크 상태를 확인하는것!!
dag1 = DAG(dag_id="ingest_supermarket_data", schedule_interval="0 16 * * *",~~~)
dag2 = DAG(schedule_interval="0 16 * * *", ~~~)
# dag1 의존성 정의
DummyOperator(task_id="copy_to_raw",dag=dag1) >> DummyOperator(task_id="process_supermarket", dag=dag1)
# ExternalTaskSensor을 통해 다른 dag간 상태 폴링
wait = ExternalTaskSensor(
task_id="wait_for_process_supermarket",
# 상태 폴링할 dag_id 값 (여기서는 dag1의 id)
external_dag_id="ingest_supermarket_data",
# 상태 폴링할 dag의 task_id값 (여기서는 dag1의 process_supermarket 태스크)
external_task_id="process_supermarket",
dag=dag2,
)
report = DummyOperator(task_id="report", dag=dag2)
# dag2 의존성 정의
wait >> report
## 상태 체크 해야되는 dag가 여러개이면 (즉, 다대일)
## ExternalTaskSensor을 여러개 만들어 줘야됨
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime
# DAGs 1, 2, 3, 4 정의 (이전과 동일)
dag1 = DAG(dag_id="ingest_supermarket_data1", schedule_interval="0 16 * * *", start_date=datetime(2023, 1, 1))
dag2 = DAG(dag_id="ingest_supermarket_data2", schedule_interval="0 16 * * *", start_date=datetime(2023, 1, 1))
dag3 = DAG(dag_id="ingest_supermarket_data3", schedule_interval="0 16 * * *", start_date=datetime(2023, 1, 1))
dag4 = DAG(dag_id="ingest_supermarket_data4", schedule_interval="0 16 * * *", start_date=datetime(2023, 1, 1))
dag5 = DAG(dag_id="aggregate_supermarket_data", schedule_interval="0 16 * * *", start_date=datetime(2023, 1, 1))
# dag1 의존성 정의
DummyOperator(task_id="copy_to_raw", dag=dag1) >> DummyOperator(task_id="process_supermarket", dag=dag1)
# dag2 의존성 정의
DummyOperator(task_id="copy_to_raw", dag=dag2) >> DummyOperator(task_id="process_supermarket", dag=dag2)
# dag3 의존성 정의
DummyOperator(task_id="copy_to_raw", dag=dag3) >> DummyOperator(task_id="process_supermarket", dag=dag3)
# dag4 의존성 정의
DummyOperator(task_id="copy_to_raw", dag=dag4) >> DummyOperator(task_id="process_supermarket", dag=dag4)
# dag5 의존성 정의 - 여러 ExternalTaskSensor로 각 dag를 감시
wait1 = ExternalTaskSensor(
task_id="wait_for_dag1_process_supermarket",
external_dag_id="ingest_supermarket_data1",
external_task_id="process_supermarket",
mode="poke", # 또는 'reschedule' 모드 사용 가능
dag=dag5,
)
wait2 = ExternalTaskSensor(
task_id="wait_for_dag2_process_supermarket",
external_dag_id="ingest_supermarket_data2",
external_task_id="process_supermarket",
mode="poke",
dag=dag5,
)
wait3 = ExternalTaskSensor(
task_id="wait_for_dag3_process_supermarket",
external_dag_id="ingest_supermarket_data3",
external_task_id="process_supermarket",
mode="poke",
dag=dag5,
)
wait4 = ExternalTaskSensor(
task_id="wait_for_dag4_process_supermarket",
external_dag_id="ingest_supermarket_data4",
external_task_id="process_supermarket",
mode="poke",
dag=dag5,
)
# 모든 ExternalTaskSensor가 완료된 후 실행될 태스크
report = DummyOperator(task_id="report", dag=dag5)
# dag5 의존성 정의 - 모든 wait 태스크가 완료된 후 report 실행
[wait1, wait2, wait3, wait4] >> report
즉, ExternalTaskSensor 오퍼레이터의 wait 태스크는 외부 dag(dag1)의 특정 task(process_supermarket)가 완료되었는지 감시하는 역할!!!
EnternalTaskSensor는 자신과 정확히 동일한 실행 날짜를 가진 task에 대한 성공만 확인!!
그럼 schedule_interval이 다르면.. ExternalTaskSensor은 해당하는 task를 찾을수 없다 ㅠㅠㅠ

이런경우 ExternalTaskSensor이 스케줄이 다른 task를 검색할 수 있도록 offset 설정 가능
(2-2) ExternalTaskSensor의 execution_delta
ExternalTaskSensor 오퍼레이터는 스케줄 인터벌이 완벽히 동일한 task 상태를 폴링하는데 스케줄 인터벌이 다른경우 의도적으로 ExternalTaskSensor 오퍼레이터 execution_delta 인자를 통해 스케줄 인터벌을 맞춰줄수 있다!!
wait = ExternalTaskSensor(
task_id="wait_for_process_supermarket",
# 상태 폴링할 dag_id 값 (여기서는 dag1의 id)
external_dag_id="ingest_supermarket_data",
# 상태 폴링할 dag의 task_id값 (여기서는 dag1의 process_supermarket 태스크)
external_task_id="process_supermarket",
# schedule_interval 맞춰주기 위한 execution_delta
execution_delta=datetime.timedelta(hours=4),
dag=dag2,
)

3. REST/CLI를 통한 workflow 시작
다른 dag에서 dag를 트리거하는 방법 이외, 외부에서 REST API 및 CLI를 통해 트리거 가능!!
(3-1) CLI를 통한 dag 트리거
airflow dags trigger dag1
>>> 실행 날짜가 현재 날짜 및 시간으로 설정된 dag1을 트리거
# 태스크 콘텍스트 변수를 통해 트리거 실행된 dag의 모든 태스크에 해당 변수 사용 가능
첫번쨰 방법 : airflow dags trigger -c '{supermarket_id: 1}' dag1
두번쨰 방법 : airflow dags trigger --conf '{supermarket_id: 1}' dag1
여러 변수를 적용하는 태스크를 복제해 dag를 구성하는 경우 파이프라인에 변수를 동적으로 구현할수 있어 dag 구성이 간단 및 유연해짐!!
간단 실습 cli_trg_test.py 던파 데이터 가져오는 dag
import datetime as dt # 날짜와 시간 데이터를 처리하기 위한 클래스를 제공
from pathlib import Path # 파일 시스템 경로를 객체 지향적으로 다룰 수 있게 해주는 모듈
import pandas as pd # 판다스
from airflow import DAG
from airflow.operators.bash import BashOperator # Bash 명령을 실행하는 작업
from airflow.operators.python import PythonOperator # Python 함수를 실행하는 작업
# dag 정의
dag=DAG(
dag_id="trigger_test",
start_date=dt.datetime.now(),
schedule_interval=None, # dag 스케줄 지정안함
)
# task 정의
# 던파 경매 데이터 가져오기
fetch_events=BashOperator(
task_id="fetch_events", # task name
bash_command=(
'curl -o /opt/airflow/data/{{ dag_run.conf["pdate"] }}_events.json -G "https://api.neople.co.kr/df/auction-sold" '
'--data-urlencode "itemName=균열의 단편" '
'--data-urlencode "wordType=match" '
'--data-urlencode "wordShort=false" '
'--data-urlencode "limit=30" '
'--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'
),
dag=dag, # task가 어떤 dag에 속할것인지 명시
)
CLI 명령어
airflow dags trigger -c '{"pdate": "20240820"}' trigger_test (DAG_ID)

run_type값이 manual__ 으로 보아 : airflow 외부에서 트리거 되었나는것을 나타냄
(3-2) REST API를 사용 dag 트리거
# airflow conf 파일 수정
auth_backends = airflow.api.auth.backend.session, airflow.api.auth.backend.basic_auth
# 웹 서버 재시작
airflow webserver --daemon
curl -H "Authorization: Basic YWRtaW46YWRtaW4=" \
-X POST "http://192.168.56.10:9898/api/v1/dags/trigger_test/dagRuns" \
-H "Content-Type: application/json" \
-d '{
"conf": {
"pdate": "20240820"
}
}'
해당 오류.. 으아아아아 권한..?!?!?

{
"detail": null,
"status": 403,
"title": "Forbidden",
"type": "https://airflow.apache.org/docs/apache-airflow/2.9.3/stable-rest-api-ref.html#section/Errors/PermissionDenied"
}
예시 코드
curl \
-u admin:admin \ #airflow 사용자명, 비밀번호
-X POST \
"http://192.168.56.10:9898/api/v1/dags/print_dag_run_conf(dag_id)/dagRuns" \
-H "Content-Type: application/json" \
-d '{"conf":{}}' # 동적 변수가 없어도 엔드포인트에 데이터가 필요
-d '{"conf": {"supermarket": 1}}'
결론은 외부 시스템 및 CI/CD 에서 airflow dag를 트리거를 쉽게 할수 있다!!...
6장 요약
- 센서 오퍼레이터는 특정 사용자 조건이 참인지 거짓인지 계속 확인하는 유형
- PythonSensor 오퍼레이터는 다양한 요건에 맞는 센서인 사용자 지정조건 이다
- TriggerDagRunOpertator 오퍼레이터는 다른 dag를 트리거 할수 있다!!
- ExternalTaskSensor 오퍼레이터는 다른 dag의 task 상태를 확인 가능!!
- airflow dag는 REST API, CLI를 사용하여 외부 시스템에서 dag를 트리거 할수 있다
'데이터 엔지니어( 실습 정리 ) > airflow' 카테고리의 다른 글
| (8) - 커스텀 컴포넌트 빌드 (7) | 2024.08.28 |
|---|---|
| (7) - airflow 외부 시스템 통신 (1) | 2024.08.26 |
| (5) - airflow 태스크 간 의존성 정의 (0) | 2024.08.18 |
| (4) - airflow 콘텍스트 사용 (0) | 2024.08.11 |
| (3) - airflow 스케줄 (0) | 2024.08.07 |