데이터 엔지니어( 실습 정리 )

hdfs던파 아이템 데이터 데이터 마트에 저장(1)

세용용용용 2023. 10. 6. 22:30

1) 최근 10개의 csv를가져와서 데이터 마트인 RDB에 넣어보자

1) 최근 10개 데이터 가져와 merge시켜 데이터프레임으로 출력시켜보자

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
# SparkSession 생성
spark = SparkSession.builder \
        .master('yarn') \
        .appName('show_df') \
        .getOrCreate()


import subprocess

# HDFS 디렉토리 경로
hdfs_directory = "/donpa1_spark_stream"
# HDFS 디렉토리 내 파일 목록 가져오기
hdfs_ls_command = f"hdfs dfs -ls {hdfs_directory}"
result = subprocess.run(hdfs_ls_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# 가져온 파일 목록을 줄 단위로 분할
file_lines = result.stdout.strip().split('\n')
# 파일 목록을 시간 역순으로 정렬
sorted_files = sorted(file_lines, key=lambda line: line.split()[-2], reverse=True)

# 스키마 정의
schema = StructType([
    StructField("soldDate", TimestampType(), True),
    StructField("itemName", StringType(), True),
    StructField("count", IntegerType(), True),
    StructField("unitPrice", DoubleType(), True)
])

# 빈 데이터 프레임 생성
result_df = spark.createDataFrame([], schema)

index_num = 2
hdfs_path_list = []
for i in range(10):
    # 파일 가져오기
    file = sorted_files[index_num]
    # 파일 경로 파싱
    file_path = file.split()[-1]
    # hdfs경로 리스트에 append
    hdfs_path_list.append(file_path)
    # index_num 증감
    index_num += 1


for i in hdfs_path_list:
    df = spark.read.option('header', 'true').csv(f'hdfs://{i}')
    result_df = df.union(result_df)
    
# DataFrame을 사용하여 원하는 작업을 수행할 수 있습니다.
result_df.show()

spark.stop()

스파크 애플이케이션 실행 >>> spark-submit --master yarn donpa_test.py

 

2) 최근 10개의 데이터 프레임을 데이터 마트인 RDB에 넣어보자

마리아DB 설치할 인스턴스 생성해주자

# 시스템 패키지 정보를 업데이트
sudo apt update

# 마리아 db설치
sudo apt install mariadb-server

# MariaDB 서버를 시작합니다
sudo systemctl start mariadb

# 부팅 시 자동으로 시작하도록 MariaDB를 활성화
sudo systemctl enable mariadb

# MariaDB에 root 사용자로 로그인
sudo mysql -u root

# root 사용자의 비밀번호를 설정
ALTER USER 'root'@'localhost' IDENTIFIED BY '<new_password>';

# 권한을 새로 고침
FLUSH PRIVILEGES;

# MariaDB에서 데이터베이스를 생성
CREATE DATABASE donpa_datamart1;

# 해당 데이터베이스 사용
use donpa_datamart1

# 데이터베이스를 삭제
DROP DATABASE donpa_datamart1;

 

마리아DB에서 모든 통신을 허용

sudo vim /etc/mysql/mariadb.conf.d/50-server.cnf

0.0.0.0 으로 변경

마리아DB 서비스를 재시작하여 변경 사항을 적용 >>> sudo systemctl restart mariadb

마리아DB 3306포트에서 실행중인지 확인 >>> sudo ss -tuln | grep 3306

 

nn1에서 접속 테스트 해보기

sudo apt install mariadb-client-core-10.3 >>> 마리아db 클라이언트 설치



마리아db에서
mysql -u root -p >> 접속후
SELECT host, user FROM mysql.user; >> 호스트 접근권한 확인

# 172.31.41.230 접근권한 설정
GRANT ALL PRIVILEGES ON *.* TO 'root'@'172.31.41.230' IDENTIFIED BY '1234' WITH GRANT OPTION

FLUSH PRIVILEGES; >>> 변경사항 적용


nn1에서 접속해보기
mysql -h 172.31.39.99 -u root -p

성공

 

1) mariadb-java-client 설치

mkdir mariadb_jdbc

sudo apt-get install wget
wget https://downloads.mariadb.com/Connectors/java/connector-java-3.1.0/mariadb-java-client-3.1.0.jar

 

스파크 jars에 복사

sudo cp /home/ubuntu/mariadb_jdbc/mariadb-java-client-3.1.0.jar /usr/local/spark/jars/

 

잘 복사 되었는지 확인

ls /usr/local/spark/jars/mariadb-java-client-3.1.0.jar

 

2) 데이터 프레임 마리아db에 저장하는 코드

from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder \
    .appName('Write to MaríaDB') \
    .getOrCreate()

# MaríaDB 연결 정보 설정
mariadb_url = "jdbc:mysql://172.31.41.219:3306/donpa_datamart1"
mariadb_properties = {
    "user": "root",
    "password": "1234",
    "driver": "org.mariadb.jdbc.Driver"
}

# 데이터프레임 생성 (이 부분은 여러분의 데이터프레임 생성 코드로 대체하세요)
# 아래는 예시로 빈 데이터프레임을 생성하는 부분입니다.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

schema = StructType([
    StructField("soldDate", TimestampType(), True),
    StructField("itemName", StringType(), True),
    StructField("count", IntegerType(), True),
    StructField("unitPrice", DoubleType(), True)
])

data = [(None, "Item1", 7, 5.0),
        (None, "Item2", 9, 10.0)]

df = spark.createDataFrame(data, schema)

df.show()
# MaríaDB에 데이터프레임 쓰기
df.write \
    .jdbc(url=mariadb_url, table="your_table_name", mode="overwrite", properties=mariadb_properties)

# 스파크 애플리케이션 종료
spark.stop()

스파크 애플리케이션 실행 >>> spark-submit --master yarn --deploy-mode client --jars /home/ubuntu/mariadb_jdbc/mariadb-java-client-3.1.0.jar mariadb_save_test.py

 

이슈 발생 이슈발생 ㅠㅠㅠ

mariadb-java-client 2.7.0으로 해야 호환이 되는 문제 발견!!!!

다시 깔아주고 해당 버전으로 실행하자

 

 

 

최종적으로 최근 10개 거래데이터 마리아db에 넣는 코드

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

spark = SparkSession.builder \
        .master('yarn') \
        .appName('show_df') \
        .getOrCreate()

# MaríaDB 연결 정보 설정
mariadb_url = "jdbc:mysql://172.31.41.219:3306/donpa_datamart1"
mariadb_properties = {
    "user": "root",
    "password": "1234",
    "driver": "org.mariadb.jdbc.Driver"
}

# Python 스크립트 내에서 외부 명령을 실행하고 그 결과를 가져올 수 있습
import subprocess

# HDFS 디렉토리 경로
hdfs_directory = "/donpa1_spark_stream"
# HDFS 디렉토리 내 파일 목록 가져오기
hdfs_ls_command = f"hdfs dfs -ls {hdfs_directory}"
result = subprocess.run(hdfs_ls_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# 가져온 파일 목록을 줄 단위로 분할
file_lines = result.stdout.strip().split('\n')
# 파일 목록을 시간 역순으로 정렬
sorted_files = sorted(file_lines, key=lambda line: line.split()[-2], reverse=True)

# 스키마 정의
schema = StructType([
    StructField("soldDate", TimestampType(), True),
    StructField("itemName", StringType(), True),
    StructField("count", IntegerType(), True),
    StructField("unitPrice", DoubleType(), True)
])

# 빈 데이터 프레임 생성
result_df = spark.createDataFrame([], schema)

index_num = 2
hdfs_path_list = []
for i in range(10):
    # 파일 가져오기
    file = sorted_files[index_num]
    # 파일 경로 파싱
    file_path = file.split()[-1]
    # hdfs경로 리스트에 append
    hdfs_path_list.append(file_path)
    # index_num 증감
    index_num += 1


for i in hdfs_path_list:
    df = spark.read.option('header', 'true').csv(f'hdfs://{i}')
    result_df = df.union(result_df)

# DataFrame을 사용하여 원하는 작업을 수행할 수 있습니다.
result_df.show()

# MaríaDB에 데이터프레임 쓰기
result_df.write \
    .jdbc(url=mariadb_url, table="your_table_name", mode="overwrite", properties=mariadb_properties)

# 스파크 애플리케이션 종료
spark.stop()

spark-submit --master yarn --deploy-mode client --jars /usr/local/spark/jars/mariadb-java-client-2.7.0.jar mariadb_save_test.py >>> 스파크 애플리케이션 실행 해보자

 

마리아db에서 확인

select * from your_table_name;

좋다 좋다 드디어 해결 버전 호환 문제였음 ㅎㅎㅎ

 

이제 해당 코드를 airflow를 사용하여 워크 플로우 설정하자