쿠버네티스,쿠버플로우

던전앤파이터 시세 예측[1]

세용용용용 2023. 8. 20. 16:10

0. 일단 기준 시간을 한국시간으로 먼저 변경해주자

1. 현재 시간과 날짜 표시

timedatectl >>> 역시나 서울로 안되있음 서울로 바꿔줘야됨

2. sudo timedatectl set-timezone "Asia/Seoul" >>> 서울로 바꿔주자

3. 변경후 timedatectl로 바뀌었는지 확인해주며됨

 

1. 데이터 수집

 

1. api가져와서 csv파일로 저장하는 스파크 코드

test.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import current_timestamp, year, month, dayofmonth, hour
import requests

# 스파크 세션 생성
spark = SparkSession.builder \
        .master("yarn") \
        .appName("DataProcessing") \
        .getOrCreate()

# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/auction-sold"

# 파라미터 설정
params = {
    "itemName": "균열의 단편",
    "wordType": "match",
    "wordShort": "false",
    "limit": 30,
    "apikey": "JUG54ELPbKttanbis2VPFNqC9LJOM7v4"
}

# GET 요청 보내기
response = requests.get(api_url, params=params)

# 응답 데이터 확인
if response.status_code == 200:
    data = response.json()
    # 데이터 처리 및 출력
    #print(data)
else:
    print("API 요청 실패:", response.status_code)

test1 = data.get("rows")
formatted_data = [{"soldDate": row["soldDate"], "itemName": row["itemName"], "unitPrice": row["unitPrice"]} for row in test1]

# 데이터프레임 생성
df = spark.createDataFrame(formatted_data)

# 불필요한 컬럼 제거
df = df.drop("soldDate")

# 수집한 시간 컬럼 추가
df = df.withColumn("now_time", current_timestamp())
df = df.withColumn("year", year("now_time"))
df = df.withColumn("month", month("now_time"))
df = df.withColumn("day", dayofmonth("now_time"))
df = df.withColumn("hour", hour("now_time"))

# now_time 컬럼을 timestamp 형식으로 변환
df = df.withColumn("now_time", col("now_time").cast("timestamp"))

# unitPrice 컬럼을 숫자 형식으로 변환
df = df.withColumn("unitPrice", col("unitPrice").cast(DoubleType()))

# 시간이 같은 항목들의 unitPrice를 평균으로 합치기
grouped_df = df.groupBy("now_time", "itemName","year", "month", "day", "hour").avg("unitPrice").withColumnRenamed("avg(unitPrice)", "unitPrice")

# 원하는 순서로 컬럼 선택
grouped_df = grouped_df.select("now_datetime", "itemName", "unitPrice")

# CSV 파일로 저장
csv_path = "donpa_data.csv"
grouped_df.write.csv(csv_path, header=True, mode="overwrite")

# 스파크 세션 종료
spark.stop()

print("데이터를 CSV 파일로 저장했습니다.")

 

 

2. api가져와서 기존 csv파일과 합치고 저장하는 스파크 코드

vim test1.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
import requests
import os # os 모듈 임포트

# 스파크 세션 생성
spark = SparkSession.builder \
        .master("yarn") \
        .appName("DataProcessing") \
        .getOrCreate()

# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/auction-sold"

# 파라미터 설정
params = {
    "itemName": "균열의 단편",
    "wordType": "match",
    "wordShort": "false",
    "limit": 50,
    "apikey": "JUG54ELPbKttanbis2VPFNqC9LJOM7v4"
}

# GET 요청 보내기
response = requests.get(api_url, params=params)

# 응답 데이터 확인
if response.status_code == 200:
    data = response.json()
    # 데이터 처리 및 출력
    #print(data)
else:
    print("API 요청 실패:", response.status_code)

test1 = data.get("rows")
formatted_data = [{"soldDate": row["soldDate"], "itemName": row["itemName"], "unitPrice": row["unitPrice"]} for row in test1]

# 데이터프레임 생성
df = spark.createDataFrame(formatted_data)

# soldDate 컬럼을 timestamp 형식으로 변환
df = df.withColumn("soldDate", col("soldDate").cast("timestamp"))

# unitPrice 컬럼을 숫자 형식으로 변환
df = df.withColumn("unitPrice", col("unitPrice").cast(DoubleType()))

# soldDate가 같은 항목들의 unitPrice를 평균으로 합치기
grouped_df = df.groupBy("soldDate", "itemName").avg("unitPrice").withColumnRenamed("avg(unitPrice)", "unitPrice")

# 이전 데이터프레임 읽어오기
previous_df = spark.read.option("header", "true").csv("hdfs:///user/ubuntu/donpa_data.csv/*.csv")

# 두 데이터프레임을 수직으로 합치기
merged_df = previous_df.union(grouped_df)

# CSV 파일로 저장
grouped_df.write.csv("new_donpa.csv", header=True, mode="overwrite")

# 기존 디렉토리 삭제
os.system("hdfs dfs -rm -r donpa_data.csv")

# 새로운 디렉터리 이름 변경
os.system("hdfs dfs -mv new_donpa.csv donpa_data.csv")

print("성공")

# 스파크 세션 종료
spark.stop()

 

 

3. 1번,2번 합친코드 : 이전 데이터프레임 있으면 합쳐서 저장 없으면 그냥 저장하기

vim test3.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import current_timestamp, year, month, dayofmonth, hour
import requests
import os

# 파이썬 스크립트 내에서 외부 명령어를 실행하고 그 결과를 처리하는 기능
import subprocess

# 스파크 세션 생성
spark = SparkSession.builder \
        .master("yarn") \
        .appName("DataProcessing") \
        .getOrCreate()

# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/auction-sold"

# 파라미터 설정
params = {
    "itemName": "균열의 단편",
    "wordType": "match",
    "wordShort": "false",
    "limit": 30,
    "apikey": "JUG54ELPbKttanbis2VPFNqC9LJOM7v4"
}

# GET 요청 보내기
response = requests.get(api_url, params=params)

# 응답 데이터 확인
if response.status_code == 200:
    data = response.json()
else:
    print("API 요청 실패:", response.status_code)

test1 = data.get("rows")
formatted_data = [{"soldDate": row["soldDate"], "itemName": row["itemName"], "unitPrice": row["unitPrice"]} for row in test1]

# 데이터프레임 생성
df = spark.createDataFrame(formatted_data)

# 불필요한 컬럼 제거
df = df.drop("soldDate")

# 수집한 시간 컬럼 추가
df = df.withColumn("now_time", current_timestamp())
df = df.withColumn("year", year("now_time"))
df = df.withColumn("month", month("now_time"))
df = df.withColumn("day", dayofmonth("now_time"))
df = df.withColumn("hour", hour("now_time"))

# now_time 컬럼을 timestamp 형식으로 변환
df = df.withColumn("now_time", col("now_time").cast("timestamp"))

# unitPrice 컬럼을 숫자 형식으로 변환
df = df.withColumn("unitPrice", col("unitPrice").cast(DoubleType()))

# 시간대 같은 항목들의 unitPrice를 평균으로 합치기
grouped_df = df.groupBy("now_time", "itemName","year", "month", "day", "hour").avg("unitPrice").withColumnRenamed("avg(unitPrice)", "unitPrice")

# 원하는 순서로 컬럼 선택
grouped_df = grouped_df.select("now_datetime", "itemName", "unitPrice")

try:
    previous_df = spark.read.option("header", "true").csv("hdfs:///user/ubuntu/donpa_data.csv/*.csv")
except:
    previous_df = None

if previous_df is not None:
    # now_time 컬럼을 timestamp 형식으로 변환
    previous_df = previous_df.withColumn("now_time", col("now_time").cast("timestamp"))

    # unitPrice 컬럼을 숫자 형식으로 변환
    previous_df = previous_df.withColumn("unitPrice", col("unitPrice").cast(DoubleType()))

    # 두 데이터프레임을 수직으로 합치기
    merged_df = previous_df.union(grouped_df)

    # 시간대 같은 항목들의 unitPrice를 평균으로 합치기
    merged_df = merged_df.groupBy("now_time", "itemName","year", "month", "day", "hour").avg("unitPrice").withColumnRenamed("avg(unitPrice)", "unitPrice")

    # soldDate 시간 순으로 정렬
    merged_df = merged_df.orderBy("now_time")

    # CSV 파일로 저장
    merged_df.write.csv("new_donpa.csv", header=True, mode="overwrite")

    # 기존 디렉토리 삭제
    os.system("hdfs dfs -rm -r donpa_data.csv")

    # 새로운 디렉터리 이름 변경
    os.system("hdfs dfs -mv new_donpa.csv donpa_data.csv")
else:
    # grouped_df로 바로 저장
    grouped_df.write.csv("donpa_data.csv", header=True, mode="overwrite")

print("성공")

# 스파크 세션 종료
spark.stop()

spark-submit --master yarn --deploy-mode client test3.py

 

hdfs dfs -ls donpa_data.csv >>> 데이터가 들어갔는지 확인

hdfs dfs -cat donpa_data.csv/*.csv >>> 데이터 확인

저장된것을 확인할수 있다

 

이제 해당 데이터를 5분마다 csv로 저장하고

저장된 csv파일을 mongo db에 저장을 해보자

 

 

2. 몽고db

nn1에서 pymongo 모듈을 설치 >>> pip install pymongo

 

1) --packages 옵션을 사용하여 Maven 중앙 저장소에서 MongoDB Spark Connector를 다운로드하고 관리하도록 설정

spark-submit --master yarn     --deploy-mode client     --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0     mongo.py

 

 

1. 몽고db 설치 밎 설정

먼저 aws에서 몽고db 인스턴스 생성해주고 밑에거 수행

1. apt저장소 추가
wget -qO - https://www.mongodb.org/static/pgp/server-5.0.asc | sudo apt-key add -

2. apt저장소 등록
echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu $(lsb_release -cs)/mongodb-org/5.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-5.0.list

3. 패키지 목록 업데이트
sudo apt update

4. 몽고db 설치
sudo apt install -y mongodb-org

5. 몽고db 서비스 시작
sudo systemctl start mongod

6. 부팅 시 자동 시작 설정
sudo systemctl enable mongod

7. 몽고db 접속
쉘에 >>> mongo

8. 데이터베이스 생성
use donpa

9. 컬렉션 생성
db.createCollection("donpa_data")

10. db, 컬렉션 생성 확인 명령어
show dbs, show collections

 

새로운 사용자를 생성하고 해당 데이터베이스에 대한 권한을 할당

use donpa

db.createUser({
  user: "sy0218",
  pwd: "1234",
  roles: [
    { role: "readWrite", db: "donpa" }
  ]
})

 

몽고db 외부에서 접속 허용하기

1. mongod.conf 파일 편집: MongoDB 구성 파일(mongod.conf)을 열어서 bindIp 설정을 변경합니다.
sudo nano /etc/mongod.conf

2.bindIp 설정 변경: bindIp 설정을 원하는 값으로 변경합니다. 예를 들어, 모든 IP 주소로부터의 연결을 허용하려면 다음과 같이 설정
bindIp: 0.0.0.0

3. MongoDB 재시작: 설정을 변경한 후에는 MongoDB 서비스를 재시작
sudo service mongod restart

 

특정 IP 주소 허용: 몽고DB의 포트(기본적으로 27017)를 특정 IP 주소에서만 접근 가능하도록 방화벽 설정을 변경

sudo apt-get update
sudo apt-get install ufw
sudo ufw enable

 

특정 IP 주소 허용: sudo ufw allow from <특정 IP 주소> to any port 27017

특정 IP 주소 허용: 몽고DB의 포트(기본적으로 27017)를 특정 IP 주소에서만 접근 가능하도록 방화벽 설정을 변경

 

sudo ufw status >>> 설정확인

 

3. 몽고db에 스파크로 수집한 던파csv 넣어보자

일단 데이터가 들어가는지 테스트를 해보자

vim mongo_test.py

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col
import pymongo

# Spark 세션 생성
spark = SparkSession.builder \
    .master("yarn") \
    .appName("MongoDBExample") \
    .getOrCreate()

# 예제 데이터 생성
data = [Row(id=1, name="Alice", age=28),
        Row(id=2, name="Bob", age=35),
        Row(id=3, name="Charlie", age=22)]

# 데이터프레임 생성
df = spark.createDataFrame(data)

# MongoDB에 연결
client = pymongo.MongoClient("mongodb://172.31.7.197:27017/")
db = client["test"]
collection = db["test_data"]

# 데이터프레임을 MongoDB에 저장
df.write.format("mongo").mode("overwrite").option("uri", "mongodb://172.31.7.197:27017/test.test_data").save()

# 저장된 데이터 조회
mongo_data = list(collection.find({}))
for doc in mongo_data:
    print(doc)

# Spark 세션 종료
spark.stop()

스파크에서 MongoDB에 저장할 때는 추가적인 라이브러리가 필요합니다. 최신 버전의 스파크에서는 spark-mongodb 라이브러리를 사용하여 MongoDB에 데이터를 저장할 수 있다!!!

 

1) --packages 옵션을 사용하여 Maven 중앙 저장소에서 MongoDB Spark Connector를 다운로드하고 관리하도록 설정

spark-submit --master yarn     --deploy-mode client     --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0     mongo_test.py

 

명령어 실행후 들어갔는지 확인해보자(mongo 인스턴스에서)

use test >>> db사용

db.test_data.find() >>> db.test 컬렉션의 데이터 확인하기

 

 

이제 실제 던파csv파일을 몽고db에 저장해보자

vim mongo1.py

from pyspark.sql import SparkSession
import pymongo

# 스파크 세션 생성
spark = SparkSession.builder \
    .master("yarn") \
    .appName("CSV to MongoDB") \
    .getOrCreate()

# HDFS에서 CSV 파일 읽어오기
csv_path = "hdfs:///user/ubuntu/donpa_data.csv/*.csv"
csv_df = spark.read.option("header", "true").csv(csv_path)


# MongoDB에 연결
client = pymongo.MongoClient("mongodb://172.31.7.197:27017/")
db = client["donpa"]
collection = db["donpa_data"]

# 데이터프레임을 MongoDB에 저장
csv_df.write.format("mongo").mode("overwrite").option("uri", "mongodb://172.31.7.197:27017/donpa.donpa_data").save()

print("성공")

# 스파크 세션 종료
spark.stop()

spark-submit --master yarn     --deploy-mode client     --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0     mongo1.py >>> 명령어 실행

 

실행후 데이터가 들어갔는지 확인( mongo db에서)use donpadb.donpa_data.find()

아주 자알 들어간것을 확인할수 있다!!!

 

 

 

4. Apache Airflow를 사용하여 10분마다 스파크 애플리케이션을 실행하는 작업을 수행해보자

- 아파치 에어플로우는 워크플로우 자동화 및 스케줄링 도구

 

1. airflow를 설치후 실행, 이후 웹ui에 접속해 작업을 정의

(1) pip install apache-airflow >>> airflow 설치

(2) echo 'export PATH="$PATH:$HOME/.local/bin"' >> ~/.bashrc : airflow 명령이 설치된 경로가 PATH 환경 변수에 추가,

추가하지 않으면 나중에 airflow 명령을 사용할때 경로를 입력해야하는 불편함 초래...

source ~/.bashrc >> 변경사항 적용

(3) airflow db init : airflow 초기화

(4) cd airflow >>> vim airflow.cfg 

9898포트로 변경해주자

timezone두 서울로 바꿔주자

false로 수정

rm -rf ./airflow.db

airflow db init

 

(5) airflow users create --username wntpdyd0218 --firstname syoung --lastname ju --role Admin --email sy02229@gmail.com --password 1234
사용자 이름 : wntpdyd0218

비밀번호 : 1234

 

(6) airflow webserver -D >>> 에어플로우 웹서버 백그라운드 실행후http://13.124.183.182:9898/ >>> 접속

 

 

2. DAG 정의 파일을 작성

cd airflow >>> mkdir dags >>> cd dags

dags디렉토리 안에 만들어야됨!!

from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
import pytz

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 8, 21),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'timezone': 'Asia/seoul',
}

dag = DAG(
    'spark_submit_dag1',
    default_args=default_args,
    schedule_interval='2,7,12,17,22,27,32,37,42,47,52,57 * * * *',  # 5분마다 실행
    catchup=False,  # 과거 작업은 실행하지 않음
)

# coin_union_csv.py 실행
spark_submit_task = BashOperator(
    task_id='airflow_load_csv',
    bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client /home/ubuntu/newtest.py',
    dag=dag,
)

# fullcoin_push_mongo.py 실행
push_mongo_task = BashOperator(
    task_id='airflow_push_mongo',
    bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0 /home/ubuntu/mongo1.py',  # 파이썬 >실행
    dag=dag,
)

# Task 간 의존성 설정
spark_submit_task >> push_mongo_task

스파크 실행 경로 확인 하는 방법

1. 환경변수 환인 : echo $SPARK_HOME

2. which 명령어 사용( 스파크 실행 파일의 실제 경로를 찾을 수도 있다 ) : spark-submit

 

chmod +x donpa_data_save_csv.py : 10분마다 실행해야되기에 실행권한 주기

chmod +x mongo1.py : 10분마다 실행해야되기에 실행권한 주기

airflow scheduler -D : scheduler 백그라운드 실행하기

 

 

airflow 웹사이트에 들어가보면(실행시켜주자)

 

최종적으로 몽고db에 데이터가 들어갔는지 확인하자

db.donpa_data.count()

 

 

 

5. 이제 주피터 노트북에서 몽고db 데이터를 가져오고 시계열 분석(LSTM) 을 수행해보자

aws에서 인스턴스 생성( 주피터 )

 

1. 시스템 업데이트

sudo apt update
sudo apt upgrade

 

2. Python 및 pip 설치

sudo apt install python3-pip

3. 주피터 설치

sudo apt  install jupyter

 

4. 주피터 설정파일 생성 및 변경

jupyter notebook --generate-config

vim /home/ubuntu/.jupyter/jupyter_notebook_config.py

 

5. 주피터 실행

jupyter notebook

 

6. 외부에서 접속

http://43.202.3.246:8888/?token=f1fc6e94d76752d42549755b5e19e07cdff73f3a1f39b90e

 

 

1. 접속을 했으면 몽고db데이터를 가져와보자

그전에 몽고db에서 sudo ufw allow from 43.202.3.246 to any port 27017 >>> 주피터 노트북 외부ip 방화벽 허용을 해주고

보안규칙 추가해주자 : 사용자지정tcp, 27017, 모든 ip(편의상)

 

일단 간단하게 연결이 되는지 부터 확인!!!

!pip install pymongo
-----------------------------------------------------------------------------------
from pymongo import MongoClient

# MongoDB 연결 설정
client = MongoClient("mongodb://3.35.175.199:27017/")
db = client["coin"]  # 데이터베이스명

try:
    # 연결 확인
    client.server_info()
    print("MongoDB 연결 성공")
except Exception as e:
    print("MongoDB 연결 실패:", e)

성공한걸 확인!!!

 

이제 데이터를 가져와 간단한 전처리를 해보자

import pandas as pd
from pymongo import MongoClient

# MongoDB 연결 설정
client = MongoClient("mongodb://3.35.175.199:27017/")
db = client["donpa"]  # 데이터베이스명
collection = db["donpa_data"]  # 컬렉션명

# 컬렉션 데이터 조회
data = collection.find()

# 데이터프레임으로 변환
df = pd.DataFrame(data)

# _id 컬럼은 제거해주자
df = df.drop(columns="_id") 

# 출력
print("데이터의 row수:",len(df))


from datetime import datetime
# object타입의 데이터 datetime타입으로 변경하는 코드
df['soldDate'] = pd.to_datetime(df['soldDate'])

# 년월일시분 정보만 추출하여 원하는 형식으로 출력, 10 단위로 변환하고 다시 분 단위로 합치기
df['soldDate'] = df['soldDate'].dt.strftime("%Y-%m-%d %H:") + (df['soldDate'].dt.minute // 10 * 10).astype(str)

# soldDate 다시 datetime타입으로 변경하는 코드
df['soldDate'] = pd.to_datetime(df['soldDate'])

# unitPrice 컬럼 float타입으로 변경
df['unitPrice'] = df['unitPrice'].astype('float')

# soldDate, itemName컬럼으로 묶어주기 unitPrice는 평균으로 ㄱㄱ
df = df.groupby(['soldDate','itemName']).mean('unitPrice')
df.reset_index(drop=False, inplace=True)

#soldDate 컬럼 인덱스로 변경
df.set_index('soldDate', inplace=True)

# 잘 수정했는지 확인
df.info()
df

잘 변경 된것을 확인할수 있다!!!

 

 

nn1에서 crontab -e

매주 월요일마다 로그 지워주기

0 0 * * 1 sudo find /usr/local/spark/logs/ -type f -delete

 

dn1, dn2, dn3 crontab -e

30분마다 캐시 지워주기

*/30 * * * * rm -rf /tmp/hadoop-ubuntu/nm-local-dir/usercache