1. Spark 머신러닝 라이브러리
Spark MLlib = 머신러닝 알고리즘을 분산 처리 방식으로 구현
→ 대용량 데이터 처리에 최적화
1-1. 왜 Spark MLlib 쓰냐?
- 장점
- 대용량 데이터 ML 파이프라인 분산 처리 가능 ( 데이터 읽기 + 전처리 + 인코딩 + feature vector 생성 + model fit )
- 데이터가 분산 저장된 경우 (HDFS / S3) → 분산 처리 최적화 가능
- 단점
- 딥러닝 지원 안됨
- 디버깅 / 튜닝 불편
데이터가 크고 분산 필요 시 Spark
데이터가 작고 모델 다양성 필요시 Sklearn / 딥러닝
[ Spark MLlib 알고리즘 ]
[ 회귀 / 분류 / 군집 / 추천 ]
Regression (회귀)
SVM (분류)
Naive Bayes (분류)
Decision Tree (트리 모델)
K-Means clustering (군집)
Recommendation (추천)
1-2. Spark MLlib 파이프라인 구조
Transformer → Estimator → Model → Evaluator
전처리 → 모델 학습 → 모델 생성 → 성능 평가
[ 피처 엔지니어링 ]
모델이 패턴을 잘 학습할 수 있도록 "컬럼 변환 + 생성 + 선택까지 포함한 작업"
특징 뽑기 + 변환이 핵심
[ Spark 내부 유틸 ]
Linear Algebra
Statistics ( 통계 )
2. Spark 머신러닝 실습
집값 예측 ( 회귀 ) 실습
데이터 다운로드 ⤵
wget https://raw.githubusercontent.com/ageron/handson-ml/master/datasets/housing/housing.csv
2-1. Spark 머신러닝 파이프라인 구조
[ Linear Regression Pipeline ] || [ Random Forest Pipeline ]
==============================================================================================
Raw DF || Raw DF
↓ || ↓
dropna || dropna
↓ || ↓
label 설정 || label 설정
↓ || ↓
Pipeline || Pipeline
- StringIndexer || - StringIndexer
- OneHotEncoder || - OneHotEncoder
- VectorAssembler || - VectorAssembler
↓ || ↓
features + label || features + label
↓ || ↓
train/test split || train/test split
↓ || ↓
LinearRegression.fit() || RandomForestRegressor.fit()
↓ || ↓
predict() || predict()
↓ || ↓
evaluate (RMSE / R2 / MSE) || evaluate (RMSE / R2 / MSE)
2-2. Spark MLlib 핵심 코드
[ LinearRegression ]
# ================================
# 1. Import
# ================================
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
# ================================
# 2. 데이터 로드
# ================================
df = spark.read.csv("/work/jsy/meta_code_spark/data_set/housing.csv", header=True, inferSchema=True)
df.printSchema()
============================
>>> df.printSchema()
root
|-- longitude: double (nullable = true)
|-- latitude: double (nullable = true)
|-- housing_median_age: double (nullable = true)
|-- total_rooms: double (nullable = true)
|-- total_bedrooms: double (nullable = true)
|-- population: double (nullable = true)
|-- households: double (nullable = true)
|-- median_income: double (nullable = true)
|-- median_house_value: double (nullable = true)
|-- ocean_proximity: string (nullable = true)
============================
# 컬럼 간단 설명
longitude: 경도
latitude: 위도
housing_median_age: 해당 지역 집 나이 중앙값
total_rooms: 전체 방 수
total_bedrooms: 침실 개수
population: 해당 지역 인구 수
households: 가구 수
median_income: 해당 지역 소득 중앙값
median_house_value: 집값 ( 우리가 예측할 값, LABEL )
ocean_proximity: 바다와 거리 ( NEAR BAY, INLAND.. 문자열 → 원핫 인코딩 필요 )
# string 컬럼 유니크값 보기
df.select("ocean_proximity").distinct().show()
# ================================
# 3. 결측치 제거
# ================================
df = df.dropna()
# ================================
# 4. Label 생성
# ================================
# 우리가 예측할 값 (집값)
df = df.withColumnRenamed("median_house_value", "label")
# ================================
# 5. Train / Test Split
# ================================
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42) # seed는 랜덤을 고정시키는 난수
# ================================
# 6. 전처리 단계 정의
# ================================
# (1) 문자열 → 숫자 변환
indexer = StringIndexer(
inputCol="ocean_proximity",
outputCol="ocean_index"
)
# (2) 숫자 → OneHot Vector 변환
encoder = OneHotEncoder(
inputCols=["ocean_index"],
outputCols=["ocean_vec"]
)
# (3) feature 컬럼 합치기
assembler = VectorAssembler(
inputCols=[
"longitude",
"latitude",
"housing_median_age",
"total_rooms",
"total_bedrooms",
"population",
"households",
"median_income",
"ocean_vec"
],
outputCol="features"
)
# ================================
# 7. Pipeline 생성
# ================================
pipeline = Pipeline(stages=[
indexer,
encoder,
assembler
])
# ================================
# 8. Pipeline 적용 (fit + transform)
# ================================
pipeline_model = pipeline.fit(df)
train_prepared = pipeline_model.transform(train_data)
test_prepared = pipeline_model.transform(test_data)
# ================================
# 9. Linear Regression 모델
# ================================
lr = LinearRegression(
featuresCol="features",
labelCol="label",
regParam=0.1 # 과적합 방지용 (추천)
)
model = lr.fit(train_prepared)
# ================================
# 10. 평가
# ================================
test_results = model.evaluate(test_prepared)
print("========== MODEL PERFORMANCE ==========")
print(f"RMSE: {test_results.rootMeanSquaredError}")
print(f"R2 : {test_results.r2}")
print(f"MSE : {test_results.meanSquaredError}")
# RMSE: 68641.31738871416 > 예측이 평균적으로 얼마나 틀렸는지 ( 약6.8만 달러 오차 )
# R2 : 0.6432667356470648 > 모델이 데이터 별화를 얼마나 설명하냐
# MSE : 4711630452.858192 > 오차 제곱 평균
# ================================
# 11. 예측 결과 확인
# ================================
test_results.predictions.show(10)
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-----------+-------------+--------------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income| label|ocean_proximity|ocean_index| ocean_vec| features| prediction|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-----------+-------------+--------------------+------------------+
| -124.3| 41.84| 17.0| 2677.0| 531.0| 1244.0| 456.0| 3.0313|103600.0| NEAR OCEAN| 2.0|(4,[2],[1.0])|[-124.3,41.84,17....| 151276.3164262334|
| -124.23| 40.54| 52.0| 2694.0| 453.0| 1152.0| 435.0| 3.0806|106700.0| NEAR OCEAN| 2.0|(4,[2],[1.0])|[-124.23,40.54,52...|216541.68675884278|
| -124.23| 41.75| 11.0| 3159.0| 616.0| 1343.0| 479.0| 2.4805| 73200.0| NEAR OCEAN| 2.0|(4,[2],[1.0])|[-124.23,41.75,11...|126837.21866904991|
| -124.19| 40.73| 21.0| 5694.0| 1056.0| 2907.0| 972.0| 3.5363| 90100.0| NEAR OCEAN| 2.0|(4,[2],[1.0])|[-124.19,40.73,21...|197963.21587998327|
| -124.18| 40.78| 34.0| 1592.0| 364.0| 950.0| 317.0| 2.1607| 67000.0| NEAR OCEAN| 2.0|(4,[2],[1.0])|[-124.18,40.78,34...|153296.32388650253|
| -124.17| 40.62| 32.0| 1595.0| 309.0| 706.0| 277.0| 2.8958| 86400.0| NEAR OCEAN| 2.0|(4,[2],[1.0])|[-124.17,40.62,32...|185358.86569401668|
| -124.17| 40.79| 43.0| 2285.0| 479.0| 1169.0| 482.0| 1.9688| 70500.0| NEAR OCEAN| 2.0|(4,[2],[1.0])|[-124.17,40.79,43...|161144.36215620255|
| -124.16| 40.6| 39.0| 1322.0| 283.0| 642.0| 292.0| 2.4519| 85100.0| NEAR OCEAN| 2.0|(4,[2],[1.0])|[-124.16,40.6,39....|177329.24304726906|
| -124.16| 40.8| 52.0| 2416.0| 618.0| 1150.0| 571.0| 1.7308| 80500.0| NEAR OCEAN| 2.0|(4,[2],[1.0])|[-124.16,40.8,52....|179350.95031422656|
| -124.16| 40.95| 20.0| 1075.0| 214.0| 529.0| 196.0| 3.1406| 96000.0| NEAR OCEAN| 2.0|(4,[2],[1.0])|[-124.16,40.95,20...| 169650.8043393609|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-----------+-------------+--------------------+------------------+
[ RandomForestRegressor ]
# ================================
# 1. Import
# ================================
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor # 결정트리 기반 예측 모델 ( 회귀 )
from pyspark.ml.evaluation import RegressionEvaluator # 모델 성능 평가 라이브러리
# ================================
# 2. 데이터 로드
# ================================
df = spark.read.csv(
"/work/jsy/meta_code_spark/data_set/housing.csv",
header = True,
inferSchema = True
)
df = df.dropna()
df = df.withColumnRenamed("median_house_value", "label")
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
# ================================
# 3. 전처리
# ================================
str_idxer = StringIndexer(
inputCol="ocean_proximity",
outputCol="ocean_idx"
)
onehot_encoder = OneHotEncoder(
inputCols=["ocean_idx"],
outputCols=["ocean_vec"]
)
assembler = VectorAssembler(
inputCols=[
"longitude",
"latitude",
"housing_median_age",
"total_rooms",
"total_bedrooms",
"population",
"households",
"median_income",
"ocean_vec"
],
outputCol="features"
)
# ================================
# 4. Pipeline
# ================================
pipeline = Pipeline(stages = [
str_idxer, onehot_encoder, assembler
])
pipeline_model = pipeline.fit(df)
train_prepared = pipeline_model.transform(train_data)
test_prepared = pipeline_model.transform(test_data)
# ================================
# 5. Random Forest 모델
# ================================
rf = RandomForestRegressor(
featuresCol="features",
labelCol="label",
numTrees=100, # 트리 개수 (많을수록 안정적 but 느림)
maxDepth=10, # 과적합 조절 핵심
seed=42
)
model = rf.fit(train_prepared)
# ================================
# 6. 예측
# ================================
predictions = model.transform(test_prepared)
# ================================
# 7. 평가
# ================================
evaluator_rmse = RegressionEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="rmse"
)
evaluator_r2 = RegressionEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="r2"
)
evaluator_mse = RegressionEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="mse"
)
rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)
mse = evaluator_mse.evaluate(predictions)
print("========== RANDOM FOREST PERFORMANCE ==========")
print(f"RMSE: {rmse}")
print(f"R2 : {r2}")
print(f"MSE : {mse}")
# RMSE: 56244.43303176802
# R2 : 0.7604856853102027
# MSE : 3163436247.0650377
# ================================
# 8. 결과 확인
# ================================
predictions.select("label", "prediction", "features").show(10)
+--------+------------------+--------------------+
| label| prediction| features|
+--------+------------------+--------------------+
|103600.0|104619.67923527556|[-124.3,41.84,17....|
|106700.0| 127413.9751344274|[-124.23,40.54,52...|
| 73200.0| 99382.75343565232|[-124.23,41.75,11...|
| 90100.0| 159014.8935549297|[-124.19,40.73,21...|
| 67000.0| 86113.07669656756|[-124.18,40.78,34...|
| 86400.0|108369.28943241299|[-124.17,40.62,32...|
| 70500.0| 87490.41413208531|[-124.17,40.79,43...|
| 85100.0| 91392.38305081984|[-124.16,40.6,39....|
| 80500.0| 93014.18066651141|[-124.16,40.8,52....|
| 96000.0|123844.81190978076|[-124.16,40.95,20...|
+--------+------------------+--------------------+
3. Spark GraphX 라이브러리
그래프란 뭐냐?
Graph는 데이터를 노드와 엣지로 표현하는 방식
노드(Vertex) = 대상 ( 사람, 서버, 디바이스 등 )
엣지(Edge) = 관계 ( 로그인, 접속, 요청 등 )
예시..)
사용자 → 서버 로그인
디바이스 → 서버 요청
사용자 → 디바이스 접근
즉, 누가? 누구랑? 연결됐는지를 표현하는 구조
3-1. 왜 그래프를 쓰냐?
| user | server | action |
| u1 | s1 | login |
- 기존 데이터는 테이블 → 한 줄 관계만 보기 좋음
- 하지만 현실 데이터는 복잡하게 얽힘 → 복잡한 구조는 테이블로 분석이 어렵다..
[ 그래프가 필요한 이유 ]
- 중요 노드 찾기 (PageRank)
- 어떤 서버가 가장 핵심?
- 어떤 계정이 영향력이 큰가?
- 이상 행동 탐지
- 평소 안 가던 서버 접속
- 권한 상승 흐름
- 최단 경로 분석
- sy0218 → 서버까지 몇 단계 거리?
- 연결 구조 분석
- 네트워크 구조 전체 이해
3-2. Spark에서 그래프 쓰는 이유
그래프 문제의 특징
데이터가 엄청 크고 + 연결이 많음 → 한 머신으로 처리 힘듬
[ 단일머신 vs Spark ]
1) 단일머신 그래프 처리 구조
[ 모든 노드 + 모든 간선 ]
↓
┌───────────────────┐
│ Single CPU │
│ Single RAM │
└───────────────────┘
↓
PageRank 계산
→ 메모리 힘듬 ( 대형 그래프는 RAM에 다 못 올림 )
→ 연결성 때문에 분리 어려움 ( 한 노드 계산하려면 전체 그래프 영향 필요 )
→ 반복 계산 ( PageRank: 계속 점수 전달 → CPU 오래 점유 )
2) Spark 그래프 처리 구조
[ 전체 그래프 ]
↓
┌──────── Partition ────────┐
│ │ │ │
[Worker1] [Worker2] [Worker3] [Worker4]
│ │ │ │
└─── 메시지 전달 (shuffle) ───┘
↓
반복 계산 (PageRank)
↓
결과 병합
1. 그래프를 쪼갬
2. 메시지 패싱 방식 ( 각 노드가 이웃에게 점수 전달 )
3. 반복 계산
- 각 워커에서 병렬 수행
- 중간 결과 네트워크로 교환
즉, Spark는 대규모 그래프 분석을 위해 사용
노드/엣지도 분산 저장 + 병렬로 그래프 계산
'데이터 엔지니어( 이론 공부 ) > spark' 카테고리의 다른 글
| Apache Spark ( Spark 최적화 실습 ) (0) | 2026.04.29 |
|---|---|
| Apache Spark ( Spark 최적화 하기 ) (0) | 2026.04.28 |
| Apache Spark ( Spark submit & partition & shuffle ) (0) | 2026.04.25 |
| Apache Spark ( 데이터프레임 ) (0) | 2026.04.17 |
| Apache Spark ( 핵심 개념 ) (1) | 2026.04.14 |