본문 바로가기
데이터 엔지니어( 실습 정리 )/airflow

(9) - 테스트 하기

by 세용용용용 2024. 8. 28.

 

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!

 

  • 9 장은 airflow 테스트에 대해 설명

 

1. airflow 테스트

  • 개별 태스크는 >> 단위 테스트
  • 여러 구성 요소의 동작을 검증 >> 통합 테스트
  • 파이썬 에서 사용되는pytest 테스트 프레임 워크 사용
  • git 에서 지원하는 CI/CD 시스템인 github Action 사용

 

(1-1) 모든 DAG에 대한 무결성 테스트

  • 무결성 테스트 : 시스템이나 데이터가 원래 의도된 상태와 일치하는지 확인하는 과정
  • dag가 정상적 구현이 되었는지 ( dag에 사이클이 포함되있는지, dag의 task id가 고유한지 등.. )
##### 테스트를 위한 pytest 설치 #####
# pytest 설치
pip install pytest

# 사용가능 옵션 확인
pytest --help

 

  • 테스트 영역 구성 법은 프로젝트 최상단 디렉터리에 test/ 디렉터리 생성후 검사 대상을 복사하여 구성
  • pytest 사용시 test_ 접두사 필요 ( pytest는 디렉터리 스캔을 통해 test_, _test 파일 검색, test 자동 검색 )
  • tests 디렉터리는 모듈 동작 구조가 아니기에 테스트는 의존적이지 않으며 독립적 실행할 수 있어야됨

 

 

dag 무결성 테스트 ( /opt/airflow/tests/dags/test_dag_integrity.py )

import glob # 파일 시스템에서 패턴 매칭해 파일 경로를 찾는 데 사용되는 모듈
import importlib.util # 동적으로 모듈을 임포트하는 데 사용
import os # 운영 체제와 상호작용하기 위한 다양한 기능을 제공하는 모듈

import pytest # Python의 단위 테스트 및 기능 테스트
from airflow.models import DAG # DAG를 정의하기 위한 클래스

DAG_PATH = os.path.join(
    os.path.dirname(__file__), "..", "..", "dags/**/*.py"
)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)

# 발견된 모든 파이썬 파일에 대해 테스트 실행하는 데커레이터
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
    module_name, _ = os.path.splitext(dag_file)
    module_path = os.path.join(DAG_PATH, dag_file)
    mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
    
    module = importlib.util.module_from_spec(mod_spec)
    mod_spec.loader.exec_module(module)
    
    dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
    assert dag_objects

    for dag in dag_objects:
        # Test cycles
        dag.test_cycle()

>>> 코드에서 두가지 검사

  • assert dag_objects로 /dags 경로 모든 파이썬 파일에 dag 객체가 적어도 하나 이상 포함되있는지 ( 유효성 검사 )
  • for dag in dag_objects : dag.test_cycle() 로 dag 객체 순환 주기 존재 여부

직접 검사 방법 추가

# dag 이름이 import, export로 시작하는 것 검사
assert dag.dag_id.startswith(("import", "export"))

 

DAG 무결성 테스트

# pytest 실행 ( 경로를 옵션으로 주고 다른 위치를 탐색하지 않도록 함 )
pytest /opt/airflow/tests/

테스트 결과 확인

 

 

(1-2) CI/CD 파이프라인 설정하기

  • CI/CD 파이프라인 : 코드 저장소를 통해 코드 변경시 사전 정의된 스크립트를 실행하는 시스템
  • 지속적 통합(CI) : 변경 코드가 코딩 표준과 테스트 조건을 준수하는지 확인 및 검증
  • CI : Flake8, pylint, black 같은 코드 검사기를 통해 코드 품질을 확인하고 일련의 테스트 실행
  • 지속적 배포(CD) : 자동화된 코드를 프로덕션 시스템에 자동으로 배포, 코딩 생산성 극대화
  • CI/CD는 매우 다양하며 우린 GitHub Action를 실습해볼거임
  • 보통 CI/CD 시스템은 파이프라인이 정의된 YAML 파일로 시작
  • 성공 파이프 라인 만 마스터에 병합

GitHub Action 파이프라인 예시

  • 깃허브가 push를 수신할 떄마다 전체 CI/CD 파이프라인 실행

 

(1-3) 단위 테스트 작성 ( 8장의 movielens_operator.py를 가지고 테스트 진행 )

Pytest 구성

  • test 스크립트는 test_ 접두어가 있어야됨
  • Pytest 목업을 사용해 실제 호출을 발생하지 않고 테스트 실행
  • 목업 설치 : pip install pytest-mock
  • Pytest 에서 목업 사용시 인수를 테스트 함수에 전달

 

from airflow.models import Connection
from airflow.operators.bash import BashOperator

from airflowbook.operators.movielens_operator import (
    MovielensPopularityOperator,
    MovielensHook,
)


def test_movielenspopularityoperator(mocker):
    mock_get = mocker.patch.object(
        MovielensHook,
        "get_connection",
        return_value=Connection(conn_id="test", login="airflow", password="airflow"),
    )
    task = MovielensPopularityOperator(
        task_id="test_id",
        conn_id="testconn",
        start_date="2015-01-01",
        end_date="2015-01-03",
        top_n=5,
    )
    result = task.execute(context=None)
    assert len(result) == 5
    assert mock_get.call_count == 1
    mock_get.assert_called_with("testconn")


def test_example():
    task = BashOperator(task_id="test", bash_command="echo 'hello!'", xcom_push=True)
    result = task.execute(context={})
    assert result == "hello!"

 

 

(1-4) 디스크 파일로 테스트

JSON >>> CSV 형식으로 만드는 예제 에서 JsonToCsvOperator을 통해 csv로 저장파이썬 임시 저장소 관련 작업을 위한 tempfile 모듈을 통해 파일저장하지 않고 테스트

import csv
import json
from pathlib import Path

from airflowbook.operators.json_to_csv_operator import JsonToCsvOperator


def test_json_to_csv_operator(tmp_path: Path):
    print(tmp_path.as_posix())

    input_path = tmp_path / "input.json"
    output_path = tmp_path / "output.csv"

    # Write input data to tmp path
    input_data = [
        {"name": "bob", "age": "41", "sex": "M"},
        {"name": "alice", "age": "24", "sex": "F"},
        {"name": "carol", "age": "60", "sex": "F"},
    ]
    with open(input_path, "w") as f:
        f.write(json.dumps(input_data))

    # Run task
    operator = JsonToCsvOperator(
        task_id="test", input_path=input_path, output_path=output_path
    )
    operator.execute(context={})

    # Read result
    with open(output_path, "r") as f:
        reader = csv.DictReader(f)
        result = [dict(row) for row in reader]

    # Assert
    assert result == input_data

 

 

2. dag 밑 태스크 콘텍스트로 작업

import datetime

import pytest
from airflow.models import DAG, BaseOperator

pytest_plugins = ["helpers_namespace"]


@pytest.fixture
def test_dag():
    return DAG(
        "test_dag",
        default_args={"owner": "airflow", "start_date": datetime.datetime(2015, 1, 1)},
        schedule_interval="@daily",
    )


@pytest.helpers.register
def run_airflow_task(task: BaseOperator, dag: DAG):
    dag.clear()
    task.run(
        start_date=dag.default_args["start_date"],
        end_date=dag.default_args["start_date"],
        ignore_ti_state=True,
    )

 

  • dag 객체가 모든 테스트는 test_dag() 함수 실행시 test_dag를 추가해 인스턴스 가능

 

(2-1) 외부 시스템 작업

api와 postgres 데이터베이스 연결 테스트시, 로컬에는 접근이 안되는 환경에서

pytest-docker-tools를 사용해 테스트 ( 테스트 도커 컨테이너 생성 )

import os

import pytest
from airflow.models import Connection
from pytest_docker_tools import fetch, container

from airflowbook.operators.movielens_operator import (
    MovielensHook,
    MovielensToPostgresOperator,
    PostgresHook,
)

postgres_image = fetch(repository="postgres:11.1-alpine")
postgres = container(
    image="{postgres_image.id}",
    environment={"POSTGRES_USER": "testuser", "POSTGRES_PASSWORD": "testpass"},
    ports={"5432/tcp": None},
    volumes={
        os.path.join(os.path.dirname(__file__), "postgres-init.sql"): {
            "bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
        }
    },
)


def test_movielens_to_postgres_operator(mocker, test_dag, postgres):
    mocker.patch.object(
        MovielensHook,
        "get_connection",
        return_value=Connection(conn_id="test", login="airflow", password="airflow"),
    )
    mocker.patch.object(
        PostgresHook,
        "get_connection",
        return_value=Connection(
            conn_id="postgres",
            conn_type="postgres",
            host="localhost",
            login="testuser",
            password="testpass",
            port=postgres.ports["5432/tcp"][0],
        ),
    )

    task = MovielensToPostgresOperator(
        task_id="test",
        movielens_conn_id="movielens_id",
        start_date="{{ prev_ds }}",
        end_date="{{ ds }}",
        postgres_conn_id="postgres_id",
        insert_query=(
            "INSERT INTO movielens (movieId,rating,ratingTimestamp,userId,scrapeTime) "
            "VALUES ({0}, '{{ macros.datetime.now() }}')"
        ),
        dag=test_dag,
    )

    pg_hook = PostgresHook()

    row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
    assert row_count == 0

    pytest.helpers.run_airflow_task(task, test_dag)

    row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
    assert row_count > 0

 

 

3. 개발을 위한 테스트 사용

Whirl을 이용한 프로덕션 환경 에뮬레이션

- 도커 컨테이너에서 운영환경 모든 구성요소를 시뮬레이션

- 도커 컴포즈로 괸리하는것

- 단점은 도커 이미지 제공이 안됨..ㅠㅠ

 

DTAP 환경 생성

- 격리된 DTAP 환경 설정

- 단점은 구현이 매우 복잡,,,