
Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!
1. airflow : 워크플로우 자동화 및 스케줄링을 위한 오픈소스
(1-1) airflow 특징
- 파이썬 기반 워크플로우 개발
- cron 기능 제공
- 파이프라인 각 동작을 태스크로 구성
- 각 태스크는 순서를 보장
- 태스크 간 의존성 확인을 위한 그래프 표현( 방향성 그래프 )
- 방향성 비순환 그래프를 가지며, 순환을 허용안함(대그,DAG)
| 방향성 비순환 그래프 (DAG) |
| 날씨 예보 가져오기 (태스크 1) |
| 가져온 데이터 전처리 (태스크 2) |
| 대시보드 데이터 전송 (태스크 3) |
(1-2) 비순환 그래프 실행 알고리즘
- 각 태스크는 open상태로 기다림
- 다음 태스크로 가기전 이전 태스크 완료 확인
- 태스크 완료시 다음 실행 태스크 대기열에 추가
- 대기열 태스크 실행 후 완료시 완료 표시
- 모든 태스크 완료시 까지 1단계 반복
(1-3) 그래프 파이프라인 절차적 파이프라인 비교
| 기존 절차형 파이프 라인.proc |
| 날씨 데이터 가져오기 |
| 우산 판매 데이터 가져오기 |
| 날씨 데이터 전처리 |
| 우산 판매 데이터 전처리 |
| 날씨, 우산 데이터 merge |
| ai 모델 fit |
| 모델 배포 |
| 비순환 그래프 DAG | |
| 날씨 데이터 가져오기 ( task ) | 우산 판매 데이터 가져오기 ( task ) |
| 날씨 데이터 전처리 ( task ) | 우산 판매 데이터 전처리 ( task ) |
| 날씨, 우산 데이터 merge ( task ) | |
| ai 모델 fit ( task ) | |
| 모델 배포 ( task ) | |
기존 절차형 파이프 라인에서는 각 동작을 순차적으로 실행하지만
비순환 그래프 dag에서는 데이터 가져오기, 전치리 task를 독립적으로 실행 후 하나의 task로 합치게됨
즉, dag 실행으로 태스크를 병렬로 실행할 수 있기에 순차적인 파이프 라인 보다 효율적인 컴퓨팅 리소스 활용 가능!!!
전체 작업을 하나의 스크립트로 구성시 파이프라인 중간 과정에서 실패시 전체를 재실행해야함, 반면 그래프 기반 알고리즘은 실패한 task와 이후 task만 재실행 하면 되므로 효율적으로 파이프라인 구축 가능!!!
2. airflow python pipline
(2-1) dag python 개념
- airflow는 파이썬 코드로 유연한 파이프라인 정의
DAG_FILE(python) >>> DAG( 비순환 그래프 파이프라인 )
ex) 간단 예시
# DAG 정의
dag = DAG(
'weekly_task_dag',
default_args=default_args,
description='test dag',
schedule_interval='0 0 * * 1', # 매주 월요일
catchup=False,
)
# 태스크 정의
task1 = DummyOperator(
task_id='task1',
dag=dag,
)
task2 = DummyOperator(
task_id='task2',
dag=dag,
)
task3 = DummyOperator(
task_id='task3',
dag=dag,
)
task4 = DummyOperator(
task_id='task4',
dag=dag,
)
# 태스크 의존성 설정
task1 >> [task2, task3] >> task4
- airflow DAG를 파이썬 코드로 정의시 파이프라인 구축에 많은 유연성을 제공할수 있다!!
- airflow 확장 기능이 지속 발전으로, 여러 시스템 간에도 데이터 프로세스를 결합할 수 있어 복잡한 데이터 파이프라인 구축 가능
(2-2) 파이프라인 스케줄링, 실행
airflow는 cron을 통한 복잡한 스케줄 사용 가능
airflow 구성요소
- airflow 스케줄러 : dag 스케줄이 지난 경우, airflow 워커에 dag 태스크 예약
- airflow 워커 : 예약된 태스크 선택후 시작
- airflow 웹 서버 : dag 파이프 라인을 시각화, 실행 결과 확인 하는 인터페이스 제공
airflow 파이프라인
- dag 워크플로 작성 (python file)
- airflow 스케줄러는 dag 파일 분석후 예약 주기 확인, 각 task 의존성 확인
- 예약 시간 지났으면 dag 실행
- 스케줄러는 각 task 의존성 확인하며, 의존성 태스크가 미완료시 실행 대기열에 추가
- 다시 1단계로 돌아가 새로운 루프를 대기
task가 실행 대기열 추가시 워커가 task를 선택후 실행 ( 실행을 병렬로 수행 )
위의 모든 결과는 airflow 메타스토어로 전달되어 사용자가 airflow 웹 서버를 통해 진행 상황을 추적하고 로그 확인 가능
(2-3) 모니터링 및 실패 처리
airflow 웹 서버를 통한 실행 결과에 대한 요약 및 다양한 모니터링 가능
기본적으로 airflow는 태스크 실패 시 재시도 가능
재시도 실패시 실패로그를 기록후 사용자에게 실패 통보
트리 뷰 통해 실패 task 로그 확인 하며 디버깅 가능
'데이터 엔지니어( 실습 정리 ) > airflow' 카테고리의 다른 글
| (6) - airflow 워크플로 트리거 (0) | 2024.08.18 |
|---|---|
| (5) - airflow 태스크 간 의존성 정의 (0) | 2024.08.18 |
| (4) - airflow 콘텍스트 사용 (0) | 2024.08.11 |
| (3) - airflow 스케줄 (0) | 2024.08.07 |
| (2) - airflow 간단 실습 (0) | 2024.08.04 |