
Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!
- 8장은 사용자 만의 오퍼레이터를 빌드하고 dag에서 이를 사용하는 방법을 설명
- 또한, 해당 커스텀 컴포넌트를 파이썬 패키지로 패키징해 여러 환경에 설치하거나 재사용할 떄 편리하게 하는 방법을 살펴봄
1. PythonOperator로 작업하기 ( 예제 데이터 : 영화 평점 api )
커스텀 컴포넌트 빌드전 PythonOperator을 사용 방법 먼저 실습
워크플로우 계획 : 영화평점 api >>> 신규 평점 데이터 가져오기 >>> 인기 영화 랭킹 >>> ....이후 추천시스템 앱 (다운스트림) 발전 가능
컨테이너 실행 ( /work/docker_compose_dir/docker-compose.yml )
version: '3.7'
services:
movielens:
build: /work/docker_other/movielens-api
image: manning-airflow/movielens-api
ports:
- "5000:5000"
environment:
API_USER: airflow
API_PASSWORD: airflow
도커 컴포즈 실행 & 종료
## 도커 컴포즈 설치 ##
DOCKER_COMPOSE_VERSION=$(curl -s https://api.github.com/repos/docker/compose/releases/latest | grep 'tag_name' | cut -d\" -f4)
curl -L "https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
# 설치한 도커 컴포즈에 바이너리 실행권한 부여
chmod +x /usr/local/bin/docker-compose
# up : 정의된 모든 서비스를 빌드하고 시작
# -d : 컨테이너 백그라운드로 실행
# --build : 도커 이미지 빌드 부터
docker-compose up -d --build
# docker compose 종료
docker-compose down
엔드포인트에서 제공하는 평점 데이터

- 평점 데이터는 json 포맷으로 제공
- 실제 평점 데이터는 result 필드 안에 포함
- offset는 결과 중 몇 번쨰 결과부터 가져오는지 의미
- limit는 결과 중 한 번에 가져올수 있는 레코드 개수
- 특정 기간 평점 데이터 가져오려면 start_date, end_date 파라미터 사용
http://192.168.56.10:5000/ratings?start_date=2019-01-01&end_date=2019-01-02
(1-1) api에서 평점 데이터 가져오기
airflow를 사용한 프로세스 자동화를 위해 평점 데이터 수집 프로그램 구현 ( /work/dag_airflow/01_python.py )
import datetime as dt
import logging
import json
import os
import pandas as pd
import requests
from airflow import DAG
from airflow.operators.python import PythonOperator
MOVIELENS_HOST = os.environ.get("MOVIELENS_HOST", "movielens")
MOVIELENS_SCHEMA = os.environ.get("MOVIELENS_SCHEMA", "http")
MOVIELENS_PORT = os.environ.get("MOVIELENS_PORT", "5000")
MOVIELENS_USER = os.environ["MOVIELENS_USER"]
MOVIELENS_PASSWORD = os.environ["MOVIELENS_PASSWORD"]
# requests session 셋팅하는 함수
def _get_session():
session = requests.Session()
session.auth = (MOVIELENS_USER, MOVIELENS_PASSWORD)
schema = MOVIELENS_SCHEMA
host = MOVIELENS_HOST
port = MOVIELENS_PORT
base_url = f"{schema}://{host}:{port}"
return session, base_url
# 페이지 처리하는 함수
def _get_with_pagination(session, url, params, batch_size=100):
offset = 0
total = None
while total is None or offset < total:
response = session.get(
url,
params={
**params,
**{"offset": offset, "limit": batch_size}
}
)
# 상태 체크 후 결과 json을 파싱
response.raise_for_status()
response_json = response.json()
yield from response_json["result"] # 가져온 레코드를 함수 호출자에게 yield
# 현재 offset과 레코드 총 수 업데이트
offset += batch_size
total = response_json["total"]
# 전체 결합 함수
def _get_ratings(start_date, end_date, batch_size=100):
session, base_url = _get_session() # api 요청 세션과 기본 URL 가져오기
yield from _get_with_pagination(
session=session,
url=base_url + "/ratings",
params={"start_date": start_date, "end_date": end_date},
batch_size=batch_size,
)
# with 문을 사용해 DAG 객체의 설정을 한 번만 지정하고, 블록 내에서 작업을 정의할 수 있습니다
with DAG(
dag_id="01_python",
description="Fetches ratings from the Movielens API using the Python Operator.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 3),
schedule_interval="@daily",
) as dag:
# 날짜별로 파티션 하기 위한 함수
def _fetch_ratings(templates_dict, batch_size=1000, **_):
logger = logging.getLogger(__name__) # 함수 동작에 대한 피드백 제공
start_date = templates_dict["start_date"]
end_date = templates_dict["end_date"]
output_path = templates_dict["output_path"]
logger.info(f"Fetching ratings for {start_date} to {end_date}")
# _get_ratings 함수 사용해 평점 데이터 가져오기
ratings = list(
_get_ratings(
start_date=start_date,
end_date=end_date,
batch_size=batch_size
)
)
logger.info(f"Fetched {len(ratings)} ratings")
logger.info(f"Writing ratings to {output_path}")
# 출력 디렉토리 미존재시 생성
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
# 출력값 json 포맷으로 저장
with open(output_path, "w") as file_:
json.dump(ratings, file_)
# 최종 pythonoperator을 사용해 평점 데이터 수집하는 task
fetch_ratings = PythonOperator(
task_id="fetch_ratings",
python_callable=_fetch_ratings,
templates_dict={
"start_date": "{{ds}}",
"end_date": "{{next_ds}}",
"output_path": "/opt/airflow/data/{{ds}}.json",
},
)
# 의존성 정의
fetch_ratings
- yield from >>> 다른 이터러블 객체나 제너레이터의 값을 간단하게 제너레이터에서 반환할 수 있게 합니다.
- 이제 _get_ratings 함수를 PythonOperator로 호출함으로써 평점 데이터 수집
- json파일을 날짜별로 파티션( 재수집 용이 )
도커 컴포즈 수정
version: '3.7'
######### airflow env #########
x-environment: &airflow_environment
AIRFLOW__CORE__EXECUTOR: "SequentialExecutor"
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: "False"
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
AIRFLOW__CORE__SQL_ALCHEMY_CONN: "sqlite:////opt/airflow/airflow.db"
AIRFLOW__CORE__STORE_DAG_CODE: "True"
AIRFLOW__CORE__STORE_SERIALIZED_DAGS: "True"
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
AIRFLOW__WEBSERVER__RBAC: "False"
AIRFLOW_CONN_MOVIELENS: "http://airflow:airflow@movielens"
MOVIELENS_USER: "airflow"
MOVIELENS_PASSWORD: "airflow"
######### airflow env #########
services:
movielens:
build: /work/docker_other/movielens-api
image: manning-airflow/movielens-api
container_name: mavielens_container
ports:
- "5000:5000"
environment:
API_USER: airflow
API_PASSWORD: airflow
airflow:
build: /work/docker_airflow
image: airflow/airflow_test:2.9.3
container_name: airflow_container
depends_on:
- movielens
ports:
- "9898:8080"
volumes:
- /work/dag_airflow/:/opt/airflow/dags/
environment: *airflow_environment
- 도커 컴포즈 실행
docker-compose up -d --build
- 워크플로우 json 산출물

>>> 날짜 별로 파티션된 영화 평점 데이터
영화 평점 데이터 가져오고 랭킹을 csv로 산출 하기 위한 task 추가( rank_movies )
랭킹 산출 함수 ( /work/dag_airflow/custom/ranking.py )
import pandas as pd
def rank_movies_by_rating(ratings, min_ratings=2):
ranking = (
ratings.groupby("movieId")
.agg(
avg_rating=pd.NamedAgg(column="rating", aggfunc="mean"),
num_ratings=pd.NamedAgg(column="userId", aggfunc="count"),
)
.loc[lambda df: df["num_ratings"] > min_ratings]
.sort_values(["avg_rating", "num_ratings"], ascending=[False, False])
)
return ranking
rank_movies 태스크 추가!!
from custom.ranking import rank_movies_by_rating
def _rank_movies(templates_dict, min_ratings=2, **_):
input_path = templates_dict["input_path"]
output_path = templates_dict["output_path"]
ratings = pd.read_json(input_path)
ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings) # 랭킹 함수 사용
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
ranking.to_csv(output_path, index=True) # 영화 랭킹 통계 데이터 csv로 저장
# 영화 랭킹 통계 task 정의
rank_movies = PythonOperator(
task_id="rank_movies",
python_callable=_rank_movies,
templates_dict={
"input_path": "/opt/airflow/data/{{ds}}.json",
"output_path": "/opt/airflow/data/{{ds}}.csv",
},
)
# 의존성 정의
fetch_ratings >> rank_movies
- dag 실행 그래프

- 데이터 확인

>> 이와 같이 두 task를 하나의 dag로 구성
- 평점 데이터 가져오기 >>> 영화 랭크 만들기
- 정각에 스케줄링 되며 일별로 인기 영화 랭킹 제작 가능
2. 커스텀 훅 빌드하기
- 앞선 예제 대부분의 코드는 api 연동과 관련된 코드
- api 주소, 인증정보, 연동 세션, 페이지 처리 등등 기능...
- 복잡한 api 연동과 같은 작업은 코드를 캡슐화 하고 재활용 가능한 airflow 훅으로 만들 수 있음!!
- 모든 api 전용 코드를 한 곳에 보관, 여러 dag에서 해당 훅을 간단하게 사용 하면 유사한 api 연동 워크플로우 에서 api 연동에 대한 노력을 줄일수 있음!!!
ex..) 훅 사용 예시
hook = MovielensHook(conn_id="movielens") # 훅 생성!!
ratings = hook.get_ratings(start_data, end_date) # 생성 된 훅 을 통한 작업 수행
hook.close() # 훅을 닫고 사용 리소스 해제
(2-1) 커스텀 훅 설계
airflow에서 모든 훅은 추상 클래스인 BaseHook 클래스의 서브 클래스로 생성!!!
# base 훅 서브 클래스
from airflow.hooks.base_hook import BaseHook
class MovielensHook(BaseHook):
def __init__(self, conn_id): # conn_id는 훅에게 어떤 커넥션을 사용하는지 전달
super().__init__() # 클래스 생성자 호출
self._conn_id = conn_id # 커넥션 id 지정
사전 준비 airflow 메타스토어에 연결 정보 추가
airflow UI >>> Admin >>> Connections

- Connection Type( 연결 유형 ) : HTTP
- Host ( 호스트 ) : movielens
- 그 외 스키마, 로그인 정보, 포트 정보 추가
>>> 이제 메타스토어 에서 연결 정보를 가져오기 위해 BaseHook 클래스에 get_connections라는 메서드 제공, 커넥션 ID에 대한 연결 세부 정보 가져오는 메서드임!!!!
config = self.get_connection(self.__conn_id)
# 연결 설정 정보 가져오기
schema = config.schema
host = config.host
port = config.port
base_url = f"{schema}://{host}:{port}/"
if config.login:
session.auth = (config.login, config.password)
>>> config 객체를 이용해 host, post, user, password 가져올수 있음!!
API 세션 캐싱 추가 ( /work/dag_airflow/custom/hooks.py )
import requests
from airflow.hooks.base_hook import BaseHook
class MovielensHook(BaseHook):
def __init__(self, conn_id): # conn_id는 훅에게 어떤 커넥션을 사용하는지 전달
super().__init__() # 클래스 생성자 호출
self._conn_id = conn_id # 커넥션 id 지정
# get_conn 함수 호출마다 airflow 메타스토어에 작업 요청하는 단점 해결을 위한 변수 캐싱
self._session = None
self._base_url = None
def get_conn(self):
if self._session is None: # 세션 연결이 없으면 생성
config = self.get_connection(self._conn_id)
schema = config.schema
host = config.host
port = config.port
self._base_url = f"{schema}://{host}:{port}"
self._session = requests.Session()
if config.login:
self._session.auth = (config.login, config.password)
return self._session, self._base_url # 세션과 베이스 URL 반환
- get_conn 함수 처음 호출시 self._session 값이 None 이므로, airflow 메타스토어 에서 연결 정보 가져와 base_url 과 session 정보 인스턴스 내부에 저장 후 해당 객체들을 각각 인스턴스 변수 _session, _base_url에 저장 후 나중에 캐싱해 재사용
- 즉, 두번쨰 get_conn 함수 호출부터 self._session 변수는 None이 아니라 세션 정보를 가지고 있기에 캐싱된 session, url 반환
>>> 이후 api에 대한 인증 커넥션 생성 가능
영화 평점 데이터 가져오는 함수 추가( get_ratings, _get_with_pagination )
import requests
from airflow.hooks.base_hook import BaseHook
class MovielensHook(BaseHook):
"""
Hook for the MovieLens API.
Abstracts details of the Movielens (REST) API and provides several convenience
methods for fetching data (e.g. ratings, users, movies) from the API. Also
provides support for automatic retries of failed requests, transparent
handling of pagination, authentication, etc.
Parameters
----------
conn_id : str
ID of the connection to use to connect to the Movielens API. Connection
is expected to include authentication details (login/password) and the
host that is serving the API.
"""
DEFAULT_SCHEMA = "http"
DEFAULT_PORT = 5000
def __init__(self, conn_id, retry=3):
super().__init__()
self._conn_id = conn_id
self._retry = retry
self._session = None
self._base_url = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def get_conn(self):
"""
Returns the connection used by the hook for querying data.
Should in principle not be used directly.
"""
if self._session is None:
# Fetch config for the given connection (host, login, etc).
config = self.get_connection(self._conn_id)
if not config.host:
raise ValueError(f"No host specified in connection {self._conn_id}")
schema = config.schema or self.DEFAULT_SCHEMA
port = config.port or self.DEFAULT_PORT
self._base_url = f"{schema}://{config.host}:{port}"
# Build our session instance, which we will use for any
# requests to the API.
self._session = requests.Session()
if config.login:
self._session.auth = (config.login, config.password)
return self._session, self._base_url
def close(self):
"""Closes any active session."""
if self._session:
self._session.close()
self._session = None
self._base_url = None
# API methods:
def get_movies(self):
"""Fetches a list of movies."""
raise NotImplementedError()
def get_users(self):
"""Fetches a list of users."""
raise NotImplementedError()
def get_ratings(self, start_date=None, end_date=None, batch_size=100):
"""
Fetches ratings between the given start/end date.
Parameters
----------
start_date : str
Start date to start fetching ratings from (inclusive). Expected
format is YYYY-MM-DD (equal to Airflow's ds formats).
end_date : str
End date to fetching ratings up to (exclusive). Expected
format is YYYY-MM-DD (equal to Airflow's ds formats).
batch_size : int
Size of the batches (pages) to fetch from the API. Larger values
mean less requests, but more data transferred per request.
"""
yield from self._get_with_pagination(
endpoint="/ratings",
params={"start_date": start_date, "end_date": end_date},
batch_size=batch_size,
)
def _get_with_pagination(self, endpoint, params, batch_size=100):
"""
Fetches records using a get request with given url/params,
taking pagination into account.
"""
session, base_url = self.get_conn()
url = base_url + endpoint
offset = 0
total = None
while total is None or offset < total:
response = session.get(
url, params={**params, **{"offset": offset, "limit": batch_size}}
)
response.raise_for_status()
response_json = response.json()
yield from response_json["result"]
offset += batch_size
total = response_json["total"]
movielens api 커넥션 처리 airflow 훅 완성
해당 훅에 추가 메서드 구현함으로 간단하게 기능 추가
훅의 장점은 여러 dag에서 사용하기 쉽게 api 연동에 필요 로직을 단일 클래스로 캡슐화 하여 제공
(2-2) 커스텀 훅 사용해 dag 빌드
- 패키지 에서 훅 불러오기
from custom.hooks import MovielensHook
- 훅 사용
hook = MovielensHook(conn_id=conn_id)
ratings = hook.get_ratings(
start_date=start_date,
end_date=end_date,
batch_size=batch_size
)
>>> 영화 평점 데이터 제너레이터 반환
>>> 이를 통해 json 파일로 저장
- dag 빌드 ( /work/dag_airflow/02_python.py )
import datetime as dt
import logging
import json
import os
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from custom.ranking import rank_movies_by_rating
from custom.hooks import MovielensHook
with DAG(
dag_id="02_hook",
description="Fetches ratings from the Movielens API using a custom hook.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 4),
schedule_interval="@daily",
) as dag:
def _fetch_ratings(conn_id, templates_dict, batch_size=1000, **_):
logger = logging.getLogger(__name__)
start_date = templates_dict["start_date"]
end_date = templates_dict["end_date"]
output_path = templates_dict["output_path"]
logger.info(f"Fetching ratings for {start_date} to {end_date}")
hook = MovielensHook(conn_id=conn_id)
ratings = list(
hook.get_ratings(
start_date=start_date, end_date=end_date, batch_size=batch_size
)
)
logger.info(f"Fetched {len(ratings)} ratings")
logger.info(f"Writing ratings to {output_path}")
# Make sure output directory exists.
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
with open(output_path, "w") as file_:
json.dump(ratings, fp=file_)
fetch_ratings = PythonOperator(
task_id="fetch_ratings",
python_callable=_fetch_ratings,
op_kwargs={"conn_id": "movielens"},
templates_dict={
"start_date": "{{ds}}",
"end_date": "{{next_ds}}",
"output_path": "/opt/airflow/data/{{ds}}.json",
},
)
# 영화 랭킹 데이터 통계 내는 함수
def _rank_movies(templates_dict, min_ratings=2, **_):
input_path = templates_dict["input_path"]
output_path = templates_dict["output_path"]
ratings = pd.read_json(input_path)
ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings) # 랭킹 함수 사용
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
ranking.to_csv(output_path, index=True) # 영화 랭킹 통계 데이터 csv로 저장
# _rank_movies 함수 pythonoperator로 래핑하는 task
rank_movies = PythonOperator(
task_id="rank_movies",
python_callable=_rank_movies,
templates_dict={
"input_path": "/opt/airflow/data/{{ds}}.json",
"output_path": "/opt/airflow/data/{{ds}}.csv",
},
)
# 의존성 정의
fetch_ratings >> rank_movies
3. 커스텀 오퍼레이터 빌드하기
평점 테이터 가져와, json 산출물을 뽑아내는 함수도 커스텀 오퍼레이터로 개발해보자~
airflow 모든 오퍼레이터는 BaseOperator 클래스 서브 클래스로 만들어야됨!!!!
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyCustomOperator(BaseOperator): # BaseOperator 클래스 상속
@apply_defaults # 기본 dag 인수를 전달 위한 데커레이터
def __init__(self, conn_id, **kwargs): # 사용할 인자를 생성자 메서드로 지정
# BaseOperator 클래스는 제너릭 인수들을 많이 가지고 있음,
# 이런 제너릭 인수를 모두 나열 하지 않도록 __init__에 인수 전달시 **kwargs 구문 사용
영화 평점 데이터 가져오는 커스텀 오퍼레이터 빌드
import json
import os
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from custom.hooks import MovielensHook
class MovielensFetchRatingsOperator(BaseOperator):
# Airflow의 템플릿을 사용하여 DAG의 실행 시 동적으로 값을 설정할 수 있는 필드를 지정
template_fields = ("_start_date", "_end_date", "_output_path")
# 기본 dag 인자를 전달 위한 데커레이터
@apply_defaults
# 사용할 인자 생성자 매서드로 지정
# 번외.. self는 파이썬 클래스내에 인스턴스 변수 정의 및 접근하기 위한 특별한 변수!!!
def __init__(
self,
conn_id,
output_path,
start_date="{{ds}}",
end_date="{{next_ds}}",
batch_size=1000,
**kwargs,
):
# 부모 클래스 메서드 호출
super(MovielensFetchRatingsOperator, self).__init__(**kwargs)
# 인스턴스 변수 설정
# 생성자에서 받은 인수들을 클래스 인스턴스 변수에 할당하는 작업
self._conn_id = conn_id
self._output_path = output_path
self._start_date = start_date
self._end_date = end_date
self._batch_size = batch_size
# 영화 평점 가져오기 동작
def execute(self, context):
hook = MovielensHook(self._conn_id) # 인스턴스 생성
# try, finally을 사용해
# 종료시 생성한 훅 인스턴스 리소스 해제
try:
self.log.info(f"get data {self._start_date} to {self._end_date}")
ratings = list(
hook.get_ratings(
start_date=self._start_date,
end_date=self._end_date,
)
)
self.log.info(f"finish get data {self._start_date} to {self._end_date}")
finally:
hook.close()
# 데이터 저장할 디렉토리 없으면 생성
output_dir = os.path.dirname(self._output_path)
os.makedirs(output_dir, exist_ok=True)
# 데이터 저장
with open(self._output_path, "w") as file_:
json.dump(ratings, fp=file_)
영화 평점 데이터 가져오는 커스텀 오퍼레이터를 사용하는 dag
import datetime as dt
from airflow import DAG
from custom.operators import MovielensFetchRatingsOperator
with DAG(
dag_id="03_operator",
description="Fetches ratings from the Movielens API using a custom operator.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 3),
schedule_interval="@daily",
) as dag:
MovielensFetchRatingsOperator(
task_id="fetch_ratings",
conn_id="movielens",
start_date="{{ds}}",
end_date="{{next_ds}}",
output_path="/data/custom_operator/{{ds}}.json",
)
- 데이터 처리 확인

4. 커스텀 센서 빌드하기
- airflow 센서는 특별한 유형의 오퍼레이터 >> 다운 스트림 task에서 앞 task의 특정 조건 충족 대기 위해 사용!!
- 커스텀 센서는 커스텀 오퍼레이터와 유사 >> BaseOperator 대신 BaseSensorOperator 클래스를 상속하는 것만 다름!!
from airflow.sensors.base import BaseSensorOperator
class MyCustomSenor(BaseSensorOperator): # BaseSensorOperator 클래스 상속
def poke(self, context): # poke 메서드 구현
# poke는 Boolean 값을 반환 (True/False)
# True 반환시 다운스트림 task Start
>>> BaseSensorOperator는 execute 메서드대신 poke 메서드 구현해야됨
영화 평점 데이터 특정 기간의 데이터가 있는지 확인하는 센서 구현
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from custom.hooks import MovielensHook
class MovielensRatingsSensor(BaseSensorOperator):
# Airflow의 템플릿을 사용하여 DAG의 실행 시 동적으로 값을 설정할 수 있는 필드를 지정
template_fields = ("_start_date", "_end_date")
# 기본dag 인자를 전달 위한 데커레이터
@apply_defaults
# 사용할 인자 생성자 메서드로 지정
def __init__(
self,
conn_id,
start_date="{{ds}}",
end_date="{{next_ds}}",
**kwargs
):
# 부모 클래스 초기화 메서드 호출
super().__init__(**kwargs)
# 인스턴스 변수 설정
# 생성자로 부터 받은 인자를 클래스 인스턴스 변수에 할당
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
# 생성자 만든 후 poke 메서드 구현
# start~end 데이터 존재시 True, 미존재 False 반환
def poke(self, context):
# 훅 인스턴스 생성
hook = MovielensHook(self._conn_id)
# try, except 문을 통해
# 예외 미발생시 True 반환, 예외 발생시 False 반환 하여 poke 구현
try:
next(
hook.get_ratings(
start_date=self._start_date,
end_date=self._end_date,
batch_size=1
)
)
self.log.info(f"데이터 있음!! {self._start_date} to {self._end_date}")
return True
except StopIteration:
self.log.info(f"데이터 없어!! {self._start_date} to {self._end_date}")
return False
finally:
hook.close() # close를 호출하여 리소스 해제
- 해당 커스텀 센서를 통해 영화 평점 api 연동시 데이터 존재 유무 체크후 다운스트림 task 실행 가능
커스텀 센서 사용한 dag
import datetime as dt
import pandas as pd
import os
from airflow import DAG
from airflow.operators.python import PythonOperator
from custom.operators import MovielensFetchRatingsOperator # 영화 평점 가져오는 커스텀 클래스 import
from custom.sensors import MovielensRatingsSensor # 커스텀 센서 클래스 import
from custom.ranking import rank_movies_by_rating # 랭킹 통계 클래스 import
# dag 정의
with DAG(
dag_id="04_sensor",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 3),
schedule_interval="@daily",
) as dag:
# with 절안에서 task 정의
# 센서 task
wait_for_ratings = MovielensRatingsSensor(
task_id="wait_for_ratings",
conn_id="movielens",
start_date="{{ds}}",
end_date="{{next_ds}}",
)
# 영화 평점 데이터 json 저장 task
fetch_ratings = MovielensFetchRatingsOperator(
task_id="fetch_ratings",
conn_id="movielens",
start_date="{{ds}}",
end_date="{{next_ds}}",
output_path="/opt/airflow/data/{{ds}}.json",
)
def _rank_movies(templates_dict, min_ratings=2, **_):
input_path = templates_dict["input_path"]
output_path = templates_dict["output_path"]
ratings = pd.read_json(input_path)
ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings) # 랭킹 함수 사용
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
ranking.to_csv(output_path, index=True) # 영화 랭킹 통계 데이터 csv로 저장
# 영화 랭킹 통계 task 정의
rank_movies = PythonOperator(
task_id="rank_movies",
python_callable=_rank_movies,
templates_dict={
"input_path": "/opt/airflow/data/{{ds}}.json",
"output_path": "/opt/airflow/data/{{ds}}.csv",
},
)
# task 의존성 정의
wait_for_ratings >> fetch_ratings >> rank_movies
>>> 센서 task 로그

5. 컴포넌트 패키징 하기
- dag에 커스텀 컴포넌트는 dag 디렉터리 내에 서브 패키지 까지 포함, 이런 방식은 타 프로젝트 사용시 또는 공유에는 이상적인 방법이 아님
- 컴포넌트 배포는 파이썬 패키지에 코드를 넣는 것이 좋음, 작업이 추가 되지만 airflow 에서 커스텀 컴포넌트 설치를 다른 패키지와 유사하게 할수 있다는 장점이 있다
(5-1) 파이썬 패키지 부트스트랩 작업
- 훅, 오퍼레이터, 센서, 랭킹 을 포함하는 airflow_movielens라는 패키지 생성 gogo
####### 패키지 빌드 #######
1) 디렉터리 생성
mkdir -p airflow-movielens
cd airflow-movielens
2) 소스 코드 보관 패키지 기본 구조 생성, 패키지로 만들그 위한 __init__.py 생성
mkdir -p src/airflow_movielens
touch /airflow_movielns/src/airflow_movielns/__init__.py
3) 훅, 센서, 오퍼레이터, 랭킹 클래스 소스를 복사

- setup.py 만들어 setuptools가 패키지 어떻게 설치 할건지 지시
#!/usr/bin/python3
import setuptools
# 필요한 파이썬 패키지 리스트
requirements = [
"apache-airflow==2.9.3",
"requests==2.31.0",
"pandas==1.3.5",
"numpy==1.21.5",
"scikit-learn==1.2.2"
]
# 각 패키지에 대해 ==, >=, <=, ~= 등의 연산자를 사용하여 원하는 버전을 지정가능
# "apache-airflow==2.5.0", # 정확히 버전 2.5.0을 사용
# "requests>=2.25.1", # 버전 2.25.1 이상을 사용
# "pandas~=1.3.0", # 버전 1.3.x를 사용
# "numpy<=1.21.2", # 버전 1.21.2 이하를 사용
# "scikit-learn>=0.24,<0.25" # 0.24 이상, 0.25 미만 버전을 사용
setuptools.setup(
name="airflow_movielens",
version="0.1.0",
description="hooks, opertaors, ranking, sensors",
# 개발 저자 및 세부사항
author="Anonymous",
author_email="anonymous@example.com",
# 패키지 종속성 제공
install_requires=requirements,
# setuptools에 패키지 파이썬 파일 위치 제공
packages=setuptools.find_packages("src"),
package_dir={"":"src"},
# 온라인에서 패키지를 찾을수 있는 위치
url="https://github.com/example-repo/airflow_movielens",
# 개발 코드 라이센스 정보
license="sysy0218",
)

(5-2) 파이썬 패키지 설치하기 ( pip 명령을 통해 파이썬 환경에 패키지 설치 )
# 구현하고자 하는 setup.py가 존재하는 경로 실행
python3 -m pip install /aiflow-movielens
8장 요약
- 커스텀 컴포넌트를 구현해 airflow 내장 기능 확장 가능
- 커스텀 훅을 통해 airflow가 지원 않는 시스템과 연동 가능
- 커스텀 센서를 통해 이벤트 대기하는 컴포넌트 구현 가능
- 커스텀 오퍼레이터, 훅, 센서 같은 코드를 파이썬 라이브러리로 구현
'데이터 엔지니어( 실습 정리 ) > airflow' 카테고리의 다른 글
| (10) - 컨테이너에서 태스크 실행 (1) | 2024.09.02 |
|---|---|
| (9) - 테스트 하기 (0) | 2024.08.28 |
| (7) - airflow 외부 시스템 통신 (1) | 2024.08.26 |
| (6) - airflow 워크플로 트리거 (0) | 2024.08.18 |
| (5) - airflow 태스크 간 의존성 정의 (0) | 2024.08.18 |