데이터 엔지니어( 실습 정리 )

hive 테이블 동적 Create, 실습 정리

세용용용용 2024. 7. 5. 21:23

0. hive : 대용량 데이터 관리를 위한 데이터 웨어하우스 인프라

- hdfs 위에 구축되어 있으며 HiveQL을 사용해 데이터를 쿼리하고 분석

주요 개념
1) 테이블
- 데이터가 저장됨, rdb와 유사하지만 hdfs 디렉터리와 파일로 구성
2) 데이터 베이스
- 테이블을 그룹화 하는 논리적 단위
3) 파티션
- 데이터를 더 세분화 관리하기 위해 파티션 사용
4) 버킷
- 파티션 내 데이터를 더 작게 나누기 위해 사용
5) 메타스토어
- 테이블 스키마와 메타데이터 정보를 저장하는 시스템

동작 원리
1) 쿼리작성 : HiveQL을 사용해 쿼리 작성
2) 쿼리 파싱 및 플래닝
- hive 쿼리 파싱해 실행계획 생성, 메타스토어 에서 테이블 및 스키마 정보 가져옴
3) MR 작업 생성
- hive 실행 계획을 mr 작업으로 변환, hadoop 클러스터에서 병렬로 처리
4) 작업 실행
- 변환된 mr작업이 hadoop 클러스터에서 처리
5) 결과 반환
- 처리 결과 반환

!!! RDB와 차이점 !!!
- RDB : 스키마를 정의하고 해당 틀에 맞게 데이터를 입력
- HIVE : 데이터를 저장후 해당 데이터에 맞는 스키마를 정의

 

1. hive 테이블 CREATE ( 동적 으로 만들기 )

- hive 테이블 만드는 것도 어떻게 보면 반복 작업 이기에 동적으로 구현하기 위해 객체지향 프로그래밍 선택

- 꼭 제가 한게 정답은 아닙니당!! 저도 잘 모르고 새로운 것을 도전해보고 싶은 사람이라 ㅎㅎㅎ 참고만 해주세용!!!!

 

/data/db/hive/CREATE_hive_table.py

#!/usr/bin/python3
import sys
import subprocess
from datetime import datetime

class create_hive_table:
    def __init__(self, database, table_name, columns, partitions, row_delimiter, field_delimiter, stored):
        self.database = database
        self.table_name = table_name
        self.columns = columns
        self.partitions = partitions
        self.row_delimiter = row_delimiter
        self.field_delimiter = field_delimiter
        self.stored = stored

    def log(self, message):
        print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Time_Stamp : {message}")

    def drop_table(self):
        drop_query = f"hive -e 'DROP TABLE IF EXISTS {self.database}.{self.table_name};'"
        subprocess.run(drop_query, shell=True, check=True)
        self.log(f"Dropped existing table: {self.database}.{self.table_name}")

    def create_table_query(self):
        columns_str = ", ".join([f"{col} STRING" for col in self.columns])
        partitions_str = ", ".join([f"{part_name} {part_type}" for part_name, part_type in self.partitions.items()])
        query = (
            f"CREATE TABLE {self.database}.{self.table_name} (\n"
            f"    {columns_str}\n"
            f")\n"
            f"PARTITIONED BY (\n"
            f"    {partitions_str}\n"
            f")\n"
            f"ROW FORMAT DELIMITED\n"
            f"FIELDS TERMINATED BY '{self.field_delimiter}'\n"
            f"LINES TERMINATED BY '{self.row_delimiter}'\n"
            f"STORED AS {self.stored}"
        )
        create_cmd = f"hive -e \"{query}\""
        subprocess.run(create_cmd, shell=True, check=True)
        self.log(f"CREATE table: {self.database}.{self.table_name}")

    def run(self):
        self.log("Create table Start....")
        self.drop_table()
        self.create_table_query()
        self.log("Create table End....")

if __name__ == "__main__":
    if len(sys.argv) != 8:
        print("Usage: ./CREATE_hive_table.py database table_name columns partitions row_delimiter field_delimiter stored")
        sys.exit(1)

    database = sys.argv[1]
    table_name = sys.argv[2]
    columns = eval(sys.argv[3])
    partitions = eval(sys.argv[4])
    row_delimiter = sys.argv[5]
    field_delimiter = sys.argv[6]
    stored = sys.argv[7]

    manager = create_hive_table(database, table_name, columns, partitions, row_delimiter, field_delimiter, stored)
    manager.run()

 

실행 조건

# (실행파일) (hive 데이터베이스명) (hive 테이블명) (hive 컬럼) (hive 파티션) (행 구분자) (열 구분자) (Stored)
[실행파일] default test_table "['soldDate','itemName','count','unitPrice']" "{'pdate':'STRING'}" "\n" "\t" TextFile

 

hive 테이블 생성 확인 명령어

show create table [테이블명]

 

 

2. test 데이터 넣기

/data/scripts/get_sh/test.py : 던파 api 데이터 가져오는 실행파일

#!/usr/bin/python3

import requests
import csv
from datetime import datetime

def row_data(dict_values):
        answer = []

        for key in ['soldDate','itemName','count','unitPrice']:
                answer.append(str(dict_values[key]))
        return '\t'.join(answer)

# API 엔드포인트 URL
api_url = "https://api.neople.co.kr/df/auction-sold"

# 파라미터 설정
params = {
    "itemName": "균열의 단편",
    "wordType": "match",
    "wordShort": "false",
    "limit": 30,
    "apikey": "JUG54ELPbKttanbis2VPFNqC9LJOM7v4"
}

# GET 요청 보내기
response = requests.get(api_url, params=params)
data = response.json()
test1 = data.get("rows")

for dict_values in test1:
        print(row_data(dict_values))

 

/data/scripts/get_sh/test_get.sh : 가져온 row데이터 로컬에 txt파일로 리다이렉트 하는 스크립트

#!/bin/bash

# 인자 개수 확인
if [ "$#" -ne 1 ]; then
    echo "Usage: $0 YYYYmmdd"
    exit 1
fi

# 현재 날짜 구하기
mydate=$1

# 파이썬 스크립트 실행하여 결과를 파일에 저장
python3 /data/scripts/get_sh/test.py > /data/row_data/"${mydate}"_test.txt

 

/data/scripts/test/test_2hive : 로컬 txt파일 hive에 넣는 스크립트(웨어하우징)

#!/bin/sh

if [ $# -ne 1 ] || [ `echo $1 | wc -c` -ne 9 ]; then
    echo "$0 YYYYMMDD"
    exit
fi

GDATE=$1

DATA_DIR="/data/row_data"
DATA_FILE="${GDATE}_test.txt"
hive_table_name="test_table"

hadoop_dir="/data/test/"
hadoop_file="test_${GDATE}"

# 디렉토리 존재하지 않으면 생성
hdfs dfs -test -e ${hadoop_dir} || hdfs dfs -mkdir -p ${hadoop_dir}
# 파일 존재시 삭제(멱등성 위해)
hdfs dfs -test -e ${hadoop_dir}/${hadoop_file} && hdfs dfs -rm ${hadoop_dir}/${hadoop_file}

# 로컬파일 hdfs에 put
hdfs dfs -put ${DATA_DIR}/${DATA_FILE} ${hadoop_dir}/${hadoop_file}

# hive에 넣기
hive -e "set mapred.job.priority=VERY_HIGH;load data inpath '${hadoop_dir}/${hadoop_file}' overwrite into table ${hive_table_name} partition (pdate='${GDATE}')"

 

hive 테이블 확인하는 명령어(hiveQL)

select * from [테이블명]