데이터 엔지니어( 실습 정리 )/airflow
(11) - 효율적인 dag 모범 사례
세용용용용
2024. 9. 10. 20:58

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!
- 효율적인 dag 작성 사례
1. 깔끔한 dag 작성
(1-1) 스타일 가이드 사용
- 효율적인 코드 관리를 위한 코딩 스타일 가이드를 따르기
- 스타일 가이드 준수 확인 검사기 : pylint, flake ( 둘 다 정적 코드 검사기 )
- 코드 포매터 도구를 통해 코드를 형식화 : YARF, Black ( 코드를 정해진 규칙을 적용해 바꿈 )

(1-2) 중앙에서 자격 증명 관리
- airflow 메타스토어에서 자격 증명을 저장하여 관리

(1-3) 구성 세부 정보 일관성 있는 관리
YAML, 또는 airflow variables 를 통해 메타스토어 구성 정보 저장
1. airflow variables 저장
from airflow.models import Variable
input_path=Variable.get("dag1_input_path") # 전역 변수 호출
output_path=Variable.get("dag1_output_path")
~~~
2. yaml 파일 저장
import yaml
def _fetch_data(config_path, **context):
with open(config_path) as config_file:
config=yaml.load(config_file)
fetch_data=PythonOperator(
op_kwargs={"config_path": "config.yaml"},
~~
)
(1-4) dag 구성 시 연산 부분 배제
필요한 시점에만 계산 수행 하도록 로직 개선 ( 태스크내 연산 수행 으로 개선 )
태스크 내 자격 증명 가져오기
from airflow.hooks.base_hook import BaseHook
def _task1(conn_id, **context):
api_config=BaseHook.get_connection(conn_id)
~~~
(1-5) factory 함수를 사용해 공통 패턴 생성

(1-6) 태스크 그룹을 사용해 관련 태스크 그룹 만들기
- 태스크 세트를 더 작은 그룹으로 효과적으로 그룹화애 dag 구조를 쉽게 관리 및 이해 가능


- 태스크 그룹을 통해 복잡성을 숨길 수 있다!!
(1-7) 대규모 수정을 위한 새로운 dag 생성
- 변경 수행 전 dag의 새 버전 복사본을 생성
2. 재현 가능한 태스크 설계
- 태스크는 항상 멱등성을 가져야됨 : 동일 태스크 여러 번 다시 실행해도 결과는 동일
- 태스크 결과는 결정적 : 주어진 입력에 항상 동일 출력 반환
- 함수형 패러다임 사용해 태스크 설계 : 함수형 프로그래밍의 따라 태스크 설계
3. 효율적인 데이터 처리
(3-1) 데이터 처리량 제한
- 원하는 결과를 얻는 데 필요한 최소한의 데이터로 처리를 제한

(3-2) 증분 적재 및 처리
- 데이터를 시계열 기반 파티션으로 분할하고, 파티션을 각 dag 실행에서 개별적으로 처리

(3-3) 중간 단계 데이터 캐싱
- 태스크에 중간 단계 데이터를 저장해 각 태스크를 다른 태스크와 독립적으로 쉽게 다시 실행 가능
(3-4) 로컬 파일 시스템에 데이터 저장 방지
- 로컬 파일 시스템에 저장 단점은, 다운스트림 태스크가 파일에 접근하지 못할 수 있음
- 공유 저장소를 사용 추천
(3-5) 외부/소스 시스템으로 작업 이전
- airflow는 실제 데이터 처리를 위한 워커를 사용하는 것보다, 오케스트레이션 도구로 사용될 떄 더 효율적
- airflow는 주로 오케스트레이션 도구로 설계 되었고, 이런 방식으로 사용시 최상의 결과를 얻을 수 있다!!
4. 자원 관리
(4-1) Pool을 이용한 동시성 관리
- airflow는 리소스 풀을 사용해 주어진 리소스에 액세스할 수 있는 태스크 수를 제어 가능 ( Admin > Pool )
태스크에 특정 자원 풀 할당
PythonOperator(
task_id="sy_task",
~~~~
pool="my_resource_pool"
)
- 풀에 여유 슬롯 없으면 스케줄러는 슬롯을 사용할 수 있을 떄까지 태스크 연기
(4-2) SLA 및 경고를 사용해 장기 실행 작업 탐지
- dag 또는 task에 SLA 제한 시간을 지정 가능
DAG의 모든 TASK에 SLA 할당
from datetime import timedelta
default_args={
"sla": timedelta(hour=2),
~~
}
with DAG(
dag_id="~~",
~~
default_args=default_args,
_ as dag:
~~
- dag 수준 이외 태스크 오퍼레이터에 sla 인수를 전달해 태스크 수준 SLA를 지정 가능