1. Spark submit 이란?
스파크 프로그램을 클러스터에 실행시키는 명령어
→ spark-submit sy_app.py
[ Spark Submit 실행시 ]
1. 드라이버 프로그램 실행
2. 클러스터( Yarn )에 작업 요청
3. 여러 노드에서 병렬로 작업 수행
스파크 서브밋은 스파크 애플리케이션을 → 클러스터에 제출하는 명령어
2. 파티션 ( Partition )
Spark 파티션 → 데이터를 쪼개 병렬 처리하는 단위
즉, 파티션 1개 = 태스크 1개
2-1. 파티션이 왜 중요?
- 파티션이 너무 적으면?
- 동시에 돌릴 태스크 수가 적음
- CPU 놀음 → 클러스터 활용 못함
- 파티션이 너무 크면?
- 각 태스크가 처리할 데이터가 많음
- 메모리 터짐
- 속도 느림
- 파티션이 너무 많으면?
- task 관리 오버헤드 증가 ( task 스케줄링, 직렬화 )
- 데이터가 잘게 쪼개지며 → 실제 처리시간 보다 → 태스크 생성 / 종료 비용이 더 큼..
- 셔플 성능 저하 ( 셔플 발생하는 파티션 수가 많아짐.. )
- task 관리 오버헤드 증가 ( task 스케줄링, 직렬화 )
[ 그럼 스파크에서 파티션은 어떻게 튜닝? ]
- 처음에는 CPU 코어나, 데이터 크기를 기준으로 결정하며
- 이후 Spark UI 확인 후 → task 시간 & spill 발생 여부를 파악 후 상황에 맞게 튜닝
즉, 테스트를 하며 튜닝이 정답
2-2. 파티셔닝 방식
[ HashPartitioner ( 기본 ) ]
- partition = hash(key) % 파티션 개수
- 같은 키는 같은 파티션으로 감
[ 사용자 정의 파티셔너 ]
- 사용자가 직접 분배 로직 작성 가능
2-3. 파티셔닝 관련 옵션
spark.default.parallelism → 기본 파티션 수
spark.sql.shuffle.partitions → 셔플 시 파티션 수
spark.files.maxPartitionBytes → 파일 읽을 때 파티션 기준
이 외에도 다양한 파티션 관련 옵션 존재..
3. 셔플 ( Partition )
파티션 간 데이터 이동
[ 언제 발생? ]
→ 키 기준으로 묶을 때 ( groupby, join, reduceByKey )
3-1. Spark 에서 셔플의 문제 ( 분산 클러스터 )
- 셔플 = 분산 엔진 비용 발생
- 디스크 I/O 발생( 키 단위 파일 생성 ) + 네트워크 전송 발생
[ 셔플 내부 흐름 ]
1. 각 파티션 결과를 키 별로 디스크에 씀
2. 네트워크로 다른 노드로 보냄
3. 다시 읽어서 병합
즉, 스파크가 느려지는 원인 1순위
3-2. 셔플 관련 옵션
spark.shuffle.compress → 압축 여부
spark.shuffle.spill → 메모리 넘치면 디스크로 spill
spark.shuffle.manager → sort 방식 권장
4. 파티션 변경 ( 핵심 포인트 )
그럼 파티션 바꾸면 셔플이 발생함?
→ 경우에 따라 다름
[ repartition() ]
df.repartition(10)
- 무조건 셔플 발생 → 데이터 재분배 필요..
[ coalesce() ]
df.coalesce(2)
- 셔플 없이 파티션 줄일 수 있는 경우는 셔플 발생 X
- 하지만, 재분배 개념이 아니라 → 파티션 불균형 발생 ( skew )
4-1. 언제 파티션 변경하냐?
- 클러스터 자원 활용 ( 파티션 ↑ )
- 메모리 터지는 거 방지 ( 파티션 ↑ )
- 데이터 균형 맞추려고
- 특정 key를 같은 파티션에 넣으려고
[ repartition(col) 주의! ]
- 같은 값끼리 묶임 → 데이터 스큐 위험
- 특정 Key에 데이터 몰림
5. 파티션 종류 3개
1. Input Partition
→ 파일 읽을 때 생성
2. Shuffle Partition
→ 셔플 시 생성 ( spark.sql.shuffle.partitions )
3. Output Partition
→ 결과 저장 시
6. 파티션 단위 연산
mapPartitions: 파티션 단위 연산
rdd.mapPartitions(func)
→ 한 건씩이 아니라 "파티션 단위" 처리
[ map vs mapPartitions ]
1. map → 데이터 1개씩 함수 호출
ex..) rdd.map(func)
[1, 2, 3, 4] → 함수 4번 호출됨
2. mapPartitions → 파티션 단위로 한 번 호출
ex..) mapPartitions(func)
파티션1: [1, 2], 파티션2: [3, 4] → 함수 2번 호출됨
[ 왜 mapPartitions 씀? ]
- 함수 호출 비용 줄임
- DB 커넥션 같은거 재사용 가능 ( 비용 큰 작업 재사용 필수! )
7. 파티션 & 셔플 실습
7-1. 데이터 읽기 ( Parquet )
1. 데이터 읽기
items_df = spark.read.parquet("/work/jsy/meta_code_spark/maple_items.parquet")
sellers_df = spark.read.parquet("/work/jsy/meta_code_spark/maple_sellers.parquet")
---------------------------------------------------------------
2. 데이터 상태
## items_df ##
+-------+---------+-----+
|item_id|seller_id|price|
+-------+---------+-----+
| 1| user_1| 5888|
| 2| user_2|37808|
| 3| user_3|55158|
| 4| user_4|99189|
| 5| user_5|55305|
| 6| user_6|13533|
| 7| user_7|23688|
| 8| user_8|60822|
| 9| user_9|11351|
| 10| user_10|49051|
+-------+---------+-----+
## sellers_df ##
+---------+-----------+
|seller_id|seller_name|
+---------+-----------+
| user_1| seller_1|
| user_2| seller_2|
| user_3| seller_3|
| user_4| seller_4|
| user_5| seller_5|
| user_6| seller_6|
| user_7| seller_7|
| user_8| seller_8|
| user_9| seller_9|
| user_10| seller_10|
+---------+-----------+
---------------------------------------------------------------
7-2. 입력 파티션 확인 ( Input partition )
1. defaultMinPartitions: 최소 몇 개 파티션으로 나눌지에 대한 기본 값
sc.defaultMinPartitions → 2
------------------------------------------------------------------------
2. items, sellers 행 카운트 + 파티션 확인
getNumPartitions() → 해당 데이터가 몇 개의 파티션으로 나뉘어 있는지 알려주는 함수
print("items count:", items_df.count(), "items partitions:", items_df.rdd.getNumPartitions())
→ items count: 10 items partitions: 1
print("sellers count:", sellers_df.count(), "sellers partitions", sellers_df.rdd.getNumPartitions())
→ sellers count: 10 sellers partitions 1
------------------------------------------------------------------------
[ 왜? defaultMinPartitions는 2개인대 → getNumPartitions() → 입력 파티션은 1개? ]
- 파일 읽기는 defaultMinPartitions 사용 X
- 파일 읽기 파티션 결정 기준
- 파일 개수 + 파일 크기 + HDFS 블록 사이즈 + spark.sql.files.maxPartitionsBytes ( 파일 읽을 때 한 파티션이 가져갈 수 있는 최대 데이터 크기 )
즉, 데이터가 작으면 → 그냥 1개 파티션으로 읽음
7-3. reparttion
1. 파티션 변경
items_df2 = items_df.repartition(3)
→ 파티션 강제로 3개로 변경
→ 데이터 셔플 발생 (중요)
-----------------------------------------
2. 파티션 확인
items_df2.rdd.getNumPartitions() # 3
[ repartition 특징 ]
- 데이터 재분배 ( shuffle 발생 )
- 네트워크 비용 있음 + 대신 병렬성 증가
7-4. map vs mapPartitions 차이
1. map ( 행 단위 함수 실행 )
df.map(func)
→ 한 줄씩 처리
[ 특징 ]
row마다 함수 실행
느릴 수 있음 ( 연결 객체 반복 생성.. )
---------------------------------------------------
2. mapPartitions ( 파티션 단위 함수 실행 )
def f(pdf_iter):
for pdf in pdf_iter:
yield pdf
items_df2.mapInPandas(f, schema=items_df2.schema).collect()
---------------------------------------------------
[ 핵심 차이 ]
| 구분 | map | mapPartitions |
| 단위 | 행(row) | 파티션 |
| 속도 | 느릴 수 있음 | 빠름 |
| 연결 생성 | 많음 | 적음 |
| 추천 상황 | 단순 변환 | 무거운 작업 |
[ mapPartitions를 쓰는 이유 ]
1. 외부 연결 최소화 ( 가장 핵심 )
def f(iterator):
conn = connect_db() # 1번만 생성
for row in iterator:
yield query(conn, row)
→ 행 마다 DB 연결 객체 생성 X
→ 파티션당 1번 연결 O
→ 성능 차이 엄청 큼
------------------------------------------------
2. 무거운 객체 재사용
def f(iterator):
model = load_model() # 1번만 로딩
for row in iterator:
yield model.predict(row)
→ 모델 매번 로딩 X
→ 파티션당 1번 로딩 O
[ mapPartitionsWithIndex ]
파티션 번호 + 배열 같이 넘겨줌
from pyspark.sql import Row
items_df2 = items_df.repartition(3)
rdd = items_df2.rdd
def f(idx, iterator):
for row in iterator:
yield Row(
partition_id=idx,
original=row
)
new_rdd = rdd.mapPartitionsWithIndex(f)
result_df = new_rdd.toDF()
result_df.show(truncate=False)
+------------+--------------------+
|partition_id|original |
+------------+--------------------+
|0 |{1, user_1, 5888} |
|0 |{8, user_8, 60822} |
|0 |{7, user_7, 23688} |
|0 |{6, user_6, 13533} |
|1 |{9, user_9, 11351} |
|1 |{3, user_3, 55158} |
|1 |{10, user_10, 49051}|
|2 |{5, user_5, 55305} |
|2 |{4, user_4, 99189} |
|2 |{2, user_2, 37808} |
+------------+--------------------+
- 왜? mapPartitionsWithIndex를 쓰지?
- 데이터 스큐 확인 + 파티션별 디버깅
- 또는, 파티션 별 다른 로직 적용 시
7-5. coalesce
파티션 갯수 줄이는 함수
items_df2 = items_df2.coalesce(1) → 파티션 하나로 줄이기
print(items_df2.rdd.getNumPartitions()) → 1
7-6. explain
물리 실행 게획 확인
1. join_df = items_df.join(sellers_df, ['seller_id'], 'inner')
2. join_df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [seller_id#1, item_id#0L, price#2L, seller_name#21]
+- BroadcastHashJoin [seller_id#1], [seller_id#20], Inner, BuildRight, false
:- Filter isnotnull(seller_id#1)
: +- FileScan parquet [item_id#0L,seller_id#1,price#2L] Batched: true, DataFilters: [isnotnull(seller_id#1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/work/jsy/meta_code_spark/maple_items.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(seller_id)], ReadSchema: struct<item_id:bigint,seller_id:string,price:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=58]
+- Filter isnotnull(seller_id#20)
+- FileScan parquet [seller_id#20,seller_name#21] Batched: true, DataFilters: [isnotnull(seller_id#20)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/work/jsy/meta_code_spark/maple_sellers.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(seller_id)], ReadSchema: struct<seller_id:string,seller_name:string>
[ 물리 실행 계획 확인 해보기 ]
- Project: 최종 선택된 컬럼 ( seller_id, item_id, price, seller_name )
- BroadcastHashJoin: small table을 드라이버가 보내고, 각 익스큐터 메모리에 캐시 형태로 복사한 뒤 → 로컬 join 하는 방법 ( 셔플 디스크 I/O + 네트워크 병목 제거 )
- BroadcastHashJoin [seller_id#1], [seller_id#20], Inner, BuildRight, false → seller_id로 조인하는데, 오른쪽 테이블을 브로드캐스트 해서 Hash Join으로 Inner Join 한다는 뜻
- Filter isnotnull → 조인전에 null 체크
- BroadcastExchange: 아래쪽이 작은 테이블 → 드라이버가 복사 후, 각 익스큐터 메모리에 복사 ( 로컬 Join )
- Filter isnotnull → 조인전에 null 체크
- 여기서는 maple_sellers.parquet 테이블을 브로드캐스트함
filter가 어느 순서로 실행되느냐에 따라 성능 차이가 있을 수 있느니 튜닝시 참고할것
[ 브로드캐스트 조인 장단점 ]
- 장점
- 분산 처리 비용 최적화 ( 네트워크 병목 + 디스크 I/O 없음 )
- 작은 데이터 + 큰 데이터 조합에 최적
- Spark 옵티마이저 자동 적용 가능 ( spark.sql.autoBroadcastJoinThreshold ) → 보통 10MB ~ 100MB 수준
- 단점
- 메모리 압박 발생 → 작은 테이블 모든 익스큐터에 복제 → 익스큐터 갯수 많을수록 메모리 사용 증가 → 메모리 터짐 발생 ( OOM )
[ 브로드캐스트 조인 핵심 CheckPoint ]
- 작은 테이블 크기 확인 → 디스크 크기 말고 실제 실행 시 메모리 크기 + 압축 풀린 상태 기준
- sellers_df.cache() + sellers_df.count → http://192.168.56.60:4040 → 스토리지탭 → size in memory 확인
- 익스큐터 메모리 대비 비율 → 브로드캐스트 테이블 * 익스큐터 개수 = 전체 메모리 부담 확인

1. spark-submit: 스파크 작업을 실행하는 명령어
2. 파티션
- 병렬 처리 단위
- 많으면 좋지만 "적당히"가 중요
3. 셔플: 데이터 이동, 분산 작업 성능 저하의 주범 ( 디스크 I/O + 네트워크 병목 )
Spark 성능 최적화의 핵심은
셔플 최소화 + 파티션 튜닝 = 성능
'데이터 엔지니어( 이론 공부 ) > spark' 카테고리의 다른 글
| Apache Spark ( Spark 최적화 실습 ) (0) | 2026.04.29 |
|---|---|
| Apache Spark ( Spark 최적화 하기 ) (0) | 2026.04.28 |
| Apache Spark ( 데이터프레임 ) (0) | 2026.04.17 |
| Apache Spark ( 핵심 개념 ) (1) | 2026.04.14 |
| 2. 스파크 간단히 살펴보기 ( 빅데이터와 스파크 살펴보기 ) (0) | 2026.03.25 |