데이터 엔지니어( 실습 정리 )

카프카, 스파크 스트리밍 던파 데이터 수집 실습(0)

세용용용용 2023. 10. 6. 22:29

API 데이터를 주기적으로 Kafka 토픽에 전송하려면 Kafka 프로듀서를 작성하고 이를 스케줄러로 실행하여 주기적으로 API에서 데이터를 가져와야 합니다.

 

먼저 python-pip부터 설치 >>> sudo apt install python3-pip

confluent-kafka-python 라이브러리를 설치 >>> pip install confluent-kafka

 

1) Python 스크립트를 작성하여 API 데이터를 Kafka 토픽에 전송


vim donpa_test.py

from confluent_kafka import Producer
import requests
import json
import time
import pandas as pd  # pandas 라이브러리 임포트

# 이전 데이터를 추적하기 위한 변수 초기화
last_data = None

# Kafka 설정
kafka_config = {
    'bootstrap.servers': 'kafka01:9092,kafka02:9092,kafka03:9092',
    'client.id': 'api-producer'
}

# Kafka 프로듀서 생성
producer = Producer(kafka_config)

# API 엔드포인트 및 파라미터 설정
api_url = "https://api.neople.co.kr/df/auction-sold"
params = {
    "itemName": "균열의 단편",
    "wordType": "match",
    "wordShort": "false",
    "limit": 1,
    "apikey": "JUG54ELPbKttanbis2VPFNqC9LJOM7v4"
}

# 주기적으로 API 호출 및 데이터를 Kafka 토픽에 전송
while True:
    try:
        response = requests.get(api_url, params=params)
        data = response.json()
        data = data.get('rows')
        data = [{"soldDate": row["soldDate"], "itemName": row["itemName"], "count": row["count"], "unitPrice": row["unitPrice"]} for row in data]
        data = data[0]
        # 이전 데이터와 중복되지 않는 경우에만 데이터를 전송
        if data != last_data:
            last_data = data
            message = json.dumps(data, ensure_ascii=False).encode('utf-8')

            # Kafka 토픽에 데이터 전송
            producer.produce("donpa1", key="api_data", value=message)
            # 전송한 메시지 확인
            producer.poll(0)

            print("API 데이터를 Kafka 토픽에 전송했습니다.")

    except Exception as e:
        print(f"오류 발생: {str(e)}")

    # 5분(300초) 대기
    time.sleep(10)

 

스크립트 실행 >>> python3 donpa_test.py

 

 

2) 토픽에 들어갔는지 확인(kafka02)

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka:9092,kafka:9092 --topic donpa1 --from-beginning

오우야쓰 성공!!!

 

3) 토픽에 데이터가 들어갈때마다 스파크 스트리밍을 통해 hdfs데이터 저장

카프카 hosts설정해주기

sudo vi /etc/hosts

127.0.0.1 localhost
172.31.41.230 nn1
172.31.43.215 nn2
172.31.44.88 dn1
172.31.33.238 dn2
172.31.37.126 dn3

172.31.46.129 kafka01
172.31.34.181 kafka02
172.31.36.66 kafka03


# The following lines are desirable for IPv6 capable hosts
::1 ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
ff02::3 ip6-allhosts

 

 

1) 스파크 토픽의 데이터를 콘솔에 띄우기

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

sc = SparkSession.builder \
     .master('yarn') \
     .appName('donpa_spark_stream') \
     .getOrCreate()

sc.sparkContext.setLogLevel('INFO')

# read stream
df =  sc.readStream.format('kafka') \
      .option("kafka.bootstrap.servers", "kafka01:9092,kafka02:9091,kafka03:9092") \
      .option("subscribe", "donpa1") \
      .option("startingOffsets", "earliest") \
      .load()

# JSON 스키마 정의
schema = StructType([
    StructField("soldDate", TimestampType(), True),
    StructField("itemName", StringType(), True),
    StructField("count", IntegerType(), True),
    StructField("unitPrice", DoubleType(), True)
])

# value 컬럼을 JSON 파싱
df = df.selectExpr("CAST(value AS STRING) as json")
df = df.select(from_json(df.json, schema).alias("data"))


# display console
query2 = df.select("data.*").writeStream \
         .outputMode("append") \
         .format('console') \
         .foreachBatch(lambda batch_df, batch_id: batch_df.show() if batch_id != 0 else None) \
         .option('truncate', False) \
         .start()

query2.awaitTermination()

 

스파크 애플리케이션 실행

spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 donpa_spark_stream1.py

3.5.0 >> 나의 스파크 버전 명시

 

콘솔 출력 확인

 

 

2) 스파크 토픽의 데이터를 hdfs에 저장하기

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

sc = SparkSession.builder \
     .master('yarn') \
     .appName('donpa_spark_stream') \
     .getOrCreate()


sc.sparkContext.setLogLevel('INFO')

# read stream
df   = sc.readStream.format('kafka') \
      .option("kafka.bootstrap.servers", "kafka01:9092,kafka02:9091,kafka03:9092") \
      .option("subscribe", "donpa1") \
      .option("startingOffsets", "earliest") \
      .load()

# JSON 스키마 정의
schema = StructType([
    StructField("soldDate", TimestampType(), True),
    StructField("itemName", StringType(), True),
    StructField("count", IntegerType(), True),
    StructField("unitPrice", DoubleType(), True)
])

# value 컬럼을 JSON 파싱
df = df.selectExpr("CAST(value AS STRING) as json")
df = df.select(from_json(df.json, schema).alias("data"))

# Define a function to process each batch
def process_batch(batch_df, batch_id):
    if batch_id == 0:
        # Skip processing for batch ID 0
        pass
    else:
        # Your processing logic for other batch IDs
        batch_df.write.option("header", "true") \
                .format("csv") \
                .mode("append") \
                .save("/donpa1_spark_stream")

# Write stream - HDFS using foreachBatch
query2 = df.select("data.*").writeStream \
            .foreachBatch(process_batch) \
            .outputMode("append") \
            .option("checkpointLocation", "/check") \
            .start()


query2.awaitTermination()

 

스파크 애플리케이션 실행

spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 spark_test.py

 >> 나의 스파크 버전 명시(2.12:3.5.0)

 

hdfs dfs -ls /donpa1_spark_stream >>> 저장되었는지 확인

 

 

 

3) hdfs최근 저장 파일 콘솔에 출력해보기

from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder \
        .master('yarn') \
        .appName('show_df') \
        .getOrCreate()


import subprocess

# HDFS 디렉토리 경로
hdfs_directory = "/donpa1_spark_stream"
# HDFS 디렉토리 내 파일 목록 가져오기
hdfs_ls_command = f"hdfs dfs -ls {hdfs_directory}"
result = subprocess.run(hdfs_ls_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# 가져온 파일 목록을 줄 단위로 분할
file_lines = result.stdout.strip().split('\n')
# 파일 목록을 시간 역순으로 정렬
sorted_files = sorted(file_lines, key=lambda line: line.split()[-2], reverse=True)
# 가장 최신 파일 가져오기 (첫 번째 파일)
latest_file = sorted_files[2]
# 파일 경로 파싱
latest_file_path = latest_file.split()[-1]

# HDFS 경로를 사용하여 Spark DataFrame을 읽습니다.
latest_df = spark.read.option('header', 'true').csv(f'hdfs://{latest_file_path}')

# DataFrame을 사용하여 원하는 작업을 수행할 수 있습니다.
latest_df.show()

spark.stop()

spark-submit --master yarn donpa_test.py >>> 스파크 애플리케이션 실행

 

출력화면

 

4) hdfs에 저장하는 스파크 에플리케이션 백그라운드 실행

spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 /home/ubuntu/donpa_spark_stream/spark_test.py & >>> 스파크 에플리케이션 백그라운드 실행 명령어

 

nohup spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 /home/ubuntu/donpa_spark_stream/spark_test.py > /dev/null 2>&1 & >>> 표준출력 무시, 백그라운드 실행 명령어

 

가장 최근 저장된 csv출력해보기

spark-submit --master yarn /home/ubuntu/spark_save_rdb/donpa_test.py

 

1) 백그라운드 스파크 애플리케이션 종료하는법

ps aux | grep spark_test.py >>> pid확인

 

kill <PID> >>> 종료

kill 23888 >>> 종료 시키기