
Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!
- 9 장은 airflow 테스트에 대해 설명
1. airflow 테스트
- 개별 태스크는 >> 단위 테스트
- 여러 구성 요소의 동작을 검증 >> 통합 테스트
- 파이썬 에서 사용되는pytest 테스트 프레임 워크 사용
- git 에서 지원하는 CI/CD 시스템인 github Action 사용
(1-1) 모든 DAG에 대한 무결성 테스트
- 무결성 테스트 : 시스템이나 데이터가 원래 의도된 상태와 일치하는지 확인하는 과정
- dag가 정상적 구현이 되었는지 ( dag에 사이클이 포함되있는지, dag의 task id가 고유한지 등.. )
##### 테스트를 위한 pytest 설치 #####
# pytest 설치
pip install pytest
# 사용가능 옵션 확인
pytest --help
- 테스트 영역 구성 법은 프로젝트 최상단 디렉터리에 test/ 디렉터리 생성후 검사 대상을 복사하여 구성
- pytest 사용시 test_ 접두사 필요 ( pytest는 디렉터리 스캔을 통해 test_, _test 파일 검색, test 자동 검색 )
- tests 디렉터리는 모듈 동작 구조가 아니기에 테스트는 의존적이지 않으며 독립적 실행할 수 있어야됨

dag 무결성 테스트 ( /opt/airflow/tests/dags/test_dag_integrity.py )
import glob # 파일 시스템에서 패턴 매칭해 파일 경로를 찾는 데 사용되는 모듈
import importlib.util # 동적으로 모듈을 임포트하는 데 사용
import os # 운영 체제와 상호작용하기 위한 다양한 기능을 제공하는 모듈
import pytest # Python의 단위 테스트 및 기능 테스트
from airflow.models import DAG # DAG를 정의하기 위한 클래스
DAG_PATH = os.path.join(
os.path.dirname(__file__), "..", "..", "dags/**/*.py"
)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)
# 발견된 모든 파이썬 파일에 대해 테스트 실행하는 데커레이터
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
assert dag_objects
for dag in dag_objects:
# Test cycles
dag.test_cycle()
>>> 코드에서 두가지 검사
- assert dag_objects로 /dags 경로 모든 파이썬 파일에 dag 객체가 적어도 하나 이상 포함되있는지 ( 유효성 검사 )
- for dag in dag_objects : dag.test_cycle() 로 dag 객체 순환 주기 존재 여부
직접 검사 방법 추가
# dag 이름이 import, export로 시작하는 것 검사
assert dag.dag_id.startswith(("import", "export"))
DAG 무결성 테스트
# pytest 실행 ( 경로를 옵션으로 주고 다른 위치를 탐색하지 않도록 함 )
pytest /opt/airflow/tests/

(1-2) CI/CD 파이프라인 설정하기
- CI/CD 파이프라인 : 코드 저장소를 통해 코드 변경시 사전 정의된 스크립트를 실행하는 시스템
- 지속적 통합(CI) : 변경 코드가 코딩 표준과 테스트 조건을 준수하는지 확인 및 검증
- CI : Flake8, pylint, black 같은 코드 검사기를 통해 코드 품질을 확인하고 일련의 테스트 실행
- 지속적 배포(CD) : 자동화된 코드를 프로덕션 시스템에 자동으로 배포, 코딩 생산성 극대화
- CI/CD는 매우 다양하며 우린 GitHub Action를 실습해볼거임
- 보통 CI/CD 시스템은 파이프라인이 정의된 YAML 파일로 시작
- 성공 파이프 라인 만 마스터에 병합
GitHub Action 파이프라인 예시

- 깃허브가 push를 수신할 떄마다 전체 CI/CD 파이프라인 실행
(1-3) 단위 테스트 작성 ( 8장의 movielens_operator.py를 가지고 테스트 진행 )
Pytest 구성
- test 스크립트는 test_ 접두어가 있어야됨
- Pytest 목업을 사용해 실제 호출을 발생하지 않고 테스트 실행
- 목업 설치 : pip install pytest-mock
- Pytest 에서 목업 사용시 인수를 테스트 함수에 전달
from airflow.models import Connection
from airflow.operators.bash import BashOperator
from airflowbook.operators.movielens_operator import (
MovielensPopularityOperator,
MovielensHook,
)
def test_movielenspopularityoperator(mocker):
mock_get = mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(conn_id="test", login="airflow", password="airflow"),
)
task = MovielensPopularityOperator(
task_id="test_id",
conn_id="testconn",
start_date="2015-01-01",
end_date="2015-01-03",
top_n=5,
)
result = task.execute(context=None)
assert len(result) == 5
assert mock_get.call_count == 1
mock_get.assert_called_with("testconn")
def test_example():
task = BashOperator(task_id="test", bash_command="echo 'hello!'", xcom_push=True)
result = task.execute(context={})
assert result == "hello!"
(1-4) 디스크 파일로 테스트
JSON >>> CSV 형식으로 만드는 예제 에서 JsonToCsvOperator을 통해 csv로 저장파이썬 임시 저장소 관련 작업을 위한 tempfile 모듈을 통해 파일저장하지 않고 테스트
import csv
import json
from pathlib import Path
from airflowbook.operators.json_to_csv_operator import JsonToCsvOperator
def test_json_to_csv_operator(tmp_path: Path):
print(tmp_path.as_posix())
input_path = tmp_path / "input.json"
output_path = tmp_path / "output.csv"
# Write input data to tmp path
input_data = [
{"name": "bob", "age": "41", "sex": "M"},
{"name": "alice", "age": "24", "sex": "F"},
{"name": "carol", "age": "60", "sex": "F"},
]
with open(input_path, "w") as f:
f.write(json.dumps(input_data))
# Run task
operator = JsonToCsvOperator(
task_id="test", input_path=input_path, output_path=output_path
)
operator.execute(context={})
# Read result
with open(output_path, "r") as f:
reader = csv.DictReader(f)
result = [dict(row) for row in reader]
# Assert
assert result == input_data
2. dag 밑 태스크 콘텍스트로 작업
import datetime
import pytest
from airflow.models import DAG, BaseOperator
pytest_plugins = ["helpers_namespace"]
@pytest.fixture
def test_dag():
return DAG(
"test_dag",
default_args={"owner": "airflow", "start_date": datetime.datetime(2015, 1, 1)},
schedule_interval="@daily",
)
@pytest.helpers.register
def run_airflow_task(task: BaseOperator, dag: DAG):
dag.clear()
task.run(
start_date=dag.default_args["start_date"],
end_date=dag.default_args["start_date"],
ignore_ti_state=True,
)
- dag 객체가 모든 테스트는 test_dag() 함수 실행시 test_dag를 추가해 인스턴스 가능
(2-1) 외부 시스템 작업
api와 postgres 데이터베이스 연결 테스트시, 로컬에는 접근이 안되는 환경에서
pytest-docker-tools를 사용해 테스트 ( 테스트 도커 컨테이너 생성 )
import os
import pytest
from airflow.models import Connection
from pytest_docker_tools import fetch, container
from airflowbook.operators.movielens_operator import (
MovielensHook,
MovielensToPostgresOperator,
PostgresHook,
)
postgres_image = fetch(repository="postgres:11.1-alpine")
postgres = container(
image="{postgres_image.id}",
environment={"POSTGRES_USER": "testuser", "POSTGRES_PASSWORD": "testpass"},
ports={"5432/tcp": None},
volumes={
os.path.join(os.path.dirname(__file__), "postgres-init.sql"): {
"bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
}
},
)
def test_movielens_to_postgres_operator(mocker, test_dag, postgres):
mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(conn_id="test", login="airflow", password="airflow"),
)
mocker.patch.object(
PostgresHook,
"get_connection",
return_value=Connection(
conn_id="postgres",
conn_type="postgres",
host="localhost",
login="testuser",
password="testpass",
port=postgres.ports["5432/tcp"][0],
),
)
task = MovielensToPostgresOperator(
task_id="test",
movielens_conn_id="movielens_id",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
postgres_conn_id="postgres_id",
insert_query=(
"INSERT INTO movielens (movieId,rating,ratingTimestamp,userId,scrapeTime) "
"VALUES ({0}, '{{ macros.datetime.now() }}')"
),
dag=test_dag,
)
pg_hook = PostgresHook()
row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
assert row_count == 0
pytest.helpers.run_airflow_task(task, test_dag)
row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
assert row_count > 0
3. 개발을 위한 테스트 사용
Whirl을 이용한 프로덕션 환경 에뮬레이션
- 도커 컨테이너에서 운영환경 모든 구성요소를 시뮬레이션
- 도커 컴포즈로 괸리하는것
- 단점은 도커 이미지 제공이 안됨..ㅠㅠ
DTAP 환경 생성
- 격리된 DTAP 환경 설정
- 단점은 구현이 매우 복잡,,,
'데이터 엔지니어( 실습 정리 ) > airflow' 카테고리의 다른 글
| (11) - 효율적인 dag 모범 사례 (0) | 2024.09.10 |
|---|---|
| (10) - 컨테이너에서 태스크 실행 (1) | 2024.09.02 |
| (8) - 커스텀 컴포넌트 빌드 (7) | 2024.08.28 |
| (7) - airflow 외부 시스템 통신 (1) | 2024.08.26 |
| (6) - airflow 워크플로 트리거 (0) | 2024.08.18 |