(10) - 컨테이너에서 태스크 실행

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!
- 10장은 구축, 배포 유지 관리가 편한 airflow dag 생성에 중점을 둠
- 이러한 문제를 고려해 도커, 쿠버네트스 사용한 컨테이너 airflow 태스크 실행
1. 다양한 오퍼레이터 고려사항
- 여러 오퍼레이터 존재시 dag는 관리하기 어려움
- 여러 오퍼레이터 사용시 오퍼레이터마다 종속성을 요구 ( 파이썬은 동일 환경에 동일한 패키지 버전을 설치할 수 없음 )

(1-1) 제너릭 오퍼레이터 지향하기
- airflow 태스크 실행을 위해 하나의 제너릭 오퍼레이터 사용
- 각각에 대한 종속성 패키지 설치후 동시에 다양한 태스크 실행할 수있는 제너릭 오퍼레이를 >>> 컨테이너를 통해 구현
2. 컨테이너
- 애플리케이션의 필요한 종속성을 포함하고, 서로 다른 환경에서 쉽게 배포할수 있는 기술
- 배포의 문제 (운영체제 차이, 종속성, 라이브러리, 하드웨어 등 ) 다양한 요소를 고려해야함
- 이런 복잡성 관리는 가상화를 사용
- vm의 단점은 호스트os위에 가상os를 실행하기에 무거움... ( 즉, 큰 리소스로 인한 오버헤드 발생 )
컨테이너 기반 가상화
- 호스트os의 커널 레벨 기능을 사용해 애플리케이션 가상화
- 각 애플리케이션은 자체 os를 구동할 필요없이 호스트 os에서 간단하게 활용가능

(2-1) 간단한 도커 컨테이너 실행
# debian:buster-slim 이미지에서 Hello, world! 출력
docker run debian:buster-slim echo Hello, world!
- 도커 클라이언트 도커 데몬에 접속
- 도커 데몬은 허브 레지스트리에 debian 이미지 가져옴
- 도커 데몬은 해당 이미지를 사용해 컨테이너 생성
- 컨테이너 내부에서 echo Hello, world 명령 실행
- 도커 데몬은 명령에 대한 출력을 도커 클라이언트로 전송해 터미널에 표시
(2-2) 도커 이미지 생성
dockerfile을 작성해 도커 이미지 빌드
FROM python:3.8-slim # 기반으로 사용할 이미지
# 종속성 파일을 복사후 pip로 설치
COPY requirements.txt /tmp/requirements
RUN pip install -r /tmp/requirements
# 실행할 스크립트를 복사하고 실행파일로 변경
COPY fetch_weather.py /usr/local/bin/fetch-weather
RUN chmod +x /usr/local/bin/fetch-weather
ENTRYPOINT [ "/usr/local/bin/fetch_weather.py" ] # 컨테이너 시작할 떄 실행할 명령을 도커에 알림
CMD [ "--help" ] # 도커 명령에 포함할 기본 인수 알려줌
#### 이후 도커 빌드 ####
docker build --tag manning-airflow/wttr-example .
#### wttr 이미지를 사용해 컨테이너 실행
docker run manning-airflow/wttr-example:latest
(2-3) 볼륨을 사용해 데이터 유지
# 특정 도시에 대한 wttr 컨테이너 실행
docker run wttr-example:latest Amsterdam

컨테이너 실행시 볼륨 마운트
# docker run -v [로컬 경로]:[컨테이너 경로]
docker run -v /work/rowdata_airflow/wttr:/data \
manning-airflow/wttr-example Amsterdam --output_path /data/amsterdam.json
#### 컨데이터 stop ####
docker stop <컨테이너 id>
#### 컨테이너 제거 ####
docker rm <컨테이너 id>
#### 컨테이너 확인 ( 실행, 중지 다 확인 ) ####
docker ps -a

3. 컨테이너와 airflow
- airflow를 통해 태스크를 컨테이너로 실행가능
- 컨테이너 기반 오퍼레이터(DockeOperator, KubernetesPodOperator) 사용해 태스크 정의, 해다 오퍼레이터는 실행되면 컨테이너 실행을 시작후 컨테이너가 정상 완료까지 기다림
- 평점 가져오기(HttpOperator), 영화 랭킹 지정(PythonOpertator), 결과 게시(postgres 기반 오퍼레이터) 각 태스크를 DockerOperator을 사용해 다양한 태스크를 대체하고 적절한 종속석 가진 3개의 다른 도커 컨테이너에서 명령 실행가능!!

왜 굳이 컨테이너 사용???
- 컨테이너 기반 접근 방식은 각 태스트에 대해 이미지 빌드해야됨( 구현이 복잡,,,, 굳이 왜??,,, )
- 간편한 종속성 : 해당 태스크만의 종속성을 설치가능 ( 태스크간 종속성 출동 발생 안함 )
- 다양한 태스크 실행시 동일한 접근 방식 제공
- 향상된 테스트 가능성
4. 도커에서 태스크 실행
(4-1) DockerOperator
DockerOperator을 사용해 도커를 통해 컨테이너에서 태스크 실행
# task 간단 예시
rank_movies = DockerOperator(
task_id="rank_movies",
image="manning-airflow/movieslens-ranking", # DockerOperator에게 사용할 이미지 알려줌
command=[ # 컨테이너 실행할 명령어
"rank_movies.py",
"--input_path",
"/data/ratings/{{ds}}.json",
"--output_path",
"/data/rankings/{{ds}}.csv",
],
volumes=["/tmp/airflow/data:/data"] # 마운트할 볼륨 지정
)
오퍼레이터 동작 방식

- airflow는 워커에 task를 스케줄해 실행지시
- DockerOperator은 적절한 인수를 사용해 워커 시스템에서 docker run 명령 실행
- 도커데몬은 필요한 이미지 가져옴
- 컨테이너를 실행
- 로컬 볼륨에 마운트
- 컨테이너 종료되고 DockerOperator은 airflow 워커 결과를 반환
(4-2) task를 위한 컨테이너 이미지 생성
- DockerOperator을 사용하여 태스크를 실행하려면 태스크에 대한 도커 이미지를 빌드 해야됨
평점 스크립트
#!/usr/bin/env python
from pathlib import Path
import logging
import json
import click
import requests
logging.basicConfig(level=logging.INFO)
@click.command()
@click.option(
"--start_date",
type=click.DateTime(formats=["%Y-%m-%d"]),
required=True,
help="Start date for ratings.",
)
@click.option(
"--end_date",
type=click.DateTime(formats=["%Y-%m-%d"]),
required=True,
help="End date for ratings.",
)
@click.option(
"--output_path",
type=click.Path(dir_okay=False),
required=True,
help="Output file path.",
)
@click.option(
"--host", type=str, default="http://movielens:5000", help="Movielens API URL."
)
@click.option(
"--user",
type=str,
envvar="MOVIELENS_USER",
required=True,
help="Movielens API user.",
)
@click.option(
"--password",
type=str,
envvar="MOVIELENS_PASSWORD",
required=True,
help="Movielens API password.",
)
@click.option(
"--batch_size", type=int, default=100, help="Batch size for retrieving records."
)
def main(start_date, end_date, output_path, host, user, password, batch_size):
"""CLI script for fetching movie ratings from the movielens API."""
# Setup session.
session = requests.Session()
session.auth = (user, password)
# Fetch ratings.
logging.info("Fetching ratings from %s (user: %s)", host, user)
ratings = list(
_get_ratings(
session=session,
host=host,
start_date=start_date,
end_date=end_date,
batch_size=batch_size,
)
)
logging.info("Retrieved %d ratings!", len(ratings))
# Write output.
output_path = Path(output_path)
output_dir = output_path.parent
output_dir.mkdir(parents=True, exist_ok=True)
logging.info("Writing to %s", output_path)
with output_path.open("w") as file_:
json.dump(ratings, file_)
def _get_ratings(session, host, start_date, end_date, batch_size=100):
yield from _get_with_pagination(
session=session,
url=host + "/ratings",
params={
"start_date": start_date.strftime("%Y-%m-%d"),
"end_date": end_date.strftime("%Y-%m-%d"),
},
batch_size=batch_size,
)
def _get_with_pagination(session, url, params, batch_size=100):
"""
Fetches records using a get request with given url/params,
taking pagination into account.
"""
offset = 0
total = None
while total is None or offset < total:
response = session.get(
url, params={**params, **{"offset": offset, "limit": batch_size}}
)
response.raise_for_status()
response_json = response.json()
yield from response_json["result"]
offset += batch_size
total = response_json["total"]
if __name__ == "__main__":
main()
click의 주요 기능
- 간단한 CLI 구현: 복잡한 명령어 라인 인터페이스를 간단하고 직관적으로 작성할 수 있도록 도와줌
- 명령어 및 옵션 정의: 명령어, 인수 및 옵션을 쉽게 정의
- 자동 도움말 및 문서화
- 데코레이터를 사용한 간편한 함수 정의: CLI 명령어를 정의할 때 함수에 데코레이터를 사용하여 간편하게 처리
평점 스크립트 도커파일
FROM python:3.8-slim # 베이스 이미지
RUN python -m pip install click==7.1.1 requests==2.23.0 # 종속성 설치
COPY fetch_ratings.py /usr/local/bin/fetch-ratings
RUN chmod +x /usr/local/bin/fetch-ratings
ENV PATH="/usr/local/bin:${PATH}" # 스크립트 전체경로 지정않고도 실행 가능
>>> ENV PATH : 스크립트 전체경로를 지정하지 않고 fetch-ratings 명령으로 스크립트 실행 가능!!
평점 스크립트 빌드 && run
## docker build
docker build -t manning-airflow/movielens-fetch .
## docker run
docker run --rm manning-airflow/movielens-fetch fetch-ratings --help

영화 랭킹 스크립트
#!/usr/bin/env python
from pathlib import Path
import click
import pandas as pd
# 함수가 CLI 명령어로 실행될 수 있음을 나타냄
@click.command()
# CLI 명령에 사용되는 옵션을 저으이
@click.option(
"--input_path",
# dir_okay : 경로가 디렉토리면 오류
# exists : 경로가 실제 존재하는지 확인
# readable=True : 파일 읽을수 있는지 확인
type=click.Path(dir_okay=False, exists=True, readable=True),
required=True,
)
@click.option(
"--output_path",
type=click.Path(dir_okay=False, writable=True), required=True
)
@click.option("--min_ratings", type=int, default=2)
def main(input_path, output_path, min_ratings):
output_path = Path(output_path)
ratings = pd.read_json(input_path)
ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings)
output_path.parent.mkdir(parents=True, exist_ok=True)
ranking.to_csv(output_path, index=True)
def rank_movies_by_rating(ratings, min_ratings=2):
ranking = (
ratings.groupby("movieId")
.agg(
avg_rating=pd.NamedAgg(column="rating", aggfunc="mean"),
num_ratings=pd.NamedAgg(column="userId", aggfunc="nunique"),
)
.loc[lambda df: df["num_ratings"] > min_ratings]
.sort_values(["avg_rating", "num_ratings"], ascending=False)
)
return ranking
if __name__ == "__main__":
main()
도커 파일
FROM python:3.8-slim
RUN pip install click==7.1.1 pandas==0.25.2 numpy==1.19.5
COPY rank_movies.py /usr/local/bin/rank-movies
RUN chmod +x /usr/local/bin/rank-movies
ENV PATH="/usr/local/bin:${PATH}"
(4-3) 도커 task로 dag 구성
- DockerOperator로 기존 task 대체후 DockerOperator이 올바른 인수를 사용해 태스크 실행하도록 구현
- 또한 각 컨테이너는 task 작업 종료지 파일시스템이 존재하지 않기에 태스크 간 데이터 교환방법도 고려
평점, 랭킹 가져오기 컨테이너 실행( /work/dag_airflow/01_docker.py )
import datetime as dt
import os
from airflow import DAG
# 도커 오퍼레이터
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount
# dag 정의
with DAG(
dag_id="01_docker",
description="movielens Gapi using docker",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 3),
schedule_interval="@daily",
) as dag:
# 컨테이너 task 정의
# 도커 오퍼레이터 사용
fetch_ratings=DockerOperator(
task_id="fetch_ratings",
image="manning-airflow/movielens-fetch",
command=[
"fetch-ratings", # 컨테이너에서 fetch_ratings 스크립트 실행, 밑에는 필수 인자
"--start_date",
"{{ds}}",
"--end_date",
"{{next_ds}}",
"--output_path",
"/data/ratings/{{ds}}.json",
"--user",
os.environ["MOVIELENS_USER"],
"--password",
os.environ["MOVIELENS_PASSWORD"],
"--host",
os.environ["MOVIELENS_HOST"],
],
network_mode="airflow",
mounts=[
Mount(
source="/work/row_data",
target="/data",
type="bind")
],
)
rank_movies = DockerOperator(
task_id="rank_movies",
image="manning-airflow/movielens-rank",
command=[
"rank-movies",
"--input_path",
"/data/ratings/{{ds}}.json",
"--output_path",
"/data/rankings/{{ds}}.csv",
],
mounts=[
Mount(
source="/work/row_data",
target="/data",
type="bind")
],
)
fetch_ratings >> rank_movies
평점, 랭킹 컨테이너 실행 환경을 위한 도커 컴포즈 ( /work/docker_compo/docker-compose.yml )
version: '3.7'
######### airflow env #########
x-environment: &airflow_environment
AIRFLOW__CORE__EXECUTOR: "SequentialExecutor"
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: "False"
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
AIRFLOW__CORE__SQL_ALCHEMY_CONN: "sqlite:////opt/airflow/airflow.db"
AIRFLOW__CORE__STORE_DAG_CODE: "True"
AIRFLOW__CORE__STORE_SERIALIZED_DAGS: "True"
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
AIRFLOW__WEBSERVER__RBAC: "False"
AIRFLOW_CONN_MOVIELENS: "http://airflow:airflow@movielens"
MOVIELENS_USER: "airflow"
MOVIELENS_PASSWORD: "airflow"
MOVIELENS_HOST: "http://movielens:5000"
######### airflow env #########
services:
airflow:
build: /work/docker_images/docker_airflow
image: airflow/airflow_test:2.9.3
container_name: airflow_container
depends_on:
- movielens
ports:
- "9898:8080"
volumes:
- /work/dag_airflow/:/opt/airflow/dags/
- /var/run/docker.sock:/var/run/docker.sock
networks:
- airflow
environment: *airflow_environment
movielens:
build: /work/docker_images/movielens-api
image: manning-airflow/movielens-api
container_name: chapter10_docker_movielens_1
networks:
- airflow
ports:
- "5000:5000"
environment:
API_USER: airflow
API_PASSWORD: airflow
movielens-fetch:
build: /work/docker_images/movielens-fetch
image: manning-airflow/movielens-fetch
container_name: chapter10_docker_movielens-fetch_1
restart: "no"
movielens-rank:
build: /work/docker_images/movielens-ranking
image: manning-airflow/movielens-rank
container_name: chapter10_docker_movielens-rank_1
restart: "no"
networks:
airflow:
name: airflow
- /var/run/docker.sock:/var/run/docker.sock는 Docker 컨테이너가 호스트 시스템의 Docker 데몬과 상호작용할 수 있도록 하는 Docker의 소켓 파일을 컨테이너와 공유하는 것입니다. 이 설정을 사용하면 컨테이너 내에서 Docker 명령어를 실행하거나, Docker API를 호출하여 다른 Docker 컨테이너를 관리할 수 있습니다.
오버레이 ( /work/docker_compo/docker-compose.override.yml )
version: '3.7'
services:
movielens-fetch:
container_name: chapter10_docker_movielens-fetch_1
command: ["echo", "Skip running"]
movielens-rank:
container_name: chapter10_docker_movielens-rank_1
command: ["echo", "Skip running"]
실행 명령
docker-compose -f docker-compose.yml -f docker-compose.override.yml up -d



- 컨테이너 기반 task는 dag의 개발을 효과적으로 분리할 수 있다!!!
- 이미지 자체 라이프사이클 내에 개발할 수 있으며 dag와 별로도 이미지 테스트 가능~~
5. 쿠버네티스에서 task 실행
- 도커는 task를 단일 시스템서 실행할수 있는 편리한 접근법 제공
- but.. 여러 시스템 에서 task를 조정하고 분산하는데 도움되지 않음 ㅠㅠ
- 이런 도커 한계로 쿠버네티스(컨테이너 오케스트레이션 시스템) 을 통해 확장 가능
(5-1) 쿠버네티스
쿠버네티스는 컨테이너화된 애플리케이션의 배포, 확장 및 관리에 초점을 맞춘 오픈소스 ( 컨테이너 오케스트레이션 플랫폼 )