데이터 엔지니어( 실습 정리 )/airflow

(10) - 컨테이너에서 태스크 실행

세용용용용 2024. 9. 2. 21:39

 

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!

 

  • 10장은 구축, 배포 유지 관리가 편한 airflow dag 생성에 중점을 둠
  • 이러한 문제를 고려해 도커, 쿠버네트스 사용한 컨테이너 airflow 태스크 실행

 

1. 다양한 오퍼레이터 고려사항

  • 여러 오퍼레이터 존재시 dag는 관리하기 어려움
  • 여러 오퍼레이터 사용시 오퍼레이터마다 종속성을 요구 ( 파이썬은 동일 환경에 동일한 패키지 버전을 설치할 수 없음 )

 

(1-1) 제너릭 오퍼레이터 지향하기

  • airflow 태스크 실행을 위해 하나의 제너릭 오퍼레이터 사용
  • 각각에 대한 종속성 패키지 설치후 동시에 다양한 태스크 실행할 수있는 제너릭 오퍼레이를 >>> 컨테이너를 통해 구현

 

2. 컨테이너

  • 애플리케이션의 필요한 종속성을 포함하고, 서로 다른 환경에서 쉽게 배포할수 있는 기술
  • 배포의 문제 (운영체제 차이, 종속성, 라이브러리, 하드웨어 등 ) 다양한 요소를 고려해야함
  • 이런 복잡성 관리는 가상화를 사용
  • vm의 단점은 호스트os위에 가상os를 실행하기에 무거움... ( 즉, 큰 리소스로 인한 오버헤드 발생 )

 

컨테이너 기반 가상화

  • 호스트os의 커널 레벨 기능을 사용해 애플리케이션 가상화
  • 각 애플리케이션은 자체 os를 구동할 필요없이 호스트 os에서 간단하게 활용가능

 

(2-1) 간단한 도커 컨테이너 실행

# debian:buster-slim 이미지에서 Hello, world! 출력
docker run debian:buster-slim echo Hello, world!
  • 도커 클라이언트 도커 데몬에 접속
  • 도커 데몬은 허브 레지스트리에 debian 이미지 가져옴
  • 도커 데몬은 해당 이미지를 사용해 컨테이너 생성
  • 컨테이너 내부에서 echo Hello, world 명령 실행
  • 도커 데몬은 명령에 대한 출력을 도커 클라이언트로 전송해 터미널에 표시

 

(2-2) 도커 이미지 생성

 

dockerfile을 작성해 도커 이미지 빌드

FROM python:3.8-slim # 기반으로 사용할 이미지

# 종속성 파일을 복사후 pip로 설치
COPY requirements.txt /tmp/requirements
RUN pip install -r /tmp/requirements

# 실행할 스크립트를 복사하고 실행파일로 변경
COPY fetch_weather.py /usr/local/bin/fetch-weather
RUN chmod +x /usr/local/bin/fetch-weather

ENTRYPOINT [ "/usr/local/bin/fetch_weather.py" ] # 컨테이너 시작할 떄 실행할 명령을 도커에 알림
CMD [ "--help" ] # 도커 명령에 포함할 기본 인수 알려줌


#### 이후 도커 빌드 ####
docker build --tag manning-airflow/wttr-example .
#### wttr 이미지를 사용해 컨테이너 실행
docker run manning-airflow/wttr-example:latest

 

(2-3) 볼륨을 사용해 데이터 유지

# 특정 도시에 대한 wttr 컨테이너 실행
docker run wttr-example:latest Amsterdam

컨테이너 실행

 

컨테이너 실행시 볼륨 마운트

# docker run -v [로컬 경로]:[컨테이너 경로]
docker run -v /work/rowdata_airflow/wttr:/data \
manning-airflow/wttr-example Amsterdam --output_path /data/amsterdam.json

#### 컨데이터 stop ####
docker stop <컨테이너 id>

#### 컨테이너 제거 ####
docker rm <컨테이너 id>

#### 컨테이너 확인 ( 실행, 중지 다 확인 ) ####
docker ps -a

json 마운트 확인

 

 

3. 컨테이너와 airflow

  • airflow를 통해 태스크를 컨테이너로 실행가능
  • 컨테이너 기반 오퍼레이터(DockeOperator, KubernetesPodOperator) 사용해 태스크 정의, 해다 오퍼레이터는 실행되면 컨테이너 실행을 시작후 컨테이너가 정상 완료까지 기다림
  • 평점 가져오기(HttpOperator), 영화 랭킹 지정(PythonOpertator), 결과 게시(postgres 기반 오퍼레이터) 각 태스크를 DockerOperator을 사용해 다양한 태스크를 대체하고 적절한 종속석 가진 3개의 다른 도커 컨테이너에서 명령 실행가능!!

 

왜 굳이 컨테이너 사용???

  • 컨테이너 기반 접근 방식은 각 태스트에 대해 이미지 빌드해야됨( 구현이 복잡,,,, 굳이 왜??,,, )
  • 간편한 종속성 : 해당 태스크만의 종속성을 설치가능 ( 태스크간 종속성 출동 발생 안함 )
  • 다양한 태스크 실행시 동일한 접근 방식 제공
  • 향상된 테스트 가능성

 

4. 도커에서 태스크 실행

(4-1) DockerOperator

DockerOperator을 사용해 도커를 통해 컨테이너에서 태스크 실행

# task 간단 예시
rank_movies = DockerOperator(
    task_id="rank_movies",
    image="manning-airflow/movieslens-ranking", # DockerOperator에게 사용할 이미지 알려줌
    command=[ # 컨테이너 실행할 명령어
        "rank_movies.py",
        "--input_path",
        "/data/ratings/{{ds}}.json",
        "--output_path",
        "/data/rankings/{{ds}}.csv",
    ],
    volumes=["/tmp/airflow/data:/data"] # 마운트할 볼륨 지정
)

 

오퍼레이터 동작 방식

  • airflow는 워커에 task를 스케줄해 실행지시
  • DockerOperator은 적절한 인수를 사용해 워커 시스템에서 docker run 명령 실행
  • 도커데몬은 필요한 이미지 가져옴
  • 컨테이너를 실행
  • 로컬 볼륨에 마운트
  • 컨테이너 종료되고 DockerOperator은 airflow 워커 결과를 반환

 

(4-2) task를 위한 컨테이너 이미지 생성

  • DockerOperator을 사용하여 태스크를 실행하려면 태스크에 대한 도커 이미지를 빌드 해야됨

 

평점 스크립트

더보기

#!/usr/bin/env python

from pathlib import Path
import logging
import json
import click
import requests
logging.basicConfig(level=logging.INFO)

@click.command()
@click.option(
    "--start_date",
    type=click.DateTime(formats=["%Y-%m-%d"]),
    required=True,
    help="Start date for ratings.",
)
@click.option(
    "--end_date",
    type=click.DateTime(formats=["%Y-%m-%d"]),
    required=True,
    help="End date for ratings.",
)
@click.option(
    "--output_path",
    type=click.Path(dir_okay=False),
    required=True,
    help="Output file path.",
)
@click.option(
    "--host", type=str, default="http://movielens:5000", help="Movielens API URL."
)
@click.option(
    "--user",
    type=str,
    envvar="MOVIELENS_USER",
    required=True,
    help="Movielens API user.",
)
@click.option(
    "--password",
    type=str,
    envvar="MOVIELENS_PASSWORD",
    required=True,
    help="Movielens API password.",
)
@click.option(
    "--batch_size", type=int, default=100, help="Batch size for retrieving records."
)

def main(start_date, end_date, output_path, host, user, password, batch_size):
    """CLI script for fetching movie ratings from the movielens API."""

    # Setup session.
    session = requests.Session()
    session.auth = (user, password)

    # Fetch ratings.
    logging.info("Fetching ratings from %s (user: %s)", host, user)

    ratings = list(
        _get_ratings(
            session=session,
            host=host,
            start_date=start_date,
            end_date=end_date,
            batch_size=batch_size,
        )
    )
    logging.info("Retrieved %d ratings!", len(ratings))

    # Write output.
    output_path = Path(output_path)

    output_dir = output_path.parent
    output_dir.mkdir(parents=True, exist_ok=True)

    logging.info("Writing to %s", output_path)
    with output_path.open("w") as file_:
        json.dump(ratings, file_)


def _get_ratings(session, host, start_date, end_date, batch_size=100):
    yield from _get_with_pagination(
        session=session,
        url=host + "/ratings",
        params={
            "start_date": start_date.strftime("%Y-%m-%d"),
            "end_date": end_date.strftime("%Y-%m-%d"),
        },
        batch_size=batch_size,
    )

def _get_with_pagination(session, url, params, batch_size=100):
    """
    Fetches records using a get request with given url/params,
    taking pagination into account.
    """

    offset = 0
    total = None
    while total is None or offset < total:
        response = session.get(
            url, params={**params, **{"offset": offset, "limit": batch_size}}
        )
        response.raise_for_status()
        response_json = response.json()

        yield from response_json["result"]

        offset += batch_size
        total = response_json["total"]

if __name__ == "__main__":
    main()

click의 주요 기능

  • 간단한 CLI 구현: 복잡한 명령어 라인 인터페이스를 간단하고 직관적으로 작성할 수 있도록 도와줌
  • 명령어 및 옵션 정의: 명령어, 인수 및 옵션을 쉽게 정의
  • 자동 도움말 및 문서화
  • 데코레이터를 사용한 간편한 함수 정의: CLI 명령어를 정의할 때 함수에 데코레이터를 사용하여 간편하게 처리

 

평점 스크립트 도커파일

FROM python:3.8-slim # 베이스 이미지

RUN python -m pip install click==7.1.1 requests==2.23.0 # 종속성 설치
COPY fetch_ratings.py /usr/local/bin/fetch-ratings
RUN chmod +x /usr/local/bin/fetch-ratings

ENV PATH="/usr/local/bin:${PATH}" # 스크립트 전체경로 지정않고도 실행 가능

>>> ENV PATH : 스크립트 전체경로를 지정하지 않고 fetch-ratings 명령으로 스크립트 실행 가능!!

 

 

평점 스크립트 빌드 && run

## docker build
docker build -t manning-airflow/movielens-fetch .

## docker run
docker run --rm manning-airflow/movielens-fetch fetch-ratings --help

 

 

영화 랭킹 스크립트

더보기

#!/usr/bin/env python

from pathlib import Path
import click
import pandas as pd

# 함수가 CLI 명령어로 실행될 수 있음을 나타냄
@click.command()

# CLI 명령에 사용되는 옵션을 저으이
@click.option(
    "--input_path",
    # dir_okay : 경로가 디렉토리면 오류
    # exists : 경로가 실제 존재하는지 확인
    # readable=True : 파일 읽을수 있는지 확인
    type=click.Path(dir_okay=False, exists=True, readable=True),
    required=True,
)
@click.option(
    "--output_path",
    type=click.Path(dir_okay=False, writable=True), required=True
)
@click.option("--min_ratings", type=int, default=2)
def main(input_path, output_path, min_ratings):
    output_path = Path(output_path)

    ratings = pd.read_json(input_path)
    ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings)

    output_path.parent.mkdir(parents=True, exist_ok=True)
    ranking.to_csv(output_path, index=True)


def rank_movies_by_rating(ratings, min_ratings=2):
    ranking = (
        ratings.groupby("movieId")
        .agg(
            avg_rating=pd.NamedAgg(column="rating", aggfunc="mean"),
            num_ratings=pd.NamedAgg(column="userId", aggfunc="nunique"),
        )
        .loc[lambda df: df["num_ratings"] > min_ratings]
        .sort_values(["avg_rating", "num_ratings"], ascending=False)
    )
    return ranking


if __name__ == "__main__":
    main()

도커 파일

FROM python:3.8-slim

RUN pip install click==7.1.1 pandas==0.25.2 numpy==1.19.5
COPY rank_movies.py /usr/local/bin/rank-movies
RUN chmod +x /usr/local/bin/rank-movies
ENV PATH="/usr/local/bin:${PATH}"

 

 

(4-3) 도커 task로 dag 구성

  • DockerOperator로 기존 task 대체후 DockerOperator이 올바른 인수를 사용해 태스크 실행하도록 구현
  • 또한 각 컨테이너는 task 작업 종료지 파일시스템이 존재하지 않기에 태스크 간 데이터 교환방법도 고려

 

평점, 랭킹 가져오기 컨테이너 실행( /work/dag_airflow/01_docker.py )

import datetime as dt
import os
from airflow import DAG

# 도커 오퍼레이터
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount

# dag 정의
with DAG(
    dag_id="01_docker",
    description="movielens Gapi using docker",
    start_date=dt.datetime(2019, 1, 1),
    end_date=dt.datetime(2019, 1, 3),
    schedule_interval="@daily",
) as dag:
    # 컨테이너 task 정의
    # 도커 오퍼레이터 사용
    fetch_ratings=DockerOperator(
    task_id="fetch_ratings",
    image="manning-airflow/movielens-fetch",
    command=[
        "fetch-ratings", # 컨테이너에서 fetch_ratings 스크립트 실행, 밑에는 필수 인자
        "--start_date",
        "{{ds}}",
        "--end_date",
        "{{next_ds}}",
        "--output_path",
        "/data/ratings/{{ds}}.json",
        "--user",
        os.environ["MOVIELENS_USER"],
        "--password",
        os.environ["MOVIELENS_PASSWORD"],
        "--host",
        os.environ["MOVIELENS_HOST"],
        ],
        network_mode="airflow",
        mounts=[
            Mount(
                source="/work/row_data",
                target="/data",
                type="bind")
        ],
    )

    rank_movies = DockerOperator(
        task_id="rank_movies",
        image="manning-airflow/movielens-rank",
        command=[
            "rank-movies",
            "--input_path",
            "/data/ratings/{{ds}}.json",
            "--output_path",
            "/data/rankings/{{ds}}.csv",
        ],
        mounts=[
            Mount(
                source="/work/row_data",
                target="/data",
                type="bind")
        ],
    )

    fetch_ratings >> rank_movies

 

 

평점, 랭킹 컨테이너 실행 환경을 위한 도커 컴포즈 ( /work/docker_compo/docker-compose.yml )

version: '3.7'
######### airflow env #########
x-environment: &airflow_environment
    AIRFLOW__CORE__EXECUTOR: "SequentialExecutor"
    AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: "False"
    AIRFLOW__CORE__LOAD_EXAMPLES: "False"
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: "sqlite:////opt/airflow/airflow.db"
    AIRFLOW__CORE__STORE_DAG_CODE: "True"
    AIRFLOW__CORE__STORE_SERIALIZED_DAGS: "True"
    AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
    AIRFLOW__WEBSERVER__RBAC: "False"
    AIRFLOW_CONN_MOVIELENS: "http://airflow:airflow@movielens"
    MOVIELENS_USER: "airflow"
    MOVIELENS_PASSWORD: "airflow"
    MOVIELENS_HOST: "http://movielens:5000"
######### airflow env #########
services:
  airflow:
    build: /work/docker_images/docker_airflow
    image: airflow/airflow_test:2.9.3
    container_name: airflow_container
    depends_on:
      - movielens
    ports:
      - "9898:8080"
    volumes:
      - /work/dag_airflow/:/opt/airflow/dags/
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - airflow
    environment: *airflow_environment
  movielens:
    build: /work/docker_images/movielens-api
    image: manning-airflow/movielens-api
    container_name: chapter10_docker_movielens_1
    networks:
      - airflow
    ports:
      - "5000:5000"
    environment:
      API_USER: airflow
      API_PASSWORD: airflow

  movielens-fetch:
    build: /work/docker_images/movielens-fetch
    image: manning-airflow/movielens-fetch
    container_name: chapter10_docker_movielens-fetch_1
    restart: "no"
  movielens-rank:
    build: /work/docker_images/movielens-ranking
    image: manning-airflow/movielens-rank
    container_name: chapter10_docker_movielens-rank_1
    restart: "no"

networks:
  airflow:
    name: airflow
  • /var/run/docker.sock:/var/run/docker.sock는 Docker 컨테이너가 호스트 시스템의 Docker 데몬과 상호작용할 수 있도록 하는 Docker의 소켓 파일을 컨테이너와 공유하는 것입니다. 이 설정을 사용하면 컨테이너 내에서 Docker 명령어를 실행하거나, Docker API를 호출하여 다른 Docker 컨테이너를 관리할 수 있습니다.

 

오버레이 ( /work/docker_compo/docker-compose.override.yml )

version: '3.7'
services:
  movielens-fetch:
    container_name: chapter10_docker_movielens-fetch_1
    command: ["echo", "Skip running"]
  movielens-rank:
    container_name: chapter10_docker_movielens-rank_1
    command: ["echo", "Skip running"]

 

실행 명령

docker-compose -f docker-compose.yml -f docker-compose.override.yml up -d

 

ui 에서 dag 실행결과
산출물 저장 확인
도커 이미지 작업을 위한 워크플로

 

  • 컨테이너 기반 task는 dag의 개발을 효과적으로 분리할 수 있다!!!
  • 이미지 자체 라이프사이클 내에 개발할 수 있으며 dag와 별로도 이미지 테스트 가능~~

 

5. 쿠버네티스에서 task 실행

  • 도커는 task를 단일 시스템서 실행할수 있는 편리한 접근법 제공
  • but.. 여러 시스템 에서 task를 조정하고 분산하는데 도움되지 않음 ㅠㅠ
  • 이런 도커 한계로 쿠버네티스(컨테이너 오케스트레이션 시스템) 을 통해 확장 가능

 

(5-1) 쿠버네티스

쿠버네티스는 컨테이너화된 애플리케이션의 배포, 확장 및 관리에 초점을 맞춘 오픈소스 ( 컨테이너 오케스트레이션 플랫폼 )