본문 바로가기
카테고리 없음

airflow활용 데이터 마트 최신화(2)

by 세용용용용 2023. 10. 6.

1) 주기적으로 최근10개의 CSV가져와 데이터 마트인 RDB를 최신화 시켜주자(airflow 워크플로우 사용)

- 아파치 에어플로우는 워크플로우 자동화 및 스케줄링 도구

 

python3 -m venv airflow-env # 새 가상 환경 생성

source airflow-env/bin/activate # 새 가상 환경 활성화

 

1. airflow를 설치후 실행, 이후 웹ui에 접속해 작업을 정의

(1) pip install apache-airflow >>> airflow 설치

버전 호환 맞춰주기

pip install werkzeug==2.2.2

pip install cachelib==0.9.0

 

(2) echo 'export PATH="$PATH:$HOME/.local/bin"' >> ~/.bashrc : airflow 명령이 설치된 경로가 PATH 환경 변수에 추가,

추가하지 않으면 나중에 airflow 명령을 사용할때 경로를 입력해야하는 불편함 초래...

source ~/.bashrc >> 변경사항 적용

 

(3) airflow db init : airflow 초기화

 

(4) cd airflow >>> vim airflow.cfg 

9898포트로 변경해주자

timezone두 서울로 바꿔주자

false로 수정

rm -rf ./airflow.db

airflow db init

 

(5) airflow users create --username wntpdyd0218 --firstname syoung --lastname ju --role Admin --email sy02229@gmail.com --password 1234
사용자 이름 : wntpdyd0218

비밀번호 : 1234

 

(6) airflow webserver -D >>> 에어플로우 웹서버 백그라운드 실행후

http://13.208.214.61:9898 접속

 

2. DAG 정의 파일을 작성

(1) vi airflow.cfg 

dag디렉토리명 위치 확인후 해당위치에 mkdir dags디렉토리 생성해주기

 

cd dags >>> 해당 위치에서 파일 작성!!

 

donpa_item01.py

from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
import pytz

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 10, 5),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'timezone': 'Asia/seoul',
}

dag = DAG(
    'spark_submit_dag1',
    default_args=default_args,
    schedule_interval='*/30 * * * *',  # 5분마다 실행
    catchup=False,  # 과거 작업은 실행하지 않음
)

#  실행
mariadb_submit_task = BashOperator(
    task_id='mariadb_update',
    bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client --jars /usr/local/spark/jars/mariadb-java-client-2.7.0.jar /home/ubuntu/dbmart_save/mariadb_save_donpa01.py',
    dag=dag,
)


# Task 간 의존성 설정
# mariadb_submit_task >> mariadb_submin_tast1
~

 

airflow scheduler -D : scheduler 백그라운드 실행하기

airflow 웹사이트에 들어가보면(실행시켜주자)

 

데이터 마트인 mariadb가 최신화 되었는지 확인

성공성공

 

이제 30분마다 최신의 던파 아이템 거래 데이터로 데이터 마트가 최신화 될것이다!!!

최종적으로 bi툴을 선정해 30분마다 거래 데이터를 대시보드로 구현해보자