
Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!
1. task 간 의존성
(1-1) 선형 의존성 유형
각 task를 순차적으로 실행하는 ( 데이터 다운 >> 이미지 다운 >> 알림 ) 유형
task간 의존성을 통해 업스트림 의존성 성공이후 다음 task 실행 시작
(1-2) 팬인/팬아웃 의존성
# 병렬 실행 선형 의존성
fetch_weather >> clean_weather
fetch_sales >> clean_sales
# 팬 아웃 의존성 (일 대 다)
from airflow.operators.dummy import DummOperator
start=DummyOperator(task_id="start") # 더미 시작 태스크 생성
start >> [fetch_weather,fetch_sales] # 팬 아웃 의존성 태스크 생성
# 팬 인 의존성 (대 대 일)
[clean_weather,clean_sales] >> join_datasets
# 나머지 선형 태스크 체인
join_datasets >> train_model >> deploy_model
시작 한 후 fetch_weather >> clean_weather, fetch_sales >> clean_sales 해당 태스크 병렬로 실행
join_datasets 이후 태스크 부터는 선형적 진행
2. 브랜치 하기
데이터 관련 시스템 전환으로 다른 형식의 원시 데이터가 제공될 예정일떄
(2-1) 태스크 내에서 브랜치
# 실행 날짜를 통해 처리 코드 분리
def _clean_sales(**context):
if context["execution_date"] < ERP_CHANGE_DATE:
_clean_sales_old(**context) # 이전 데이터 처리 함수
else
_clean_sales_new(**context) # 이후 데이터 처리 함수
~~~
clean_sales_data=PythonOperator(
task_id="clean_sales",
python_callable=_clean_sales,
)
- 이러한 방식은 dag 구조를 수정하지 않고 dag의 유연성을 허용 하는것
- 하지만 분기가 가능한 태스크로 구성된 경우에만 작동
- 완전히 다른 태스크 체인이 필요하면 개별 태스크 세트로 분할 해야됨
(2-2) dag 내부에서 브랜치
개별 태스크 세트 개발후 dag가 이전, 새로운 작업 실행을 선택함
##### 신규 수집,정제 task 추가 #####
## 기존 task
fetch_sales_old=PythonOperator(...)
clean_sales_old=PythonOpertaor(...)
fetch_sales_old >> clean_sales_old
## 새로운 task
fetch_sales_new=PythonOperator(...)
clean_sales_new=PythonOperator(...)
fetch_sales_new >> clean_sales_new
##### BranchPythinOperator 을 통해 브랜치 작업 가능 #####
def _pick_erp_system(**context)
if context["execution_date"] < ERP_CHANGE_DATE:
return "fetch_sales_old"
else:
return "fetch_sales_new"
pick_erp_system=BranchPythonOperator(
task_id='pick_erp_system'
python_callable=_pick_erp_system,
)
# 의존성 설정
pick_erp_system >> [fetch_sales_old, fetch_sales_new]
하지만 내부 브랜치 추가시

- join_datasets 이후 태스크는 실행하지 않음...
- 트리거 규칙이 (all_success 이기에) 모든 상위 태스크가 성공해야 태스크 실행 가능한 조건..
- 즉, join_datasets 트리거 규칙을 변경해 업스트림 task 하나를 건너뛰더라도 계속 진행 되도록 변경
##### 트리거 변경 #####
join_datasets=PythonOperator(
...,
trigger_rule="none_failed" # 모든 상위 항목이 실행 완료 및 실패가 없으면 작업 시작
)
# 브랜치 후에서 다음 태스크 실행을 계속 할수 있다!!!!
이러한 방법은 join_datasets 태스크에 연결된 task가 3개임...
서로 다른 브랜치를 결합해 더미 task를 추가해 브랜치 조건을 명확하게 함
##### 더미 task 추가 #####
join_datasets 태스크에 대한 트리거 규칙 설정할 필요 없음!!!!
from airflow.operators,dummy import DummyOperator
join_branch=DummOperator(
task_id="join_erp_branch",
trigger_rule="none_failed" # 모든 상위 항목이 실행 완료 및 실패가 없으면 작업 시작
)
# 의존성 정의
[clean_sales_old, clean_Sales_new] >> join_branch
join_branch >> join_datasets
해당 방법은 join_datasets 태스크 트리거 규칙을 변경할 필요가 없어 브랜치를 좀 더 독립적 유지 가능
3. 조건부 태스크
- airflow는 특정 조건에 따라 dag에서 특정 태스크를 건너뛸 수 있는 방법 제공
- 특정 데이터 세트를 사용 할떄만 실행
- 최근 실행된 dag 경우만 task 실행 ( 백필시 )
(3-1) task 내에서 조건
가장 최근 실행된 dag에 대해서만 모델을 배포하도록 dag 변경
##### 태스크 내 조건 구현 #####
# 최근 실행 dag 에서만 태스크 실행 하기위해
def _deploy(**context):
if context["execution_date"] == 최근 실행 시간 이면
deploy_model()
deploy_PythonOperator(
task_id="deploy_model",
python_callable=_deploy,
)
해당 방법은 의도한 대로 동작은 하지만, 배포 로직 조건이 혼용...
airflow ui에서 task 결과 추적 시 에도 혼란
(3-2) 조건부 태스크 만들기
- 현재 파이프라인 실행이 가장 최근인지 확인하는 task 추가하고, 해당 task 다운스트림에 모델 배포 task추가해 배포를 조건부 가능
- execution_date가 최근 실행이 아닌 경우 다운스트림 작업을 건너 뛰도록 AirflowSkipException 함수 사용
from airflow.exceptions import AirflowSkipException
def _latest_only(**context):
left_window=context["dag"].following_schedule(context["execution_date"])
right_window=context["dag"].following_schedule(left_window)
now=pendulum.now("UTC")
if not left_window < now <= right_window: 실행 윈도우 경계가 아니면
raise AirflowSkipException("Not the most recent run!") # 다운 스트림 task 건너 뛰기
latest_only=PythonOperator(
task_id="latest_only",
python_callable+_latest_only
)
# 외존성 정의, 모델 배포 task 다운스트림 추가
latest_only >> deploy_model
(3-3) 내장 오퍼레이터 사용
airflow 내장 클레스인 LastOnlyOperator 클래스 사용
from airflow.operators.lastest_only import LastestOnlyOperator
latest_only=LatestOnlyOperator(
task_id="latest_only",
dag=dag
)
join_datasets >> train_model >> deploy_model
latest_only >> deploy_model
물론 더 복잡한 로직을 추가할 경우 PythonOperator 기반 구현이 더 효율적!!!!
4. 트리거 규칙
- airflow 기본 트리거 규칙은 all_success
- task 실행시 모든 의존적 태스크가 성공적으로 완료되야 실행가능
- all_success인 경우 업스트림 태스크 결과가 다운스트림 태스크에도 영향을 미치는 경우 전파라고 하고
- 이 경우 다운스트림 모든 task도 건너 뛸수 있음
(4-1) 기타 트리거 규칙
- all_success (default) : 모든 상위 task가 성공적으로 완료시 트리거 ( 디폴트 트리거 규칙 )
- all_failed : 모든 상위 태스크 실패, 상위 task의 오류로 실패했을 경우 트리거 ( 실패가 예상되는 상황에서 오류처리 트리거 )
- all_done : 결과 상태 관계없이 모든 부모가 실행 완료시 트리거 ( 모든 태스크 완료시 실행할 청소 코드 실행 )
- one_failed : 하나 이상의 상위 태스크가 실패하자마자 트리거 ( 알림, 롤백 같은 오류 처리 코드 빠르게 트리거 )
- one_success : 한 부모가 성공시 트리거 ( 하나의 결과를 사용할 수 있게 되는 즉시, 빠르게 트리거 )
- none_failed : 실패한 상위 태스크가 없지만, 태스크가 성공 또는 건너 뛴 경우 트리거
- none_skipped : 건너뛴 상위 태스크가 없지만 태스크 성공 또는 실패한 경우 트리거 ( 모든 업스트림 task 실행한 경우, 해당 결과를 무시하고 트리거 )
- dummy : 업스트림 task 상태 관계없이 트리거
5. task간 데이터 공유
- airflow는 XCom을 사용해 task간 작은 데이터 공유 가능
- XCom은 task간 메시지를 교환해 특정 상태를 공유
- XCom 값은 admin > XCom 항복에서 확인 가능
XCom_push
##### xcom_push 사용해 XCom 값 게시
def _train_model(**context):
model_id=str(uuid.uuid4())
context["task_instance"].xcom_push(key="model_id", value=model_id)
train_model=PythonOperator(
task_id="train_model",
python_callable=_train_model
)
XCom_pull
# tain_model 태스크에 게시한 model_id와 일치하는 XCom 값을 가져옴
# xcom 값을 가져올떄 dag_id 및 실행 날짜 정의가능
# 디폴트는 현재 dag와 실행 날짜로 설정되어 값을 가져옴
def _deploy_model(**context):
model_id=context["task_instance"].xcom_pull(
task_ids="train_model", key="model_id"
)
print(f"Deploying model {model_id}")
deploy_model=PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model
)
(5-1) 템플릿 에서 XCom 값 사용
def _deploy_model(templates_dict, **context):
model_id=templates_dict["model_id"]
print(f"Deploying model {model_id}")
deploy_model=PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model,
# 템플릿 사용
templates_dict={
"model_id": "{{task_instance.xcom_pull(
task_ids='train_model', key='model_id')}}"
},
)
(5-2) 리턴 사용 XCom 값 게시
PythonOperator 파이썬 콜러블 인수에서 반환된 값을 XCom 값으로 게시
def _train_model(**context):
model_id=str(uuid.uuid4())
return model_id
PythonOperatordml implict XCom은 return_value 키 기준 등록
(5-3) XCom 사용 고려사항
- task간 상태를 공유하는데 유용하지만, 단점 존재
- 풀링 task는 필요한 값을 사용하기 위해 task간 묵시적 의존성이 존재
- but, 명시적 의존성과 달라 dag에 표시 안되고 task 스케줄시 고려되지 않음
- XCom은 원자성을 무너뜨릴수 있음
- XCom이 저장하는 모든 값은 직렬화 지원해야 하는 기술적 한계 존재 즉, 람다 또는 여러 다중 멀티프로세스 관련 클래스 같은 일부 파이썬 유형은 XCom에 저장 안됨 또한 저장 크기가 제한됨
(5-4) 커스텀 XCom 백엔드 사용
XCom을 유연하게 활용하기 위해 XCom 백엔드를 지정할수 있는 옵션 추가
커스텀 XCom 백엔드 구조
from typing import Any
from airflow.models.xcom import BaseXCom
class CustionXComBackend(BaseXCom):
@staticmethod
def serialize_value(value: Any):
~~
@staticmethod
def deserialize_value(result) -> Any:
~~~
커스텀 XCom 백엔드는 XCom 값 저장 선택을 다양하게 함
6. taskflow api로 파이썬 태스크 연결
airflow2에서 taskflow api를 통해 파이썬 태스크 및 의존성 정의를 위한 새로운 데커레이터 기반 api 추가 제공
기존 XCom으로 데이터를 제공하는 코드
def _train_model(**context):
model_d=str(uuid.uuid4())
context["task_instance"].xcom_push(key="model_id", value=model_id)
def _deploy_model(**context):
model_id=context["task_instance"].xcom_pull(
task_ids="train_model", key="model_id"
)
print(f"Deploying model {model_id}"
train_model=PythonOperator(
test_id="train_model",
python_callable=_train_model
)
deploy_model=PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model
)
train_model >> deploy_model # 태스크 의존성 정의
해당 방식 단점은
- 함수(_train_model, _deploy_model) 정의후 python Operator을 이용해 airflow task 생성
- 모델 id 공유를 위해 xcom_push, xcom_pull 사용해 모델id값을 전송 및 반환
- xcom데이터 의존성 정의하는 것은 번거로움
taskflow api는 파이썬 함수를 태스크로 쉽게 변환, task간 데이터 공유를 명확하게 함
데커레이터를 사용함
taskflow api를 사용해 훈련, 배포 task 정의
from airflow.decorators import task
with dag(~~) as dag:
~~~
# taskflow api사용해 airflow가 train_model 함수를 래핑하도록 하는 파이썬 태스크 정의 가능
@task
def train_model():
model_id=str(uuid.uuid4())
return model_id
# 단순 파이썬 함수 인수로 전달
@task
def deploy_model(model_id: str):
print(f"Deploying model {model_id}")
# taskflow 태스크 의존성 정의
model_id=train_model()
deploy_model(model_id)
해당 코드로 두 태스크와 task간 의존성이 포함됨 dag 생성
(6-1) taskflow api 장단점
장점
- XCom를 사용하는 태스크 간소화
- XCom 문제인 task 간 의존성을 확인하지 못하는 문제 해결
단점
- PythonOperator을 사용해 구현되는 파이썬 태스크로 제한됨, 즉 다른 airflow 오퍼레이터 와 관련된 task는 일반 api를 사용해 태스크 및 태스크 의존성 정의해야됨
- 두 가지 혼용은 문제는 없지만 코드가 복잡해짐
with DAG(~~) as dag:
start=DummyOperator(task_id="start")
~~~
[clean_sales, clean_weather] >> join_datasets
# taskflow api사용해 airflow가 train_model 함수를 래핑하도록 하는 파이썬 태스크 정의 가능
@task
def train_model():
model_id=str(uuid.uuid4())
return model_id
# 단순 파이썬 함수 인수로 전달
@task
def deploy_model(model_id: str):
print(f"Deploying model {model_id}")
# taskflow 태스크 의존성 정의
model_id=train_model()
deploy_model(model_id)
join_datasets >> model_id # taskflow 스타일과 일반 task사이 의존성으로 두 가지 유형 혼합
- taskflow 유형 태스크 간 전달된 데이터는 XCom을 통해 저장
- 즉, 전달 값 은 XCom 제약 사항 적용됨( 직렬화 가능해야됨 )
- 값의 크기는 airflow 배포시 사용하는 XCom의 백엔드에 의해 제한
직렬화 : 데이터 구조나 객체를 연속적인 바이트(byte) 형태로 변환하는 과정
역직렬화 : 직렬화된 바이트 스트림을 다시 원래의 데이터 구조나 객체로 복원하는 과정
5장 요약
- airflow task 의존성을 통해 선형 태스크 의존성, 팬인/팬아웃 의존성 정의 가능
- BranchPythonOperator을 사용해 dag에 브랜치를 구성해 특정 조건에 따라 여러 실행 경로 선택 가능
- 조건부 task를 통해 특정 조건에 따른 의존성 task 실행가능
- dag 구조에서 브랜치 및 조건을 명시적 코딩시 dag 실행 방식을 해석하는데 도움
- airflow 트리거는 동작을 제어하는 트리거 규칙에 의해 제어, 태스크 다양한 상황에 대응할 수 있도록 구성
- XCom을 사용해 태스크간 상태 공유
- taskflow api는 파이썬 태스크가 많은 dag를 단순화
'데이터 엔지니어( 실습 정리 ) > airflow' 카테고리의 다른 글
| (7) - airflow 외부 시스템 통신 (1) | 2024.08.26 |
|---|---|
| (6) - airflow 워크플로 트리거 (0) | 2024.08.18 |
| (4) - airflow 콘텍스트 사용 (0) | 2024.08.11 |
| (3) - airflow 스케줄 (0) | 2024.08.07 |
| (2) - airflow 간단 실습 (0) | 2024.08.04 |