1. Spark 최적화란?
우선 스파크 최적화는 정답은 없다.. 상황에 맞게
→ 데이터를 덜 읽고 + 덜 움직이고 + 덜 계산하면 빨라진다.. ㅋㅋ
1-1. 스파크 최적화 3대 원칙
- 데이터 덜 읽기: 필요없는 데이터 애초에 안 읽기
- Partition pruning
- Column pruning
- Filter pushdown
- 데이터 덜 이동하기: 네트워크 이동 줄이기
- Shuffle 최소화
- Broadcast Join 활용
- 계산 덜 하기
- Cache 활용
- 중복 연산 제거
1-2. 스파크 최적화 흐름
1. 데이터 확인
- 데이터 크기?
- 파티션 컬럼?
- 데이터 스큐 있는지?
2. 실행 계획 확인 ( explain )
- df.explain(True) → explain 메서드 사용하여 확인
- Partition pruning 됐는지 ( 파일 자체 스킵 )
- Column Pruning ( 필요 컬럼만 )
- Filter pushdown 됐는지 ( Chunk 단위 통계를 활용해 파일 내부 row 줄임 )
- Join 방식 ( 브로드캐스트 vs 셔플 )
3. 리소스 확인 ( CPU / 메모리 )
- 드라이버 죽었나?
- 익스큐터 죽었나?
- 에러 보고 옵션 조정
4. 코드 개선
1-3. Spark Submit 옵션
### yarn 클러스터 예시 ###
spark-submit \
--class <class.path> # 실행할 메인 클래스 지정 ( Java, Scala ) \
--name # Spark UI에서 보이는 작업 이름 \
--master yarn # 어디서 실행할지 ( local, yarn, k8s = 로컬, 하둡 클러스터, 쿠버네티스 ) \
--deploy-mode client # 드라이버 위치 결정 ( clent, cluster = 내 컴퓨터, 클러스터 내부 ) \
--driver-cores 4 # 드라이버가 사용할 CPU \
--driver-memory 8g # 드리이버가 사용할 메모리 ( collect()등 큰 결과 가져올시 ) \
--num-executors 100 # Executor 개수 ( 작업자 수 ) → 너무 많으면 익스큐터 관리 비용 증가 \
--executor-cores 4 # 익스큐터 당 코어 수 ( 동시 4개 task 처리 가능 ) \
--executor-memory 8g # 익스큐터 당 메모리 ( 부족하면 GC 지옥 or OOM or spill )
--conf spark.logConf=true # 설정값 로그로 출력 \
### yarn 클러스터 예시 ###
- 총 병렬 처리 = num-executors * executor-cores
1-4. Catalyst Engine ( 스파크 최적화 엔진 )
카탈리스트 엔진 = 스파크 자동 최적화 엔진
[ 전체 흐름 ]
1. 코드 작성
2. Unresoled Logical Plan
- Spark가 코드 확인 후 트리 만듬
- 아직 테이블 위치, 컬럼 등 메타스토어 정보 모름
3. Catalog 조회 ( 메타스토어 확인 )
- Hive 메타스토어 or Spark Catalog 조회
- 테이블 + 컬럼 확인
4. Logical Plan ( 완성된 논리 계획 )
- 실제 데이터 기준 논리 계획 완성
5. Optimizer ( 논리 계획 최적화 )
- Filter Pushdown
- Column Pruning
- Constant Folding ( 상수 미리 계산 → price > 100 * 2 → price > 200 )
- Join Reordering ( 조인 순서 바꾸기 → 작은 테이블 먼저 조인 → 데이터 줄이고 진행 )
6. 물리 실행 계획 후보 생성
7. Cost Model ( 각 물리 실행 계획 비용 계산 )
- 각 물리 실행 계획 비용 계산
- 기준 → 데이터 크기 + 파티션 수 + 셔플 비용 + 메모리 사용량
8. 최적 계획 생성
- explain() 호출시 이거 나옴
9. RDD 실행
- 최종 RDD 기반 실행
- task 단위 실행, 익스큐터에서 병렬 처리
- 카탈리스트 엔진은 ⤵
- 쿼리 이해 → 실행 방법 선택 → 실제 실행까지 자동 최적화 해주는 Spark 최적화 내부 엔진
- ~.explain(true) 메서드를 사용해 실행 계획 확인
2. Partition Pruning ( 중요 )
필요 없는 파일 / 파티션 자체를 안 읽음 ( 성능 최적화 가능 )
[ 정적 파티션 프루닝 ]
SELECT *
FROM table
WHERE pdate = '2026-04-26'
- 실행 전 이미 알수 있음 → 해당 파티션만 읽음
[ 동적 파티션 프루닝 ]
SELECT *
FROM A
JOIN B
ON A.pdate = B.pdate
WHERE B.pdate = '2026-04-25'
- 주로 Join시 A는 실행 전에 모름.. 정적 파티션 프루닝 불가능 → 동적 파티션 프루닝 사용
- DPP 동작 방식
- B 먼저 실행
- pdate 값 추출
- A 읽을 때 필터 적용
즉, 동적 파티션 프루닝은 런타임에 결정됨
핵심은 튜닝시 파티션 컬럼을 WHERE에 직접 쓰면 Best → 아니면 대부분 DPP로 떨어짐..
2-1. Cache / Partition Statistics
Cache 쓰는 이유?
df.cache() + df.count()
→ 메모리 올림 + 통계 정보 생성
[ Parquet/ORC vs Cache 이후 ]
| 구분 | Parquet/ORC | Cache 이후 |
| 위치 | 파일 내부 | Spark 메모리 |
| 단위 | row group / stripe | 전체 DataFrame |
| 정확도 | 대충 (범위 기반) | 정확 (실데이터 기반) |
| 목적 | 파일 or row 스킵 | 실행 계획 최적화 |
| 사용 시점 | 읽기 전에 | 읽은 후 |
[ 왜 빨라짐? ]
- Spark가 통계 정보를 더 자세히 알게됨 → filter 최적화 가능
- join 순서 바꿈
- 브로드캐스트 여부 결정
- filter 위치 조정
- AQE 강화
- 실제 row 수 기반으로 실행 중 플랜 변경
- 이건 Cache 안 하면 정확도 떨어짐..
3. Spark Tungsten 프로젝트
스파크의 성능 병목( 객체 + GC + 포인터 추적 )을 없애려고 메모리를 바이너리로 직접 다루기 위해 만든 최적화 엔진
3-1. 기존 Spark ( 느린 이유 )
- Row 객체 → ( id 객체, name 객체, price 객체 ) + 포인터 구조 → 각각 JVM 힙에 생성
즉, Row = 객체 + 객체 + 객체 + 포인터 구조
[ 문제점 ]
- 객체가 너무 많음
- 객체 오버헤드 ( 실제 데이터 + 메타데이터 + 포인터 / 참조 구조 )
- Row 하나 = 객체 여러 개 생성
- JVM GC 부담
- 스파크는 연산할수록 객체 생성 → 안쓰는 객체 제거 해줘야됨 → GC 실행
- GC : 살아있는 객체 찾기 → 정리 → 메모리 재배치 ( 작업 멈춤 )
- GC가 계속 돌면서 느려짐..
- CPU 캐시 비효율
- 객체는 메모리에 흩어져 있음 ( 포인터 추적 방식 )
- CPU가 연속 데이터로 못 읽음 → cache miss
3-2. 텅스텐 프로젝트 핵심 변화
객체를 없애고, 데이터를 바이너리 배열로 직접 다룸
[ 기존 Spark ]
Row 객체 → JVM Heap → GC 발생
[ Tungsten ]
Row → 바이너리 배열 ( byte/long 기반 )
[ 데이터 저장 방식 변화 ]
ex..) 예시 데이터
id = 1
name = "kim"
price = 100
1. 기존 방식
Row 객체
├ id (Integer 객체)
├ name (String 객체)
└ price (Double 객체)
2. Tungsten 방식
[ 1 | 100 | offset(“kim”) ]
- 숫자 → 바이너리 저장
- 문자열 → 별도 메모리 + offset으로 참조
즉, 객체 없음 + 포인터 없음 + 그냥 배열
[ Tungsten 빠른 이유 ]
- GC 거의 없음
- 객체 자체를 안 만듦
- JVM이 정리할 게 줄어듦
- CPU 캐시 효율 ↑
- 연속된 메모리 구조 ( CPU가 한 번에 쭉 읽음 → Cache hit )
- 포인터 추적 없음
- 셔플 빨리짐 ( 변환 과정 제거 )
- 기존 = 객체 → 직렬화 → 네트워크 → 역직렬화
- Tungsten = 바이너리 그대로 전송
- 연산 속도 증가
- df.filter().join().groupBy → 객체 생성 없이 + 바이너리 상태에서 바로 처리
즉, 텅스텐은 스파크에서 객체 기반 처리를 바이너리 기반 처리로 바꿔 [ GC 감소 + CPU 캐시 효율 증가 + 직렬화 비용 제거하여 ] 스파크 전체 성능을 향상시킨 엔진
3-3. Tungsten 쓰게 만드는 사고 방식 ( 핵심 )
핵심 기준: 이 연산을 Spark가 이해할 수 있냐?
[ 텅스텐 최적화 전략 ]
- DataFrame / SQL 유지
- df.filter().groupBy().agg() → ( 카탈리스트 옵티마이저 → 텅스텐 )
- built-in 함수만 사용 ( built-in: 스파크가 의미를 이해할 수 있는 함수)
- from pyspark.sql.functions import col, when, sum
- 스파크가 의미 이해 가능 → ( 코드 생성 + 텅스텐 적용 )
- UDF 최소화 ( 핵심 )
- Python UDF는 더 심각.. ( JVM ↔ Python 왔다갔다 직렬화 / 역직렬화 발생 ) → 성능 저하
[ 텅스텐 실행계획 확인 ]
### 실행계획 확인 습관 중요 ###
df.explain(True)
#### 텅스텐 실행계획 실습 ####
spark.conf.set("spark.sql.adaptive.enabled", "false") → AQE 끄기.. ( explain 100% 노출 가능 )
AQE: 실행 중에 계획을 바꾸는 기능
from pyspark.sql.functions import col
items_df = spark.read.parquet("/work/jsy/meta_code_spark/maple_items.parquet")
df = items_df.filter(col("price") > 50000).groupBy().avg("price")
df.explain(True)
== Parsed Logical Plan ==
Aggregate [avg(price#2L) AS avg(price)#53]
+- Filter (price#2L > cast(50000 as bigint))
+- Relation [item_id#0L,seller_id#1,price#2L] parquet
== Analyzed Logical Plan ==
avg(price): double
Aggregate [avg(price#2L) AS avg(price)#53]
+- Filter (price#2L > cast(50000 as bigint))
+- Relation [item_id#0L,seller_id#1,price#2L] parquet
== Optimized Logical Plan ==
Aggregate [avg(price#2L) AS avg(price)#53]
+- Project [price#2L]
+- Filter (isnotnull(price#2L) AND (price#2L > 50000))
+- Relation [item_id#0L,seller_id#1,price#2L] parquet
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[avg(price#2L)], output=[avg(price)#53])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=131]
+- *(1) HashAggregate(keys=[], functions=[partial_avg(price#2L)], output=[sum
+- *(1) Filter (isnotnull(price#2L) AND (price#2L > 50000))
+- *(1) ColumnarToRow
+- FileScan parquet [price#2L] Batched: true, DataFilters: [isnotnul 50000)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/work/jsy/mms.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(price), GreaterThanma: struct<price:bigint>
[ 중요 ]
물리 계획 *(2), *(1) → 텅스텐 실행 단위
- 숫자가 같으면 같은 코드 블록으로 실행
- 최종 Group By시 스테이지 끊김 → 별도 코드블록
#### 텅스텐 미사용 실행계획 실습 ####
from pyspark.sql.functions import udf, col
from pyspark.sql.types import LongType
def plus_one(x):
return x + 1
udf_func = udf(plus_one, LongType())
df = items_df.withColumn("new_price", udf_func(col("price")))
df.explain(True)
== Parsed Logical Plan ==
'Project [item_id#0L, seller_id#1, price#2L, plus_one('price)#61 AS new_price#62]
+- Relation [item_id#0L,seller_id#1,price#2L] parquet
== Analyzed Logical Plan ==
item_id: bigint, seller_id: string, price: bigint, new_price: bigint
Project [item_id#0L, seller_id#1, price#2L, plus_one(price#2L)#61L AS new_price#62L]
+- Relation [item_id#0L,seller_id#1,price#2L] parquet
== Optimized Logical Plan ==
Project [item_id#0L, seller_id#1, price#2L, pythonUDF0#67L AS new_price#62L]
+- BatchEvalPython [plus_one(price#2L)#61L], [pythonUDF0#67L]
+- Relation [item_id#0L,seller_id#1,price#2L] parquet
== Physical Plan ==
*(2) Project [item_id#0L, seller_id#1, price#2L, pythonUDF0#67L AS new_price#62L]
+- BatchEvalPython [plus_one(price#2L)#61L], [pythonUDF0#67L]
+- *(1) ColumnarToRow
+- FileScan parquet [item_id#0L,seller_id#1,price#2L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/work/jsy/meta_code_spark/maple_items.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<item_id:bigint,seller_id:string,price:bigint>
[ 핵심 문제: BatchEvalPython ]
- 텅스텐 깨짐 ( byte buffer 깨짐 + 직렬화 비용 + Python 객체 생성 → 비용 발생 )
[ 내부 동작 ]
JVM (Spark, Tungsten)
↓
데이터 serialize
↓
Python 프로세스로 전달
↓
Python 함수 실행 (plus_one)
↓
다시 JVM으로 반환
[ 해결 방법 ]
df = items_df.withColumn("new_price", col("price") + 1)
*(1) Project [price + 1] → 100% 텅스텐 유지 가능
[ FileScan Parquet ] → 데이터 읽기
FileScan parquet [price#2L]
Batched: true
PushedFilters: [IsNotNull(price), GreaterThan(price,50000)]
[ 의미 ]
Parquet 파일에서 price 컬럼만 읽음
Batched: true → 여러 행을 한 번에( batch ) 읽음
PushedFilters → 50000 이하 데이터는 아예 파일에서 안 읽음
[ Columnar ToRow ] → 컬럼 기반을 Row 형태로 변환 ( 행 재조합 )
재조합 비용있음 ( 버퍼 필요 )
[ Filter ] → 조건 걸기
Filter (price > 50000) → 조건에 맞는 데이터만 남김
[ 중요! ]
이미 파일 읽을때 PushedFilters로 1차 필터
여기서 2차 필터
[ HashAggregate (partial_avg) ] → 부분 집계
*(1) HashAggregate (partial_avg)
[ 의미 ]
각 파티션에서 부분 평균 계산
[ Exchange ] → 셔플 발생
Exchange SinglePartition
[ 의미 ]
데이터 모아서 한 곳으로 이동 ( 싱글 파티션 )
평균 = 전체 데이터 기준으로 계산해야 함 → 파티션 하나로 이동
[ HashAggregate (final) ] → 최종 집계
*(2) HashAggregate (avg)
[ 의미 ]
partial 결과 합쳐 최종 평균 계산
3-4. Tungsten vs No Tungsten 성능 비교 실습
[ Tungsten ]
#### tungsten ####
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("tungsten_fast_test") \
.getOrCreate()
# 데이터 로드
df = spark.read.parquet("/work/jsy/meta_code_spark/maple_items.parquet")
# 데이터 키우기 (성능 차이 확인용)
big_df = df
for _ in range(5): # 데이터 32배 증가
big_df = big_df.union(df)
# Tungsten 방식 (built-in)
result_df = big_df.withColumn("new_price", col("price") + 1)
# 캐시 제거
spark.catalog.clearCache()
# 실행 시간 측정
start = time.time()
result_df.show()
end = time.time()
print(f"[TUNGSTEN] Execution Time: {end - start:.4f} sec")
spark.stop()
[ No Tungsten ]
#### No tungsten ####
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import LongType
spark = SparkSession.builder \
.appName("tungsten_slow_test") \
.getOrCreate()
# 데이터 로드
df = spark.read.parquet("/work/jsy/meta_code_spark/maple_items.parquet")
# 데이터 키우기 (동일 조건 유지)
big_df = df
for _ in range(5): # 데이터 32배 증가
big_df = big_df.union(df)
# Python UDF 정의
def plus_one(x):
return x + 1
udf_func = udf(plus_one, LongType())
# UDF 방식 (Tungsten 깨짐)
result_df = big_df.withColumn("new_price", udf_func(col("price")))
# 캐시 제거
spark.catalog.clearCache()
# 실행 시간 측정
start = time.time()
result_df.show()
end = time.time()
print(f"[UDF] Execution Time: {end - start:.4f} sec")
spark.stop()
[ 성능 비교 ]
- 히스토리 서버 사용 ( http://192.168.56.60:18080 )
- 실제 작업 수행 시간
- 텡스텐 : 0.1s
- 텡스텐 X : 0.5s
- 약 5배 차이.. ( 오.. )

- GC Time ( MAX 기준 ) : 0
- 직렬화 시간 ( MAX 기준 ) : 0.042초
- 객체 생성 X → CPU가 바로 메모리 읽고 계산

- GC Time ( MAX 기준 ) : 0.06초
- 직렬화 시간 ( MAX 기준 ) : 0.76초
- 객체 생성 → BatchEvalPython 프로세스 UDF 실행 → Python 객체 생성 → 다시 JVM
- 객체 생성으로 GC 수행 + 직렬화 비용 증가

1. 스파크 최적화 본질: 덜 읽고 + 덜 움직이고 + 덜 계산하면 빨라진다
2. 스파크 최적화 3원칙
- 파티션/컬럼 프루닝(읽기 ↓)
- 셔플 최소화(이동 ↓)
- Cache(계산 ↓)
3. 튜닝 순서: 데이터 확인 → explain으로 실행계획 분석 → 리소스 확인 → 코드 개선
4. Catalyst 엔진: 쿼리 분석 → 최적화 → 비용 기반 최적 실행 계획 선택
5. 파티션 프루닝: 필요한 파티션만 읽어 I/O 대폭 감소( 정적 파티션 프루닝이 가장 좋음 )
6. 텅스텐 프로젝트 핵심: 객체 제거 + 바이너리 배열 처리
- GC ↓ + CPU 캐시 효율 ↑ + 셔플 시 직렬화 제거 → 성능 향상
7. 성능 핵심 습관: 데이터 프레임 + 빌트인 함수 유지
- Python UDF 쓰면 성능 저하..
'데이터 엔지니어( 이론 공부 ) > spark' 카테고리의 다른 글
| Apache Spark ( Spark MLlib ) (0) | 2026.04.30 |
|---|---|
| Apache Spark ( Spark 최적화 실습 ) (0) | 2026.04.29 |
| Apache Spark ( Spark submit & partition & shuffle ) (0) | 2026.04.25 |
| Apache Spark ( 데이터프레임 ) (0) | 2026.04.17 |
| Apache Spark ( 핵심 개념 ) (1) | 2026.04.14 |