본문 바로가기
쿠버네티스,쿠버플로우

던전앤파이터 시세 예측[4]

by 세용용용용 2023. 9. 2.

이제 아바타 데이터를 수집해보자( 레어, 상급 이 두종류 데이터만 수집할거임 )

 

레어 아바타 수집

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
import requests

# 스파크 세션 생성
spark = SparkSession.builder \
        .master("yarn") \
        .appName("aabataProcessing") \
        .getOrCreate()

# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/avatar-market/sold"

# 파라미터 설정
params = {
        "limit": 50,
        "q": 'avatarSet:true,avatarRarity:레어',
        "apikey": "Q3A8yWb7Un1oXuM7uC5nIRV6zzGL23YP"
}

# GET 요청 보내기
response = requests.get(api_url, params=params)

# 응답 데이터 확인
if response.status_code == 200:
    data = response.json()
    # 데이터 처리 및 출력
    #print(data)
else:
    print("API 요청 실패:", response.status_code)

test1 = data.get("rows")

# 필요한 필드만 추출하여 저장할 리스트
extracted_data = []

for item in test1:
    title = item['title']
    jobname = item['jobName']
    price = item['price']
    ava_rit = item['avatarRarity']
    emblem = item['emblem']['name']
    soldDate = item['soldDate']

    extracted_data.append({'title': title, 'price': price, 'ava_rit': ava_rit, 'jobname': jobname,
                           'emblem': emblem, 'soldDate': soldDate})

# 데이터 프레임 생성
df = spark.createDataFrame(extracted_data)

# 원하는 순서로 컬럼 선택
df = df.select('soldDate', 'title', 'ava_rit', 'jobname', 'emblem', 'price')

# 중복 레코드 제거
df = df.dropDuplicates()

# soldDate 컬럼을 기준으로 정렬
df = df.orderBy("soldDate")

# csv파일로 저장
csv_path = "donpa_aabata_rare.csv"
df.write.csv(csv_path, header=True, mode="overwrite")

# 스파크 세션 종료
spark.stop()

print("데이터를 csv파일로 저장했습니다")

spark-submit --master yarn --deploy-mode client aabata_first_rare_csv.py

 

상급 아바타 수집

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import to_timestamp
import requests

# 스파크 세션 생성
spark = SparkSession.builder \
        .master("yarn") \
        .appName("aabataProcessing") \
        .getOrCreate()

# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/avatar-market/sold"

# 파라미터 설정
params = {
        "limit": 50,
        "q": 'avatarSet:true,avatarRarity:레어',
        "apikey": "Q3A8yWb7Un1oXuM7uC5nIRV6zzGL23YP"
}

# GET 요청 보내기
response = requests.get(api_url, params=params)

# 응답 데이터 확인
if response.status_code == 200:
    data = response.json()
    # 데이터 처리 및 출력
    #print(data)
else:
    print("API 요청 실패:", response.status_code)

test1 = data.get("rows")

# 필요한 필드만 추출하여 저장할 리스트
extracted_data = []

for item in test1:
    title = item['title']
    jobname = item['jobName']
    price = item['price']
    ava_rit = item['avatarRarity']
    emblem = item['emblem']['name']
    soldDate = item['soldDate']

    extracted_data.append({'title': title, 'price': price, 'ava_rit': ava_rit, 'jobname': jobname,
                           'emblem': emblem, 'soldDate': soldDate})

# 데이터 프레임 생성
df = spark.createDataFrame(extracted_data)

# 원하는 순서로 컬럼 선택
df = df.select('soldDate', 'title', 'ava_rit', 'jobname', 'emblem', 'price')

# 중복 레코드 제거
df = df.dropDuplicates()

# soldDate 컬럼을 기준으로 정렬
df = df.orderBy("soldDate")

# csv파일로 저장
csv_path = "donpa_aabata_sang.csv"
df.write.csv(csv_path, header=True, mode="overwrite")

# 스파크 세션 종료
spark.stop()

print("데이터를 csv파일로 저장했습니다")

spark-submit --master yarn --deploy-mode client aabata_first_sang_csv.py

 

 

저장된것을 확인

 

 

이제 수집해서 기존 hdsf데이터와 합치는 코드

 

레어아바타 합치는 코드

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
import requests
import os


# 스파크 세션 생성
spark = SparkSession.builder \
        .master("yarn") \
        .appName("aabataProcessing") \
        .getOrCreate()

# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/avatar-market/sold"

# 파라미터 설정
params = {
        "limit": 50,
        "q": 'avatarSet:true,avatarRarity:레어',
        "apikey": "Q3A8yWb7Un1oXuM7uC5nIRV6zzGL23YP"
}

# GET 요청 보내기
response = requests.get(api_url, params=params)

# 응답 데이터 확인
if response.status_code == 200:
    data = response.json()
    # 데이터 처리 및 출력
    #print(data)
else:
    print("API 요청 실패:", response.status_code)

test1 = data.get("rows")

# 필요한 필드만 추출하여 저장할 리스트
extracted_data = []

for item in test1:
    title = item['title']
    jobname = item['jobName']
    price = item['price']
    ava_rit = item['avatarRarity']
    emblem = item['emblem']['name']
    soldDate = item['soldDate']

    extracted_data.append({'title': title, 'price': price, 'ava_rit': ava_rit, 'jobname': jobname,
                           'emblem': emblem, 'soldDate': soldDate})

# 데이터 프레임 생성
df = spark.createDataFrame(extracted_data)

# 원하는 순서로 컬럼 선택
df = df.select('soldDate', 'title', 'ava_rit', 'jobname', 'emblem', 'price')

# 중복 레코드 제거
df = df.dropDuplicates()

# soldDate 컬럼을 기준으로 정렬
df = df.orderBy("soldDate")

try:
    previous_df = spark.read.option("header", "true").csv("hdfs:///user/ubuntu/donpa_aabata_rare.csv/*.csv")
except:
    previous_df = None

if previous_df is not None:
	merged_df = previous_df.union(df)
    
    # 중복 레코드 제거
    merged_df = merged_df.dropDuplicates()
    
    # soldDate 컬럼을 기준으로 정렬
    merged_df = merged_df.orderBy("soldDate")
	
    # csv파일로 저장
    merged_df.write.csv("new_donpa_aabata_rare.csv", header=True, mode="overwrite")
    
    # 기존 디렉토리 삭제
    os.system("hdfs dfs -rm -r donpa_aabata_rare.csv")
    
    # 새로운 디렉터리 이름 변경
    os.system("hdfs dfs -mv new_donpa_aabata_rare.csv donpa_aabata_rare.csv")

else:
	# 바로저장
    df.write.csv("donpa_aabata_rare.csv", header=True, mode="overwrite")
    
print("성공")

spark-submit --master yarn --deploy-mode client donpa_aabata_rare_save_csv.py

 

 

상급아바타 합치는 코드

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
import requests
import os


# 스파크 세션 생성
spark = SparkSession.builder \
        .master("yarn") \
        .appName("aabataProcessing") \
        .getOrCreate()

# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/avatar-market/sold"

# 파라미터 설정
params = {
        "limit": 50,
        "q": 'avatarSet:true,avatarRarity:상급',
        "apikey": "Q3A8yWb7Un1oXuM7uC5nIRV6zzGL23YP"
}

# GET 요청 보내기
response = requests.get(api_url, params=params)

# 응답 데이터 확인
if response.status_code == 200:
    data = response.json()
    # 데이터 처리 및 출력
    #print(data)
else:
    print("API 요청 실패:", response.status_code)

test1 = data.get("rows")

# 필요한 필드만 추출하여 저장할 리스트
extracted_data = []

for item in test1:
    title = item['title']
    jobname = item['jobName']
    price = item['price']
    ava_rit = item['avatarRarity']
    emblem = item['emblem']['name']
    soldDate = item['soldDate']

    extracted_data.append({'title': title, 'price': price, 'ava_rit': ava_rit, 'jobname': jobname,
                           'emblem': emblem, 'soldDate': soldDate})

# 데이터 프레임 생성
df = spark.createDataFrame(extracted_data)

# 원하는 순서로 컬럼 선택
df = df.select('soldDate', 'title', 'ava_rit', 'jobname', 'emblem', 'price')

# 중복 레코드 제거
df = df.dropDuplicates()

# soldDate 컬럼을 기준으로 정렬
df = df.orderBy("soldDate")

try:
    previous_df = spark.read.option("header", "true").csv("hdfs:///user/ubuntu/donpa_aabata_sang.csv/*.csv")
except:
    previous_df = None

if previous_df is not None:
    merged_df = previous_df.union(df)

    # 중복 레코드 제거
    merged_df = merged_df.dropDuplicates()

    # soldDate 컬럼을 기준으로 정렬
    merged_df = merged_df.orderBy("soldDate")

    # csv파일로 저장
    merged_df.write.csv("new_donpa_aabata_sang.csv", header=True, mode="overwrite")

    # 기존 디렉토리 삭제
    os.system("hdfs dfs -rm -r donpa_aabata_sang.csv")

    # 새로운 디렉터리 이름 변경
    os.system("hdfs dfs -mv new_donpa_aabata_sang.csv donpa_aabata_sang.csv")

else:
    # 바로저장
    df.write.csv("donpa_aabata_sang.csv", header=True, mode="overwrite")

print("성공")

spark-submit --master yarn --deploy-mode client donpa_aabata_sang_save_csv.py

 

 

레어 아바타csv 몽고db에 저장하는 코드

일단 몽고db에서 컬렉션을 생성하자

use donpadb.createCollection("donpa_aabata_rare") >>> 컬렉션 생성하는 코드

from pyspark.sql import SparkSession
import pymongo

# 스파크 세션 생성
spark = SparkSession.builder \
    .master("yarn") \
    .appName("aabata rare to mongo") \
    .getOrCreate()

# HDFS에서 CSV 파일 읽어오기
csv_path = "hdfs:///user/ubuntu/donpa_aabata_rare.csv/*.csv"
csv_df = spark.read.option("header", "true").csv(csv_path)

# MongoDB에 연결
client = pymongo.MongoClient("mongodb://172.31.13.233:27017/")
db = client["donpa"]
collection = db["donpa_aabata_rare"]

# 데이터프레임을 MongoDB에 저장
csv_df.write.format("mongo").mode("overwrite").option("uri", "mongodb://172.31.13.233:27017/donpa.donpa_aabata_rare").save()

print("성공")

# 스파크 세션 종료
spark.stop()

spark-submit --master yarn --deploy-mode client --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 donpa_rare_mongo.py

 

몽고db에서 저장되었는지 확인 ㄱㄱ >>> db.donpa_aabata_rare.find()

 

 

상급 아바타csv 몽고db에 저장하는 코드

일단 몽고db에서 컬렉션을 생성하자

use donpa

db.createCollection("donpa_aabata_sang") >>> 컬렉션 생성하는 코드

from pyspark.sql import SparkSession
import pymongo

# 스파크 세션 생성
spark = SparkSession.builder \
    .master("yarn") \
    .appName("aabata sang to mongo") \
    .getOrCreate()

# HDFS에서 CSV 파일 읽어오기
csv_path = "hdfs:///user/ubuntu/donpa_aabata_sang.csv/*.csv"
csv_df = spark.read.option("header", "true").csv(csv_path)

# MongoDB에 연결
client = pymongo.MongoClient("mongodb://172.31.13.233:27017/")
db = client["donpa"]
collection = db["donpa_aabata_sang"]

# 데이터프레임을 MongoDB에 저장
csv_df.write.format("mongo").mode("overwrite").option("uri", "mongodb://172.31.13.233:27017/donpa.donpa_aabata_sang").save()

print("성공")

# 스파크 세션 종료
spark.stop()

spark-submit --master yarn --deploy-mode client --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 donpa_sang_mongo.py

 

몽고db에서 저장되었는지 확인 ㄱㄱ >>> db.donpa_aabata_sang.find()

 

 

이제 Apache Airflow를 사용해 스파크 애플리케이션을 주기적으로 실행시켜보자

설치는 이전에 진행하였으니 바로 파일을 작성하자

 

DAG 정의 파일을 작성

1. 레어아바타 스파크 애플리케이션 실행시키는 dags파일

 

vim airflow_aabata_rare.py

from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
import pytz

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 9, 6),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'timezone': 'Asia/seoul',
}

dag = DAG(
    'spark_aabata_rare_dag',
    default_args=default_args,
    schedule_interval='30 * * * *',  # 정각 마다 실행
    catchup=False,  # 과거 작업은 실행하지 않음
)

# coin_union_csv.py 실행
spark_submit_task = BashOperator(
    task_id='aabata_rare_load_csv',
    bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client /home/ubuntu/donpa_data_save_csv/donpa_aabata_rare_save_csv.py',
    dag=dag,
)

# fullcoin_push_mongo.py 실행
push_mongo_task = BashOperator(
    task_id='aabata_rare_push_mongo',
    bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 /home/ubuntu/csv_push_mongo/donpa_rare_mongo.py',  # 파이썬 >실>행
    dag=dag,
)


# Task 간 의존성 설정
spark_submit_task >> push_mongo_task

 

 

2. 상급아바타 스파크 애플리케이션 실행시키는 dags파일

 

vim airflow_aabata_sang.py

from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
import pytz

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 9, 6),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'timezone': 'Asia/seoul',
}

dag = DAG(
    'spark_aabata_sang_dag',
    default_args=default_args,
    schedule_interval='30 * * * *',  # 정각 마다 실행
    catchup=False,  # 과거 작업은 실행하지 않음
)

# coin_union_csv.py 실행
spark_submit_task = BashOperator(
    task_id='aabata_sang_load_csv',
    bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client /home/ubuntu/donpa_data_save_csv/donpa_aabata_sang_save_csv.py',
    dag=dag,
)

# fullcoin_push_mongo.py 실행
push_mongo_task = BashOperator(
    task_id='aabata_sang_push_mongo',
    bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 /home/ubuntu/csv_push_mongo/donpa_sang_mongo.py',  # 파이썬 >실>행
    dag=dag,
)


# Task 간 의존성 설정
spark_submit_task >> push_mongo_task