본문 바로가기
데이터 엔지니어( 실습 정리 )/airflow

(8) - 커스텀 컴포넌트 빌드

by 세용용용용 2024. 8. 28.

 

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!

 

 

  • 8장은 사용자 만의 오퍼레이터를 빌드하고 dag에서 이를 사용하는 방법을 설명
  • 또한, 해당 커스텀 컴포넌트를 파이썬 패키지로 패키징해 여러 환경에 설치하거나 재사용할 떄 편리하게 하는 방법을 살펴봄

 

1. PythonOperator로 작업하기 ( 예제 데이터 : 영화 평점 api )

커스텀 컴포넌트 빌드전 PythonOperator을 사용 방법 먼저 실습

워크플로우 계획 : 영화평점 api >>> 신규 평점 데이터 가져오기 >>> 인기 영화 랭킹 >>> ....이후 추천시스템 앱 (다운스트림) 발전 가능

 

컨테이너 실행 ( /work/docker_compose_dir/docker-compose.yml )

version: '3.7'
services:
  movielens:
    build: /work/docker_other/movielens-api
    image: manning-airflow/movielens-api
    ports:
      - "5000:5000"
    environment:
      API_USER: airflow
      API_PASSWORD: airflow

 

도커 컴포즈 실행 & 종료

## 도커 컴포즈 설치 ##
DOCKER_COMPOSE_VERSION=$(curl -s https://api.github.com/repos/docker/compose/releases/latest | grep 'tag_name' | cut -d\" -f4)
curl -L "https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

# 설치한 도커 컴포즈에 바이너리 실행권한 부여
chmod +x /usr/local/bin/docker-compose


# up : 정의된 모든 서비스를 빌드하고 시작
# -d : 컨테이너 백그라운드로 실행
# --build : 도커 이미지 빌드 부터
docker-compose up -d --build

# docker compose 종료
docker-compose down

 

엔드포인트에서 제공하는 평점 데이터

  • 평점 데이터는 json 포맷으로 제공
  • 실제 평점 데이터는 result 필드 안에 포함
  • offset는 결과 중 몇 번쨰 결과부터 가져오는지 의미
  • limit는 결과 중 한 번에 가져올수 있는 레코드 개수
  • 특정 기간 평점 데이터 가져오려면 start_date, end_date 파라미터 사용
http://192.168.56.10:5000/ratings?start_date=2019-01-01&end_date=2019-01-02

 

 

(1-1) api에서 평점 데이터 가져오기

airflow를 사용한 프로세스 자동화를 위해 평점 데이터 수집 프로그램 구현 ( /work/dag_airflow/01_python.py )

import datetime as dt
import logging
import json
import os

import pandas as pd
import requests

from airflow import DAG
from airflow.operators.python import PythonOperator

MOVIELENS_HOST = os.environ.get("MOVIELENS_HOST", "movielens")
MOVIELENS_SCHEMA = os.environ.get("MOVIELENS_SCHEMA", "http")
MOVIELENS_PORT = os.environ.get("MOVIELENS_PORT", "5000")

MOVIELENS_USER = os.environ["MOVIELENS_USER"]
MOVIELENS_PASSWORD = os.environ["MOVIELENS_PASSWORD"]

# requests session 셋팅하는 함수
def _get_session():
    session = requests.Session()
    session.auth = (MOVIELENS_USER, MOVIELENS_PASSWORD)
    
    schema = MOVIELENS_SCHEMA
    host = MOVIELENS_HOST
    port = MOVIELENS_PORT
    
    base_url = f"{schema}://{host}:{port}"
    return session, base_url

# 페이지 처리하는 함수
def _get_with_pagination(session, url, params, batch_size=100):
    offset = 0
    total = None
    while total is None or offset < total:
        response = session.get(
            url,
            params={
                **params,
                **{"offset": offset, "limit": batch_size}
            }
        )
        # 상태 체크 후 결과 json을 파싱
        response.raise_for_status()
        response_json = response.json()
        
        yield from response_json["result"]  # 가져온 레코드를 함수 호출자에게 yield
        
        # 현재 offset과 레코드 총 수 업데이트
        offset += batch_size
        total = response_json["total"]

# 전체 결합 함수
def _get_ratings(start_date, end_date, batch_size=100):
    session, base_url = _get_session()  # api 요청 세션과 기본 URL 가져오기
    
    yield from _get_with_pagination(
        session=session,
        url=base_url + "/ratings",
        params={"start_date": start_date, "end_date": end_date},
        batch_size=batch_size,
    )

# with 문을 사용해 DAG 객체의 설정을 한 번만 지정하고, 블록 내에서 작업을 정의할 수 있습니다
with DAG(
    dag_id="01_python",
    description="Fetches ratings from the Movielens API using the Python Operator.",
    start_date=dt.datetime(2019, 1, 1),
    end_date=dt.datetime(2019, 1, 3),
    schedule_interval="@daily",
) as dag:

    # 날짜별로 파티션 하기 위한 함수
    def _fetch_ratings(templates_dict, batch_size=1000, **_):
        logger = logging.getLogger(__name__)  # 함수 동작에 대한 피드백 제공
    
        start_date = templates_dict["start_date"]
        end_date = templates_dict["end_date"]
        output_path = templates_dict["output_path"]
    
        logger.info(f"Fetching ratings for {start_date} to {end_date}")
        # _get_ratings 함수 사용해 평점 데이터 가져오기
        ratings = list(
            _get_ratings(
                start_date=start_date,
                end_date=end_date,
                batch_size=batch_size
            )
        )
        logger.info(f"Fetched {len(ratings)} ratings")
        logger.info(f"Writing ratings to {output_path}")
    
        # 출력 디렉토리 미존재시 생성
        output_dir = os.path.dirname(output_path)
        os.makedirs(output_dir, exist_ok=True)
    
        # 출력값 json 포맷으로 저장
        with open(output_path, "w") as file_:
            json.dump(ratings, file_)
        
    # 최종 pythonoperator을 사용해 평점 데이터 수집하는 task
    fetch_ratings = PythonOperator(
        task_id="fetch_ratings",
        python_callable=_fetch_ratings,
        templates_dict={
            "start_date": "{{ds}}",
            "end_date": "{{next_ds}}",
            "output_path": "/opt/airflow/data/{{ds}}.json",
        },
    )
    
    # 의존성 정의
    fetch_ratings

 

  • yield from >>> 다른 이터러블 객체나 제너레이터의 값을 간단하게 제너레이터에서 반환할 수 있게 합니다.
  • 이제 _get_ratings 함수를 PythonOperator로 호출함으로써 평점 데이터 수집
  • json파일을 날짜별로 파티션( 재수집 용이 )

 

도커 컴포즈 수정

version: '3.7'
######### airflow env #########
x-environment: &airflow_environment
    AIRFLOW__CORE__EXECUTOR: "SequentialExecutor"
    AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: "False"
    AIRFLOW__CORE__LOAD_EXAMPLES: "False"
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: "sqlite:////opt/airflow/airflow.db"
    AIRFLOW__CORE__STORE_DAG_CODE: "True"
    AIRFLOW__CORE__STORE_SERIALIZED_DAGS: "True"
    AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
    AIRFLOW__WEBSERVER__RBAC: "False"
    AIRFLOW_CONN_MOVIELENS: "http://airflow:airflow@movielens"
    MOVIELENS_USER: "airflow"
    MOVIELENS_PASSWORD: "airflow"
######### airflow env #########
services:
  movielens:
    build: /work/docker_other/movielens-api
    image: manning-airflow/movielens-api
    container_name: mavielens_container
    ports:
      - "5000:5000"
    environment:
      API_USER: airflow
      API_PASSWORD: airflow

  airflow:
    build: /work/docker_airflow
    image: airflow/airflow_test:2.9.3
    container_name: airflow_container
    depends_on:
      - movielens
    ports:
      - "9898:8080"
    volumes:
      - /work/dag_airflow/:/opt/airflow/dags/
    environment: *airflow_environment

 

  • 도커 컴포즈 실행
docker-compose up -d --build

 

  • 워크플로우 json 산출물

>>> 날짜 별로 파티션된 영화 평점 데이터

 

영화 평점 데이터 가져오고 랭킹을 csv로 산출 하기 위한 task 추가( rank_movies )

 

랭킹 산출 함수 ( /work/dag_airflow/custom/ranking.py )

import pandas as pd

def rank_movies_by_rating(ratings, min_ratings=2):
    ranking = (
        ratings.groupby("movieId")
        .agg(
            avg_rating=pd.NamedAgg(column="rating", aggfunc="mean"),
            num_ratings=pd.NamedAgg(column="userId", aggfunc="count"),
        )
        .loc[lambda df: df["num_ratings"] > min_ratings]
        .sort_values(["avg_rating", "num_ratings"], ascending=[False, False])
    )
    return ranking

 

rank_movies 태스크 추가!!

from custom.ranking import rank_movies_by_rating

def _rank_movies(templates_dict, min_ratings=2, **_):
    input_path = templates_dict["input_path"]
    output_path = templates_dict["output_path"]
    
    ratings = pd.read_json(input_path)
    ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings)  # 랭킹 함수 사용
    
    output_dir = os.path.dirname(output_path)
    os.makedirs(output_dir, exist_ok=True)
    
    ranking.to_csv(output_path, index=True)  # 영화 랭킹 통계 데이터 csv로 저장

# 영화 랭킹 통계 task 정의
rank_movies = PythonOperator(
    task_id="rank_movies",
    python_callable=_rank_movies,
    templates_dict={
        "input_path": "/opt/airflow/data/{{ds}}.json",
        "output_path": "/opt/airflow/data/{{ds}}.csv",
    },
)

# 의존성 정의
fetch_ratings >> rank_movies

 

  • dag 실행 그래프

 

  • 데이터 확인

 

>> 이와 같이 두 task를 하나의 dag로 구성

  • 평점 데이터 가져오기 >>> 영화 랭크 만들기
  • 정각에 스케줄링 되며 일별로 인기 영화 랭킹 제작 가능

 

2. 커스텀 훅 빌드하기

  • 앞선 예제 대부분의 코드는 api 연동과 관련된 코드
  • api 주소, 인증정보, 연동 세션, 페이지 처리 등등 기능...
  • 복잡한 api 연동과 같은 작업은 코드를 캡슐화 하고 재활용 가능한 airflow 훅으로 만들 수 있음!!
  • 모든 api 전용 코드를 한 곳에 보관, 여러 dag에서 해당 훅을 간단하게 사용 하면 유사한 api 연동 워크플로우 에서 api 연동에 대한 노력을 줄일수 있음!!!

ex..) 훅 사용 예시

hook = MovielensHook(conn_id="movielens") # 훅 생성!!
ratings = hook.get_ratings(start_data, end_date) # 생성 된 훅 을 통한 작업 수행
hook.close() # 훅을 닫고 사용 리소스 해제

 

(2-1) 커스텀 훅 설계

airflow에서 모든 훅은 추상 클래스인 BaseHook 클래스의 서브 클래스로 생성!!!

# base 훅 서브 클래스
from airflow.hooks.base_hook import BaseHook

class MovielensHook(BaseHook):
    def __init__(self, conn_id): # conn_id는 훅에게 어떤 커넥션을 사용하는지 전달
        super().__init__() # 클래스 생성자 호출
        self._conn_id = conn_id # 커넥션 id 지정

 

 

사전 준비 airflow 메타스토어에 연결 정보 추가

airflow UI >>> Admin >>> Connections

  • Connection Type( 연결 유형 ) : HTTP
  • Host ( 호스트 ) : movielens
  • 그 외 스키마, 로그인 정보, 포트 정보 추가

>>> 이제 메타스토어 에서 연결 정보를 가져오기 위해 BaseHook 클래스에 get_connections라는 메서드 제공, 커넥션 ID에 대한 연결 세부 정보 가져오는 메서드임!!!!

config = self.get_connection(self.__conn_id)

# 연결 설정 정보 가져오기
schema = config.schema
host = config.host
port = config.port

base_url = f"{schema}://{host}:{port}/"

if config.login:
    session.auth = (config.login, config.password)

>>> config 객체를 이용해 host, post, user, password 가져올수 있음!!

 

 

API 세션 캐싱 추가 ( /work/dag_airflow/custom/hooks.py )

import requests
from airflow.hooks.base_hook import BaseHook

class MovielensHook(BaseHook):
    def __init__(self, conn_id):  # conn_id는 훅에게 어떤 커넥션을 사용하는지 전달
        super().__init__()  # 클래스 생성자 호출
        self._conn_id = conn_id  # 커넥션 id 지정
        
        # get_conn 함수 호출마다 airflow 메타스토어에 작업 요청하는 단점 해결을 위한 변수 캐싱
        self._session = None
        self._base_url = None
        
    def get_conn(self):
        if self._session is None:  # 세션 연결이 없으면 생성
            config = self.get_connection(self._conn_id)
            schema = config.schema
            host = config.host
            port = config.port
            
            self._base_url = f"{schema}://{host}:{port}"
            self._session = requests.Session()
            
            if config.login:
                self._session.auth = (config.login, config.password)
        
        return self._session, self._base_url  # 세션과 베이스 URL 반환
  • get_conn 함수 처음 호출시 self._session 값이 None 이므로, airflow 메타스토어 에서 연결 정보 가져와 base_url 과 session 정보 인스턴스 내부에 저장 후 해당 객체들을 각각 인스턴스 변수 _session, _base_url에 저장 후 나중에 캐싱해 재사용
  • 즉, 두번쨰 get_conn 함수 호출부터 self._session 변수는 None이 아니라 세션 정보를 가지고 있기에 캐싱된 session, url 반환

>>> 이후 api에 대한 인증 커넥션 생성 가능

 

 

영화 평점 데이터 가져오는 함수 추가( get_ratings, _get_with_pagination )

import requests

from airflow.hooks.base_hook import BaseHook


class MovielensHook(BaseHook):
    """
    Hook for the MovieLens API.

    Abstracts details of the Movielens (REST) API and provides several convenience
    methods for fetching data (e.g. ratings, users, movies) from the API. Also
    provides support for automatic retries of failed requests, transparent
    handling of pagination, authentication, etc.

    Parameters
    ----------
    conn_id : str
        ID of the connection to use to connect to the Movielens API. Connection
        is expected to include authentication details (login/password) and the
        host that is serving the API.
    """

    DEFAULT_SCHEMA = "http"
    DEFAULT_PORT = 5000

    def __init__(self, conn_id, retry=3):
        super().__init__()
        self._conn_id = conn_id
        self._retry = retry

        self._session = None
        self._base_url = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def get_conn(self):
        """
        Returns the connection used by the hook for querying data.
        Should in principle not be used directly.
        """

        if self._session is None:
            # Fetch config for the given connection (host, login, etc).
            config = self.get_connection(self._conn_id)

            if not config.host:
                raise ValueError(f"No host specified in connection {self._conn_id}")

            schema = config.schema or self.DEFAULT_SCHEMA
            port = config.port or self.DEFAULT_PORT

            self._base_url = f"{schema}://{config.host}:{port}"

            # Build our session instance, which we will use for any
            # requests to the API.
            self._session = requests.Session()

            if config.login:
                self._session.auth = (config.login, config.password)

        return self._session, self._base_url

    def close(self):
        """Closes any active session."""
        if self._session:
            self._session.close()
        self._session = None
        self._base_url = None

    # API methods:

    def get_movies(self):
        """Fetches a list of movies."""
        raise NotImplementedError()

    def get_users(self):
        """Fetches a list of users."""
        raise NotImplementedError()

    def get_ratings(self, start_date=None, end_date=None, batch_size=100):
        """
        Fetches ratings between the given start/end date.

        Parameters
        ----------
        start_date : str
            Start date to start fetching ratings from (inclusive). Expected
            format is YYYY-MM-DD (equal to Airflow's ds formats).
        end_date : str
            End date to fetching ratings up to (exclusive). Expected
            format is YYYY-MM-DD (equal to Airflow's ds formats).
        batch_size : int
            Size of the batches (pages) to fetch from the API. Larger values
            mean less requests, but more data transferred per request.
        """

        yield from self._get_with_pagination(
            endpoint="/ratings",
            params={"start_date": start_date, "end_date": end_date},
            batch_size=batch_size,
        )

    def _get_with_pagination(self, endpoint, params, batch_size=100):
        """
        Fetches records using a get request with given url/params,
        taking pagination into account.
        """

        session, base_url = self.get_conn()
        url = base_url + endpoint

        offset = 0
        total = None
        while total is None or offset < total:
            response = session.get(
                url, params={**params, **{"offset": offset, "limit": batch_size}}
            )
            response.raise_for_status()
            response_json = response.json()

            yield from response_json["result"]

            offset += batch_size
            total = response_json["total"]

movielens api 커넥션 처리 airflow 훅 완성

해당 훅에 추가 메서드 구현함으로 간단하게 기능 추가

훅의 장점은 여러 dag에서 사용하기 쉽게 api 연동에 필요 로직을 단일 클래스로 캡슐화 하여 제공

 

 

(2-2) 커스텀 훅 사용해 dag 빌드

  • 패키지 에서 훅 불러오기
from custom.hooks import MovielensHook

 

  • 훅 사용
hook = MovielensHook(conn_id=conn_id)
ratings = hook.get_ratings(
    start_date=start_date,
    end_date=end_date,
    batch_size=batch_size
)

>>> 영화 평점 데이터 제너레이터 반환

>>> 이를 통해 json 파일로 저장

 

  • dag 빌드 ( /work/dag_airflow/02_python.py )
import datetime as dt
import logging
import json
import os

import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator

from custom.ranking import rank_movies_by_rating
from custom.hooks import MovielensHook


with DAG(
    dag_id="02_hook",
    description="Fetches ratings from the Movielens API using a custom hook.",
    start_date=dt.datetime(2019, 1, 1),
    end_date=dt.datetime(2019, 1, 4),
    schedule_interval="@daily",
) as dag:

    def _fetch_ratings(conn_id, templates_dict, batch_size=1000, **_):
        logger = logging.getLogger(__name__)

        start_date = templates_dict["start_date"]
        end_date = templates_dict["end_date"]
        output_path = templates_dict["output_path"]

        logger.info(f"Fetching ratings for {start_date} to {end_date}")
        hook = MovielensHook(conn_id=conn_id)
        ratings = list(
            hook.get_ratings(
                start_date=start_date, end_date=end_date, batch_size=batch_size
            )
        )
        logger.info(f"Fetched {len(ratings)} ratings")

        logger.info(f"Writing ratings to {output_path}")

        # Make sure output directory exists.
        output_dir = os.path.dirname(output_path)
        os.makedirs(output_dir, exist_ok=True)

        with open(output_path, "w") as file_:
            json.dump(ratings, fp=file_)

    fetch_ratings = PythonOperator(
        task_id="fetch_ratings",
        python_callable=_fetch_ratings,
        op_kwargs={"conn_id": "movielens"},
        templates_dict={
            "start_date": "{{ds}}",
            "end_date": "{{next_ds}}",
            "output_path": "/opt/airflow/data/{{ds}}.json",
        },
    )

    # 영화 랭킹 데이터 통계 내는 함수
    def _rank_movies(templates_dict, min_ratings=2, **_):
        input_path = templates_dict["input_path"]
        output_path = templates_dict["output_path"]


        ratings = pd.read_json(input_path)
        ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings)  # 랭킹 함수 사용

        output_dir = os.path.dirname(output_path)
        os.makedirs(output_dir, exist_ok=True)

        ranking.to_csv(output_path, index=True)  # 영화 랭킹 통계 데이터 csv로 저장

    # _rank_movies 함수 pythonoperator로 래핑하는 task
    rank_movies = PythonOperator(
        task_id="rank_movies",
        python_callable=_rank_movies,
        templates_dict={
            "input_path": "/opt/airflow/data/{{ds}}.json",
            "output_path": "/opt/airflow/data/{{ds}}.csv",
        },
    )

    # 의존성 정의
    fetch_ratings >> rank_movies

 

 

 

3. 커스텀 오퍼레이터 빌드하기

평점 테이터 가져와, json 산출물을 뽑아내는 함수도 커스텀 오퍼레이터로 개발해보자~

airflow 모든 오퍼레이터는 BaseOperator 클래스 서브 클래스로 만들어야됨!!!!

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyCustomOperator(BaseOperator): # BaseOperator 클래스 상속
    @apply_defaults # 기본 dag 인수를 전달 위한 데커레이터
    def __init__(self, conn_id, **kwargs): # 사용할 인자를 생성자 메서드로 지정
   
# BaseOperator 클래스는 제너릭 인수들을 많이 가지고 있음,
# 이런 제너릭 인수를 모두 나열 하지 않도록 __init__에 인수 전달시 **kwargs 구문 사용

 

 

영화 평점 데이터 가져오는 커스텀 오퍼레이터 빌드

import json
import os

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

from custom.hooks import MovielensHook

class MovielensFetchRatingsOperator(BaseOperator):

    # Airflow의 템플릿을 사용하여 DAG의 실행 시 동적으로 값을 설정할 수 있는 필드를 지정
    template_fields = ("_start_date", "_end_date", "_output_path")
    
    # 기본 dag 인자를 전달 위한 데커레이터
    @apply_defaults
    
    # 사용할 인자 생성자 매서드로 지정
    # 번외.. self는 파이썬 클래스내에 인스턴스 변수 정의 및 접근하기 위한 특별한 변수!!!
    def __init__(
        self,
        conn_id,
        output_path,
        start_date="{{ds}}",
        end_date="{{next_ds}}",
        batch_size=1000,
        **kwargs,
    ):
    
        # 부모 클래스 메서드 호출
        super(MovielensFetchRatingsOperator, self).__init__(**kwargs)

        # 인스턴스 변수 설정
        # 생성자에서 받은 인수들을 클래스 인스턴스 변수에 할당하는 작업
        self._conn_id = conn_id
        self._output_path = output_path
        self._start_date = start_date
        self._end_date = end_date
        self._batch_size = batch_size
    
    # 영화 평점 가져오기 동작
    def execute(self, context):
        hook = MovielensHook(self._conn_id) # 인스턴스 생성
        
        # try, finally을 사용해
        # 종료시 생성한 훅 인스턴스 리소스 해제
        try:
            self.log.info(f"get data {self._start_date} to {self._end_date}")
            ratings = list(
                hook.get_ratings(
                    start_date=self._start_date,
                    end_date=self._end_date,
                )
            )
            self.log.info(f"finish get data {self._start_date} to {self._end_date}")
        finally:
            hook.close()

        # 데이터 저장할 디렉토리 없으면 생성
        output_dir = os.path.dirname(self._output_path)
        os.makedirs(output_dir, exist_ok=True)
        
        # 데이터 저장
        with open(self._output_path, "w") as file_:
            json.dump(ratings, fp=file_)

 

 

영화 평점 데이터 가져오는 커스텀 오퍼레이터를 사용하는 dag

import datetime as dt

from airflow import DAG
from custom.operators import MovielensFetchRatingsOperator

with DAG(
    dag_id="03_operator",
    description="Fetches ratings from the Movielens API using a custom operator.",
    start_date=dt.datetime(2019, 1, 1),
    end_date=dt.datetime(2019, 1, 3),
    schedule_interval="@daily",
) as dag:
    MovielensFetchRatingsOperator(
        task_id="fetch_ratings",
        conn_id="movielens",
        start_date="{{ds}}",
        end_date="{{next_ds}}",
        output_path="/data/custom_operator/{{ds}}.json",
    )

 

  • 데이터 처리 확인

 

 

4. 커스텀 센서 빌드하기

  • airflow 센서는 특별한 유형의 오퍼레이터 >> 다운 스트림 task에서 앞 task의 특정 조건 충족 대기 위해 사용!!
  • 커스텀 센서는 커스텀 오퍼레이터와 유사 >> BaseOperator 대신 BaseSensorOperator 클래스를 상속하는 것만 다름!!
from airflow.sensors.base import BaseSensorOperator

class MyCustomSenor(BaseSensorOperator): # BaseSensorOperator 클래스 상속
    def poke(self, context): # poke 메서드 구현
    
# poke는 Boolean 값을 반환 (True/False)
# True 반환시 다운스트림 task Start

>>> BaseSensorOperator는 execute 메서드대신 poke 메서드 구현해야됨 

 

 

영화 평점 데이터 특정 기간의 데이터가 있는지 확인하는 센서 구현

from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

from custom.hooks import MovielensHook

class MovielensRatingsSensor(BaseSensorOperator):
    # Airflow의 템플릿을 사용하여 DAG의 실행 시 동적으로 값을 설정할 수 있는 필드를 지정
    template_fields = ("_start_date", "_end_date")

    # 기본dag 인자를 전달 위한 데커레이터
    @apply_defaults

    # 사용할 인자 생성자 메서드로 지정
    def __init__(
        self,
        conn_id,
        start_date="{{ds}}",
        end_date="{{next_ds}}",
        **kwargs
        ):

        # 부모 클래스 초기화 메서드 호출
        super().__init__(**kwargs)

        # 인스턴스 변수 설정
        # 생성자로 부터 받은 인자를 클래스 인스턴스 변수에 할당
        self._conn_id = conn_id
        self._start_date = start_date
        self._end_date = end_date


    # 생성자 만든 후 poke 메서드 구현
    # start~end 데이터 존재시 True, 미존재 False 반환
    def poke(self, context):
        # 훅 인스턴스 생성
        hook = MovielensHook(self._conn_id)

        # try, except 문을 통해
        # 예외 미발생시 True 반환, 예외 발생시 False 반환 하여 poke 구현
        try:
            next(
                hook.get_ratings(
                    start_date=self._start_date,
                    end_date=self._end_date,
                    batch_size=1
                )
            )
            self.log.info(f"데이터 있음!! {self._start_date} to {self._end_date}")
            return True
        except StopIteration:
            self.log.info(f"데이터 없어!! {self._start_date} to {self._end_date}")
            return False
        finally:
            hook.close() # close를 호출하여 리소스 해제
  • 해당 커스텀 센서를 통해 영화 평점 api 연동시 데이터 존재 유무 체크후 다운스트림 task 실행 가능

 

커스텀 센서 사용한 dag

import datetime as dt
import pandas as pd
import os

from airflow import DAG
from airflow.operators.python import PythonOperator

from custom.operators import MovielensFetchRatingsOperator # 영화 평점 가져오는 커스텀 클래스 import
from custom.sensors import MovielensRatingsSensor # 커스텀 센서 클래스 import
from custom.ranking import rank_movies_by_rating # 랭킹 통계 클래스 import

# dag 정의
with DAG(
    dag_id="04_sensor",
    start_date=dt.datetime(2019, 1, 1),
    end_date=dt.datetime(2019, 1, 3),
    schedule_interval="@daily",
) as dag:
    # with 절안에서 task 정의
    # 센서 task
    wait_for_ratings = MovielensRatingsSensor(
        task_id="wait_for_ratings",
        conn_id="movielens",
        start_date="{{ds}}",
        end_date="{{next_ds}}",
    )

    # 영화 평점 데이터 json 저장 task
    fetch_ratings = MovielensFetchRatingsOperator(
        task_id="fetch_ratings",
        conn_id="movielens",
        start_date="{{ds}}",
        end_date="{{next_ds}}",
        output_path="/opt/airflow/data/{{ds}}.json",
    )

    def _rank_movies(templates_dict, min_ratings=2, **_):
        input_path = templates_dict["input_path"]
        output_path = templates_dict["output_path"]

        ratings = pd.read_json(input_path)
        ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings)  # 랭킹 함수 사용

        output_dir = os.path.dirname(output_path)
        os.makedirs(output_dir, exist_ok=True)

        ranking.to_csv(output_path, index=True)  # 영화 랭킹 통계 데이터 csv로 저장

    # 영화 랭킹 통계 task 정의
    rank_movies = PythonOperator(
        task_id="rank_movies",
        python_callable=_rank_movies,
        templates_dict={
            "input_path": "/opt/airflow/data/{{ds}}.json",
            "output_path": "/opt/airflow/data/{{ds}}.csv",
        },
    )

    # task 의존성 정의
    wait_for_ratings >> fetch_ratings >> rank_movies

 

>>> 센서 task 로그

 

 

5. 컴포넌트 패키징 하기

  • dag에 커스텀 컴포넌트는 dag 디렉터리 내에 서브 패키지 까지 포함, 이런 방식은 타 프로젝트 사용시 또는 공유에는 이상적인 방법이 아님
  • 컴포넌트 배포는 파이썬 패키지에 코드를 넣는 것이 좋음, 작업이 추가 되지만 airflow 에서 커스텀 컴포넌트 설치를 다른 패키지와 유사하게 할수 있다는 장점이 있다

 

(5-1) 파이썬 패키지 부트스트랩 작업

  • 훅, 오퍼레이터, 센서, 랭킹 을 포함하는 airflow_movielens라는 패키지 생성 gogo
####### 패키지 빌드 #######
1) 디렉터리 생성
mkdir -p airflow-movielens
cd airflow-movielens

2) 소스 코드 보관 패키지 기본 구조 생성, 패키지로 만들그 위한 __init__.py 생성
mkdir -p src/airflow_movielens
touch /airflow_movielns/src/airflow_movielns/__init__.py

3) 훅, 센서, 오퍼레이터, 랭킹 클래스 소스를 복사

패키지 기본 구조

 

  • setup.py 만들어 setuptools가 패키지 어떻게 설치 할건지 지시
#!/usr/bin/python3
import setuptools

# 필요한 파이썬 패키지 리스트
requirements = [
    "apache-airflow==2.9.3",
    "requests==2.31.0",
    "pandas==1.3.5",
    "numpy==1.21.5",
    "scikit-learn==1.2.2"
]
# 각 패키지에 대해 ==, >=, <=, ~= 등의 연산자를 사용하여 원하는 버전을 지정가능
# "apache-airflow==2.5.0",  # 정확히 버전 2.5.0을 사용
# "requests>=2.25.1",       # 버전 2.25.1 이상을 사용
# "pandas~=1.3.0",          # 버전 1.3.x를 사용
# "numpy<=1.21.2",          # 버전 1.21.2 이하를 사용
# "scikit-learn>=0.24,<0.25" # 0.24 이상, 0.25 미만 버전을 사용

setuptools.setup(
    name="airflow_movielens",
    version="0.1.0",
    description="hooks, opertaors, ranking, sensors",
    # 개발 저자 및 세부사항
    author="Anonymous",
    author_email="anonymous@example.com",
    
    # 패키지 종속성 제공
    install_requires=requirements,
    
    # setuptools에 패키지 파이썬 파일 위치 제공
    packages=setuptools.find_packages("src"),
    package_dir={"":"src"},
 
    # 온라인에서 패키지를 찾을수 있는 위치
    url="https://github.com/example-repo/airflow_movielens",
    
    # 개발 코드 라이센스 정보
    license="sysy0218",
)

패키지 구조

 

(5-2) 파이썬 패키지 설치하기 ( pip 명령을 통해 파이썬 환경에 패키지 설치 )

# 구현하고자 하는 setup.py가 존재하는 경로 실행
python3 -m pip install /aiflow-movielens

 

 

8장 요약

  • 커스텀 컴포넌트를 구현해 airflow 내장 기능 확장 가능
  • 커스텀 훅을 통해 airflow가 지원 않는 시스템과 연동 가능
  • 커스텀 센서를 통해 이벤트 대기하는 컴포넌트 구현 가능
  • 커스텀 오퍼레이터, 훅, 센서 같은 코드를 파이썬 라이브러리로 구현