사용자 키보드 로그 수집
1) evdev 패키지를 설치- Linux에서 입력 이벤트를 처리하기 위한 라이브러리 pip install evdev 2) 예제를 위해 사용자 생성 및 그룹 추가1. 사용자생성 명령어adduser [사용자 이름]ex) adduser sy0218, adduser s
sy02229.tistory.com
업데이트 사항
2024-07-10 : 원시데이터 >> 하둡 (압축후 백업 추가)
2024-07-15 : 원시데이터, warehouse, backup 데이터 삭제 스크립트 추가
- 보관기간 (7일, 한달, 2년)
1. 로그 데이터 처리
1) 기존 데이터

2) 전처리 : key event at, down 들어간 라인 파싱해서 전처리 ( 키를 누른 로그 만 )
ch_data.py
#!/usr/bin/python3
import sys
from datetime import datetime
def change_time(list_value):
answer = list_value.split(' ')
return datetime.fromtimestamp(float(answer[-1])).strftime('%H%M%S')
def key_active(list_value):
answer = list_value.lstrip().rstrip().split(' ')
answer[1] = answer[1].replace('(','')
answer[1] = answer[1].replace(')','')
return answer
def user_name(list_value):
answer = list_value.lstrip().rstrip().split('\t')
return [answer[1]]
def change_work(file_name, file_dir):
# 입력파일
input_file=f"{file_dir}/{file_name}"
# 출력파일
out_file=f"{file_dir}/ch_{file_name}"
print(input_file, out_file)
with open(input_file,'r') as in_file:
for line in in_file:
if ("key event at" in line) and ("down" in line):
result = []
line = line.split(',')
# 시간 create
ch_time = change_time(line[0])
result.append(ch_time)
# key 활동내역 create
key_at = key_active(line[1])
result = result+key_at
# 사용자 명
user_nm = user_name(line[2])
result = result+user_nm
result = '\t'.join(result)
with open(out_file,'a') as out_file_handle:
out_file_handle.write(result+'\n')
if __name__ == "__main__":
args = sys.argv[1:]
# 인자가 두개가 아니면 종료
if len(args) != 2:
print("사용법 : ch_data.py <파일명> <파일경로>")
sys.exit(1)
file_name, file_dir = args
change_work(file_name, file_dir)
실행 명령어
- 형식
[실행파일] <원시데이터> <원시데이터 경로>
- 명령어
./ch_data.py 20240710_key_log.txt /data/row_data/key_log
3) 변환 결과

키 누른 시간, 키 번호 타입, 키 문자 타입, 사용자
이제 hive 웨어하우징 해보자
2. user_info
1) user_info 원시 데이터

2) local to hive
- 만들어둔 클래스를 사용해 생성
- hive 테이블 생성
./CREATE_hive_table.py default user_info "['username','level','job_type']" "{'pdate':'STRING'}" "\n" "\t" TextFile
- 나중에 join 할 최종 테이블 생성
./CREATE_hive_table.py default key_log "['logtime','key_nm','key_str','username','level','job_type']" "{'pdate':'STRING'}" "\n" "\t" ORC
- local to hive
python3 -c "from local_to_hive import HiveDataLoader; loader = HiveDataLoader('20240710', '/data/row_data/user_info', 'user_info_20240710.txt', 'user_info', '/data/user_info', 'user_info_20240710', '/data/backup/user_info', 'user_info_20240710.tgz'); loader.not_join_run()"
3) 수정된 local_to_hive.py
#!/usr/bin/python3
import os
import subprocess
import sys
from datetime import datetime
class HiveDataLoader:
def __init__(self, gdate, data_dir, data_file, hive_table_name, hadoop_dir, hadoop_file, hadoop_backup_dir, hadoop_backup_file):
self.user_info_col = 'b.username string, b.level string, b.job_type string'
self.gdate = gdate
self.data_dir = data_dir
self.data_file = data_file
self.hive_table_name = hive_table_name
self.hadoop_dir = hadoop_dir
self.hadoop_file = hadoop_file
self.hadoop_backup_dir = hadoop_backup_dir
self.hadoop_backup_file = hadoop_backup_file
def log(self, message):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Time_Stamp : {message}")
def make_hadoop_dir(self, hdfs_dir):
self.log("hadoop 디렉토리 존재하지 않으면 생성 Start....")
result = subprocess.run(['hdfs', 'dfs', '-test', '-e', hdfs_dir])
if result.returncode != 0:
subprocess.run(['hdfs', 'dfs', '-mkdir', '-p', hdfs_dir], check=True)
self.log(f"hadoop 디렉토리 {hdfs_dir} 생성됨.")
else:
self.log(f"hadoop 디렉토리 {hdfs_dir} 이미 존재함.")
self.log("hadoop 디렉토리 존재하지 않으면 생성 End....")
def local_to_hadoop(self, hdfs_dir, hdfs_file):
self.log("local 데이터 hadoop put Start....")
result = subprocess.run(['hdfs','dfs','-test','-e',f"{hdfs_dir}/{hdfs_file}"])
if result.returncode == 0:
subprocess.run(['hdfs','dfs','-rm',f"{hdfs_dir}/{hdfs_file}"],check=True)
self.log(f"{hdfs_dir}/{hdfs_file} 삭제 완료.")
if ("backup" in hdfs_dir) and (".tgz" in hdfs_file):
tar_command = f"tar czf - -C {self.data_dir} {self.data_file} | hdfs dfs -put - {hdfs_dir}/{hdfs_file}"
subprocess.run(tar_command, shell=True, check=True)
else:
subprocess.run(['hdfs','dfs','-put',f"{self.data_dir}/{self.data_file}",f"{hdfs_dir}/{hdfs_file}"], check=True)
self.log(f"{hdfs_dir}/{hdfs_file} 생성 완료.")
self.log("local 데이터 hadoop put End....")
def hadoop_to_hive(self):
self.log("hadoop 데이터 hive write Start....")
subprocess.run(['hive','-e',f"set mapred.job.priority=VERY_HIGH;load data inpath '{self.hadoop_dir}/{self.hadoop_file}' overwrite into table {self.hive_table_name}_raw partition (pdate='{self.gdate}')"],check=True)
self.log("hadoop 데이터 hive write End....")
def hadoop_to_hive_not_raw(self):
self.log("hadoop 데이터 hive write Start....")
subprocess.run(['hive','-e',f"set mapred.job.priority=VERY_HIGH;load data inpath '{self.hadoop_dir}/{self.hadoop_file}' overwrite into table {self.hive_table_name} partition (pdate='{self.gdate}')"],check=True)
self.log("hadoop 데이터 hive write End....")
def get_col_to_userinfo(self):
hive_col = subprocess.run(['hive','-e',f"describe {self.hive_table_name}_raw;"], capture_output=True, text=True, check=True)
result_col = []
for col in hive_col.stdout.strip().split('\n'):
stripped_col = col.strip()
if "pdate" in stripped_col:
break
col_data = stripped_col.split()[0]
if col_data=="username":
continue
result_col.append(f"a.{col_data} string")
result_col = ','.join(result_col)
self.hive_columns = result_col
def join_user_info(self):
self.log("hive user_info join Start....")
subprocess.run(['hive', '-e',
f"set mapred.job.priority=VERY_HIGH; "
f"insert overwrite table {self.hive_table_name} "
f"partition (pdate='{self.gdate}') "
f"select {self.hive_columns},{self.user_info_col} "
f"from {self.hive_table_name}_raw a "
f"left join user_info b "
f"on (a.username = b.username) "
f"where (a.pdate='{self.gdate}' and b.pdate='{self.gdate}')"], check=True)
self.log("hive user_info join End....")
def not_join_run(self):
self.make_hadoop_dir(self.hadoop_dir)
self.local_to_hadoop(self.hadoop_dir, self.hadoop_file)
self.hadoop_to_hive_not_raw()
self.make_hadoop_dir(self.hadoop_backup_dir)
self.local_to_hadoop(self.hadoop_backup_dir, self.hadoop_backup_file)
def run(self):
self.make_hadoop_dir(self.hadoop_dir)
self.local_to_hadoop(self.hadoop_dir, self.hadoop_file)
self.hadoop_to_hive()
self.get_col_to_userinfo()
self.join_user_info()
self.make_hadoop_dir(self.hadoop_backup_dir)
self.local_to_hadoop(self.hadoop_backup_dir, self.hadoop_backup_file)
if __name__ == "__main__":
if len(sys.argv) < 9:
print(f"사용법: {sys.argv[0]} <YYYYMMDD> <data_dir> <data_file> <hive_table_name> <hadoop_dir> <hadoop_file> <hadoop_backup_dir> <hadoop_backup_file>")
sys.exit(1)
gdate = sys.argv[1]
data_dir = sys.argv[2]
data_file = sys.argv[3]
hive_table_name = sys.argv[4]
hadoop_dir = sys.argv[5]
hadoop_file = sys.argv[6]
hadoop_backup_dir = sys.argv[7]
hadoop_backup_file = sys.argv[8]
실행 명령은 동일
python3 -c "from local_to_hive import HiveDataLoader; loader = HiveDataLoader('20240710', '/data/row_data/user_info', 'user_info_20240710.txt', 'user_info', '/data/user_info', 'user_info_20240710', '/data/backup/user_info', 'user_info_20240710.tgz'); loader.not_join_run()"
3. key_log
1) hive 테이블 create
CREATE_hive_table.py : 테이블 생성 클래스
- 필요 인자
db 명 : default
table 명 : key_log
컬럼 : "['logtime', 'key_nm', 'key_str', 'username']"
파티션 : "{'pdate':'STRING'}"
row_delimiter : "\n"
field_delimiter : "\t"
stored : TextFile
- 실행 명령어
./CREATE_hive_table.py default key_log_raw "['logtime', 'key_nm', 'key_str', 'username']" "{'pdate':'STRING'}" "\n" "\t" TextFile

2) local to hive( 클래스 )
#!/usr/bin/python3
import os
import subprocess
import sys
from datetime import datetime
class HiveDataLoader:
def __init__(self, gdate, data_dir, data_file, hive_table_name, hadoop_dir, hadoop_file, hadoop_backup_dir, hadoop_backup_file):
self.user_info_col = 'b.username string, b.level string, b.job_type string'
self.gdate = gdate
self.data_dir = data_dir
self.data_file = data_file
self.hive_table_name = hive_table_name
self.hadoop_dir = hadoop_dir
self.hadoop_file = hadoop_file
self.hadoop_backup_dir = hadoop_backup_dir
self.hadoop_backup_file = hadoop_backup_file
def log(self, message):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Time_Stamp : {message}")
def make_hadoop_dir(self, hdfs_dir):
self.log("hadoop 디렉토리 존재하지 않으면 생성 Start....")
result = subprocess.run(['hdfs', 'dfs', '-test', '-e', hdfs_dir])
if result.returncode != 0:
subprocess.run(['hdfs', 'dfs', '-mkdir', '-p', hdfs_dir], check=True)
self.log(f"hadoop 디렉토리 {hdfs_dir} 생성됨.")
else:
self.log(f"hadoop 디렉토리 {hdfs_dir} 이미 존재함.")
self.log("hadoop 디렉토리 존재하지 않으면 생성 End....")
def local_to_hadoop(self, hdfs_dir, hdfs_file):
self.log("local 데이터 hadoop put Start....")
result = subprocess.run(['hdfs','dfs','-test','-e',f"{hdfs_dir}/{hdfs_file}"])
if result.returncode == 0:
subprocess.run(['hdfs','dfs','-rm',f"{hdfs_dir}/{hdfs_file}"],check=True)
self.log(f"{hdfs_dir}/{hdfs_file} 삭제 완료.")
if ("backup" in hdfs_dir) and (".tgz" in hdfs_file):
tar_command = f"tar czf - -C {self.data_dir} {self.data_file} | hdfs dfs -put - {hdfs_dir}/{hdfs_file}"
subprocess.run(tar_command, shell=True, check=True)
else:
subprocess.run(['hdfs','dfs','-put',f"{self.data_dir}/{self.data_file}",f"{hdfs_dir}/{hdfs_file}"], check=True)
self.log(f"{hdfs_dir}/{hdfs_file} 생성 완료.")
self.log("local 데이터 hadoop put End....")
def hadoop_to_hive(self):
self.log("hadoop 데이터 hive write Start....")
subprocess.run(['hive','-e',f"set mapred.job.priority=VERY_HIGH;load data inpath '{self.hadoop_dir}/{self.hadoop_file}' overwrite into table {self.hive_table_name}_raw partition (pdate='{self.gdate}')"],check=True)
self.log("hadoop 데이터 hive write End....")
def hadoop_to_hive_not_raw(self):
self.log("hadoop 데이터 hive write Start....")
subprocess.run(['hive','-e',f"set mapred.job.priority=VERY_HIGH;load data inpath '{self.hadoop_dir}/{self.hadoop_file}' overwrite into table {self.hive_table_name} partition (pdate='{self.gdate}')"],check=True)
self.log("hadoop 데이터 hive write End....")
def get_col_to_userinfo(self):
hive_col = subprocess.run(['hive','-e',f"describe {self.hive_table_name}_raw;"], capture_output=True, text=True, check=True)
result_col = []
for col in hive_col.stdout.strip().split('\n'):
stripped_col = col.strip()
if "pdate" in stripped_col:
break
col_data = stripped_col.split()[0]
if col_data=="username":
continue
result_col.append(f"a.{col_data} string")
result_col = ','.join(result_col)
self.hive_columns = result_col
def join_user_info(self):
self.log("hive user_info join Start....")
subprocess.run(['hive', '-e',
f"set mapred.job.priority=VERY_HIGH; "
f"insert overwrite table {self.hive_table_name} "
f"partition (pdate='{self.gdate}') "
f"select {self.hive_columns},{self.user_info_col} "
f"from {self.hive_table_name}_raw a "
f"left join user_info b "
f"on (a.username = b.username) "
f"where (a.pdate='{self.gdate}' and b.pdate='{self.gdate}')"], check=True)
self.log("hive user_info join End....")
def not_join_run(self):
self.make_hadoop_dir(self.hadoop_dir)
self.local_to_hadoop(self.hadoop_dir, self.hadoop_file)
self.hadoop_to_hive_not_raw()
self.make_hadoop_dir(self.hadoop_backup_dir)
self.local_to_hadoop(self.hadoop_backup_dir, self.hadoop_backup_file)
def run(self):
self.make_hadoop_dir(self.hadoop_dir)
self.local_to_hadoop(self.hadoop_dir, self.hadoop_file)
self.hadoop_to_hive()
self.get_col_to_userinfo()
self.join_user_info()
self.make_hadoop_dir(self.hadoop_backup_dir)
self.local_to_hadoop(self.hadoop_backup_dir, self.hadoop_backup_file)
if __name__ == "__main__":
if len(sys.argv) < 9:
print(f"사용법: {sys.argv[0]} <YYYYMMDD> <data_dir> <data_file> <hive_table_name> <hadoop_dir> <hadoop_file> <hadoop_backup_dir> <hadoop_backup_file>")
sys.exit(1)
gdate = sys.argv[1]
data_dir = sys.argv[2]
data_file = sys.argv[3]
hive_table_name = sys.argv[4]
hadoop_dir = sys.argv[5]
hadoop_file = sys.argv[6]
hadoop_backup_dir = sys.argv[7]
hadoop_backup_file = sys.argv[8]
사용법
local_to_hive.py : 로컬 데이터 hive 웨어하우스 적재 클래스
join 필요 없으면 not_join_run 함수 사용
join 필요시 run 함수 사용
- 필요 인자 8개
gdate : 20240710
data_dir : /data/row_data/key_log
data_file : ch_20240710_key_log.txt
hive_table_name : key_log_raw
hadoop_dir : /data/key_log_raw
hadoop_file : key_log_20240710
hadoop_backup_dir : /data/backup/key_log
hadoop_backup_file : key_log_20240710.tgz
- 실행 명령어
python3 -c "from local_to_hive import HiveDataLoader; loader = HiveDataLoader('20240710', '/data/row_data/key_log', 'ch_20240710_key_log.txt', 'key_log', '/data/key_log_raw', 'key_log_20240710', '/data/backup/key_log', 'key_log_20240710.tgz'); loader.run()"
(1) key_log_raw 데이터

(2) key_log 데이터( 최종 )

- 형식
<키보드 입력 시간(시분초)> <키보드 번호 타입> <키보드 문자 타입> <사용자> <레벨> <직업> <수집 날짜(파티션)>
(3) key_log 데이터( 백업 )

4. 최종 proc.sh
1) 파이프 라인 전역변수 공유자원 ( /data/work/common_data.sh )
#!/usr/bin/bash
Ddate=$1
dict_name=$2
# 딕셔너리 선언
declare -A user_info=(
["data_dir"]="/data/row_data/user_info"
["ch_data_file"]="user_info_${Ddate}.txt"
["hive_table_name"]="user_info"
["hadoop_dir"]="/data/user_info"
["hadoop_file"]="user_info_${Ddate}"
["hadoop_backup_dir"]="/data/backup/user_info"
["hadoop_backup_file"]="user_info_${Ddate}.tgz"
)
declare -A key_log=(
["scripts_dir"]="/data/scripts/key_log"
["data_dir"]="/data/row_data/key_log"
["data_file"]="${Ddate}_key_log.txt"
["ch_data_file"]="ch_${Ddate}_key_log.txt"
["hive_table_name"]="key_log"
["hadoop_dir"]="/data/key_log_raw"
["hadoop_file"]="key_log_${Ddate}"
["hadoop_backup_dir"]="/data/backup/key_log"
["hadoop_backup_file"]="key_log_${Ddate}.tgz"
)
# 지정된 딕셔너리 이름으로 변수들을 export
declare -n dict_ref="$dict_name" # 인자로 받은 딕녀서리 변수명을 참조할 수 있는 참조 변수 선언
for key in "${!dict_ref[@]}";
do
# 현재 쉘이랑 쉘 자식프로세스에 전역변수 던짐
export "${key}=${dict_ref[${key}]}"
done
2) key_log proc 스크립트 ( key_log_proc.sh )
#!/usr/bin/bash
# 인자 갯수 확인
if [ "$#" -ne 2 ]; then
echo "Usage: $0 <YYYYMMDD> <스크립트내 전역 변수dict_name>"
exit 1
fi
Ddate=$1
dict_name=$2
# 스크립트 내 전역변수 설정
. /data/work/common_data.sh ${Ddate} ${dict_name}
# 데이터 proprecessing
${scripts_dir}/ch_data.py $data_file $data_dir
# 클래스 파일 있는 경로로 이동
cd /data/scripts
# 데이터 웨어하우징 및 백업
python3 -c "from local_to_hive import HiveDataLoader; loader = HiveDataLoader('${Ddate}', '${data_dir}', '${ch_data_file}', '${hive_table_name}', '${hadoop_dir}', '${hadoop_file}', '${hadoop_backup_dir}', '${hadoop_backup_file}'); loader.run()"
5. 오래된 데이터 삭제하는 스크립트(원시, 웨어하우스, 백업 데이터 삭제)
rm_data_proc.sh
-----------------------------------------------------------
#!/usr/bin/bash
# 일주일 지난 원시데이터 삭제
find /data/row_data -type f -mtime +7 -exec rm -f {} \;
# 한달 지난 warehouse, 2년 지난 백업 데이터 삭제
/data/scripts/rm_data/rm_hadoop.sh
-----------------------------------------------------------
/data/scripts/rm_data/rm_hadoop.sh
-----------------------------------------------------------
#!/usr/bin/bash
# 30일 지난 데이터 삭제
del_date_warehouse=$(date -d "- 30 days" +%Y%m%d)
# 2년 지난 데이터 삭제
del_date_backup=$(date -d "- 730 days" +%Y%m%d)
# 한달 지난 warehouse 데이터 삭제
for warehouse_data in $(hdfs dfs -ls -R /user/hive/warehouse | grep '^d' | grep '/*date' | awk '{print $8}');
do
if [[ $(echo ${warehouse_data} | awk -F '/' '{print $6}' | awk -F '=' '{print $2}') -le $del_date_warehouse ]]; then
echo "삭제 디렉토리 $warehouse_data"
hdfs dfs -rm -r "$warehouse_data"
fi
done
# 2년 지난 backup 데이터 삭제
for backup_data in $(hdfs dfs -ls -R /data/backup | grep '^-' | awk '{ split($6, date, "-"); print date[1]date[2]date[3] "," $8 }');
do
now_file=$(echo ${backup_data} | awk -F ',' '{print $2}')
if [[ $(echo ${backup_data} | awk -F ',' '{print $1}') -le $del_date_backup ]]; then
echo "삭제 백업 파일: ${now_file}"
hdfs dfs -rm "${now_file}"
fi
done
-----------------------------------------------------------
감사합니당~ 감사합니당~

'데이터 엔지니어( 실습 정리 )' 카테고리의 다른 글
| ansible 사용 os_auto_config (0) | 2024.07.18 |
|---|---|
| 사용자 키보드 로그 수집 (0) | 2024.07.17 |
| hive 테이블 동적 Create, 실습 정리 (2) | 2024.07.05 |
| 도커 컨테이너 실습[하둡hive] 동적 실행 (3) | 2024.07.04 |
| 도커 컨테이너 실습[주키퍼] 동적 실행 (2) | 2024.07.03 |