본문 바로가기
데이터 엔지니어( 실습 정리 )/airflow

(6) - airflow 워크플로 트리거

by 세용용용용 2024. 8. 18.

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!

 

1. 센서 사용 폴링조건

예제 예시 : 슈퍼마켓 프로모션 데이터 처리

airflow dag
copy_to_raw_supermarket_1 copy_to_raw_supermarket_2 copy_to_raw_supermarket_3 copy_to_raw_supermarket_4
process_supermarket_1 process_supermarket_2 process_supermarket_3 process_supermarket_4
create_metrics

 

  • copy_to_raw : 데이터를 raw 스토리지에 복사
  • process_supermarket : 앱에서 데이터를 읽을수 있도록 데이터베이스상 모든 row 데이터를 변환후 저장
  • create_metrics : 분석을 위한 다양한 지표 계산 및 집계

 

(1-1) 센서를 통해 조건 확인

airflow 오퍼레이터 특수 타입인 센서

센서는 특정 조건이 true인지 확인후 true면 성공

# FileSensor
# 파일 경로가 생성될 떄 까지 기다림
from airflow.sensors.filesystem import FileSensor

wait_for_supermarket_1=FileSensor(
    task_id="wait_for_supermarket_1",
    file_path="/work/row_data/data.csv"
)

해당 FileSensor은 /work/row_data/data.csv 파일이 존재하는지 확인후 있으면 true 반환
Poking은 센서를 확인하는 상태 주기이며, poke_interval 인수로 설정 가능 ( 디폴트는 60초 )

 

이 처럼 데이터가 제공되면 센서 상태가 true가 되고, 다운 스트림 태스크가 처리됨

 

(1-2) 사용자 지정 조건 폴링 ( 실습 데이터 던파 경매장 데이터 )

PythonSensor을 사용해 사용자 지정 조건 구현 (sensor_test.py)

import datetime as dt # 날짜와 시간 데이터를 처리하기 위한 클래스를 제공
from pathlib import Path # 파일 시스템 경로를 객체 지향적으로 다룰 수 있게 해주는 모듈
import pandas as pd # 판다스
from airflow import DAG
from airflow.operators.bash import BashOperator # Bash 명령을 실행하는 작업
from airflow.operators.python import PythonOperator # Python 함수를 실행하는 작업

from airflow.sensors.python import PythonSensor # 사용자 지정 폴링 조건

# dag 정의
dag = DAG(
    dag_id="sensor_test",
    start_date=dt.datetime.now(), # 현재 시각을 start_date로 설정
    schedule_interval="0 0 * * *", # 매일 자정 실행
)

def _wait_for_donpodata(dir_name, year, month, day):
    donpa_row_path = Path("/opt/airflow/" + dir_name)
    data_files = list(donpa_row_path.glob(f"data-{year}{month:0>2}{day:0>2}-*.json"))
    success_file = donpa_row_path / f"{year}{month:0>2}{day:0>2}_SUCCESS"
    return bool(data_files) and success_file.exists()

wait_for_donpa_test = PythonSensor(
    task_id="wait_for_donpa_test",
    python_callable=_wait_for_donpodata,
    op_kwargs={
        "dir_name": "data",
        "year": "{{ execution_date.year }}",
        "month": "{{ execution_date.month }}",
        "day": "{{ execution_date.day }}",
    },
    dag=dag,
)

 

계속 기다리다 data_files, success_file 확인후 dag 종료 되는것을 확인!!!!

 

 

(1-3) 원활하지 않는 흐름의 센서 처리 ( 눈덩이 효과 )

만약 데이터가 더 이상 제공되지 않으면 최대 시간을 초과한 센서는 실패 처리됨

하지만 기본적으로 센서 타임아웃은 7일로 설정

즉, dag가 하루에 한번 실행된다 가정하면 dag실행 은 매일 추가 될것

이런 문제를 해결을 위해 airflow 실행 태스크 수를 제한하는 방법!!!!

# dag 정의시
dag=DAG(
    Dag_id="couponing_app",
    Start_date=datetime(2019,1,1),
    Schedule_interval="0 0 * * *", # 매일 자정 실행
    Concurrency=50, # 동시에 50개의 태스크 실행 허용
)

 

센서 데드록 : 실행중인 task 조건이 true가 될떄 까지 다른 task가 대기하므로 모든 슬롯이 데드록 됨

 

센서 클래스 poke, reschedule 을 통한 해결

  • 기본적으로 poke로 설정되 있어 최대 태스크에 도달하면 새로운 태스크가 차단....
  • reschedule 모드는 포크 동작을 실행할 떄만 슬롯을 차지!! 대기 시간 동안을 슬롯을 차지안함
  • 동시 task수는 airflow 전역 설정 옵션으로 제어 가능!!

 

 

2. 다른 dag를 트리거 하기 ( 예제 데이터 던파 경매장 데이터 )

여러 아이템을 추가해 분석을 할떄 각 아이템 분석이 모두 완료 까지 기다리지 않고, 아이템 마다 통계 지표를 구현

import datetime as dt # 날짜와 시간 데이터를 처리하기 위한 클래스를 제공
from pathlib import Path # 파일 시스템 경로를 객체 지향적으로 다룰 수 있게 해주는 모듈
import pandas as pd # 판다스
from airflow import DAG
from airflow.operators.bash import BashOperator # Bash 명령을 실행하는 작업
from airflow.operators.python import PythonOperator # Python 함수를 실행하는 작업

from airflow.sensors.python import PythonSensor # 사용자 지정 폴링 조건
from airflow.operators.trigger_dagrun import TriggerDagRunOperator # dag 트리거

# dag 정의
dag = DAG(
    dag_id="sensor_test",
    start_date=dt.datetime.now(), # 현재 시각을 start_date로 설정
    schedule_interval="0 0 * * *", # 매일 자정 실행
)

def _wait_for_donpodata(dir_name, year, month, day):
    donpa_row_path = Path("/opt/airflow/data/" + dir_name)
    data_files = list(donpa_row_path.glob(f"data-{year}{month:0>2}{day:0>2}-*.json"))
    success_file = donpa_row_path / f"{year}{month:0>2}{day:0>2}_SUCCESS"
    return bool(data_files) and success_file.exists()

wait_for_donpa_cracks = PythonSensor(
    task_id="wait_for_donpa_cracks",
    python_callable=_wait_for_donpodata,
    op_kwargs={
        "dir_name": "cracks",
        "year": "{{ execution_date.year }}",
        "month": "{{ execution_date.month }}",
        "day": "{{ execution_date.day }}",
    },
    mode="reschedule",
    dag=dag,
)

wait_for_donpa_lion = PythonSensor(
    task_id="wait_for_donpa_lion",
    python_callable=_wait_for_donpodata,
    op_kwargs={
        "dir_name": "lion",
        "year": "{{ execution_date.year }}",
        "month": "{{ execution_date.month }}",
        "day": "{{ execution_date.day }}",
    },
    mode="reschedule",
    dag=dag,
)

wait_for_donpa_cube = PythonSensor(
    task_id="wait_for_donpa_cube",
    python_callable=_wait_for_donpodata,
    op_kwargs={
        "dir_name": "cube",
        "year": "{{ execution_date.year }}",
        "month": "{{ execution_date.month }}",
        "day": "{{ execution_date.day }}",
    },
    mode="reschedule",
    dag=dag,
)

 

mode="reschedule" 옵션 설명

 

  • 리소스 절약 모드: 센서는 지정된 poke_interval 동안 작업을 일시 중지하고, 해당 시간이 지나면 다시 조건을 확인합니다.
  • 자원 사용: 이 모드에서는 센서가 대기 중일 때 워커 슬롯을 해제합니다. 즉, 센서가 대기 중인 동안에도 다른 작업이 실행될 수 있습니다.

 

  • Status : reschedule 상태인것을 확인!!!
  • 여기서 각 아이템 마다 task를 붙이게 되면 복잡한 위크플로우가 만들어지게 되고 더 많은 반복 task 발생..
  • 아이템 별 로 통계 dag를 호출하며 복잡성 해결
import datetime as dt # 날짜와 시간 데이터를 처리하기 위한 클래스를 제공
from pathlib import Path # 파일 시스템 경로를 객체 지향적으로 다룰 수 있게 해주는 모듈
import pandas as pd # 판다스
from airflow import DAG
from airflow.operators.bash import BashOperator # Bash 명령을 실행하는 작업
from airflow.operators.python import PythonOperator # Python 함수를 실행하는 작업

from airflow.sensors.python import PythonSensor # 사용자 지정 폴링 조건
from airflow.operators.trigger_dagrun import TriggerDagRunOperator # dag 트리거

# dag 정의
dag = DAG(
    dag_id="sensor_test",
    start_date=dt.datetime.now(), # 현재 시각을 start_date로 설정
    schedule_interval="0 0 * * *", # 매일 자정 실행
)

def _wait_for_donpodata(dir_name, year, month, day):
    donpa_row_path = Path("/opt/airflow/data/" + dir_name)
    data_files = list(donpa_row_path.glob(f"data-{year}{month:0>2}{day:0>2}-*.json"))
    success_file = donpa_row_path / f"{year}{month:0>2}{day:0>2}_SUCCESS"
    return bool(data_files) and success_file.exists()

wait_for_donpa_cracks = PythonSensor(
    task_id="wait_for_donpa_cracks",
    python_callable=_wait_for_donpodata,
    op_kwargs={
        "dir_name": "cracks",
        "year": "{{ execution_date.year }}",
        "month": "{{ execution_date.month }}",
        "day": "{{ execution_date.day }}",
    },
    mode="reschedule",
    dag=dag,
)

# TriggerDagRunOperator를 사용하여 dag1 트리거하는 task
trigger_dag1 = TriggerDagRunOperator(
    task_id="trigger_dag1",
    trigger_dag_id="cracks_metrics",
    # conf 변수로 트리거시 필요한 변수 전달
    conf={
        "dir_name": "cracks",
        "year": "{{ execution_date.year }}",
        "month": "{{ execution_date.month }}",
        "day": "{{ execution_date.day }}",
    },
    dag=dag,
)

# 의존성 정의
wait_for_donpa_cracks >> trigger_dag1


# 통계 dag 정의
dag1=DAG(
    dag_id="cracks_metrics",
    start_date=dt.datetime.now(),
    schedule_interval=None,
)

# 던파 경매 데이터 전처리 및 통계
def _calculate_stats(**kwargs):
    conf = kwargs['dag_run'].conf
    dir_name = conf.get("dir_name", "")
    year = conf.get("year", "")
    month = conf.get("month", "")
    day = conf.get("day", "")

    # 전처리 및 통계 계산
    file_path = f"/opt/airflow/data/{dir_name}/data-{year}{month:0>2}{day:0>2}-1.json"
    events = pd.read_json(file_path)
    events = pd.json_normalize(events['rows'])
    events = events[['count', 'price']]

    # 통계
    events['per_price'] = events['price'] / events['count']
    events['result_ava_price'] = events['per_price'].mean()
    events['diff'] = events['per_price'] - events['result_ava_price']

    # 통계 저장
    output_path = f"/opt/airflow/data/{dir_name}/data-{year}{month:0>2}{day:0>2}-1.csv"
    events.to_csv(output_path, index=False)

calculate_stats=PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    provide_context=True,  # context 전달을 활성화
    dag=dag1,
)

 

 

트리거 하여 dag실행

 

통계 데이터 저장 확인

 

manual__ : 수동으로 dag 실행이 시작되었음 을 나타냄

트리거 dag 의 run_id 확인

 

(2-1) 다른 dag 상태 폴링하기

  • TriggerDagRunOperator은 dag간 의존성 문제는 해결하지 못함..
  • 다른 dag가 실행 되기전 여러 트리거 dag가 완료되어야 하면?
  • 즉, dag 1,2,3 이 완료된후 집계 지표를 생성하는 dag4 가 실행되어야하면?
  • airflow는 단일 dag내에 task간 의존성은 관리하지만 dag간 의존성을 관리하는 방법은 제공안함..
  • 이럴떄 dag에서 task 상태를 포크하는 센서인 External TaskSensor을 적용

 

 

  • wait_for_etl_dag1,2,3 task가 마지막 report 태스크를 실행전 세개의 dag가 완료된 상태를 확인하는 프락시 역할을 함
  • ExternalTaskSensor 작동법은 다른 dag의 태스크를 지정해 해당 태스크 상태를 확인하는것!!
dag1 = DAG(dag_id="ingest_supermarket_data", schedule_interval="0 16 * * *",~~~)
dag2 = DAG(schedule_interval="0 16 * * *", ~~~)

# dag1 의존성 정의
DummyOperator(task_id="copy_to_raw",dag=dag1) >> DummyOperator(task_id="process_supermarket", dag=dag1)

# ExternalTaskSensor을 통해 다른 dag간 상태 폴링
wait = ExternalTaskSensor(
    task_id="wait_for_process_supermarket",
    # 상태 폴링할 dag_id 값 (여기서는 dag1의 id)
    external_dag_id="ingest_supermarket_data",
    # 상태 폴링할 dag의 task_id값 (여기서는 dag1의 process_supermarket 태스크)
    external_task_id="process_supermarket",
    dag=dag2,
)
report = DummyOperator(task_id="report", dag=dag2)

# dag2 의존성 정의
wait >> report


## 상태 체크 해야되는 dag가 여러개이면 (즉, 다대일)
## ExternalTaskSensor을 여러개 만들어 줘야됨
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime

# DAGs 1, 2, 3, 4 정의 (이전과 동일)
dag1 = DAG(dag_id="ingest_supermarket_data1", schedule_interval="0 16 * * *", start_date=datetime(2023, 1, 1))
dag2 = DAG(dag_id="ingest_supermarket_data2", schedule_interval="0 16 * * *", start_date=datetime(2023, 1, 1))
dag3 = DAG(dag_id="ingest_supermarket_data3", schedule_interval="0 16 * * *", start_date=datetime(2023, 1, 1))
dag4 = DAG(dag_id="ingest_supermarket_data4", schedule_interval="0 16 * * *", start_date=datetime(2023, 1, 1))

dag5 = DAG(dag_id="aggregate_supermarket_data", schedule_interval="0 16 * * *", start_date=datetime(2023, 1, 1))

# dag1 의존성 정의
DummyOperator(task_id="copy_to_raw", dag=dag1) >> DummyOperator(task_id="process_supermarket", dag=dag1)

# dag2 의존성 정의
DummyOperator(task_id="copy_to_raw", dag=dag2) >> DummyOperator(task_id="process_supermarket", dag=dag2)

# dag3 의존성 정의
DummyOperator(task_id="copy_to_raw", dag=dag3) >> DummyOperator(task_id="process_supermarket", dag=dag3)

# dag4 의존성 정의
DummyOperator(task_id="copy_to_raw", dag=dag4) >> DummyOperator(task_id="process_supermarket", dag=dag4)

# dag5 의존성 정의 - 여러 ExternalTaskSensor로 각 dag를 감시
wait1 = ExternalTaskSensor(
    task_id="wait_for_dag1_process_supermarket",
    external_dag_id="ingest_supermarket_data1",
    external_task_id="process_supermarket",
    mode="poke",  # 또는 'reschedule' 모드 사용 가능
    dag=dag5,
)

wait2 = ExternalTaskSensor(
    task_id="wait_for_dag2_process_supermarket",
    external_dag_id="ingest_supermarket_data2",
    external_task_id="process_supermarket",
    mode="poke",
    dag=dag5,
)

wait3 = ExternalTaskSensor(
    task_id="wait_for_dag3_process_supermarket",
    external_dag_id="ingest_supermarket_data3",
    external_task_id="process_supermarket",
    mode="poke",
    dag=dag5,
)

wait4 = ExternalTaskSensor(
    task_id="wait_for_dag4_process_supermarket",
    external_dag_id="ingest_supermarket_data4",
    external_task_id="process_supermarket",
    mode="poke",
    dag=dag5,
)

# 모든 ExternalTaskSensor가 완료된 후 실행될 태스크
report = DummyOperator(task_id="report", dag=dag5)

# dag5 의존성 정의 - 모든 wait 태스크가 완료된 후 report 실행
[wait1, wait2, wait3, wait4] >> report

 

즉, ExternalTaskSensor 오퍼레이터의 wait 태스크는 외부 dag(dag1)의 특정 task(process_supermarket)가 완료되었는지 감시하는 역할!!!

 EnternalTaskSensor는 자신과 정확히 동일한 실행 날짜를 가진 task에 대한 성공만 확인!!

그럼 schedule_interval이 다르면.. ExternalTaskSensor은 해당하는 task를 찾을수 없다 ㅠㅠㅠ

스케줄 인터벌이 다른경우 찾을수가 없음....

 

이런경우 ExternalTaskSensor이 스케줄이 다른 task를 검색할 수 있도록 offset 설정 가능

 

(2-2) ExternalTaskSensor의 execution_delta

ExternalTaskSensor 오퍼레이터는 스케줄 인터벌이 완벽히 동일한 task 상태를 폴링하는데 스케줄 인터벌이 다른경우 의도적으로 ExternalTaskSensor 오퍼레이터 execution_delta 인자를 통해 스케줄 인터벌을 맞춰줄수 있다!!

wait = ExternalTaskSensor(
    task_id="wait_for_process_supermarket",
    # 상태 폴링할 dag_id 값 (여기서는 dag1의 id)
    external_dag_id="ingest_supermarket_data",
    # 상태 폴링할 dag의 task_id값 (여기서는 dag1의 process_supermarket 태스크)
    external_task_id="process_supermarket",
    
    # schedule_interval 맞춰주기 위한 execution_delta
    execution_delta=datetime.timedelta(hours=4),
    dag=dag2,
)

 

 

 

3. REST/CLI를 통한 workflow 시작

다른 dag에서 dag를 트리거하는 방법 이외, 외부에서 REST API 및 CLI를 통해 트리거 가능!!

(3-1) CLI를 통한 dag 트리거

airflow dags trigger dag1
>>> 실행 날짜가 현재 날짜 및 시간으로 설정된 dag1을 트리거

# 태스크 콘텍스트 변수를 통해 트리거 실행된 dag의 모든 태스크에 해당 변수 사용 가능
첫번쨰 방법 : airflow dags trigger -c '{supermarket_id: 1}' dag1
두번쨰 방법 : airflow dags trigger --conf '{supermarket_id: 1}' dag1

 

여러 변수를 적용하는 태스크를 복제해 dag를 구성하는 경우 파이프라인에 변수를 동적으로 구현할수 있어 dag 구성이 간단 및 유연해짐!!

 

간단 실습 cli_trg_test.py 던파 데이터 가져오는 dag

import datetime as dt # 날짜와 시간 데이터를 처리하기 위한 클래스를 제공
from pathlib import Path # 파일 시스템 경로를 객체 지향적으로 다룰 수 있게 해주는 모듈
import pandas as pd # 판다스
from airflow import DAG
from airflow.operators.bash import BashOperator # Bash 명령을 실행하는 작업
from airflow.operators.python import PythonOperator # Python 함수를 실행하는 작업

# dag 정의
dag=DAG(
    dag_id="trigger_test",
    start_date=dt.datetime.now(),
    schedule_interval=None, # dag 스케줄 지정안함
)

# task 정의
# 던파 경매 데이터 가져오기
fetch_events=BashOperator(
    task_id="fetch_events", # task name
    bash_command=(
        'curl -o /opt/airflow/data/{{ dag_run.conf["pdate"] }}_events.json -G "https://api.neople.co.kr/df/auction-sold" '
        '--data-urlencode "itemName=균열의 단편" '
        '--data-urlencode "wordType=match" '
        '--data-urlencode "wordShort=false" '
        '--data-urlencode "limit=30" '
        '--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'
    ),
    dag=dag, # task가 어떤 dag에 속할것인지 명시
)

 

CLI 명령어

airflow dags trigger -c '{"pdate": "20240820"}' trigger_test (DAG_ID)

run_type값이 manual__ 으로 보아 : airflow 외부에서 트리거 되었나는것을 나타냄

 

 

(3-2) REST API를 사용 dag 트리거

# airflow conf 파일 수정
auth_backends = airflow.api.auth.backend.session, airflow.api.auth.backend.basic_auth

# 웹 서버 재시작
airflow webserver --daemon
curl -H "Authorization: Basic YWRtaW46YWRtaW4=" \
-X POST "http://192.168.56.10:9898/api/v1/dags/trigger_test/dagRuns" \
-H "Content-Type: application/json" \
-d '{
    "conf": {
        "pdate": "20240820"
    }
}'

 

해당 오류.. 으아아아아 권한..?!?!?

{
  "detail": null,
  "status": 403,
  "title": "Forbidden",
  "type": "https://airflow.apache.org/docs/apache-airflow/2.9.3/stable-rest-api-ref.html#section/Errors/PermissionDenied"
}

 

예시 코드

curl \
-u admin:admin \ #airflow 사용자명, 비밀번호
-X POST \
"http://192.168.56.10:9898/api/v1/dags/print_dag_run_conf(dag_id)/dagRuns" \
-H "Content-Type: application/json" \
-d '{"conf":{}}' # 동적 변수가 없어도 엔드포인트에 데이터가 필요

-d '{"conf": {"supermarket": 1}}'

 

결론은 외부 시스템 및 CI/CD 에서 airflow dag를 트리거를 쉽게 할수 있다!!...

 

6장 요약

  • 센서 오퍼레이터는 특정 사용자 조건이 참인지 거짓인지 계속 확인하는 유형
  • PythonSensor 오퍼레이터는 다양한 요건에 맞는 센서인 사용자 지정조건 이다
  • TriggerDagRunOpertator 오퍼레이터는 다른 dag를 트리거 할수 있다!!
  • ExternalTaskSensor 오퍼레이터는 다른 dag의 task 상태를 확인 가능!!
  • airflow dag는 REST API, CLI를 사용하여 외부 시스템에서 dag를 트리거 할수 있다