본문 바로가기
쿠버네티스,쿠버플로우

던파 쿠버플로우, 쿠버네티스(3)

by 세용용용용 2023. 9. 18.

진짜 다왔다 상급아바타 파이프 라인 만들어보자

1. data_load


main.py

import pandas as pd
import pymysql
from pymongo import MongoClient
from sklearn.preprocessing import StandardScaler
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import joblib
from sklearn.preprocessing import OneHotEncoder
import xgboost as xgb
import argparse

import pymysql
# 연결 정보 설정
host = "10.233.18.183"
port = 3306
database = "donpa_item"
user = "root"
password = "1234"

# 연결
connection = pymysql.connect(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database
)

# SQL 쿼리
query = "SELECT * FROM goldprice"
# 데이터프레임으로 변환
date_df = pd.read_sql(query, connection)
# 연결 종료
connection.close


# MongoDB 연결 설정
client = MongoClient("mongodb://3.38.178.84:27017/")
db = client["donpa"]  # 데이터베이스명
collection = db["donpa_aabata_sang"]  # 컬렉션명

# 컬렉션 데이터 조회
data = collection.find()

# 데이터프레임으로 변환
df = pd.DataFrame(data)

# _id 컬럼은 제거해주자
df = df.drop(columns="_id") 

df['price'] = df['price'].astype('float')

df['soldDate'] = df['soldDate'].str.replace('-','')

# sell, buy컬럼 붙여주자!!
df = df.merge(date_df, left_on='soldDate', right_on='date', how='left')

# sell, buy값이 널값이면 가장 최근의 sell, buy값으로 대체해주자
null_check = df['sell'].isnull().sum()
if null_check != 0:
    # date_df의 맨 마지막 값을 가져와서 NaN 값을 채우기
    last_date_df_row = date_df.iloc[-1]
    df['sell'].fillna(last_date_df_row['sell'], inplace=True)
    df['buy'].fillna(last_date_df_row['buy'], inplace=True)

# date컬럼 삭제해주자
df.drop(columns = 'date', inplace=True)
df['soldDate'] = pd.to_datetime(df['soldDate'])

# 년 컬럼 추가
df['year'] = df['soldDate'].dt.year

# 월 컬럼 추가
df['month'] = df['soldDate'].dt.month

# 일 컬럼 추가
df['day'] = df['soldDate'].dt.day

# 요일 컬럼 추가해주자
df['day_name'] = df['soldDate'].dt.day_name()

# 필요 없는 컬럼 삭제
df.drop(columns = 'soldDate', inplace=True)
# ava_rit도 어차피다 레어이니까 삭제
df.drop(columns = 'ava_rit', inplace=True)

# 타겟 컬러 맨뒤로 보내기
price_column = df.pop('price')
df['price'] = price_column

title_data = df['title'].drop_duplicates()
jobname_data = df['jobname'].drop_duplicates()
emblem_data = df['emblem'].drop_duplicates()

import pymysql
# 연결 정보 설정
host = "10.233.18.183"
port = 3306
database = "donpa_item"
user = "root"
password = "1234"

# 연결
connection = pymysql.connect(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database
)

try:
    # 커서 생성
    cursor = connection.cursor()

    # 데이터 입력 (존재하지 않는 데이터만 입력)
    for item in title_data:
        sql = f"INSERT INTO input_list1 (title) SELECT '{item}' FROM DUAL WHERE NOT EXISTS (SELECT * FROM input_list1 WHERE title = '{item}')"
        cursor.execute(sql)

    for item in jobname_data:
        sql = f"INSERT INTO input_list1 (jobname) SELECT '{item}' FROM DUAL WHERE NOT EXISTS (SELECT * FROM input_list1 WHERE jobname = '{item}')"
        cursor.execute(sql)

    for item in emblem_data:
        sql = f"INSERT INTO input_list1 (emblem) SELECT '{item}' FROM DUAL WHERE NOT EXISTS (SELECT * FROM input_list1 WHERE emblem = '{item}')"
        cursor.execute(sql)

    # 커밋
    connection.commit()
except Exception as e:
    # 에러 발생 시 롤백
    connection.rollback()
    print(f"데이터 입력 중 오류 발생: {str(e)}")
finally:
    # 연결 종료
    connection.close()

X_train = df.drop(columns = 'price')
y_train = df['price']


X_train, X_validation, Y_train, Y_validation = train_test_split(X_train,y_train, test_size=0.2, random_state=1)
X_train.reset_index(drop=True, inplace=True)
X_validation.reset_index(drop=True, inplace=True)

obj_col = X_train.select_dtypes(include='object').columns

sds = StandardScaler()
sds.fit(X_train.drop(columns = obj_col))

X_train_sc = sds.transform(X_train.drop(columns = obj_col))
X_train_sc = pd.DataFrame(X_train_sc, columns = X_train.drop(columns = obj_col).columns)

X_validation_sc = sds.transform(X_validation.drop(columns = obj_col))
X_validation_sc = pd.DataFrame(X_validation_sc, columns = X_validation.drop(columns = obj_col).columns)


# object 타입 컬럼 붙여주기
for i in obj_col:
    X_train_sc[i] = X_train[i]
    X_validation_sc[i] = X_validation[i]
# 스케일러 객체 저장
joblib.dump(sds, '/home/jovyan/scaler/sang_scaler.pkl')


# OneHotEncoder 객체 생성
encoder = OneHotEncoder()

X_full = pd.concat([X_train_sc, X_validation_sc])

# 범주형 열만 선택
obj_df = X_full.select_dtypes(include='object')

# 숫자형 열만 선택
no_obj_df = X_full.select_dtypes(exclude='object')

# 범주형 열을 원핫 인코딩
encoded_features = encoder.fit_transform(obj_df)

# 인코딩된 결과를 데이터프레임으로 변환
encoded_df = pd.DataFrame(encoded_features.toarray(), columns=encoder.get_feature_names(obj_df.columns))

# 인코딩된 범주형 열과 숫자형 열을 합침
X_train_sc_ec = pd.concat([no_obj_df[:len(X_train_sc)] , encoded_df[:len(X_train_sc)]], axis = 1)
X_validation_sc_ec = pd.concat([no_obj_df[len(X_train_sc):] , encoded_df[len(X_train_sc):].reset_index(drop=True)], axis = 1)

# 인코딩 객체 저장
joblib.dump(encoder, '/home/jovyan/encoder/sang_encoder.pkl')

import re

# 데이터프레임의 컬럼 이름에서 특수 문자를 제거하고 변경할 새로운 컬럼 이름 리스트 생성
new_columns = []
for old_column in X_train_sc_ec.columns:
    new_column = re.sub(r'[^\w\s]', '', old_column)  # 특수 문자 제거
    new_columns.append(new_column)

# 컬럼 이름을 새로운 이름으로 설정
X_train_sc_ec.columns = new_columns
X_validation_sc_ec.columns = new_columns


X_train_sc_ec.to_csv('/home/jovyan/csv/X_train_sc_ec.csv', index=False)
X_validation_sc_ec.to_csv('/home/jovyan/csv/X_validation_sc_ec.csv', index=False)
Y_train.to_csv('/home/jovyan/csv/Y_train.csv', index=False)
Y_validation.to_csv('/home/jovyan/csv/Y_validation.csv', index=False)

 

dockerfile

# base image
FROM python:3.9

# 작업 디렉토리 설정
WORKDIR /app

# main 파일 작업디렉토리로 복사
COPY main.py /app/main.py
COPY requirements.txt /app

# 필요한 종속성 설치
RUN pip install --no-cache-dir -r requirements.txt

CMD [ "python" , "main.py" ]

 

requirements.txt

pandas==1.1.5
PyMySQL==1.1.0
pymongo
scikit-learn==0.24.2
numpy
joblib==1.1.1
xgboost==1.5.2

 

 

2. 카팁 이미지, 카팁 야물파일

카팁이미지

main.py

import pandas as pd
import xgboost as xgb
import argparse
from sklearn.metrics import mean_absolute_error

X_train_sc_ec = pd.read_csv('/home/jovyan/csv/X_train_sc_ec.csv')
X_validation_sc_ec = pd.read_csv('/home/jovyan/csv/X_validation_sc_ec.csv')
Y_train = pd.read_csv('/home/jovyan/csv/Y_train.csv')
Y_validation = pd.read_csv('/home/jovyan/csv/Y_validation.csv')

parser = argparse.ArgumentParser()
parser.add_argument('--learning_rate', required=False, type=float, default=0.1)
parser.add_argument('--n_estimators', required=False, type=int, default=100)
parser.add_argument('--max_depth', required=False, type=int, default=5)

args = parser.parse_args()


xgb_model = xgb.XGBRegressor(random_state=10,
                            learning_rate=args.learning_rate,
                            n_estimators=args.n_estimators,
                            max_depth=args.max_depth
                            )

xgb_model.fit(X_train_sc_ec, Y_train)


# 검증 데이터 예측
pred_validation = xgb_model.predict(X_validation_sc_ec)

# 성능평가
from sklearn.metrics import mean_absolute_error
mae_validation = mean_absolute_error(Y_validation, pred_validation)
print("mae_validation="+str(mae_validation))

 

dockerfile

FROM python:3.9
ENV PYTHONUNBUFFERED 1
WORKDIR /app
COPY . /app/
RUN mkdir csv
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
ENTRYPOINT ["python", "main.py"]

requirements.txt

pandas==1.1.5
scikit-learn==0.24.2
numpy
xgboost==1.5.2

 

 

카팁 야물

apiVersion: batch/v1
kind: Job
spec:
  template:
    metadata:
      annotations:
        sidecar.istio.io/inject: 'false'
    spec:
      containers:
              - name: training-container
                image: kubeflow-registry.default.svc.cluster.local:30000/sangpipline1:latest
                command:
                  - python3
                  - /app/main.py
                  - '--learning_rate=${trialParameters.learning_rate}'
                  - '--n_estimators=${trialParameters.n_estimators}'
                  - '--max_depth=${trialParameters.max_depth}'
                volumeMounts:
                  - name: csv-volume
                    mountPath: /home/jovyan/csv
      restartPolicy: Never
      volumes:
          - name: csv-volume
            persistentVolumeClaim:
                  claimName: sang-csv-pvc

 

카팁 야물파일

apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
  name: donpa-sang
  namespace: 'sy'
spec:
  maxTrialCount: 40
  parallelTrialCount: 2
  maxFailedTrialCount: 3
  resumePolicy: Never
  objective:
    type: minimize
    goal: 1200000
    objectiveMetricName: mae_validation
    additionalMetricNames: []
  algorithm:
    algorithmName: random
    algorithmSettings: []
  parameters:
    - name: lr
      parameterType: double
      feasibleSpace:
        min: '0.01'
        max: '0.3'
        step: '0.05'
    - name: 'n'
      parameterType: int
      feasibleSpace:
        min: '30'
        max: '200'
        step: '1'
    - name: d
      parameterType: int
      feasibleSpace:
        min: '1'
        max: '30'
        step: '1'
  metricsCollectorSpec:
    collector:
      kind: StdOut
  trialTemplate:
    primaryContainerName: training-container
    successCondition: status.conditions.#(type=="Complete")#|#(status=="True")#
    failureCondition: status.conditions.#(type=="Failed")#|#(status=="True")#
    retain: false
    trialParameters:
      - name: learning_rate
        reference: lr
        description: ''
      - name: n_estimators
        reference: 'n'
        description: ''
      - name: max_depth
        reference: d
        description: ''
    trialSpec:
      apiVersion: batch/v1
      kind: Job
      spec:
        template:
          metadata:
            annotations:
              sidecar.istio.io/inject: 'false'
          spec:
            containers:
              - name: training-container
                image: >-
                  kubeflow-registry.default.svc.cluster.local:30000/sangpipline1:latest
                command:
                  - python3
                  - /app/main.py
                  - '--learning_rate=${trialParameters.learning_rate}'
                  - '--n_estimators=${trialParameters.n_estimators}'
                  - '--max_depth=${trialParameters.max_depth}'
                volumeMounts:
                  - name: csv-volume
                    mountPath: /home/jovyan/csv
            restartPolicy: Never
            volumes:
              - name: csv-volume
                persistentVolumeClaim:
                  claimName: sang-csv-pvc

 

3. 카팁 돌리는 이미지

sang_katib_start.py

from kubeflow.katib import KatibClient
import yaml
import argparse

test_yaml = '/app/exp/sang.yaml'

katib_client = KatibClient()

with open(test_yaml, "r") as yaml_file:
    experiment_config = yaml.load(yaml_file)

namespace = experiment_config['metadata']['namespace']

try:
    katib_client.create_experiment(experiment_config, namespace)
except:
    pass

 

requirements.txt

kubeflow-katib
PyYAML==5.2

 

dockerfile

FROM python:3.9
ENV PYTHONUNBUFFERED 1
WORKDIR /app
COPY . /app/
RUN mkdir exp
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
ENTRYPOINT ["python", "sang_katib_start.py"]

 

 

4. 카팁완료후 최적의 매개변수를 사용해 모델 배포

sang_model_update.py

from kubeflow.katib import KatibClient
from sklearn.metrics import mean_absolute_error
import yaml
import pandas as pd
import time
import argparse
import joblib
import xgboost as xgb


time.sleep(580)
while True:
    test_yaml = '/app/exp/sang.yaml'

    with open(test_yaml, "r") as yaml_file:
        experiment_config = yaml.load(yaml_file)

    name = experiment_config['metadata']['name']
    namespace = experiment_config['metadata']['namespace']


    katib_client = KatibClient()

    time.sleep(10)
    if katib_client.get_experiment_status(name,namespace) == 'Succeeded':

        experiment = katib_client.get_experiment(name=name,namespace=namespace)

        lr=experiment['status']['currentOptimalTrial']['parameterAssignments'][0]['value']
        n=experiment['status']['currentOptimalTrial']['parameterAssignments'][1]['value']
        d=experiment['status']['currentOptimalTrial']['parameterAssignments'][2]['value']

        katib_client.delete_experiment(name,namespace)

        new_model = xgb.XGBRegressor(random_state=10,
                                     learning_rate=float(lr),
                                     n_estimators=int(n),
                                     max_depth=int(d)
                                     )
        X_train_sc_ec = pd.read_csv('/app/csv/X_train_sc_ec.csv')
        X_validation_sc_ec = pd.read_csv('/app/csv/X_validation_sc_ec.csv')
        Y_train = pd.read_csv('/app/csv/Y_train.csv')
        Y_validation = pd.read_csv('/app/csv/Y_validation.csv')


        # 새로운 모델 성능 평가
        new_model.fit(X_train_sc_ec, Y_train)
        new_pred_validation = new_model.predict(X_validation_sc_ec)
        new_mae_validation = mean_absolute_error(Y_validation, new_pred_validation)

        # 모델을 파일로 저장
        joblib.dump(new_model, '/app/model/sang_model.pkl')
        
        # while문 종료
        break

 

requirements.txt

kubeflow-katib
PyYAML==5.2
scikit-learn==0.24.2
pandas==1.1.5
joblib==1.1.1
xgboost==1.5.2

 

dockerfile

FROM python:3.9
ENV PYTHONUNBUFFERED 1
WORKDIR /app
COPY . /app/
RUN mkdir exp
RUN mkdir csv
RUN mkdir model
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
ENTRYPOINT ["python", "sang_model_update.py"]

 

 

최종 파이프라인 kfp파이썬 파일

import kfp
import kfp.components as comp
from kfp import dsl

@dsl.pipeline(
    name='wwww',
    description='sang xgb test1'
)
def sang_xgb_pipline():
    model_pvc = dsl.PipelineVolume('model-pvc')
    encoder_pvc = dsl.PipelineVolume('encoder-pvc')
    scaler_pvc = dsl.PipelineVolume('scaler-pvc')
    csv_pvc = dsl.PipelineVolume('sang-csv-pvc')
    yaml_pvc = dsl.PipelineVolume('katib-yaml-pvc')

    data_load = dsl.ContainerOp(
        name='data load',
        image='kubeflow-registry.default.svc.cluster.local:30000/sang_data_load:latest',
        command=['python', 'main.py','1'],
        pvolumes={'/home/jovyan/csv': csv_pvc,
                  '/home/jovyan/encoder': encoder_pvc,
                  '/home/jovyan/scaler': scaler_pvc}
    )

    katib_run = dsl.ContainerOp(
        name='katib run',
        image='kubeflow-registry.default.svc.cluster.local:30000/sang_katib_start:latest',
        command=['python', 'sang_katib_start.py','1'],
        pvolumes={'/app/exp': yaml_pvc}
    )

    model_update = dsl.ContainerOp(
        name='model update',
        image='kubeflow-registry.default.svc.cluster.local:30000/sang_model_update:latest',
        command=['python', 'sang_model_update.py','1'],
        pvolumes={'/app/exp': yaml_pvc,
                  '/app/model': model_pvc,
                  '/app/csv': csv_pvc}
    )

    katib_run.after(data_load)
    model_update.after(katib_run)

if __name__ == "__main__":
    import kfp.compiler as compiler
    compiler.Compiler().compile(sang_xgb_pipline, __file__ + ".tar.gz")

 

python3 (파일명)

tar파일로 만든후 파이프라인ui에 업로드해 파이프라인 실행시키기


완료후 모델이 배포가 되었는지 확인

 

가격 예측이 되는지 확인 해보자!!

5주간 대장정 끝났다 ㅠㅠㅠ

1. aws하둡 인프라 구축( 네임노드2, 데이터노드3 )

2. 던파 아이템10개, 아바타 2개(레어 상급) hdfs에 데이터 수집, hdfs데이터 몽고db전송 스파크 애플리케이션 생성

3. 에어플로우에서 총 12개의dags를 사용하여 1시간 마다 hdfs, 몽고db에 데이터 수집

4. 아이템 10개 lstm시계열 분석 1시간뒤 가격 예측

5. 레어, 상급 아바타 xgboost모델을 사용하여 가격 예측

- 피처엔지니어링 외부 피처 추가 던파 골드 시세 데이터 추가함 골드 가치에 따라 시세에 영향이 미친다고 생각!!

6. 아바타 모델 하루마다 카팁을 돌려 최적의 매개변수를 적용한 최신화된 모델을 배포

7. 쿠브플로우 파이프라인 ( 데이터 분석, 카팁 생성, 카팁 종료후 최적의 모델 배포 )

8. 던파 이벤트 사이트에서 목요일 10시마다 현재 진행중 이벤트 크롤링하는 이미지 컨테이너로 실행 (데이터 최신화)

9. 3시간마다 던파 관련 뉴스 크롤링한 이미지 컨테이너로 실행 ( 데이터 최신화 )

 

아이템 시세 메뉴( 매20분에 1시간뒤 가격을 예측함)

 

아바타 시세 메뉴( 원하는 아바타, 엠블럼, 직업, 금일 날짜를 선택해 확인 을 누르면 금일 예상가격이 표시됨)

 

진행중 이벤트 메뉴 ( 던파 현재 진행중인 이벤트를 나열 클릭시 해당 이벤트 사이트로 이동)

 

던파 소식 메뉴( 던파관련 실시간 뉴스 소식을 확인할수 있음 클릭시 해당 뉴스로 이동)

 

던전앤파이터 최종프로젝트 완료!!

매순간 오류를 만났다.. 진짜로 하지만 시있실없 시련은 있어도 실패는 없다 ㅠㅠㅠㅠㅠ