데이터 엔지니어( 실습 정리 )/airflow

(4) - airflow 콘텍스트 사용

세용용용용 2024. 8. 11. 18:50

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!

 

 

1. airflow 콘텍스트 ( 예제 데이터 : 위키피디아 페이지 뷰 )

오퍼레이터에 런타임 시 변수를 삽입하여 동적으로 구성 해보자~~~~

 

(1-1) 배쉬 오퍼레이터

위키피디아 페이지 뷰 다운로드 

import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator

# DAG 정의
dag = DAG(
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@hourly",
)

# Task 정의
# 위키피디아 데이터 가져오기 배쉬 오퍼레이터
get_data = BashOperator(
    task_id="get_data",
    bash_command=(
        "curl -o /work/rowdata_airflow/wikipageiews.gz "
        "https://dumps.wikimedia.org/other/pageviews/"
        "{{ execution_date.year }}/"  # 런타임 시 삽입될 변수 정의
        "{{ '{:02}'.format(execution_date.hour) }}0000.gz"
    ),
    dag=dag,
)
  • 중괄호는 jinja 텔플릿 문자열 : 런타임 시에 문자열 변수 및 표현식을 대체하는 템플릿 엔진
  • airflow는 날짜 시간에 Pendulum 라이브러리 사용

 

 

(1-2) 태스크 콘텍스트 출력

그럼 템플릿 어떤 인자를 사용할수 있지???????

import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator

dag=DAG(
    dag_id="chapter4_print_context",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
)

def _print_context(**kwargs):
    print(kwargs)

print_context=PythonOperator(
    task_id="print_context",
    python_callable=_print_context,
    dag=dag,
)
  • task 콘텍스트 에서 사용 가능한 모든 변수 리스트 출력( 딕셔너리 형태로 출력됨 )
  • 모든 변수는 **kwargs에 저장되 print() 함수에 전달

 

 

(1-3) 파이썬 오퍼레이터

from urllib import request
import airflow from airflow
import DAG from airflow.operators.python import PythonOperator

dag=DAG(
    dag_ip="stocksense",
    start_date=airflow.utils.dates.days_ago(1),
    schedule_interval="@hourly",
)

def _get_data(execution_date):
    year, month, day, hour, *_=execution_date.timetuple()
    url=(
        "https://dumps.wilimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}/"
        f"pageviews-{year}-{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    output_path="/work/rowdata_airflow/wikipageviews.gz"
    request.urlretirieve(url, output_path)

get_date=PythonOperator(
    task_id="get_data",
    python_callable=_get_data,
    dag=dag,
)


####### 모든 테스크 컨텍스트 이용하기 #######
def _get_data(**context):
    year, month, day, hour, *_=execution_date.timetuple()
    url=(
        "https://dumps.wikimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}/pagebiews-{year}{month:0>2}{day:>2}-{hous:0>2}.gz"
    )
    output_path="/work/rowdata_airflow/wikipageviews.gz"
    request.urlretirieve(url, output_path)

_get_data=PythonOperator(
    task_id="get_data",
    python_callable=_get_data,
    dag=dag,
)


####### 파이썬 오퍼레이터 kwargs 인수 전달 #######
## **_ 인자는 나머지 인자는 무시한다는 의미 ##
def _get_data(year, month, day, hour, output_path, **_):
    url=(
        "https://dumps/wikimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}"
        f"pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    request.urlretrieve(url, output_path)

get_data=PythonOperator(
   task_id="get_data",
   python-callable=_get_data,
   op_kwargs={
   	"year":"{{ execution_date.year }}",
    "month": "{{ execution_date.month }}",
    "day": "{{ execution_date.day }}",
    "hour": "{{ execution_date.hour }}",
    "output_path": "/work/rowdata_airflow/wikipageviews.gz",
   },
   dag=dag,
)

 

(1-4) 템플릿 인수 검사하기

airflow UI 에서 작업 실행후 Rendered Template 버튼 클릭해 템플릿 인수 값 검사 가능

ui에서는 airflow 다음 작업 스케줄까지 기다려야됨....

 

CLI로 확인하는 방법!!! (메타 스토어에 아무것도 등록되지 않아 간편하게 확인 가능)

airflow tasks render [dag id] [task id] [desired execution date]

 

 

2. 다른 시스템 연결

  1. requirements.txt
apache-airflow==2.9.3
requests==2.31.0
pandas==1.3.5
numpy==1.21.5

 

  1. dag
import airflow.utils.dates
from airflow import DAG
from urllib import request
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

# DAG 정의
dag = DAG(
    dag_id="wiki_dag",
    start_date=airflow.utils.dates.days_ago(1), # 하루 전
    #schedule_interval="@hourly", # 매시간 마다
    schedule_interval=timedelta(hours=12),
    #schedule_interval="0 12 * * *",
    template_searchpath="/opt/airflow/data"
)


# Task 정의
# 위키피디아 데이터 가져오는 task ( bash 오퍼레이터 )
get_data = BashOperator(
    task_id="get_data",
    bash_command=(
        "pre_hour=$(date -d '{{ execution_date.strftime('%Y-%m-%d %H') }} -2 hours' '+%Y%m%d-%H0000') && "
        "curl -o /opt/airflow/data/pageviews-${pre_hour}.gz "
        "https://dumps.wikimedia.org/other/pageviews/"
        "{{ execution_date.year }}/"
        "{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
        "pageviews-${pre_hour}.gz"
    ),
    dag=dag,
)


# 압축 해제하는 task ( bash 오퍼레이터 )
extract_gz=BashOperator(
    task_id="extract_gz",
    bash_command=(
        "pre_hour=$(date -d '{{ execution_date.strftime('%Y-%m-%d %H') }} -2 hours' '+%Y%m%d-%H0000') && "
        "gunzip --force /opt/airflow/data/pageviews-${pre_hour}.gz"
    ),
    dag=dag,
)


# 파일에서 특정 도메인 view 카운트 세고, insert문 파일 만드는 함수
def _fetch_pageviews(pagenames, year, month, day, hour):
    result=dict.fromkeys(pagenames, 0)
    with open(f"/opt/airflow/data/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000","r") as f:
        for line in f:
            domain_code, page_title, view_counts, _ = line.split(" ") # 데이터 구분자 공백임
            if domain_code == "en" and page_title in pagenames:
                result[page_title]+=int(view_counts)

    # 최종 결과값 쓰기
    with open(f"/opt/airflow/data/postgres_query.sql", "w") as f:
        for pagename, pageviewcount in result.items():
            f.write(
                "INSERT INTO pageview_counts VALUES ("
                f"'{pagename}', '{pageviewcount}', '{year}{month:0>2}{day:0>2}{hour:0>2}'"
                ") ON CONFLICT (pagename, pageviewcount, datetime) DO UPDATE SET pageviewcount = EXCLUDED.pageviewcount,datetime = EXCLUDED.datetime;\n"
            )
# 파일 특정 도메인 뷰 카운트 집계헤서 insert 파일 만드는 task ( python 오퍼레이터 )
fetch_pageviews=PythonOperator(
    task_id="fetch_pageviews",
    python_callable=_fetch_pageviews,
    op_kwargs={
        "pagenames": {
            "Google",
            "Amamzon",
            "Apple",
            "Microsoft",
            "Facebook",
        },
        "year":"{{ (execution_date - macros.timedelta(hours=2)).year }}",
        "month": "{{ (execution_date - macros.timedelta(hours=2)).month }}",
        "day": "{{ (execution_date - macros.timedelta(hours=2)).day }}",
        "hour": "{{ (execution_date - macros.timedelta(hours=2)).hour }}"
    },
    dag=dag,
)


def render_template(**kwargs):
    execution_date = kwargs['execution_date']
    adjusted_date = execution_date - timedelta(hours=2)
    rendered_str = adjusted_date.strftime("/opt/airflow/data/postgres_query-%Y%m%d-%H.sql")
    return rendered_str
render_task = PythonOperator(
    task_id='render_template',
    python_callable=render_template,
    provide_context=True,
    dag=dag,
)



# postgres 테이블에 데이터 insert 하는 task ( postgres 오퍼레이터 )
from airflow.providers.postgres.operators.postgres import PostgresOperator
write_to_postgres=PostgresOperator(
    task_id="write_to_postgres",
    postgres_conn_id="10_postgres",
    sql="postgres_query.sql",
    dag=dag,
)

# 의존성 정의
get_data >> extract_gz >> fetch_pageviews >> render_task >> write_to_postgres

 

(2-1) db 연결 설정

airflow ui에서 admin >> connections

  • 조건 삽입후 생성

 

 

(2-2) 최종 dag 실행 and db 저장 확인

 

  • 위와 같이 postgreslOperator 실행시 PostgresOperator은 postgres와 통신하기 위해 훅 이라는 불리는 것을 인스턴스화함
  • 훅은 Postgres에 쿼리를 전송하고 연결에 대한 종료 작업 처리
  • 오퍼레이터는 사용자 요청을 훅으로 전달하는 작업만 담당

 

3. 요약

  • 오퍼레이터 일부 인수 템플릿화 가능
  • 템플릿 작업은 런타임시 실행
  • 템플릿 인수 결과는 airflow tasks render로 확인 가능
  • 오퍼레이터는 훅을 통해 타 시스템과 통신 가능
  • 오퍼레이터는 무엇을 해야하는지, 훅은 작업 방법 결정