본문 바로가기
데이터 엔지니어( 이론 공부 )/spark

Apache Spark ( Spark 최적화 하기 )

by 세용융용융용 2026. 4. 28.

1. Spark 최적화란?

우선 스파크 최적화는 정답은 없다.. 상황에 맞게
→ 데이터를 덜 읽고 + 덜 움직이고 + 덜 계산하면 빨라진다.. ㅋㅋ

1-1. 스파크 최적화 3대 원칙

  • 데이터 덜 읽기: 필요없는 데이터 애초에 안 읽기
    • Partition pruning
    • Column pruning
    • Filter pushdown
  • 데이터 덜 이동하기: 네트워크 이동 줄이기
    • Shuffle 최소화
    • Broadcast Join 활용
  • 계산 덜 하기
    • Cache 활용
    • 중복 연산 제거

1-2. 스파크 최적화 흐름

1. 데이터 확인
  - 데이터 크기?
  - 파티션 컬럼?
  - 데이터 스큐 있는지?

2. 실행 계획 확인 ( explain )
  - df.explain(True) → explain 메서드 사용하여 확인
  - Partition pruning 됐는지 ( 파일 자체 스킵 )
  - Column Pruning ( 필요 컬럼만 )
  - Filter pushdown 됐는지 ( Chunk 단위 통계를 활용해 파일 내부 row 줄임 )
  - Join 방식 ( 브로드캐스트 vs 셔플 )

3. 리소스 확인 ( CPU / 메모리 )
   - 드라이버 죽었나?
   - 익스큐터 죽었나?
   - 에러 보고 옵션 조정

4. 코드 개선

1-3. Spark Submit 옵션

### yarn 클러스터 예시 ###
spark-submit \
--class <class.path> # 실행할 메인 클래스 지정 ( Java, Scala ) \
--name # Spark UI에서 보이는 작업 이름 \
--master yarn # 어디서 실행할지 ( local, yarn, k8s = 로컬, 하둡 클러스터, 쿠버네티스 ) \
--deploy-mode client # 드라이버 위치 결정 ( clent, cluster = 내 컴퓨터, 클러스터 내부 ) \
--driver-cores 4 # 드라이버가 사용할 CPU \
--driver-memory 8g # 드리이버가 사용할 메모리 ( collect()등 큰 결과 가져올시 ) \
--num-executors 100 # Executor 개수 ( 작업자 수 ) → 너무 많으면 익스큐터 관리 비용 증가 \
--executor-cores 4 # 익스큐터 당 코어 수 ( 동시 4개 task 처리 가능 ) \
--executor-memory 8g # 익스큐터 당 메모리 ( 부족하면 GC 지옥 or OOM or spill )
--conf spark.logConf=true # 설정값 로그로 출력 \
### yarn 클러스터 예시 ###
  • 총 병렬 처리 = num-executors * executor-cores

1-4. Catalyst Engine ( 스파크 최적화 엔진 )

카탈리스트 엔진 = 스파크 자동 최적화 엔진
[ 전체 흐름 ]
1. 코드 작성
2. Unresoled Logical Plan
   - Spark가 코드 확인 후 트리 만듬
   - 아직 테이블 위치, 컬럼 등 메타스토어 정보 모름
3. Catalog 조회 ( 메타스토어 확인 )
   - Hive 메타스토어 or Spark Catalog 조회
   - 테이블 + 컬럼 확인
4. Logical Plan ( 완성된 논리 계획 )
   - 실제 데이터 기준 논리 계획 완성
5. Optimizer ( 논리 계획 최적화 )
   - Filter Pushdown 
   - Column Pruning
   - Constant Folding ( 상수 미리 계산 → price > 100 * 2 → price > 200 )
   - Join Reordering ( 조인 순서 바꾸기 → 작은 테이블 먼저 조인 → 데이터 줄이고 진행 )
6. 물리 실행 계획 후보 생성
7. Cost Model ( 각 물리 실행 계획 비용 계산 )
   - 각 물리 실행 계획 비용 계산
   - 기준 → 데이터 크기 + 파티션 수 + 셔플 비용 + 메모리 사용량
8. 최적 계획 생성
   - explain() 호출시 이거 나옴
9. RDD 실행
   - 최종 RDD 기반 실행
   - task 단위 실행, 익스큐터에서 병렬 처리
  • 카탈리스트 엔진은 ⤵
  • 쿼리 이해 → 실행 방법 선택 → 실제 실행까지 자동 최적화 해주는 Spark 최적화 내부 엔진
  • ~.explain(true) 메서드를 사용해 실행 계획 확인 

 

2. Partition Pruning ( 중요 )

필요 없는 파일 / 파티션 자체를 안 읽음 ( 성능 최적화 가능 )

 

[ 정적 파티션 프루닝 ]

SELECT *
FROM table
WHERE pdate = '2026-04-26'
  • 실행 전 이미 알수 있음 → 해당 파티션만 읽음

 

[ 동적 파티션 프루닝 ]

SELECT *
FROM A
JOIN B
ON A.pdate = B.pdate
WHERE B.pdate = '2026-04-25'
  • 주로 Join시 A는 실행 전에 모름.. 정적 파티션 프루닝 불가능 → 동적 파티션 프루닝 사용
  • DPP 동작 방식
    • B 먼저 실행
    • pdate 값 추출
    • A 읽을 때 필터 적용
즉, 동적 파티션 프루닝은 런타임에 결정됨
핵심은 튜닝시 파티션 컬럼을 WHERE에 직접 쓰면 Best → 아니면 대부분 DPP로 떨어짐..

2-1. Cache / Partition Statistics

Cache 쓰는 이유?
df.cache() + df.count()
→ 메모리 올림 + 통계 정보 생성

 

[ Parquet/ORC vs Cache 이후 ]

구분 Parquet/ORC Cache 이후
위치 파일 내부 Spark 메모리
단위 row group / stripe 전체 DataFrame
정확도 대충 (범위 기반) 정확 (실데이터 기반)
목적 파일 or row 스킵 실행 계획 최적화
사용 시점 읽기 전에 읽은 후

 

[ 왜 빨라짐? ]

  • Spark가 통계 정보를 더 자세히 알게됨 → filter 최적화 가능
    • join 순서 바꿈
    • 브로드캐스트 여부 결정
    • filter 위치 조정
  • AQE 강화
    • 실제 row 수 기반으로 실행 중 플랜 변경
    • 이건 Cache 안 하면 정확도 떨어짐..

 

3. Spark Tungsten 프로젝트

스파크의 성능 병목( 객체 + GC + 포인터 추적 )을 없애려고 메모리를 바이너리로 직접 다루기 위해 만든 최적화 엔진

3-1. 기존 Spark ( 느린 이유 )

  • Row 객체 → ( id 객체, name 객체, price 객체 ) + 포인터 구조 → 각각 JVM 힙에 생성
즉, Row = 객체 + 객체 + 객체 + 포인터 구조

 

[ 문제점 ]

  • 객체가 너무 많음
    • 객체 오버헤드 ( 실제 데이터 + 메타데이터 + 포인터 / 참조 구조 )
    • Row 하나 = 객체 여러 개 생성
  • JVM GC 부담
    • 스파크는 연산할수록 객체 생성 → 안쓰는 객체 제거 해줘야됨 → GC 실행
    • GC : 살아있는 객체 찾기 → 정리 → 메모리 재배치 ( 작업 멈춤 )
    • GC가 계속 돌면서 느려짐..
  • CPU 캐시 비효율
    • 객체는 메모리에 흩어져 있음 ( 포인터 추적 방식 )
    • CPU가 연속 데이터로 못 읽음 → cache miss

3-2. 텅스텐 프로젝트 핵심 변화

객체를 없애고, 데이터를 바이너리 배열로 직접 다룸
[ 기존 Spark ]
Row 객체 → JVM Heap → GC 발생

[ Tungsten ]
Row → 바이너리 배열 ( byte/long 기반 )

 

[ 데이터 저장 방식 변화 ]

ex..) 예시 데이터
id = 1
name = "kim"
price = 100

1. 기존 방식
Row 객체
 ├ id (Integer 객체)
 ├ name (String 객체)
 └ price (Double 객체)
 
 2. Tungsten 방식
 [ 1 | 100 | offset(“kim”) ]
 - 숫자 → 바이너리 저장
 - 문자열 → 별도 메모리 + offset으로 참조
 즉, 객체 없음 + 포인터 없음 + 그냥 배열

 

[ Tungsten 빠른 이유 ]

  • GC 거의 없음
    • 객체 자체를 안 만듦
    • JVM이 정리할 게 줄어듦
  • CPU 캐시 효율 ↑
    • 연속된 메모리 구조 ( CPU가 한 번에 쭉 읽음 → Cache hit )
    • 포인터 추적 없음
  • 셔플 빨리짐 ( 변환 과정 제거 )
    • 기존 = 객체 → 직렬화 → 네트워크 → 역직렬화
    • Tungsten = 바이너리 그대로 전송
  • 연산 속도 증가
    • df.filter().join().groupBy → 객체 생성 없이 + 바이너리 상태에서 바로 처리
즉, 텅스텐은 스파크에서 객체 기반 처리를 바이너리 기반 처리로 바꿔 [ GC 감소 + CPU 캐시 효율 증가 + 직렬화 비용 제거하여 ] 스파크 전체 성능을 향상시킨 엔진

3-3. Tungsten 쓰게 만드는 사고 방식 ( 핵심 )

핵심 기준: 이 연산을 Spark가 이해할 수 있냐?

 

[ 텅스텐 최적화 전략 ]

  • DataFrame / SQL 유지
    • df.filter().groupBy().agg() → ( 카탈리스트 옵티마이저 → 텅스텐 )
  • built-in 함수만 사용 ( built-in: 스파크가 의미를 이해할 수 있는 함수)
    • from pyspark.sql.functions import col, when, sum 
    • 스파크가 의미 이해 가능 → ( 코드 생성 + 텅스텐 적용 )
  • UDF 최소화 ( 핵심 )
    • Python UDF는 더 심각.. ( JVM ↔ Python 왔다갔다 직렬화 / 역직렬화 발생 ) → 성능 저하

 

[ 텅스텐 실행계획 확인 ]

### 실행계획 확인 습관 중요 ###
df.explain(True)

#### 텅스텐 실행계획 실습 ####
spark.conf.set("spark.sql.adaptive.enabled", "false") → AQE 끄기.. ( explain 100% 노출 가능 )
AQE: 실행 중에 계획을 바꾸는 기능

from pyspark.sql.functions import col

items_df = spark.read.parquet("/work/jsy/meta_code_spark/maple_items.parquet")
df = items_df.filter(col("price") > 50000).groupBy().avg("price")
df.explain(True)
== Parsed Logical Plan ==
Aggregate [avg(price#2L) AS avg(price)#53]
+- Filter (price#2L > cast(50000 as bigint))
   +- Relation [item_id#0L,seller_id#1,price#2L] parquet

== Analyzed Logical Plan ==
avg(price): double
Aggregate [avg(price#2L) AS avg(price)#53]
+- Filter (price#2L > cast(50000 as bigint))
   +- Relation [item_id#0L,seller_id#1,price#2L] parquet

== Optimized Logical Plan ==
Aggregate [avg(price#2L) AS avg(price)#53]
+- Project [price#2L]
   +- Filter (isnotnull(price#2L) AND (price#2L > 50000))
      +- Relation [item_id#0L,seller_id#1,price#2L] parquet

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[avg(price#2L)], output=[avg(price)#53])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=131]
   +- *(1) HashAggregate(keys=[], functions=[partial_avg(price#2L)], output=[sum
      +- *(1) Filter (isnotnull(price#2L) AND (price#2L > 50000))
         +- *(1) ColumnarToRow
            +- FileScan parquet [price#2L] Batched: true, DataFilters: [isnotnul 50000)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/work/jsy/mms.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(price), GreaterThanma: struct<price:bigint>
            
            
[ 중요 ]
물리 계획 *(2), *(1) → 텅스텐 실행 단위
- 숫자가 같으면 같은 코드 블록으로 실행
- 최종 Group By시 스테이지 끊김 → 별도 코드블록


#### 텅스텐 미사용 실행계획 실습 ####
from pyspark.sql.functions import udf, col
from pyspark.sql.types import LongType

def plus_one(x):
    return x + 1
    
udf_func = udf(plus_one, LongType())
df = items_df.withColumn("new_price", udf_func(col("price")))
df.explain(True)
== Parsed Logical Plan ==
'Project [item_id#0L, seller_id#1, price#2L, plus_one('price)#61 AS new_price#62]
+- Relation [item_id#0L,seller_id#1,price#2L] parquet

== Analyzed Logical Plan ==
item_id: bigint, seller_id: string, price: bigint, new_price: bigint
Project [item_id#0L, seller_id#1, price#2L, plus_one(price#2L)#61L AS new_price#62L]
+- Relation [item_id#0L,seller_id#1,price#2L] parquet

== Optimized Logical Plan ==
Project [item_id#0L, seller_id#1, price#2L, pythonUDF0#67L AS new_price#62L]
+- BatchEvalPython [plus_one(price#2L)#61L], [pythonUDF0#67L]
   +- Relation [item_id#0L,seller_id#1,price#2L] parquet

== Physical Plan ==
*(2) Project [item_id#0L, seller_id#1, price#2L, pythonUDF0#67L AS new_price#62L]
+- BatchEvalPython [plus_one(price#2L)#61L], [pythonUDF0#67L]
   +- *(1) ColumnarToRow
      +- FileScan parquet [item_id#0L,seller_id#1,price#2L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/work/jsy/meta_code_spark/maple_items.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<item_id:bigint,seller_id:string,price:bigint>

[ 핵심 문제: BatchEvalPython ]
- 텅스텐 깨짐 ( byte buffer 깨짐 + 직렬화 비용 + Python 객체 생성 → 비용 발생 )

[ 내부 동작 ]
JVM (Spark, Tungsten)
   ↓
데이터 serialize
   ↓
Python 프로세스로 전달
   ↓
Python 함수 실행 (plus_one)
   ↓
다시 JVM으로 반환

[ 해결 방법 ]
df = items_df.withColumn("new_price", col("price") + 1)
*(1) Project [price + 1] → 100% 텅스텐 유지 가능
[ FileScan Parquet ] → 데이터 읽기
FileScan parquet [price#2L]
Batched: true
PushedFilters: [IsNotNull(price), GreaterThan(price,50000)]

[ 의미 ]
Parquet 파일에서 price 컬럼만 읽음
Batched: true → 여러 행을 한 번에( batch ) 읽음
PushedFilters → 50000 이하 데이터는 아예 파일에서 안 읽음
[ Columnar ToRow ]  → 컬럼 기반을 Row 형태로 변환 ( 행 재조합 )
재조합 비용있음 ( 버퍼 필요 )
[ Filter ] → 조건 걸기
Filter (price > 50000) → 조건에 맞는 데이터만 남김

[ 중요! ]
이미 파일 읽을때 PushedFilters로 1차 필터
여기서 2차 필터 
[ HashAggregate (partial_avg) ] → 부분 집계
*(1) HashAggregate (partial_avg)

[ 의미 ]
각 파티션에서 부분 평균 계산
[ Exchange ] → 셔플 발생
Exchange SinglePartition

[ 의미 ]
데이터 모아서 한 곳으로 이동 ( 싱글 파티션 )
평균 = 전체 데이터 기준으로 계산해야 함 → 파티션 하나로 이동
[ HashAggregate (final) ] → 최종 집계
*(2) HashAggregate (avg)

[ 의미 ]
partial 결과 합쳐 최종 평균 계산

3-4. Tungsten vs No Tungsten 성능 비교 실습

[ Tungsten ]

#### tungsten ####
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("tungsten_fast_test") \
    .getOrCreate()

# 데이터 로드
df = spark.read.parquet("/work/jsy/meta_code_spark/maple_items.parquet")

# 데이터 키우기 (성능 차이 확인용)
big_df = df
for _ in range(5):  # 데이터 32배 증가
    big_df = big_df.union(df)

# Tungsten 방식 (built-in)
result_df = big_df.withColumn("new_price", col("price") + 1)

# 캐시 제거
spark.catalog.clearCache()

# 실행 시간 측정
start = time.time()
result_df.show()
end = time.time()

print(f"[TUNGSTEN] Execution Time: {end - start:.4f} sec")

spark.stop()

 

[ No Tungsten ]

#### No tungsten ####
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import LongType

spark = SparkSession.builder \
    .appName("tungsten_slow_test") \
    .getOrCreate()

# 데이터 로드
df = spark.read.parquet("/work/jsy/meta_code_spark/maple_items.parquet")

# 데이터 키우기 (동일 조건 유지)
big_df = df
for _ in range(5):  # 데이터 32배 증가
    big_df = big_df.union(df)

# Python UDF 정의
def plus_one(x):
    return x + 1

udf_func = udf(plus_one, LongType())

# UDF 방식 (Tungsten 깨짐)
result_df = big_df.withColumn("new_price", udf_func(col("price")))

# 캐시 제거
spark.catalog.clearCache()

# 실행 시간 측정
start = time.time()
result_df.show()
end = time.time()

print(f"[UDF] Execution Time: {end - start:.4f} sec")

spark.stop()

 

[ 성능 비교 ]

  • 히스토리 서버 사용 ( http://192.168.56.60:18080 )
  • 실제 작업 수행 시간
    • 텡스텐 : 0.1s
    • 텡스텐 X : 0.5s
    • 약 5배 차이.. ( 오.. )

텅스텐

  • GC Time ( MAX 기준 ) : 0
  • 직렬화 시간 ( MAX 기준 ) : 0.042초
  • 객체 생성 X → CPU가 바로 메모리 읽고 계산

텅스텐 X

  • GC Time ( MAX 기준 ) : 0.06초
  • 직렬화 시간 ( MAX 기준 ) : 0.76초
  • 객체 생성 → BatchEvalPython 프로세스 UDF 실행 → Python 객체 생성 → 다시 JVM
    • 객체 생성으로 GC 수행 + 직렬화 비용 증가

 

 

1. 스파크 최적화 본질: 덜 읽고 + 덜 움직이고 + 덜 계산하면 빨라진다
2. 스파크 최적화 3원칙
   - 파티션/컬럼 프루닝(읽기 ↓)
   - 셔플 최소화(이동 ↓)
   - Cache(계산 ↓)
3. 튜닝 순서: 데이터 확인 → explain으로 실행계획 분석 → 리소스 확인 → 코드 개선
4. Catalyst 엔진: 쿼리 분석 → 최적화 → 비용 기반 최적 실행 계획 선택
5. 파티션 프루닝: 필요한 파티션만 읽어 I/O 대폭 감소( 정적 파티션 프루닝이 가장 좋음 )
6. 텅스텐 프로젝트 핵심: 객체 제거 + 바이너리 배열 처리
    - GC ↓ + CPU 캐시 효율 ↑ + 셔플 시 직렬화 제거 → 성능 향상
7. 성능 핵심 습관: 데이터 프레임 + 빌트인 함수 유지
   - Python UDF 쓰면 성능 저하..