세용용용용 2024. 8. 7. 22:14

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!

1. airflow 스케줄링 ( 예제 데이터 :  던파 경매장 데이터 )

책에서는 사용자 이벤트를 api로 가져오지만 해당 코드를 찾을수가 없어 대체 했습니다 ㅠㅠㅠㅠㅠ

 

- 사용자 이벤트를 매일 추적하고 분석할수 있는 dag를 구현해보자!!

- 이벤트 로그는 데이터가 많기에 오랜 기간 데이터를 저장하지 않음, 하지만 그럼에도 필요한 경우는 아마존 s3 및 타사 클라우드를 스토리지를 통해 낮은 비용으로 데이터 저장 가능

 

 

던파 경매장 이벤트 pipline

dag
던파 경매장 이벤트 data get ( task 1 )
통계 및 계산 ( task 2 )

 

(1-1) dag 실습 ( 01_unscheduled.py )

import datetime as dt # 날짜와 시간 데이터를 처리하기 위한 클래스를 제공
from pathlib import Path # 파일 시스템 경로를 객체 지향적으로 다룰 수 있게 해주는 모듈
import pandas as pd # 판다스
from airflow import DAG
from airflow.operators.bash import BashOperator # Bash 명령을 실행하는 작업
from airflow.operators.python import PythonOperator # Python 함수를 실행하는 작업

# dag 정의
dag=DAG(
    dag_id="01_unscheduled",
    start_date=dt.datetime(2024, 8, 6),
    schedule_interval=None, # dag 스케줄 지정안함
)

# task 정의
# 던파 경매 데이터 가져오기
fetch_events=BashOperator(
    task_id="fetch_events", # task name
    bash_command=(
        'mkdir -p /opt/airflow/data/data && '
        'curl -o /opt/airflow/data/events.json -G "https://api.neople.co.kr/df/auction-sold" '
        '--data-urlencode "itemName=균열의 단편" '
        '--data-urlencode "wordType=match" '
        '--data-urlencode "wordShort=false" '
        '--data-urlencode "limit=30" '
        '--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'
    ),
    dag=dag, # task가 어떤 dag에 속할것인지 명시
)

# 던파 경매 데이터 전처리 및 통계
def _calculate_stats(input_path, output_path):
    # 전처리
    events=pd.read_json(input_path)
    events=pd.json_normalize(events['rows'])
    events=events[['count','price']]
    
    # 통계
    events['per_price']=events['price']/events['count']
    events['result_ava_price'] = events['per_price'].mean()
    events['diff']=events['per_price']-events['result_ava_price']
    
    # 경로가 있는지 확인
    Path(output_path).parent.mkdir(exist_ok=True) # 부모디렉토리를 반환하고 생성( 존재시 넘김 ) 
    events.to_csv(output_path, index=False)

calculate_stats=PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    op_kwargs={
        "input_path": "/opt/airflow/data/events.json",
        "output_path": "/opt/airflow/data/csv_dir/stats.csv",
    },
    dag=dag,
)

# 의존성 정의
fetch_events >> calculate_stats

 

requirements.txt 추가

apache-airflow==2.9.3
requests==2.31.0
pandas==1.3.5
numpy==1.21.5

 

도커 build and run

# docker build
docker build -t airflow_test:2.9.3 .

# docker run
docker run -d \
-p 9898:8080 \
-v /work/dag_airflow/01_unscheduled.py:/opt/airflow/dags/01_unscheduled.py \
-v /work/rowdata_airflow:/opt/airflow/data \
--name airflow_container \
airflow_test:2.9.3

# 컨테이너 접속
docker exec -it airflow_container /bin/bash

# 스케줄 실행
nohup airflow scheduler > /dev/null 2>&1 &

 

 

실행 테스트

실행 완료 화면 입니다

로컬에 데이터 저장 확인

 

최종 통계 데이터 확인

 

 

2. airflow 스케줄링 ( 정기적 실행 )

  • dag 정의시 schedule_interval=None 이거나 디폴트로 초기화시 예약실행이 되지않음 즉, ui를 통해 사용자가 수동 트리거를 통해 워크플로우를 동작
  • 2장에서 schedule_interval="@daily" 로 dag를 정의하며 매일 자정에 주기적으로 워크플로우를 실행할수 있었다.
  • 종료 인자가 없으면 이론상 계속 실행이됨, 만약 기간이 정해져 있는 경우라면 end_date에 인수를 설정해 실행 중지end_date=dt.datetime(year=2019, month=1, day=5)
  • 다양한 스케줄을 위해 airflow는 cron을 지원!! ( * * * * * / 분 시 일 월 요일 )

 

(2-1) airflow cron 예제

0 * * * * >>> 정시에 실행
0 0 * * * >>> 자정에 실행
0 0 * * 0 >>> 일요일 자정에 실행
0 0 1 * * >>> 1월 자정에 실행
45 23 * * SAT >>> 토요일 23시 45분에 실행

좀더 복잡한 cron도 가능
0 0 * * MON, WED, FRI >>> 월,화,금 자정에 실행
0 0 * * MON-FRI >>> 월~금 자정에 실행
0 0,12 * * * >>> 24,12시에 실행

 

(2-2) airflow 빈도 기반 스케줄 설정 ( timedelta ) 

  • cron 기반 스케줄일 특정 빈도마다 스케줄을 정의하는 것에는 한계가 있음...
  • airflow는 빈도기반 스케줄링을 위한 timedelta도 지원 ( schedule_interval=dt.timedelta(days=3) )

 

 

3. 데이터 증분 처리하기

airflow 스케줄을 통해 주기적으로 데이터 워크플로으 작업을 하며 산출물을 일자별로 분리하도록 구현

 

(3-1) 실행 날짜를 사용해 동적 시간 참조

execution_date, next_execution_date >> 스케줄 간격으로 실행, 종료되는 시간을 위한 매개변수
만약, execution_date, next_execution_date 기준으로 동적으로 데이터를 가져오고 싶으면
start_date={{execution_date.strftime('%Y-%m-%d')}}
end_date={{next_execution_date.strftime('%Y-%m-%d')}}
매개변수를 통해 동적으로 실행날짜를 참조하고 strftime 메서드를 통해 문자열 형식으로 반환하여 날짜 지정 가능

 

(3-2) 데이터 파티셔닝

산출물에도 동적 매개변수를 추가하여 덮어쓰는 방식이 아닌 주기적으로 데이터를 파티셔닝 가능

##### 기존 task #####
fetch_events=BashOperator(
    task_id="fetch_events", # task name
    bash_command=(
        'mkdir -p /opt/airflow/data/data && '
        'curl -o /opt/airflow/data/events.json -G "https://api.neople.co.kr/df/auction-sold" '
        '--data-urlencode "itemName=균열의 단편" '
        '--data-urlencode "wordType=match" '
        '--data-urlencode "wordShort=false" '
        '--data-urlencode "limit=30" '
        '--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'
    ),
    dag=dag, # task가 어떤 dag에 속할것인지 명시
)
>>> 스케줄로 실행시 매번 데이터는 events.json 파일에 덮어쓰기됨

##### 수정 task #####
fetch_events=BashOperator(
    task_id="fetch_events", # task name
    bash_command=(
        'mkdir -p /opt/airflow/data/data && '
        'curl -o /opt/airflow/data/{{execution_date.strftime('%Y-%m-%d')}}events.json -G "https://api.neople.co.kr/df/auction-sold" '
        '--data-urlencode "itemName=균열의 단편" '
        '--data-urlencode "wordType=match" '
        '--data-urlencode "wordShort=false" '
        '--data-urlencode "limit=30" '
        '--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'
    ),
    dag=dag, # task가 어떤 dag에 속할것인지 명시
)
>>> 동적으로 매개변수를 추가하며 데이터를 파티셔닝 할수 있다
이를 통해 데이터 세트를 작고 관리하기 쉬운 조각으로 나누는 전략이 가능 ( 파티셔닝 )


이후 받아온 데이터를 처리하는 task로 같이 변경해주면 완료
##### 기존 task #####
# 던파 경매 데이터 전처리 및 통계
def _calculate_stats(input_path, output_path):
    # 전처리
    events=pd.read_json(input_path)
    events=pd.json_normalize(events['rows'])
    events=events[['count','price']]
    
    # 통계
    events['per_price']=events['price']/events['count']
    events['result_ava_price'] = events['per_price'].mean()
    events['diff']=events['per_price']-events['result_ava_price']
    
    # 경로가 있는지 확인
    Path(output_path).parent.mkdir(exist_ok=True) # 부모디렉토리를 반환하고 생성( 존재시 넘김 ) 
    events.to_csv(output_path, index=False)

calculate_stats=PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    op_kwargs={
        "input_path": "/opt/airflow/data/events.json",
        "output_path": "/opt/airflow/data/csv_dir/stats.csv",
    },
    dag=dag,
)


##### 변경 task #####
# 던파 경매 데이터 전처리 및 통계
def _calculate_stats(input_path, output_path):
    # 전처리
    events=pd.read_json(input_path)
    events=pd.json_normalize(events['rows'])
    events=events[['count','price']]
    
    # 통계
    events['per_price']=events['price']/events['count']
    events['result_ava_price'] = events['per_price'].mean()
    events['diff']=events['per_price']-events['result_ava_price']
    
    # 경로가 있는지 확인
    Path(output_path).parent.mkdir(exist_ok=True) # 부모디렉토리를 반환하고 생성( 존재시 넘김 ) 
    events.to_csv(output_path, index=False)

calculate_stats=PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    op_kwargs={
        "input_path": "/opt/airflow/data/{{execution_date.strftime('%Y-%m-%d')}}events.json",
        "output_path": "/opt/airflow/data/csv_dir/{{execution_date.strftime('%Y-%m-%d')}}stats.csv",
    },
    dag=dag,
)

 

 

4. airflow 실행날짜 이해

  • 간격 기반 스케줄 : 명시적 스케줄 간격 동안 실행
  • 시점 기반 스케줄 : 시작/종료 시점으로 스케줄 간격을 추측

 

 

5. 과거 데이터 메꾸기 위한 백필 사용

airflow는 과거의 시작날로 스케줄 정의 가능 즉, dag를 과거 시점부터 실행할수 있고 이런 프로세스를 백필 이라함

dag=DAG(
    dag_id="test",
    schedule_interval="@daily",
    start_date=dt.datetime(year=2019, month=1, day=1),
    end_date=dt.datetime(year=2019, month=1, day=5),
    catchup=False, # 백필 비활성화 변수
)

dag실행 날짜 2019-1-3 이면 현재 스케줄 간격 부터 작업 시작
만약 catchup=True 이면 dag의 start_date 스케줄 간격 부터 작업 시작 ( 백필 )

 

  • 백필의 장점은 코드 변경 시에서 데이터 재처리 가능, 하지만 여러 상황에 따라 제한이 있다.

 

 

6. 태스크 디자인을 위한 사례

airflow task의 가장 중요한 두 속성 >>> 원자성, 멱등성

 

(6-1) 원자성

  • airflow 태스크는 성공적으로 수행되거나 시스템 상태에 영향을 미치지 않고 실패 하도록 정의
  • 모든것이 완료되거나, 완료되지 않도록 보장
  • 한 task 안에서 두개의 작업은 원자성을 무너뜨림
# 던파 경매 데이터 전처리 및 통계
def _calculate_stats(input_path, output_path):
    # 전처리
    events=pd.read_json(input_path)
    events=pd.json_normalize(events['rows'])
    events=events[['count','price']]
    
    # 통계
    events['per_price']=events['price']/events['count']
    events['result_ava_price'] = events['per_price'].mean()
    events['diff']=events['per_price']-events['result_ava_price']
    
    # 경로가 있는지 확인
    Path(output_path).parent.mkdir(exist_ok=True) # 부모디렉토리를 반환하고 생성( 존재시 넘김 ) 
    events.to_csv(output_path, index=False)

    # 파일을 다른 시스템으로 보내는 함수 실행
    scp_file(output_path, ip=192.168.56.33, user=root)
    
calculate_stats=PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    op_kwargs={
        "input_path": "/opt/airflow/data/{{execution_date.strftime('%Y-%m-%d')}}events.json",
        "output_path": "/opt/airflow/data/csv_dir/{{execution_date.strftime('%Y-%m-%d')}}stats.csv",
    },
    dag=dag,
)
  • 기존 통계 처리후 csv로 저장하는 task에 다른 시스템으로 데이터를 보내는 작업을 추가시
  • 시스템 문제로 (네트워크 등등..) 파일이 전송되지 않더라도 로컬에 csv는 저장이 되므로 작업이 성공한 것 처럼 보임,,
  • 해당 작업 원자성 유지를 위해 다른 시스템으로 보내는 작업은 별도의 task로 분리 해야됨

 

def _scp_file(ip, user, **context):
    # 파일을 다른 시스템으로 보내는 함수 실행
    scp_file(output_path, ip=ip, user=user)

scp_file=PythonOperator(
    python_callable=_scp_file,
    task_id="scp_file"
    op_kwargs={"ip":"192.168.56.33", "user":"root"},
    templates_dict={"output_path":"/opt/airflow/data/csv_dir/{{execution_date.strftime('%Y-%m-%d')}}stats.csv"},
    dag=dag,
)
  • 각 작업을 task로 따로 분리시 서로 영향을 주지 않기에 원자성이 유지됨

 

 

(6-2) 멱등성

입력 변경 없이 태스크를 다시 실행해도 전체 결과는 변경되지 않아야됨

# 데이터 추가
'curl -O /opt/airflow/data/events.json -G "https://api.neople.co.kr/df/auction-sold" '
        '--data-urlencode "itemName=균열의 단편" '
        '--data-urlencode "wordType=match" '
        '--data-urlencode "wordShort=false" '
        '--data-urlencode "limit=30" '
        '--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'
        
# 데이터 덮어쓰기
'curl -o /opt/airflow/data/events.json -G "https://api.neople.co.kr/df/auction-sold" '
        '--data-urlencode "itemName=균열의 단편" '
        '--data-urlencode "wordType=match" '
        '--data-urlencode "wordShort=false" '
        '--data-urlencode "limit=30" '
        '--data-urlencode "apikey=JUG54ELPbKttanbis2VPFNqC9LJOM7v4"'

 

데이터 쓰는 태스크 예시 일부에서

  • 첫번쨰는 같은 파일에 추가를 하게되므로 다시 실행할 떄마다 결과가 점점 늘어남 ( 비멱등성 태스크 )
  • 두번쨰는 같은 파일에 덮어 쓰게되므로 다시 실행하더라도 동일안 결과가 생성 ( 멱등성 태스크 )

 

3장 정리

  • 스케줄 을 통해 dag실행 가능
  • airflow는 cron, timedelta를 통해 다양한 스케줄 구성 가능
  • 템플릿을 사용해 변수 동적 할당 가능
  • 백필을 통해 과거 시점 작업 실행 가능
  • 멱등성은 동일 출력 결과를 생성하게 구현하여 동일한 재실행 보장