데이터 엔지니어( 실습 정리 )/airflow
(4) - airflow 콘텍스트 사용
세용용용용
2024. 8. 11. 18:50

Apache Airflow 기반의 데이터 파이프라인 공부한 후 정리한 내용 입니다!!!
1. airflow 콘텍스트 ( 예제 데이터 : 위키피디아 페이지 뷰 )
오퍼레이터에 런타임 시 변수를 삽입하여 동적으로 구성 해보자~~~~
(1-1) 배쉬 오퍼레이터
위키피디아 페이지 뷰 다운로드
import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
# DAG 정의
dag = DAG(
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@hourly",
)
# Task 정의
# 위키피디아 데이터 가져오기 배쉬 오퍼레이터
get_data = BashOperator(
task_id="get_data",
bash_command=(
"curl -o /work/rowdata_airflow/wikipageiews.gz "
"https://dumps.wikimedia.org/other/pageviews/"
"{{ execution_date.year }}/" # 런타임 시 삽입될 변수 정의
"{{ '{:02}'.format(execution_date.hour) }}0000.gz"
),
dag=dag,
)
- 중괄호는 jinja 텔플릿 문자열 : 런타임 시에 문자열 변수 및 표현식을 대체하는 템플릿 엔진
- airflow는 날짜 시간에 Pendulum 라이브러리 사용
(1-2) 태스크 콘텍스트 출력
그럼 템플릿 어떤 인자를 사용할수 있지???????
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag=DAG(
dag_id="chapter4_print_context",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)
def _print_context(**kwargs):
print(kwargs)
print_context=PythonOperator(
task_id="print_context",
python_callable=_print_context,
dag=dag,
)
- task 콘텍스트 에서 사용 가능한 모든 변수 리스트 출력( 딕셔너리 형태로 출력됨 )
- 모든 변수는 **kwargs에 저장되 print() 함수에 전달
(1-3) 파이썬 오퍼레이터
from urllib import request
import airflow from airflow
import DAG from airflow.operators.python import PythonOperator
dag=DAG(
dag_ip="stocksense",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
)
def _get_data(execution_date):
year, month, day, hour, *_=execution_date.timetuple()
url=(
"https://dumps.wilimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/"
f"pageviews-{year}-{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
output_path="/work/rowdata_airflow/wikipageviews.gz"
request.urlretirieve(url, output_path)
get_date=PythonOperator(
task_id="get_data",
python_callable=_get_data,
dag=dag,
)
####### 모든 테스크 컨텍스트 이용하기 #######
def _get_data(**context):
year, month, day, hour, *_=execution_date.timetuple()
url=(
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pagebiews-{year}{month:0>2}{day:>2}-{hous:0>2}.gz"
)
output_path="/work/rowdata_airflow/wikipageviews.gz"
request.urlretirieve(url, output_path)
_get_data=PythonOperator(
task_id="get_data",
python_callable=_get_data,
dag=dag,
)
####### 파이썬 오퍼레이터 kwargs 인수 전달 #######
## **_ 인자는 나머지 인자는 무시한다는 의미 ##
def _get_data(year, month, day, hour, output_path, **_):
url=(
"https://dumps/wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}"
f"pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data=PythonOperator(
task_id="get_data",
python-callable=_get_data,
op_kwargs={
"year":"{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/work/rowdata_airflow/wikipageviews.gz",
},
dag=dag,
)
(1-4) 템플릿 인수 검사하기
airflow UI 에서 작업 실행후 Rendered Template 버튼 클릭해 템플릿 인수 값 검사 가능
ui에서는 airflow 다음 작업 스케줄까지 기다려야됨....
CLI로 확인하는 방법!!! (메타 스토어에 아무것도 등록되지 않아 간편하게 확인 가능)
airflow tasks render [dag id] [task id] [desired execution date]
2. 다른 시스템 연결
- requirements.txt
apache-airflow==2.9.3
requests==2.31.0
pandas==1.3.5
numpy==1.21.5
- dag
import airflow.utils.dates
from airflow import DAG
from urllib import request
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
# DAG 정의
dag = DAG(
dag_id="wiki_dag",
start_date=airflow.utils.dates.days_ago(1), # 하루 전
#schedule_interval="@hourly", # 매시간 마다
schedule_interval=timedelta(hours=12),
#schedule_interval="0 12 * * *",
template_searchpath="/opt/airflow/data"
)
# Task 정의
# 위키피디아 데이터 가져오는 task ( bash 오퍼레이터 )
get_data = BashOperator(
task_id="get_data",
bash_command=(
"pre_hour=$(date -d '{{ execution_date.strftime('%Y-%m-%d %H') }} -2 hours' '+%Y%m%d-%H0000') && "
"curl -o /opt/airflow/data/pageviews-${pre_hour}.gz "
"https://dumps.wikimedia.org/other/pageviews/"
"{{ execution_date.year }}/"
"{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
"pageviews-${pre_hour}.gz"
),
dag=dag,
)
# 압축 해제하는 task ( bash 오퍼레이터 )
extract_gz=BashOperator(
task_id="extract_gz",
bash_command=(
"pre_hour=$(date -d '{{ execution_date.strftime('%Y-%m-%d %H') }} -2 hours' '+%Y%m%d-%H0000') && "
"gunzip --force /opt/airflow/data/pageviews-${pre_hour}.gz"
),
dag=dag,
)
# 파일에서 특정 도메인 view 카운트 세고, insert문 파일 만드는 함수
def _fetch_pageviews(pagenames, year, month, day, hour):
result=dict.fromkeys(pagenames, 0)
with open(f"/opt/airflow/data/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000","r") as f:
for line in f:
domain_code, page_title, view_counts, _ = line.split(" ") # 데이터 구분자 공백임
if domain_code == "en" and page_title in pagenames:
result[page_title]+=int(view_counts)
# 최종 결과값 쓰기
with open(f"/opt/airflow/data/postgres_query.sql", "w") as f:
for pagename, pageviewcount in result.items():
f.write(
"INSERT INTO pageview_counts VALUES ("
f"'{pagename}', '{pageviewcount}', '{year}{month:0>2}{day:0>2}{hour:0>2}'"
") ON CONFLICT (pagename, pageviewcount, datetime) DO UPDATE SET pageviewcount = EXCLUDED.pageviewcount,datetime = EXCLUDED.datetime;\n"
)
# 파일 특정 도메인 뷰 카운트 집계헤서 insert 파일 만드는 task ( python 오퍼레이터 )
fetch_pageviews=PythonOperator(
task_id="fetch_pageviews",
python_callable=_fetch_pageviews,
op_kwargs={
"pagenames": {
"Google",
"Amamzon",
"Apple",
"Microsoft",
"Facebook",
},
"year":"{{ (execution_date - macros.timedelta(hours=2)).year }}",
"month": "{{ (execution_date - macros.timedelta(hours=2)).month }}",
"day": "{{ (execution_date - macros.timedelta(hours=2)).day }}",
"hour": "{{ (execution_date - macros.timedelta(hours=2)).hour }}"
},
dag=dag,
)
def render_template(**kwargs):
execution_date = kwargs['execution_date']
adjusted_date = execution_date - timedelta(hours=2)
rendered_str = adjusted_date.strftime("/opt/airflow/data/postgres_query-%Y%m%d-%H.sql")
return rendered_str
render_task = PythonOperator(
task_id='render_template',
python_callable=render_template,
provide_context=True,
dag=dag,
)
# postgres 테이블에 데이터 insert 하는 task ( postgres 오퍼레이터 )
from airflow.providers.postgres.operators.postgres import PostgresOperator
write_to_postgres=PostgresOperator(
task_id="write_to_postgres",
postgres_conn_id="10_postgres",
sql="postgres_query.sql",
dag=dag,
)
# 의존성 정의
get_data >> extract_gz >> fetch_pageviews >> render_task >> write_to_postgres
(2-1) db 연결 설정
airflow ui에서 admin >> connections

- 조건 삽입후 생성
(2-2) 최종 dag 실행 and db 저장 확인


- 위와 같이 postgreslOperator 실행시 PostgresOperator은 postgres와 통신하기 위해 훅 이라는 불리는 것을 인스턴스화함
- 훅은 Postgres에 쿼리를 전송하고 연결에 대한 종료 작업 처리
- 오퍼레이터는 사용자 요청을 훅으로 전달하는 작업만 담당
3. 요약
- 오퍼레이터 일부 인수 템플릿화 가능
- 템플릿 작업은 런타임시 실행
- 템플릿 인수 결과는 airflow tasks render로 확인 가능
- 오퍼레이터는 훅을 통해 타 시스템과 통신 가능
- 오퍼레이터는 무엇을 해야하는지, 훅은 작업 방법 결정