데이터 엔지니어( 실습 정리 )/airflow

(11) - 효율적인 dag 모범 사례

세용용용용 2024. 9. 10. 20:58

 

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!

 

  • 효율적인 dag 작성 사례

 

1. 깔끔한 dag 작성

 

(1-1) 스타일 가이드 사용

  • 효율적인 코드 관리를 위한 코딩 스타일 가이드를 따르기
  • 스타일 가이드 준수 확인 검사기 : pylint, flake ( 둘 다 정적 코드 검사기 )
  • 코드 포매터 도구를 통해 코드를 형식화 : YARF, Black ( 코드를 정해진 규칙을 적용해 바꿈 )

 

 

(1-2) 중앙에서 자격 증명 관리

  • airflow 메타스토어에서 자격 증명을 저장하여 관리

 

 

(1-3) 구성 세부 정보 일관성 있는 관리

YAML, 또는 airflow variables 를 통해 메타스토어 구성 정보 저장

 

1. airflow variables 저장

from airflow.models import Variable

input_path=Variable.get("dag1_input_path") # 전역 변수 호출
output_path=Variable.get("dag1_output_path")

~~~

 

2. yaml 파일 저장

import yaml

def _fetch_data(config_path, **context):
    with open(config_path) as config_file:
        config=yaml.load(config_file)

fetch_data=PythonOperator(
    op_kwargs={"config_path": "config.yaml"},
    ~~
)

 

 

(1-4) dag 구성 시 연산 부분 배제

필요한 시점에만 계산 수행 하도록 로직 개선 ( 태스크내 연산 수행 으로 개선 )

 

태스크 내 자격 증명 가져오기

from airflow.hooks.base_hook import BaseHook

def _task1(conn_id, **context):
    api_config=BaseHook.get_connection(conn_id)
    ~~~

 

 

(1-5) factory 함수를 사용해 공통 패턴 생성

 

 

(1-6) 태스크 그룹을 사용해 관련 태스크 그룹 만들기

  • 태스크 세트를 더 작은 그룹으로 효과적으로 그룹화애 dag 구조를 쉽게 관리 및 이해 가능

  • 태스크 그룹을 통해 복잡성을 숨길 수 있다!!

 

(1-7) 대규모 수정을 위한 새로운 dag 생성

  • 변경 수행 전 dag의 새 버전 복사본을 생성

 

 

2. 재현 가능한 태스크 설계

  • 태스크는 항상 멱등성을 가져야됨 : 동일 태스크 여러 번 다시 실행해도 결과는 동일
  • 태스크 결과는 결정적 : 주어진 입력에 항상 동일 출력 반환
  • 함수형 패러다임 사용해 태스크 설계 : 함수형 프로그래밍의 따라 태스크 설계

 

3. 효율적인 데이터 처리

(3-1) 데이터 처리량 제한

  • 원하는 결과를 얻는 데 필요한 최소한의 데이터로 처리를 제한

 

 

(3-2) 증분 적재 및 처리

  • 데이터를 시계열 기반 파티션으로 분할하고, 파티션을 각 dag 실행에서 개별적으로 처리

 

 

(3-3) 중간 단계 데이터 캐싱

  • 태스크에 중간 단계 데이터를 저장해 각 태스크를 다른 태스크와 독립적으로 쉽게 다시 실행 가능

 

(3-4) 로컬 파일 시스템에 데이터 저장 방지

  • 로컬 파일 시스템에 저장 단점은, 다운스트림 태스크가 파일에 접근하지 못할 수 있음
  • 공유 저장소를 사용 추천

 

(3-5) 외부/소스 시스템으로 작업 이전

  • airflow는 실제 데이터 처리를 위한 워커를 사용하는 것보다, 오케스트레이션 도구로 사용될 떄 더 효율적
  • airflow는 주로 오케스트레이션 도구로 설계 되었고, 이런 방식으로 사용시 최상의 결과를 얻을 수 있다!!

 

 

4. 자원 관리

(4-1) Pool을 이용한 동시성 관리

  • airflow는 리소스 풀을 사용해 주어진 리소스에 액세스할 수 있는 태스크 수를 제어 가능 ( Admin > Pool )

태스크에 특정 자원 풀 할당

PythonOperator(
    task_id="sy_task",
    ~~~~
    pool="my_resource_pool"
)

 

  • 풀에 여유 슬롯 없으면 스케줄러는 슬롯을 사용할 수 있을 떄까지 태스크 연기

 

(4-2) SLA 및 경고를 사용해 장기 실행 작업 탐지

  • dag 또는 task에 SLA 제한 시간을 지정 가능

 

DAG의 모든 TASK에 SLA 할당

from datetime import timedelta
default_args={
    "sla": timedelta(hour=2),
    ~~
}

with DAG(
    dag_id="~~",
    ~~
    default_args=default_args,
_ as dag:
 ~~

 

  • dag 수준 이외 태스크 오퍼레이터에 sla 인수를 전달해 태스크 수준 SLA를 지정 가능