본문 바로가기
데이터 엔지니어( 이론 공부 )/spark

Apache Spark ( Spark MLlib )

by 세용융용융용 2026. 4. 30.

1. Spark 머신러닝 라이브러리

Spark MLlib = 머신러닝 알고리즘을 분산 처리 방식으로 구현
→ 대용량 데이터 처리에 최적화

1-1. 왜 Spark MLlib 쓰냐?

  • 장점
    • 대용량 데이터 ML 파이프라인 분산 처리 가능 ( 데이터 읽기 + 전처리 + 인코딩 + feature vector 생성 + model fit )
    • 데이터가 분산 저장된 경우 (HDFS / S3)  → 분산 처리 최적화 가능
  • 단점
    • 딥러닝 지원 안됨
    • 디버깅 / 튜닝 불편
데이터가 크고 분산 필요 시 Spark
데이터가 작고 모델 다양성 필요시 Sklearn / 딥러닝

 

[ Spark MLlib 알고리즘 ]

[ 회귀 / 분류 / 군집 / 추천 ]
Regression (회귀)
SVM (분류)
Naive Bayes (분류)
Decision Tree (트리 모델)
K-Means clustering (군집)
Recommendation (추천)

1-2. Spark MLlib 파이프라인 구조

Transformer → Estimator → Model → Evaluator
전처리 → 모델 학습 → 모델 생성 → 성능 평가

 

[ 피처 엔지니어링 ]

모델이 패턴을 잘 학습할 수 있도록 "컬럼 변환 + 생성 + 선택까지 포함한 작업"
특징 뽑기 + 변환이 핵심
[ Spark 내부 유틸 ]
Linear Algebra
Statistics ( 통계 )

 

2. Spark 머신러닝 실습

집값 예측 ( 회귀 ) 실습
데이터 다운로드 ⤵
wget https://raw.githubusercontent.com/ageron/handson-ml/master/datasets/housing/housing.csv

2-1. Spark 머신러닝 파이프라인 구조

[ Linear Regression Pipeline ]                 ||                 [ Random Forest Pipeline ]
==============================================================================================

Raw DF                                         ||                 Raw DF
  ↓                                            ||                   ↓
dropna                                         ||                 dropna
  ↓                                            ||                   ↓
label 설정                                     ||               label 설정
  ↓                                            ||                   ↓
Pipeline                                       ||               Pipeline
   - StringIndexer                             ||                  - StringIndexer
   - OneHotEncoder                             ||                  - OneHotEncoder
   - VectorAssembler                           ||                  - VectorAssembler
  ↓                                            ||                   ↓
features + label                               ||             features + label
  ↓                                            ||                   ↓
train/test split                               ||             train/test split
  ↓                                            ||                   ↓
LinearRegression.fit()                         ||       RandomForestRegressor.fit()
  ↓                                            ||                   ↓
predict()                                      ||               predict()
  ↓                                            ||                   ↓
evaluate (RMSE / R2 / MSE)                     ||       evaluate (RMSE / R2 / MSE)

2-2. Spark MLlib 핵심 코드

[ LinearRegression ]

# ================================
# 1. Import
# ================================
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression


# ================================
# 2. 데이터 로드
# ================================
df = spark.read.csv("/work/jsy/meta_code_spark/data_set/housing.csv", header=True, inferSchema=True)
df.printSchema()
============================
>>> df.printSchema()
root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
============================
# 컬럼 간단 설명
longitude: 경도
latitude: 위도
housing_median_age: 해당 지역 집 나이 중앙값
total_rooms: 전체 방 수
total_bedrooms: 침실 개수
population: 해당 지역 인구 수
households: 가구 수
median_income: 해당 지역 소득 중앙값
median_house_value: 집값 ( 우리가 예측할 값, LABEL )
ocean_proximity: 바다와 거리 ( NEAR BAY, INLAND.. 문자열 → 원핫 인코딩 필요 )

# string 컬럼 유니크값 보기
df.select("ocean_proximity").distinct().show()


# ================================
# 3. 결측치 제거
# ================================
df = df.dropna()

# ================================
# 4. Label 생성
# ================================
# 우리가 예측할 값 (집값)
df = df.withColumnRenamed("median_house_value", "label")

# ================================
# 5. Train / Test Split
# ================================
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42) # seed는 랜덤을 고정시키는 난수

# ================================
# 6. 전처리 단계 정의
# ================================
# (1) 문자열 → 숫자 변환
indexer = StringIndexer(
    inputCol="ocean_proximity",
    outputCol="ocean_index"
)

# (2) 숫자 → OneHot Vector 변환
encoder = OneHotEncoder(
    inputCols=["ocean_index"],
    outputCols=["ocean_vec"]
)

# (3) feature 컬럼 합치기
assembler = VectorAssembler(
    inputCols=[
        "longitude",
        "latitude",
        "housing_median_age",
        "total_rooms",
        "total_bedrooms",
        "population",
        "households",
        "median_income",
        "ocean_vec"
    ],
    outputCol="features"
)

# ================================
# 7. Pipeline 생성
# ================================
pipeline = Pipeline(stages=[
    indexer,
    encoder,
    assembler
])

# ================================
# 8. Pipeline 적용 (fit + transform)
# ================================
pipeline_model = pipeline.fit(df)

train_prepared = pipeline_model.transform(train_data)
test_prepared = pipeline_model.transform(test_data)

# ================================
# 9. Linear Regression 모델
# ================================
lr = LinearRegression(
    featuresCol="features",
    labelCol="label",
    regParam=0.1   # 과적합 방지용 (추천)
)

model = lr.fit(train_prepared)

# ================================
# 10. 평가
# ================================
test_results = model.evaluate(test_prepared)

print("========== MODEL PERFORMANCE ==========")
print(f"RMSE: {test_results.rootMeanSquaredError}")
print(f"R2  : {test_results.r2}")
print(f"MSE : {test_results.meanSquaredError}")

# RMSE: 68641.31738871416 > 예측이 평균적으로 얼마나 틀렸는지 ( 약6.8만 달러 오차 )
# R2  : 0.6432667356470648 > 모델이 데이터 별화를 얼마나 설명하냐
# MSE : 4711630452.858192 > 오차 제곱 평균

# ================================
# 11. 예측 결과 확인
# ================================
test_results.predictions.show(10)
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-----------+-------------+--------------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|ocean_index|    ocean_vec|            features|        prediction|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-----------+-------------+--------------------+------------------+
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|103600.0|     NEAR OCEAN|        2.0|(4,[2],[1.0])|[-124.3,41.84,17....| 151276.3164262334|
|  -124.23|   40.54|              52.0|     2694.0|         453.0|    1152.0|     435.0|       3.0806|106700.0|     NEAR OCEAN|        2.0|(4,[2],[1.0])|[-124.23,40.54,52...|216541.68675884278|
|  -124.23|   41.75|              11.0|     3159.0|         616.0|    1343.0|     479.0|       2.4805| 73200.0|     NEAR OCEAN|        2.0|(4,[2],[1.0])|[-124.23,41.75,11...|126837.21866904991|
|  -124.19|   40.73|              21.0|     5694.0|        1056.0|    2907.0|     972.0|       3.5363| 90100.0|     NEAR OCEAN|        2.0|(4,[2],[1.0])|[-124.19,40.73,21...|197963.21587998327|
|  -124.18|   40.78|              34.0|     1592.0|         364.0|     950.0|     317.0|       2.1607| 67000.0|     NEAR OCEAN|        2.0|(4,[2],[1.0])|[-124.18,40.78,34...|153296.32388650253|
|  -124.17|   40.62|              32.0|     1595.0|         309.0|     706.0|     277.0|       2.8958| 86400.0|     NEAR OCEAN|        2.0|(4,[2],[1.0])|[-124.17,40.62,32...|185358.86569401668|
|  -124.17|   40.79|              43.0|     2285.0|         479.0|    1169.0|     482.0|       1.9688| 70500.0|     NEAR OCEAN|        2.0|(4,[2],[1.0])|[-124.17,40.79,43...|161144.36215620255|
|  -124.16|    40.6|              39.0|     1322.0|         283.0|     642.0|     292.0|       2.4519| 85100.0|     NEAR OCEAN|        2.0|(4,[2],[1.0])|[-124.16,40.6,39....|177329.24304726906|
|  -124.16|    40.8|              52.0|     2416.0|         618.0|    1150.0|     571.0|       1.7308| 80500.0|     NEAR OCEAN|        2.0|(4,[2],[1.0])|[-124.16,40.8,52....|179350.95031422656|
|  -124.16|   40.95|              20.0|     1075.0|         214.0|     529.0|     196.0|       3.1406| 96000.0|     NEAR OCEAN|        2.0|(4,[2],[1.0])|[-124.16,40.95,20...| 169650.8043393609|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-----------+-------------+--------------------+------------------+

 

[ RandomForestRegressor ]

# ================================
# 1. Import
# ================================
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor # 결정트리 기반 예측 모델 ( 회귀 )
from pyspark.ml.evaluation import RegressionEvaluator # 모델 성능 평가 라이브러리

# ================================
# 2. 데이터 로드
# ================================
df = spark.read.csv(
    "/work/jsy/meta_code_spark/data_set/housing.csv",
    header = True,
    inferSchema = True
)
df = df.dropna()
df = df.withColumnRenamed("median_house_value", "label")

train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# ================================
# 3. 전처리
# ================================
str_idxer = StringIndexer(
    inputCol="ocean_proximity",
    outputCol="ocean_idx"
)

onehot_encoder = OneHotEncoder(
    inputCols=["ocean_idx"],
    outputCols=["ocean_vec"]
)

assembler = VectorAssembler(
    inputCols=[
        "longitude",
        "latitude",
        "housing_median_age",
        "total_rooms",
        "total_bedrooms",
        "population",
        "households",
        "median_income",
        "ocean_vec"
    ],
    outputCol="features"
)

# ================================
# 4. Pipeline
# ================================
pipeline = Pipeline(stages = [
    str_idxer, onehot_encoder, assembler
])

pipeline_model = pipeline.fit(df)
train_prepared = pipeline_model.transform(train_data)
test_prepared = pipeline_model.transform(test_data)

# ================================
# 5. Random Forest 모델
# ================================
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="label",
    numTrees=100,      # 트리 개수 (많을수록 안정적 but 느림)
    maxDepth=10,       # 과적합 조절 핵심
    seed=42
)

model = rf.fit(train_prepared)

# ================================
# 6. 예측
# ================================
predictions = model.transform(test_prepared)


# ================================
# 7. 평가
# ================================
evaluator_rmse = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="rmse"
)

evaluator_r2 = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="r2"
)

evaluator_mse = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="mse"
)

rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)
mse = evaluator_mse.evaluate(predictions)


print("========== RANDOM FOREST PERFORMANCE ==========")
print(f"RMSE: {rmse}")
print(f"R2  : {r2}")
print(f"MSE : {mse}")

# RMSE: 56244.43303176802
# R2  : 0.7604856853102027
# MSE : 3163436247.0650377

# ================================
# 8. 결과 확인
# ================================
predictions.select("label", "prediction", "features").show(10)
+--------+------------------+--------------------+
|   label|        prediction|            features|
+--------+------------------+--------------------+
|103600.0|104619.67923527556|[-124.3,41.84,17....|
|106700.0| 127413.9751344274|[-124.23,40.54,52...|
| 73200.0| 99382.75343565232|[-124.23,41.75,11...|
| 90100.0| 159014.8935549297|[-124.19,40.73,21...|
| 67000.0| 86113.07669656756|[-124.18,40.78,34...|
| 86400.0|108369.28943241299|[-124.17,40.62,32...|
| 70500.0| 87490.41413208531|[-124.17,40.79,43...|
| 85100.0| 91392.38305081984|[-124.16,40.6,39....|
| 80500.0| 93014.18066651141|[-124.16,40.8,52....|
| 96000.0|123844.81190978076|[-124.16,40.95,20...|
+--------+------------------+--------------------+

 

3. Spark GraphX 라이브러리

그래프란 뭐냐?
Graph는 데이터를 노드와 엣지로 표현하는 방식
노드(Vertex) = 대상 ( 사람, 서버, 디바이스 등 )
엣지(Edge) = 관계 ( 로그인, 접속, 요청 등 )

예시..)
사용자 → 서버 로그인
디바이스 → 서버 요청
사용자 → 디바이스 접근

즉, 누가? 누구랑? 연결됐는지를 표현하는 구조

3-1. 왜 그래프를 쓰냐?

user server action
u1 s1 login
  • 기존 데이터는 테이블 → 한 줄 관계만 보기 좋음
  • 하지만 현실 데이터는 복잡하게 얽힘 → 복잡한 구조는 테이블로 분석이 어렵다..

 

[ 그래프가 필요한 이유 ]

  1. 중요 노드 찾기 (PageRank)
    • 어떤 서버가 가장 핵심?
    • 어떤 계정이 영향력이 큰가?
  2. 이상 행동 탐지
    • 평소 안 가던 서버 접속
    • 권한 상승 흐름
  3. 최단 경로 분석
    • sy0218 → 서버까지 몇 단계 거리?
  4. 연결 구조 분석
    • 네트워크 구조 전체 이해

3-2. Spark에서 그래프 쓰는 이유

그래프 문제의 특징
데이터가 엄청 크고 + 연결이 많음 → 한 머신으로 처리 힘듬

 

[ 단일머신 vs Spark ]

1) 단일머신 그래프 처리 구조
		[ 모든 노드 + 모든 간선 ]
                  ↓
        ┌───────────────────┐
        │     Single CPU     │
        │     Single RAM     │
        └───────────────────┘
                  ↓
            PageRank 계산
→ 메모리 힘듬 ( 대형 그래프는 RAM에 다 못 올림 )
→ 연결성 때문에 분리 어려움 ( 한 노드 계산하려면 전체 그래프 영향 필요 )
→ 반복 계산 ( PageRank: 계속 점수 전달 → CPU 오래 점유 )


2) Spark 그래프 처리 구조
        [ 전체 그래프 ]
              ↓
    ┌──────── Partition ────────┐
    │        │        │        │
 [Worker1] [Worker2] [Worker3] [Worker4]
    │        │        │        │
    └─── 메시지 전달 (shuffle) ───┘
              ↓
        반복 계산 (PageRank)
              ↓
         결과 병합

1. 그래프를 쪼갬
2. 메시지 패싱 방식 ( 각 노드가 이웃에게 점수 전달 )
3. 반복 계산
   - 각 워커에서 병렬 수행
   - 중간 결과 네트워크로 교환
즉, Spark는 대규모 그래프 분석을 위해 사용
노드/엣지도 분산 저장 + 병렬로 그래프 계산