카프카, 스파크 스트리밍 던파 데이터 수집 실습(0)
API 데이터를 주기적으로 Kafka 토픽에 전송하려면 Kafka 프로듀서를 작성하고 이를 스케줄러로 실행하여 주기적으로 API에서 데이터를 가져와야 합니다.
먼저 python-pip부터 설치 >>> sudo apt install python3-pip
confluent-kafka-python 라이브러리를 설치 >>> pip install confluent-kafka
1) Python 스크립트를 작성하여 API 데이터를 Kafka 토픽에 전송
vim donpa_test.py
from confluent_kafka import Producer
import requests
import json
import time
import pandas as pd # pandas 라이브러리 임포트
# 이전 데이터를 추적하기 위한 변수 초기화
last_data = None
# Kafka 설정
kafka_config = {
'bootstrap.servers': 'kafka01:9092,kafka02:9092,kafka03:9092',
'client.id': 'api-producer'
}
# Kafka 프로듀서 생성
producer = Producer(kafka_config)
# API 엔드포인트 및 파라미터 설정
api_url = "https://api.neople.co.kr/df/auction-sold"
params = {
"itemName": "균열의 단편",
"wordType": "match",
"wordShort": "false",
"limit": 1,
"apikey": "JUG54ELPbKttanbis2VPFNqC9LJOM7v4"
}
# 주기적으로 API 호출 및 데이터를 Kafka 토픽에 전송
while True:
try:
response = requests.get(api_url, params=params)
data = response.json()
data = data.get('rows')
data = [{"soldDate": row["soldDate"], "itemName": row["itemName"], "count": row["count"], "unitPrice": row["unitPrice"]} for row in data]
data = data[0]
# 이전 데이터와 중복되지 않는 경우에만 데이터를 전송
if data != last_data:
last_data = data
message = json.dumps(data, ensure_ascii=False).encode('utf-8')
# Kafka 토픽에 데이터 전송
producer.produce("donpa1", key="api_data", value=message)
# 전송한 메시지 확인
producer.poll(0)
print("API 데이터를 Kafka 토픽에 전송했습니다.")
except Exception as e:
print(f"오류 발생: {str(e)}")
# 5분(300초) 대기
time.sleep(10)
스크립트 실행 >>> python3 donpa_test.py
2) 토픽에 들어갔는지 확인(kafka02)
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka:9092,kafka:9092 --topic donpa1 --from-beginning


오우야쓰 성공!!!
3) 토픽에 데이터가 들어갈때마다 스파크 스트리밍을 통해 hdfs데이터 저장
카프카 hosts설정해주기
sudo vi /etc/hosts
127.0.0.1 localhost
172.31.41.230 nn1
172.31.43.215 nn2
172.31.44.88 dn1
172.31.33.238 dn2
172.31.37.126 dn3
172.31.46.129 kafka01
172.31.34.181 kafka02
172.31.36.66 kafka03
# The following lines are desirable for IPv6 capable hosts
::1 ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
ff02::3 ip6-allhosts
1) 스파크 토픽의 데이터를 콘솔에 띄우기
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
sc = SparkSession.builder \
.master('yarn') \
.appName('donpa_spark_stream') \
.getOrCreate()
sc.sparkContext.setLogLevel('INFO')
# read stream
df = sc.readStream.format('kafka') \
.option("kafka.bootstrap.servers", "kafka01:9092,kafka02:9091,kafka03:9092") \
.option("subscribe", "donpa1") \
.option("startingOffsets", "earliest") \
.load()
# JSON 스키마 정의
schema = StructType([
StructField("soldDate", TimestampType(), True),
StructField("itemName", StringType(), True),
StructField("count", IntegerType(), True),
StructField("unitPrice", DoubleType(), True)
])
# value 컬럼을 JSON 파싱
df = df.selectExpr("CAST(value AS STRING) as json")
df = df.select(from_json(df.json, schema).alias("data"))
# display console
query2 = df.select("data.*").writeStream \
.outputMode("append") \
.format('console') \
.foreachBatch(lambda batch_df, batch_id: batch_df.show() if batch_id != 0 else None) \
.option('truncate', False) \
.start()
query2.awaitTermination()
스파크 애플리케이션 실행
spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 donpa_spark_stream1.py
3.5.0 >> 나의 스파크 버전 명시
콘솔 출력 확인

2) 스파크 토픽의 데이터를 hdfs에 저장하기
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
sc = SparkSession.builder \
.master('yarn') \
.appName('donpa_spark_stream') \
.getOrCreate()
sc.sparkContext.setLogLevel('INFO')
# read stream
df = sc.readStream.format('kafka') \
.option("kafka.bootstrap.servers", "kafka01:9092,kafka02:9091,kafka03:9092") \
.option("subscribe", "donpa1") \
.option("startingOffsets", "earliest") \
.load()
# JSON 스키마 정의
schema = StructType([
StructField("soldDate", TimestampType(), True),
StructField("itemName", StringType(), True),
StructField("count", IntegerType(), True),
StructField("unitPrice", DoubleType(), True)
])
# value 컬럼을 JSON 파싱
df = df.selectExpr("CAST(value AS STRING) as json")
df = df.select(from_json(df.json, schema).alias("data"))
# Define a function to process each batch
def process_batch(batch_df, batch_id):
if batch_id == 0:
# Skip processing for batch ID 0
pass
else:
# Your processing logic for other batch IDs
batch_df.write.option("header", "true") \
.format("csv") \
.mode("append") \
.save("/donpa1_spark_stream")
# Write stream - HDFS using foreachBatch
query2 = df.select("data.*").writeStream \
.foreachBatch(process_batch) \
.outputMode("append") \
.option("checkpointLocation", "/check") \
.start()
query2.awaitTermination()
스파크 애플리케이션 실행
spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 spark_test.py
>> 나의 스파크 버전 명시(2.12:3.5.0)
hdfs dfs -ls /donpa1_spark_stream >>> 저장되었는지 확인

3) hdfs최근 저장 파일 콘솔에 출력해보기
from pyspark.sql import SparkSession
# SparkSession 생성
spark = SparkSession.builder \
.master('yarn') \
.appName('show_df') \
.getOrCreate()
import subprocess
# HDFS 디렉토리 경로
hdfs_directory = "/donpa1_spark_stream"
# HDFS 디렉토리 내 파일 목록 가져오기
hdfs_ls_command = f"hdfs dfs -ls {hdfs_directory}"
result = subprocess.run(hdfs_ls_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# 가져온 파일 목록을 줄 단위로 분할
file_lines = result.stdout.strip().split('\n')
# 파일 목록을 시간 역순으로 정렬
sorted_files = sorted(file_lines, key=lambda line: line.split()[-2], reverse=True)
# 가장 최신 파일 가져오기 (첫 번째 파일)
latest_file = sorted_files[2]
# 파일 경로 파싱
latest_file_path = latest_file.split()[-1]
# HDFS 경로를 사용하여 Spark DataFrame을 읽습니다.
latest_df = spark.read.option('header', 'true').csv(f'hdfs://{latest_file_path}')
# DataFrame을 사용하여 원하는 작업을 수행할 수 있습니다.
latest_df.show()
spark.stop()
spark-submit --master yarn donpa_test.py >>> 스파크 애플리케이션 실행
출력화면

4) hdfs에 저장하는 스파크 에플리케이션 백그라운드 실행
spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 /home/ubuntu/donpa_spark_stream/spark_test.py & >>> 스파크 에플리케이션 백그라운드 실행 명령어
nohup spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 /home/ubuntu/donpa_spark_stream/spark_test.py > /dev/null 2>&1 & >>> 표준출력 무시, 백그라운드 실행 명령어
가장 최근 저장된 csv출력해보기
spark-submit --master yarn /home/ubuntu/spark_save_rdb/donpa_test.py

1) 백그라운드 스파크 애플리케이션 종료하는법
ps aux | grep spark_test.py >>> pid확인

kill <PID> >>> 종료
kill 23888 >>> 종료 시키기