본문 바로가기
데이터 엔지니어( 실습 정리 )

사용자 키보드 로그 hive 웨어하우징

by 세용용용용 2024. 7. 15.

사용자 키보드 로그 수집 (tistory.com)

 

사용자 키보드 로그 수집

1) evdev 패키지를 설치- Linux에서 입력 이벤트를 처리하기 위한 라이브러리 pip install evdev  2) 예제를 위해 사용자 생성 및 그룹 추가1. 사용자생성 명령어adduser [사용자 이름]ex) adduser sy0218, adduser s

sy02229.tistory.com

 

업데이트 사항

2024-07-10 : 원시데이터 >> 하둡 (압축후 백업 추가)
2024-07-15 : 원시데이터, warehouse, backup 데이터 삭제 스크립트 추가
- 보관기간 (7일, 한달, 2년)

 

 

1. 로그 데이터 처리

1) 기존 데이터

 

2) 전처리 : key event at, down 들어간 라인 파싱해서 전처리 ( 키를 누른 로그 만 )

ch_data.py

#!/usr/bin/python3
import sys
from datetime import datetime

def change_time(list_value):
    answer = list_value.split(' ')
    return datetime.fromtimestamp(float(answer[-1])).strftime('%H%M%S')

def key_active(list_value):
    answer = list_value.lstrip().rstrip().split(' ')
    answer[1] = answer[1].replace('(','')
    answer[1] = answer[1].replace(')','')
    return answer

def user_name(list_value):
    answer = list_value.lstrip().rstrip().split('\t')
    return [answer[1]]

def change_work(file_name, file_dir):
    # 입력파일
    input_file=f"{file_dir}/{file_name}"
    # 출력파일
    out_file=f"{file_dir}/ch_{file_name}"
    print(input_file, out_file)

    with open(input_file,'r') as in_file:
         for line in in_file:
             if ("key event at" in line) and ("down" in line):
                 result = []
                 line = line.split(',')
                 # 시간 create
                 ch_time = change_time(line[0])
                 result.append(ch_time)

                 # key 활동내역 create
                 key_at = key_active(line[1])
                 result = result+key_at

                 # 사용자 명
                 user_nm = user_name(line[2])
                 result = result+user_nm
                 result = '\t'.join(result)
                 with open(out_file,'a') as out_file_handle:
                      out_file_handle.write(result+'\n')

if __name__ == "__main__":
   args = sys.argv[1:]
   # 인자가 두개가 아니면 종료
   if len(args) != 2:
      print("사용법 : ch_data.py <파일명> <파일경로>")
      sys.exit(1)

   file_name, file_dir = args
   change_work(file_name, file_dir)

 

실행 명령어

- 형식
[실행파일] <원시데이터> <원시데이터 경로>

- 명령어
./ch_data.py 20240710_key_log.txt /data/row_data/key_log

 

3) 변환 결과

키 누른 시간, 키 번호 타입, 키 문자 타입, 사용자

이제 hive 웨어하우징 해보자

 

 

2. user_info

1) user_info 원시 데이터

 

2) local to hive

- 만들어둔 클래스를 사용해 생성

- hive 테이블 생성
./CREATE_hive_table.py default user_info "['username','level','job_type']" "{'pdate':'STRING'}" "\n" "\t" TextFile

- 나중에 join 할 최종 테이블 생성
./CREATE_hive_table.py default key_log "['logtime','key_nm','key_str','username','level','job_type']" "{'pdate':'STRING'}" "\n" "\t" ORC

- local to hive
python3 -c "from local_to_hive import HiveDataLoader; loader = HiveDataLoader('20240710', '/data/row_data/user_info', 'user_info_20240710.txt', 'user_info', '/data/user_info', 'user_info_20240710', '/data/backup/user_info', 'user_info_20240710.tgz'); loader.not_join_run()"

 

3) 수정된 local_to_hive.py

#!/usr/bin/python3

import os
import subprocess
import sys
from datetime import datetime

class HiveDataLoader:
    def __init__(self, gdate, data_dir, data_file, hive_table_name, hadoop_dir, hadoop_file, hadoop_backup_dir, hadoop_backup_file):
        self.user_info_col = 'b.username string, b.level string, b.job_type string'

        self.gdate = gdate
        self.data_dir = data_dir
        self.data_file = data_file
        self.hive_table_name = hive_table_name
        self.hadoop_dir = hadoop_dir
        self.hadoop_file = hadoop_file
        self.hadoop_backup_dir = hadoop_backup_dir
        self.hadoop_backup_file = hadoop_backup_file

    def log(self, message):
        print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Time_Stamp : {message}")

    def make_hadoop_dir(self, hdfs_dir):
        self.log("hadoop 디렉토리 존재하지 않으면 생성 Start....")
        result = subprocess.run(['hdfs', 'dfs', '-test', '-e', hdfs_dir])
        if result.returncode != 0:
            subprocess.run(['hdfs', 'dfs', '-mkdir', '-p', hdfs_dir], check=True)
            self.log(f"hadoop 디렉토리 {hdfs_dir} 생성됨.")
        else:
            self.log(f"hadoop 디렉토리 {hdfs_dir} 이미 존재함.")
        self.log("hadoop 디렉토리 존재하지 않으면 생성 End....")

    def local_to_hadoop(self, hdfs_dir, hdfs_file):
        self.log("local 데이터 hadoop put Start....")
        result = subprocess.run(['hdfs','dfs','-test','-e',f"{hdfs_dir}/{hdfs_file}"])
        if result.returncode == 0:
           subprocess.run(['hdfs','dfs','-rm',f"{hdfs_dir}/{hdfs_file}"],check=True)
           self.log(f"{hdfs_dir}/{hdfs_file} 삭제 완료.")

        if ("backup" in hdfs_dir) and (".tgz" in hdfs_file):
           tar_command = f"tar czf - -C {self.data_dir} {self.data_file} | hdfs dfs -put - {hdfs_dir}/{hdfs_file}"
           subprocess.run(tar_command, shell=True, check=True)
        else:
           subprocess.run(['hdfs','dfs','-put',f"{self.data_dir}/{self.data_file}",f"{hdfs_dir}/{hdfs_file}"], check=True)
        self.log(f"{hdfs_dir}/{hdfs_file} 생성 완료.")
        self.log("local 데이터 hadoop put End....")

    def hadoop_to_hive(self):
        self.log("hadoop 데이터 hive write Start....")
        subprocess.run(['hive','-e',f"set mapred.job.priority=VERY_HIGH;load data inpath '{self.hadoop_dir}/{self.hadoop_file}' overwrite into table {self.hive_table_name}_raw partition (pdate='{self.gdate}')"],check=True)
        self.log("hadoop 데이터 hive write End....")

    def hadoop_to_hive_not_raw(self):
        self.log("hadoop 데이터 hive write Start....")
        subprocess.run(['hive','-e',f"set mapred.job.priority=VERY_HIGH;load data inpath '{self.hadoop_dir}/{self.hadoop_file}' overwrite into table {self.hive_table_name} partition (pdate='{self.gdate}')"],check=True)
        self.log("hadoop 데이터 hive write End....")


    def get_col_to_userinfo(self):
        hive_col = subprocess.run(['hive','-e',f"describe {self.hive_table_name}_raw;"], capture_output=True, text=True, check=True)
        result_col = []
        for col in hive_col.stdout.strip().split('\n'):
            stripped_col = col.strip()
            if "pdate" in stripped_col:
                break
            col_data = stripped_col.split()[0]
            if col_data=="username":
                continue
            result_col.append(f"a.{col_data} string")
        result_col = ','.join(result_col)
        self.hive_columns = result_col

    def join_user_info(self):
        self.log("hive user_info join Start....")
        subprocess.run(['hive', '-e',
                        f"set mapred.job.priority=VERY_HIGH; "
                        f"insert overwrite table {self.hive_table_name} "
                        f"partition (pdate='{self.gdate}') "
                        f"select {self.hive_columns},{self.user_info_col} "
                        f"from {self.hive_table_name}_raw a "
                        f"left join user_info b "
                        f"on (a.username = b.username) "
                        f"where (a.pdate='{self.gdate}' and b.pdate='{self.gdate}')"], check=True)
        self.log("hive user_info join End....")


    def not_join_run(self):
        self.make_hadoop_dir(self.hadoop_dir)
        self.local_to_hadoop(self.hadoop_dir, self.hadoop_file)
        self.hadoop_to_hive_not_raw()
        self.make_hadoop_dir(self.hadoop_backup_dir)
        self.local_to_hadoop(self.hadoop_backup_dir, self.hadoop_backup_file)
    def run(self):
        self.make_hadoop_dir(self.hadoop_dir)
        self.local_to_hadoop(self.hadoop_dir, self.hadoop_file)
        self.hadoop_to_hive()
        self.get_col_to_userinfo()
        self.join_user_info()
        self.make_hadoop_dir(self.hadoop_backup_dir)
        self.local_to_hadoop(self.hadoop_backup_dir, self.hadoop_backup_file)

if __name__ == "__main__":
    if len(sys.argv) < 9:
        print(f"사용법: {sys.argv[0]} <YYYYMMDD> <data_dir> <data_file> <hive_table_name> <hadoop_dir> <hadoop_file> <hadoop_backup_dir> <hadoop_backup_file>")
        sys.exit(1)

    gdate = sys.argv[1]
    data_dir = sys.argv[2]
    data_file = sys.argv[3]
    hive_table_name = sys.argv[4]
    hadoop_dir = sys.argv[5]
    hadoop_file = sys.argv[6]
    hadoop_backup_dir = sys.argv[7]
    hadoop_backup_file = sys.argv[8]

 

실행 명령은 동일

python3 -c "from local_to_hive import HiveDataLoader; loader = HiveDataLoader('20240710', '/data/row_data/user_info', 'user_info_20240710.txt', 'user_info', '/data/user_info', 'user_info_20240710', '/data/backup/user_info', 'user_info_20240710.tgz'); loader.not_join_run()"

 

 

 

3. key_log

1) hive 테이블 create

CREATE_hive_table.py : 테이블 생성 클래스

- 필요 인자
db 명 : default
table 명 : key_log
컬럼 : "['logtime', 'key_nm', 'key_str', 'username']"
파티션 : "{'pdate':'STRING'}"
row_delimiter : "\n"
field_delimiter : "\t"
stored : TextFile

- 실행 명령어
./CREATE_hive_table.py default key_log_raw "['logtime', 'key_nm', 'key_str', 'username']" "{'pdate':'STRING'}" "\n" "\t" TextFile

 

2) local to hive( 클래스 )

#!/usr/bin/python3

import os
import subprocess
import sys
from datetime import datetime

class HiveDataLoader:
    def __init__(self, gdate, data_dir, data_file, hive_table_name, hadoop_dir, hadoop_file, hadoop_backup_dir, hadoop_backup_file):
        self.user_info_col = 'b.username string, b.level string, b.job_type string'

        self.gdate = gdate
        self.data_dir = data_dir
        self.data_file = data_file
        self.hive_table_name = hive_table_name
        self.hadoop_dir = hadoop_dir
        self.hadoop_file = hadoop_file
        self.hadoop_backup_dir = hadoop_backup_dir
        self.hadoop_backup_file = hadoop_backup_file

    def log(self, message):
        print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Time_Stamp : {message}")

    def make_hadoop_dir(self, hdfs_dir):
        self.log("hadoop 디렉토리 존재하지 않으면 생성 Start....")
        result = subprocess.run(['hdfs', 'dfs', '-test', '-e', hdfs_dir])
        if result.returncode != 0:
            subprocess.run(['hdfs', 'dfs', '-mkdir', '-p', hdfs_dir], check=True)
            self.log(f"hadoop 디렉토리 {hdfs_dir} 생성됨.")
        else:
            self.log(f"hadoop 디렉토리 {hdfs_dir} 이미 존재함.")
        self.log("hadoop 디렉토리 존재하지 않으면 생성 End....")

    def local_to_hadoop(self, hdfs_dir, hdfs_file):
        self.log("local 데이터 hadoop put Start....")
        result = subprocess.run(['hdfs','dfs','-test','-e',f"{hdfs_dir}/{hdfs_file}"])
        if result.returncode == 0:
           subprocess.run(['hdfs','dfs','-rm',f"{hdfs_dir}/{hdfs_file}"],check=True)
           self.log(f"{hdfs_dir}/{hdfs_file} 삭제 완료.")

        if ("backup" in hdfs_dir) and (".tgz" in hdfs_file):
           tar_command = f"tar czf - -C {self.data_dir} {self.data_file} | hdfs dfs -put - {hdfs_dir}/{hdfs_file}"
           subprocess.run(tar_command, shell=True, check=True)
        else:
           subprocess.run(['hdfs','dfs','-put',f"{self.data_dir}/{self.data_file}",f"{hdfs_dir}/{hdfs_file}"], check=True)
        self.log(f"{hdfs_dir}/{hdfs_file} 생성 완료.")
        self.log("local 데이터 hadoop put End....")

    def hadoop_to_hive(self):
        self.log("hadoop 데이터 hive write Start....")
        subprocess.run(['hive','-e',f"set mapred.job.priority=VERY_HIGH;load data inpath '{self.hadoop_dir}/{self.hadoop_file}' overwrite into table {self.hive_table_name}_raw partition (pdate='{self.gdate}')"],check=True)
        self.log("hadoop 데이터 hive write End....")

    def hadoop_to_hive_not_raw(self):
        self.log("hadoop 데이터 hive write Start....")
        subprocess.run(['hive','-e',f"set mapred.job.priority=VERY_HIGH;load data inpath '{self.hadoop_dir}/{self.hadoop_file}' overwrite into table {self.hive_table_name} partition (pdate='{self.gdate}')"],check=True)
        self.log("hadoop 데이터 hive write End....")


    def get_col_to_userinfo(self):
        hive_col = subprocess.run(['hive','-e',f"describe {self.hive_table_name}_raw;"], capture_output=True, text=True, check=True)
        result_col = []
        for col in hive_col.stdout.strip().split('\n'):
            stripped_col = col.strip()
            if "pdate" in stripped_col:
                break
            col_data = stripped_col.split()[0]
            if col_data=="username":
                continue
            result_col.append(f"a.{col_data} string")
        result_col = ','.join(result_col)
        self.hive_columns = result_col

    def join_user_info(self):
        self.log("hive user_info join Start....")
        subprocess.run(['hive', '-e',
                        f"set mapred.job.priority=VERY_HIGH; "
                        f"insert overwrite table {self.hive_table_name} "
                        f"partition (pdate='{self.gdate}') "
                        f"select {self.hive_columns},{self.user_info_col} "
                        f"from {self.hive_table_name}_raw a "
                        f"left join user_info b "
                        f"on (a.username = b.username) "
                        f"where (a.pdate='{self.gdate}' and b.pdate='{self.gdate}')"], check=True)
        self.log("hive user_info join End....")


    def not_join_run(self):
        self.make_hadoop_dir(self.hadoop_dir)
        self.local_to_hadoop(self.hadoop_dir, self.hadoop_file)
        self.hadoop_to_hive_not_raw()
        self.make_hadoop_dir(self.hadoop_backup_dir)
        self.local_to_hadoop(self.hadoop_backup_dir, self.hadoop_backup_file)
    def run(self):
        self.make_hadoop_dir(self.hadoop_dir)
        self.local_to_hadoop(self.hadoop_dir, self.hadoop_file)
        self.hadoop_to_hive()
        self.get_col_to_userinfo()
        self.join_user_info()
        self.make_hadoop_dir(self.hadoop_backup_dir)
        self.local_to_hadoop(self.hadoop_backup_dir, self.hadoop_backup_file)

if __name__ == "__main__":
    if len(sys.argv) < 9:
        print(f"사용법: {sys.argv[0]} <YYYYMMDD> <data_dir> <data_file> <hive_table_name> <hadoop_dir> <hadoop_file> <hadoop_backup_dir> <hadoop_backup_file>")
        sys.exit(1)

    gdate = sys.argv[1]
    data_dir = sys.argv[2]
    data_file = sys.argv[3]
    hive_table_name = sys.argv[4]
    hadoop_dir = sys.argv[5]
    hadoop_file = sys.argv[6]
    hadoop_backup_dir = sys.argv[7]
    hadoop_backup_file = sys.argv[8]

 

사용법

local_to_hive.py : 로컬 데이터 hive 웨어하우스 적재 클래스

join 필요 없으면 not_join_run 함수 사용
join 필요시 run 함수 사용

- 필요 인자 8개
gdate : 20240710
data_dir : /data/row_data/key_log
data_file : ch_20240710_key_log.txt
hive_table_name : key_log_raw
hadoop_dir : /data/key_log_raw
hadoop_file : key_log_20240710
hadoop_backup_dir : /data/backup/key_log
hadoop_backup_file : key_log_20240710.tgz

- 실행 명령어
python3 -c "from local_to_hive import HiveDataLoader; loader = HiveDataLoader('20240710', '/data/row_data/key_log', 'ch_20240710_key_log.txt', 'key_log', '/data/key_log_raw', 'key_log_20240710', '/data/backup/key_log', 'key_log_20240710.tgz'); loader.run()"

 

(1) key_log_raw 데이터

 

(2) key_log 데이터( 최종 )

- 형식
<키보드 입력 시간(시분초)> <키보드 번호 타입> <키보드 문자 타입> <사용자> <레벨> <직업> <수집 날짜(파티션)>

 

(3) key_log 데이터( 백업 )

 

 

4. 최종 proc.sh

1)  파이프 라인 전역변수 공유자원 ( /data/work/common_data.sh )

#!/usr/bin/bash
Ddate=$1
dict_name=$2

# 딕셔너리 선언
declare -A user_info=(
["data_dir"]="/data/row_data/user_info"
["ch_data_file"]="user_info_${Ddate}.txt"
["hive_table_name"]="user_info"
["hadoop_dir"]="/data/user_info"
["hadoop_file"]="user_info_${Ddate}"
["hadoop_backup_dir"]="/data/backup/user_info"
["hadoop_backup_file"]="user_info_${Ddate}.tgz"
)

declare -A key_log=(
["scripts_dir"]="/data/scripts/key_log"
["data_dir"]="/data/row_data/key_log"
["data_file"]="${Ddate}_key_log.txt"
["ch_data_file"]="ch_${Ddate}_key_log.txt"
["hive_table_name"]="key_log"
["hadoop_dir"]="/data/key_log_raw"
["hadoop_file"]="key_log_${Ddate}"
["hadoop_backup_dir"]="/data/backup/key_log"
["hadoop_backup_file"]="key_log_${Ddate}.tgz"
)

# 지정된 딕셔너리 이름으로 변수들을 export
declare -n dict_ref="$dict_name"  # 인자로 받은 딕녀서리 변수명을 참조할 수 있는 참조 변수 선언

for key in "${!dict_ref[@]}";
do
# 현재 쉘이랑 쉘 자식프로세스에 전역변수 던짐
export "${key}=${dict_ref[${key}]}"
done

 

2)  key_log proc 스크립트 ( key_log_proc.sh )

#!/usr/bin/bash

# 인자 갯수 확인
if [ "$#" -ne 2 ]; then
    echo "Usage: $0 <YYYYMMDD> <스크립트내 전역 변수dict_name>"
    exit 1
fi

Ddate=$1
dict_name=$2

# 스크립트 내 전역변수 설정
. /data/work/common_data.sh ${Ddate} ${dict_name}

# 데이터 proprecessing
${scripts_dir}/ch_data.py $data_file $data_dir

# 클래스 파일 있는 경로로 이동
cd /data/scripts
# 데이터 웨어하우징 및 백업
python3 -c "from local_to_hive import HiveDataLoader; loader = HiveDataLoader('${Ddate}', '${data_dir}', '${ch_data_file}', '${hive_table_name}', '${hadoop_dir}', '${hadoop_file}', '${hadoop_backup_dir}', '${hadoop_backup_file}'); loader.run()"

 

5. 오래된 데이터 삭제하는 스크립트(원시, 웨어하우스, 백업 데이터 삭제)

rm_data_proc.sh
-----------------------------------------------------------
#!/usr/bin/bash

# 일주일 지난 원시데이터 삭제
find /data/row_data -type f -mtime +7 -exec rm -f {} \;
# 한달 지난 warehouse, 2년 지난 백업 데이터 삭제
/data/scripts/rm_data/rm_hadoop.sh
-----------------------------------------------------------


/data/scripts/rm_data/rm_hadoop.sh
-----------------------------------------------------------
#!/usr/bin/bash

# 30일 지난 데이터 삭제
del_date_warehouse=$(date -d "- 30 days" +%Y%m%d)
# 2년 지난 데이터 삭제
del_date_backup=$(date -d "- 730 days" +%Y%m%d)

# 한달 지난 warehouse 데이터 삭제
for warehouse_data in $(hdfs dfs -ls -R /user/hive/warehouse | grep '^d' | grep '/*date' | awk '{print $8}');
do
        if [[ $(echo ${warehouse_data} | awk -F '/' '{print $6}' | awk -F '=' '{print $2}') -le $del_date_warehouse ]]; then
                echo "삭제 디렉토리 $warehouse_data"
                hdfs dfs -rm -r "$warehouse_data"
        fi
done

# 2년 지난 backup 데이터 삭제
for backup_data in $(hdfs dfs -ls -R /data/backup | grep '^-' | awk '{ split($6, date, "-"); print date[1]date[2]date[3] "," $8 }');
do
        now_file=$(echo ${backup_data} | awk -F ',' '{print $2}')
        if [[ $(echo ${backup_data} | awk -F ',' '{print $1}') -le $del_date_backup ]]; then
                echo "삭제 백업 파일: ${now_file}"
                hdfs dfs -rm "${now_file}"
        fi
done
-----------------------------------------------------------

 

감사합니당~  감사합니당~