이제 아바타 데이터를 수집해보자( 레어, 상급 이 두종류 데이터만 수집할거임 )
레어 아바타 수집
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
import requests
# 스파크 세션 생성
spark = SparkSession.builder \
.master("yarn") \
.appName("aabataProcessing") \
.getOrCreate()
# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/avatar-market/sold"
# 파라미터 설정
params = {
"limit": 50,
"q": 'avatarSet:true,avatarRarity:레어',
"apikey": "Q3A8yWb7Un1oXuM7uC5nIRV6zzGL23YP"
}
# GET 요청 보내기
response = requests.get(api_url, params=params)
# 응답 데이터 확인
if response.status_code == 200:
data = response.json()
# 데이터 처리 및 출력
#print(data)
else:
print("API 요청 실패:", response.status_code)
test1 = data.get("rows")
# 필요한 필드만 추출하여 저장할 리스트
extracted_data = []
for item in test1:
title = item['title']
jobname = item['jobName']
price = item['price']
ava_rit = item['avatarRarity']
emblem = item['emblem']['name']
soldDate = item['soldDate']
extracted_data.append({'title': title, 'price': price, 'ava_rit': ava_rit, 'jobname': jobname,
'emblem': emblem, 'soldDate': soldDate})
# 데이터 프레임 생성
df = spark.createDataFrame(extracted_data)
# 원하는 순서로 컬럼 선택
df = df.select('soldDate', 'title', 'ava_rit', 'jobname', 'emblem', 'price')
# 중복 레코드 제거
df = df.dropDuplicates()
# soldDate 컬럼을 기준으로 정렬
df = df.orderBy("soldDate")
# csv파일로 저장
csv_path = "donpa_aabata_rare.csv"
df.write.csv(csv_path, header=True, mode="overwrite")
# 스파크 세션 종료
spark.stop()
print("데이터를 csv파일로 저장했습니다")
spark-submit --master yarn --deploy-mode client aabata_first_rare_csv.py
상급 아바타 수집
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import to_timestamp
import requests
# 스파크 세션 생성
spark = SparkSession.builder \
.master("yarn") \
.appName("aabataProcessing") \
.getOrCreate()
# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/avatar-market/sold"
# 파라미터 설정
params = {
"limit": 50,
"q": 'avatarSet:true,avatarRarity:레어',
"apikey": "Q3A8yWb7Un1oXuM7uC5nIRV6zzGL23YP"
}
# GET 요청 보내기
response = requests.get(api_url, params=params)
# 응답 데이터 확인
if response.status_code == 200:
data = response.json()
# 데이터 처리 및 출력
#print(data)
else:
print("API 요청 실패:", response.status_code)
test1 = data.get("rows")
# 필요한 필드만 추출하여 저장할 리스트
extracted_data = []
for item in test1:
title = item['title']
jobname = item['jobName']
price = item['price']
ava_rit = item['avatarRarity']
emblem = item['emblem']['name']
soldDate = item['soldDate']
extracted_data.append({'title': title, 'price': price, 'ava_rit': ava_rit, 'jobname': jobname,
'emblem': emblem, 'soldDate': soldDate})
# 데이터 프레임 생성
df = spark.createDataFrame(extracted_data)
# 원하는 순서로 컬럼 선택
df = df.select('soldDate', 'title', 'ava_rit', 'jobname', 'emblem', 'price')
# 중복 레코드 제거
df = df.dropDuplicates()
# soldDate 컬럼을 기준으로 정렬
df = df.orderBy("soldDate")
# csv파일로 저장
csv_path = "donpa_aabata_sang.csv"
df.write.csv(csv_path, header=True, mode="overwrite")
# 스파크 세션 종료
spark.stop()
print("데이터를 csv파일로 저장했습니다")
spark-submit --master yarn --deploy-mode client aabata_first_sang_csv.py
저장된것을 확인

이제 수집해서 기존 hdsf데이터와 합치는 코드
레어아바타 합치는 코드
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
import requests
import os
# 스파크 세션 생성
spark = SparkSession.builder \
.master("yarn") \
.appName("aabataProcessing") \
.getOrCreate()
# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/avatar-market/sold"
# 파라미터 설정
params = {
"limit": 50,
"q": 'avatarSet:true,avatarRarity:레어',
"apikey": "Q3A8yWb7Un1oXuM7uC5nIRV6zzGL23YP"
}
# GET 요청 보내기
response = requests.get(api_url, params=params)
# 응답 데이터 확인
if response.status_code == 200:
data = response.json()
# 데이터 처리 및 출력
#print(data)
else:
print("API 요청 실패:", response.status_code)
test1 = data.get("rows")
# 필요한 필드만 추출하여 저장할 리스트
extracted_data = []
for item in test1:
title = item['title']
jobname = item['jobName']
price = item['price']
ava_rit = item['avatarRarity']
emblem = item['emblem']['name']
soldDate = item['soldDate']
extracted_data.append({'title': title, 'price': price, 'ava_rit': ava_rit, 'jobname': jobname,
'emblem': emblem, 'soldDate': soldDate})
# 데이터 프레임 생성
df = spark.createDataFrame(extracted_data)
# 원하는 순서로 컬럼 선택
df = df.select('soldDate', 'title', 'ava_rit', 'jobname', 'emblem', 'price')
# 중복 레코드 제거
df = df.dropDuplicates()
# soldDate 컬럼을 기준으로 정렬
df = df.orderBy("soldDate")
try:
previous_df = spark.read.option("header", "true").csv("hdfs:///user/ubuntu/donpa_aabata_rare.csv/*.csv")
except:
previous_df = None
if previous_df is not None:
merged_df = previous_df.union(df)
# 중복 레코드 제거
merged_df = merged_df.dropDuplicates()
# soldDate 컬럼을 기준으로 정렬
merged_df = merged_df.orderBy("soldDate")
# csv파일로 저장
merged_df.write.csv("new_donpa_aabata_rare.csv", header=True, mode="overwrite")
# 기존 디렉토리 삭제
os.system("hdfs dfs -rm -r donpa_aabata_rare.csv")
# 새로운 디렉터리 이름 변경
os.system("hdfs dfs -mv new_donpa_aabata_rare.csv donpa_aabata_rare.csv")
else:
# 바로저장
df.write.csv("donpa_aabata_rare.csv", header=True, mode="overwrite")
print("성공")
spark-submit --master yarn --deploy-mode client donpa_aabata_rare_save_csv.py
상급아바타 합치는 코드
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
import requests
import os
# 스파크 세션 생성
spark = SparkSession.builder \
.master("yarn") \
.appName("aabataProcessing") \
.getOrCreate()
# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/avatar-market/sold"
# 파라미터 설정
params = {
"limit": 50,
"q": 'avatarSet:true,avatarRarity:상급',
"apikey": "Q3A8yWb7Un1oXuM7uC5nIRV6zzGL23YP"
}
# GET 요청 보내기
response = requests.get(api_url, params=params)
# 응답 데이터 확인
if response.status_code == 200:
data = response.json()
# 데이터 처리 및 출력
#print(data)
else:
print("API 요청 실패:", response.status_code)
test1 = data.get("rows")
# 필요한 필드만 추출하여 저장할 리스트
extracted_data = []
for item in test1:
title = item['title']
jobname = item['jobName']
price = item['price']
ava_rit = item['avatarRarity']
emblem = item['emblem']['name']
soldDate = item['soldDate']
extracted_data.append({'title': title, 'price': price, 'ava_rit': ava_rit, 'jobname': jobname,
'emblem': emblem, 'soldDate': soldDate})
# 데이터 프레임 생성
df = spark.createDataFrame(extracted_data)
# 원하는 순서로 컬럼 선택
df = df.select('soldDate', 'title', 'ava_rit', 'jobname', 'emblem', 'price')
# 중복 레코드 제거
df = df.dropDuplicates()
# soldDate 컬럼을 기준으로 정렬
df = df.orderBy("soldDate")
try:
previous_df = spark.read.option("header", "true").csv("hdfs:///user/ubuntu/donpa_aabata_sang.csv/*.csv")
except:
previous_df = None
if previous_df is not None:
merged_df = previous_df.union(df)
# 중복 레코드 제거
merged_df = merged_df.dropDuplicates()
# soldDate 컬럼을 기준으로 정렬
merged_df = merged_df.orderBy("soldDate")
# csv파일로 저장
merged_df.write.csv("new_donpa_aabata_sang.csv", header=True, mode="overwrite")
# 기존 디렉토리 삭제
os.system("hdfs dfs -rm -r donpa_aabata_sang.csv")
# 새로운 디렉터리 이름 변경
os.system("hdfs dfs -mv new_donpa_aabata_sang.csv donpa_aabata_sang.csv")
else:
# 바로저장
df.write.csv("donpa_aabata_sang.csv", header=True, mode="overwrite")
print("성공")
spark-submit --master yarn --deploy-mode client donpa_aabata_sang_save_csv.py
레어 아바타csv 몽고db에 저장하는 코드
일단 몽고db에서 컬렉션을 생성하자
use donpadb.createCollection("donpa_aabata_rare") >>> 컬렉션 생성하는 코드
from pyspark.sql import SparkSession
import pymongo
# 스파크 세션 생성
spark = SparkSession.builder \
.master("yarn") \
.appName("aabata rare to mongo") \
.getOrCreate()
# HDFS에서 CSV 파일 읽어오기
csv_path = "hdfs:///user/ubuntu/donpa_aabata_rare.csv/*.csv"
csv_df = spark.read.option("header", "true").csv(csv_path)
# MongoDB에 연결
client = pymongo.MongoClient("mongodb://172.31.13.233:27017/")
db = client["donpa"]
collection = db["donpa_aabata_rare"]
# 데이터프레임을 MongoDB에 저장
csv_df.write.format("mongo").mode("overwrite").option("uri", "mongodb://172.31.13.233:27017/donpa.donpa_aabata_rare").save()
print("성공")
# 스파크 세션 종료
spark.stop()
spark-submit --master yarn --deploy-mode client --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 donpa_rare_mongo.py
몽고db에서 저장되었는지 확인 ㄱㄱ >>> db.donpa_aabata_rare.find()

상급 아바타csv 몽고db에 저장하는 코드
일단 몽고db에서 컬렉션을 생성하자
use donpa
db.createCollection("donpa_aabata_sang") >>> 컬렉션 생성하는 코드
from pyspark.sql import SparkSession
import pymongo
# 스파크 세션 생성
spark = SparkSession.builder \
.master("yarn") \
.appName("aabata sang to mongo") \
.getOrCreate()
# HDFS에서 CSV 파일 읽어오기
csv_path = "hdfs:///user/ubuntu/donpa_aabata_sang.csv/*.csv"
csv_df = spark.read.option("header", "true").csv(csv_path)
# MongoDB에 연결
client = pymongo.MongoClient("mongodb://172.31.13.233:27017/")
db = client["donpa"]
collection = db["donpa_aabata_sang"]
# 데이터프레임을 MongoDB에 저장
csv_df.write.format("mongo").mode("overwrite").option("uri", "mongodb://172.31.13.233:27017/donpa.donpa_aabata_sang").save()
print("성공")
# 스파크 세션 종료
spark.stop()
spark-submit --master yarn --deploy-mode client --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 donpa_sang_mongo.py
몽고db에서 저장되었는지 확인 ㄱㄱ >>> db.donpa_aabata_sang.find()

이제 Apache Airflow를 사용해 스파크 애플리케이션을 주기적으로 실행시켜보자
설치는 이전에 진행하였으니 바로 파일을 작성하자
DAG 정의 파일을 작성
1. 레어아바타 스파크 애플리케이션 실행시키는 dags파일
vim airflow_aabata_rare.py
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
import pytz
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 9, 6),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'timezone': 'Asia/seoul',
}
dag = DAG(
'spark_aabata_rare_dag',
default_args=default_args,
schedule_interval='30 * * * *', # 정각 마다 실행
catchup=False, # 과거 작업은 실행하지 않음
)
# coin_union_csv.py 실행
spark_submit_task = BashOperator(
task_id='aabata_rare_load_csv',
bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client /home/ubuntu/donpa_data_save_csv/donpa_aabata_rare_save_csv.py',
dag=dag,
)
# fullcoin_push_mongo.py 실행
push_mongo_task = BashOperator(
task_id='aabata_rare_push_mongo',
bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 /home/ubuntu/csv_push_mongo/donpa_rare_mongo.py', # 파이썬 >실>행
dag=dag,
)
# Task 간 의존성 설정
spark_submit_task >> push_mongo_task
2. 상급아바타 스파크 애플리케이션 실행시키는 dags파일
vim airflow_aabata_sang.py
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
import pytz
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 9, 6),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'timezone': 'Asia/seoul',
}
dag = DAG(
'spark_aabata_sang_dag',
default_args=default_args,
schedule_interval='30 * * * *', # 정각 마다 실행
catchup=False, # 과거 작업은 실행하지 않음
)
# coin_union_csv.py 실행
spark_submit_task = BashOperator(
task_id='aabata_sang_load_csv',
bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client /home/ubuntu/donpa_data_save_csv/donpa_aabata_sang_save_csv.py',
dag=dag,
)
# fullcoin_push_mongo.py 실행
push_mongo_task = BashOperator(
task_id='aabata_sang_push_mongo',
bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 /home/ubuntu/csv_push_mongo/donpa_sang_mongo.py', # 파이썬 >실>행
dag=dag,
)
# Task 간 의존성 설정
spark_submit_task >> push_mongo_task
'쿠버네티스,쿠버플로우' 카테고리의 다른 글
| 던전앤파이터 시세 예측[장고(쿠버네티스 올리기)] (0) | 2023.09.10 |
|---|---|
| 던전앤파이터 시세 예측[5] (1) | 2023.09.06 |
| 던전앤파이터 시세 예측[장고] (0) | 2023.09.01 |
| 던전앤파이터 시세 예측[3] (0) | 2023.08.27 |
| 던전앤파이터 시세 예측[2] (1) | 2023.08.24 |