세용용용용 2024. 8. 4. 15:29

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!

 

1. airflow 실습 ( 예제 데이터 :  로켓 발사 데이터 )

- 워크 플로우 작업에서 airflow를 사용하는 장점은 대규모 작업을 개별 task로 분할하고 DAG로 형성 할수 있다.

 

로겟 발사 pipline

DAG
로켓 데이터 가져와 저장 ( task 1 )
로켓 이미지 가져와 저장 ( task 2 )
시스템 알림 ( task 3 )

 

코드 예제

# 라이브러리 불러오기
import json
import pathlib

import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag=DAG( # 객체 인스턴스 생성
    dag_id="download_rocket_launches" # dag명
    start_date=dirflow.utils.dates.daus_ago(14), # dag 실행일
    schedule_interval=None, # dag 스케줄 ( 자동 실행x , 수동으로 실행 가능 )
)

# 각 task 정의
download_launches=BashOperator( # bash 명령 실행 오퍼레이터 정의
    task_id="download_launches", # task 명
    bash_command="curl -o /work/airflow/launches.json -L
         'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", dag=dag, # task가 속할 dag 지정
)

# 로켓 사진 다운로드하는 파이썬 함수
def _get_pictures():
    # 이미지 저장할 경로 존재 확인
    pathlib.Path("/work/airflow/images").mkdir(parents=True, exist_ok=True)
    
    # 수집한 launches.json 파일의 모든 그림 파일 수집
    with open("/work/airflow/launches.json") as f:
        launches=json.load(f)
        image_urls=[launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response=requests.get(image_url) # 가져오기
                image_filename=image_url.split("/")[-1]
                target_file=f"/work/airflow/images/{image_filename}" # 저장 파일 명
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")
get_pictures=PythonOperator( # 파이썬 실행 오퍼레이터 정의
    task_id="get_pictures",
    python_callable=_get_pictures,
    dag=dag,
)

notify=BashOperator(
    task_id="notify".
    bash_command='echo "There are now $(ls /work/airflow/images/ | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify # 태스크 의존성 정의

 

  • dag는 워크플로우 시작점!! 워크플로우 내 모든 task는 dag개체를 참조함, 모든 오퍼레이터는 변수를 참조해 인스턴스가 어떤 dag에 속한 것인지 airflow에 알려줌!!!
  • 오퍼레이터는 하나의 task 수행하고, 여러 오퍼레이터가 워크플로 또는 dag를 구성
  • 각 오퍼레이터는 독립적으로 구성도 가능하며 또는 순서 정의 가능( 의존성 정의 )
  • python 오퍼레이터에서 python_callable 인수는 실행할 파이썬 함수를 가르킴

 

2. docker 컨테이너 airflow 실행

(2-1) dockerfile

# 기본 이미지 선택
FROM apache/airflow:2.9.3-python3.8

# 환경 변수 설정
ENV AIRFLOW_HOME=/opt/airflow

# Airflow의 실행 사용자 설정
USER root

# 필요한 패키지 설치
RUN apt-get update \
    && apt-get install -y \
        gcc \
        libffi-dev \
        libpq-dev \
        build-essential \
        vi \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*

# 사용자 권한을 Airflow 사용자로 변경
USER airflow

# 그룹에게도 권한 주기
RUN chmod -R g+rwx /opt/airflow

# Airflow 설치와 관련된 기본 설정
COPY requirements.txt $AIRFLOW_HOME/requirements.txt
RUN pip install --no-cache-dir -r $AIRFLOW_HOME/requirements.txt

USER root

# Airflow 데이터베이스 초기화 및 사용자 생성
RUN airflow db init && \
    airflow users create \
        --username admin \
        --password admin \
        --firstname sy \
        --lastname ju \
        --role Admin \
        --email admin@example.org

# 데이터 저장 디렉토리 생성
RUN mkdir -p /opt/airflow/data

#USER root
#RUN chown -R airflow: /opt/airflow
#USER airflow

# Airflow 시작!!!
CMD ["airflow", "webserver"]

 

 

(2-2) docker (build and run)

# docker build
docker build -t airflow_test:2.9.3 .

# docker run
docker run -d \
-p 9898:8080 \
-v /work/dag_airflow/docker_download_rocket_launches.py:/opt/airflow/dags/download_rocket_launches.py \
-v /work/airflow:/opt/airflow/data \
--name airflow_container \
airflow_test:2.9.3

# 컨테이너 접속
docker exec -it airflow_container /bin/bash

 

 

(2-3) window 포트포워딩

# 버츄얼박스로 띄운 가상환경(192.168.56.10) 포트포워딩 명령어
netsh interface portproxy add v4tov4 listenport=9898 listenaddress=0.0.0.0 connectport=9898 connectaddress=192.168.56.10

# 포워딩 확인 명령어
netsh interface portproxy show all

# 포워딩 삭제 명령어
netsh interface portproxy delete v4tov4 listenport=8888 listenaddress=0.0.0.0

 

 

3. airflow UI 및 dag 실행

스케줄 실행

nohup airflow scheduler > /dev/null 2>&1 &

(3-1) airflow 웹 인터페이스

http://localhosts:9898

 

(3-2) airflow 그래프 뷰

 

  • dag 스크립트 구조를 보여줌
  • dag 파이썬 파일이 dag 디렉터리에 위치시 airflow는 해당 파일을 읽고 ui에 시각화!!!
  • dag실행 하려면 dag이름 옆에있는 토글을 누르고 재생버튼을 눌러 실행

 

 

(3-3) airflow 로그 확인

  • 완료된 notify 태스크를 클릭해 로그 확인 가능
  • There are now 7 images. 를 통해 이미지 7개가 저장된것을 확인

 

3. airflow 스케줄 실행

- airflow에서는 일정 시간에 실행할 수 있도록 하는 cron 기능이 있다!!!

- dag에서 schedule_interval의 인수를 설정하면 됨

 

기존 dag ( 사용자가 직접 트리거 해주며 실행해야됨 )

dag=DAG( # 객체 인스턴스 생성
    dag_id="download_rocket_launches" # dag명
    start_date=dirflow.utils.dates.daus_ago(14), # dag 실행일
    schedule_interval=None, # dag 스케줄 ( 자동 실행x , 수동으로 실행 가능 )
)

 

변경 dag ( 스케줄을 통해 사용자가 직접 실행할 필요없이 원하는 시간 및 간격에 따라 dag 실행 )

dag=DAG( # 객체 인스턴스 생성
    dag_id="download_rocket_launches" # dag명
    start_date=dirflow.utils.dates.daus_ago(14), # dag 실행일
    schedule_interval="@daily", # dag 스케줄 ( 하루에 한번 0 0 * * * 매일 자정 )
)

 

 

4. airflow 실패 task 처리

- 실패한 task는 빨간색

- 의존성에 의해 실행 되지 못한 task는 주황색 표시

- 문제 파악은 실패한 task 로그를 통해 확인

 

만약 실패 했다면 재처리할 task 클릭후 Clear task를 통해 초기화됨, 이후 airflow는 해당 task 재실행

 

get_pictures 태스크 clear시 의존성 있는 task도 같이 재실행됨!!!!

 

재실행중 ... ㅎㅎ

 

but,, 무조건 재처리는 nono,,,

airflow는 특정 조건에서 실패를 허용하기도 함

모든 레벨이서 실패 시 처리 기준을 구성할수 있다

 

이번 장 요약

  • airflow 워크플로우는 dag로 표시
  • 오퍼레이터는 단일 태스크를 나타냄
  • airflow 인터페이스는 dag 구조확인을 위한 그래프 뷰와 시간 경과에 따른 dag 실행 상태를 보기위한 트리 뷰 제공
  • dag안의 task는 재실행 가능