본문 바로가기
데이터 엔지니어( 이론 공부 )/spark

Apache Spark ( Spark submit & partition & shuffle )

by 세용융용융용 2026. 4. 25.

1. Spark submit 이란?

스파크 프로그램을 클러스터에 실행시키는 명령어
→ spark-submit sy_app.py

[ Spark Submit 실행시 ]
1. 드라이버 프로그램 실행
2. 클러스터( Yarn )에 작업 요청
3. 여러 노드에서 병렬로 작업 수행
스파크 서브밋은 스파크 애플리케이션을 → 클러스터에 제출하는 명령어

2. 파티션 ( Partition )

Spark 파티션 → 데이터를 쪼개 병렬 처리하는 단위
즉, 파티션 1개 = 태스크 1개

2-1. 파티션이 왜 중요?

  • 파티션이 너무 적으면?
    • 동시에 돌릴 태스크 수가 적음 
    • CPU 놀음 → 클러스터 활용 못함
  • 파티션이 너무 크면?
    • 각 태스크가 처리할 데이터가 많음 
    • 메모리 터짐
    • 속도 느림
  • 파티션이 너무 많으면?
    • task 관리 오버헤드 증가 ( task 스케줄링, 직렬화 )
      • 데이터가 잘게 쪼개지며 → 실제 처리시간 보다 → 태스크 생성 / 종료 비용이 더 큼.. 
    • 셔플 성능 저하 ( 셔플 발생하는 파티션 수가 많아짐.. )

 

[ 그럼 스파크에서 파티션은 어떻게 튜닝? ]

  • 처음에는 CPU 코어나, 데이터 크기를 기준으로 결정하며
  • 이후 Spark UI 확인 후 → task 시간 & spill 발생 여부를 파악 후 상황에 맞게 튜닝
즉, 테스트를 하며 튜닝이 정답

2-2. 파티셔닝 방식

[ HashPartitioner ( 기본 ) ]

  • partition = hash(key) % 파티션 개수
  • 같은 키는 같은 파티션으로 감

 

[ 사용자 정의 파티셔너 ]

  • 사용자가 직접 분배 로직 작성 가능

2-3. 파티셔닝 관련 옵션

spark.default.parallelism → 기본 파티션 수
spark.sql.shuffle.partitions → 셔플 시 파티션 수
spark.files.maxPartitionBytes → 파일 읽을 때 파티션 기준

이 외에도 다양한 파티션 관련 옵션 존재..

 

3. 셔플 ( Partition )

파티션 간 데이터 이동

[ 언제 발생? ]
→ 키 기준으로 묶을 때 ( groupby, join, reduceByKey )

3-1. Spark 에서 셔플의 문제 ( 분산 클러스터 )

  • 셔플 = 분산 엔진 비용 발생
  • 디스크 I/O 발생( 키 단위 파일 생성 ) + 네트워크 전송 발생
[ 셔플 내부 흐름 ]
1. 각 파티션 결과를 키 별로 디스크에 씀
2. 네트워크로 다른 노드로 보냄
3. 다시 읽어서 병합
즉, 스파크가 느려지는 원인 1순위

3-2. 셔플 관련 옵션

spark.shuffle.compress → 압축 여부
spark.shuffle.spill → 메모리 넘치면 디스크로 spill
spark.shuffle.manager → sort 방식 권장

 

4. 파티션 변경 ( 핵심 포인트 )

그럼 파티션 바꾸면 셔플이 발생함?
→ 경우에 따라 다름

 

[ repartition() ]

df.repartition(10)
  • 무조건 셔플 발생 → 데이터 재분배 필요..

 

[ coalesce() ]

df.coalesce(2)
  • 셔플 없이 파티션 줄일 수 있는 경우는 셔플 발생 X
  • 하지만, 재분배 개념이 아니라  파티션 불균형 발생 ( skew )

4-1. 언제 파티션 변경하냐?

  • 클러스터 자원 활용 ( 파티션 ↑ )
  • 메모리 터지는 거 방지 ( 파티션 ↑ )
  • 데이터 균형 맞추려고
  • 특정 key를 같은 파티션에 넣으려고

 

[ repartition(col) 주의! ]

  • 같은 값끼리 묶임 → 데이터 스큐 위험
  • 특정 Key에 데이터 몰림

5. 파티션 종류 3개

1. Input Partition
→ 파일 읽을 때 생성 

2. Shuffle Partition
→ 셔플 시 생성 ( spark.sql.shuffle.partitions )

3. Output Partition
→ 결과 저장 시

6. 파티션 단위 연산

mapPartitions: 파티션 단위 연산
rdd.mapPartitions(func)
→ 한 건씩이 아니라 "파티션 단위" 처리

[ map vs mapPartitions ]
1. map → 데이터 1개씩 함수 호출
ex..) rdd.map(func)
[1, 2, 3, 4] → 함수 4번 호출됨

2. mapPartitions → 파티션 단위로 한 번 호출
ex..) mapPartitions(func)
파티션1: [1, 2], 파티션2: [3, 4] → 함수 2번 호출됨

 

[ 왜 mapPartitions 씀? ]

  • 함수 호출 비용 줄임
  • DB 커넥션 같은거 재사용 가능 ( 비용 큰 작업 재사용 필수! )

 

7. 파티션 & 셔플 실습


7-1. 데이터 읽기 ( Parquet )

1. 데이터 읽기
items_df = spark.read.parquet("/work/jsy/meta_code_spark/maple_items.parquet")
sellers_df = spark.read.parquet("/work/jsy/meta_code_spark/maple_sellers.parquet")
---------------------------------------------------------------

2. 데이터 상태
## items_df ##
+-------+---------+-----+
|item_id|seller_id|price|
+-------+---------+-----+
|      1|   user_1| 5888|
|      2|   user_2|37808|
|      3|   user_3|55158|
|      4|   user_4|99189|
|      5|   user_5|55305|
|      6|   user_6|13533|
|      7|   user_7|23688|
|      8|   user_8|60822|
|      9|   user_9|11351|
|     10|  user_10|49051|
+-------+---------+-----+

## sellers_df ##
+---------+-----------+
|seller_id|seller_name|
+---------+-----------+
|   user_1|   seller_1|
|   user_2|   seller_2|
|   user_3|   seller_3|
|   user_4|   seller_4|
|   user_5|   seller_5|
|   user_6|   seller_6|
|   user_7|   seller_7|
|   user_8|   seller_8|
|   user_9|   seller_9|
|  user_10|  seller_10|
+---------+-----------+
---------------------------------------------------------------

7-2. 입력 파티션 확인 ( Input partition )

1. defaultMinPartitions: 최소 몇 개 파티션으로 나눌지에 대한 기본 값
sc.defaultMinPartitions → 2
------------------------------------------------------------------------

2. items, sellers 행 카운트 + 파티션 확인
getNumPartitions() → 해당 데이터가 몇 개의 파티션으로 나뉘어 있는지 알려주는 함수

print("items count:", items_df.count(), "items partitions:", items_df.rdd.getNumPartitions())
→ items count: 10 items partitions: 1

print("sellers count:", sellers_df.count(), "sellers partitions", sellers_df.rdd.getNumPartitions())
→ sellers count: 10 sellers partitions 1
------------------------------------------------------------------------

 

[ 왜? defaultMinPartitions는 2개인대 → getNumPartitions() 입력 파티션은 1개? ]

  • 파일 읽기는 defaultMinPartitions 사용 X
  • 파일 읽기 파티션 결정 기준
    • 파일 개수 + 파일 크기 + HDFS 블록 사이즈 + spark.sql.files.maxPartitionsBytes ( 파일 읽을 때 한 파티션이 가져갈 수 있는 최대 데이터 크기 )
즉, 데이터가 작으면 → 그냥 1개 파티션으로 읽음

7-3. reparttion

1. 파티션 변경
items_df2 = items_df.repartition(3)

→ 파티션 강제로 3개로 변경
→ 데이터 셔플 발생 (중요)
-----------------------------------------

2. 파티션 확인
items_df2.rdd.getNumPartitions()  # 3

 

[ repartition 특징 ]

  • 데이터 재분배 ( shuffle 발생 )
  • 네트워크 비용 있음 + 대신 병렬성 증가

7-4. map vs mapPartitions 차이

1. map ( 행 단위 함수 실행 )
df.map(func)
→ 한 줄씩 처리

[ 특징 ]
row마다 함수 실행
느릴 수 있음 ( 연결 객체 반복 생성.. )
---------------------------------------------------

2. mapPartitions ( 파티션 단위 함수 실행 )
def f(pdf_iter):
    for pdf in pdf_iter:
        yield pdf
        
items_df2.mapInPandas(f, schema=items_df2.schema).collect()
---------------------------------------------------

 

[ 핵심 차이 ]

구분 map mapPartitions
단위 행(row) 파티션
속도 느릴 수 있음 빠름
연결 생성 많음 적음
추천 상황 단순 변환 무거운 작업

 

[ mapPartitions를 쓰는 이유 ]

1. 외부 연결 최소화 ( 가장 핵심 )
def f(iterator):
    conn = connect_db()   # 1번만 생성
    for row in iterator:
        yield query(conn, row)
→ 행 마다 DB 연결 객체 생성 X
→ 파티션당 1번 연결 O
→ 성능 차이 엄청 큼
------------------------------------------------

2. 무거운 객체 재사용
def f(iterator):
    model = load_model() # 1번만 로딩
    for row in iterator:
        yield model.predict(row)
→ 모델 매번 로딩 X
→ 파티션당 1번 로딩 O

 

[ mapPartitionsWithIndex ]

파티션 번호 + 배열 같이 넘겨줌
from pyspark.sql import Row

items_df2 = items_df.repartition(3)
rdd = items_df2.rdd

def f(idx, iterator):
    for row in iterator:
        yield Row(
            partition_id=idx,
            original=row
        )

new_rdd = rdd.mapPartitionsWithIndex(f)
result_df = new_rdd.toDF()
result_df.show(truncate=False)

+------------+--------------------+
|partition_id|original            |
+------------+--------------------+
|0           |{1, user_1, 5888}   |
|0           |{8, user_8, 60822}  |
|0           |{7, user_7, 23688}  |
|0           |{6, user_6, 13533}  |
|1           |{9, user_9, 11351}  |
|1           |{3, user_3, 55158}  |
|1           |{10, user_10, 49051}|
|2           |{5, user_5, 55305}  |
|2           |{4, user_4, 99189}  |
|2           |{2, user_2, 37808}  |
+------------+--------------------+
  • 왜? mapPartitionsWithIndex를 쓰지?
    • 데이터 스큐 확인 + 파티션별 디버깅
    • 또는, 파티션 별 다른 로직 적용 시

7-5. coalesce

파티션 갯수 줄이는 함수
items_df2 = items_df2.coalesce(1) → 파티션 하나로 줄이기
print(items_df2.rdd.getNumPartitions()) → 1

7-6. explain

물리 실행 게획 확인
1. join_df = items_df.join(sellers_df, ['seller_id'], 'inner')
2. join_df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [seller_id#1, item_id#0L, price#2L, seller_name#21]
   +- BroadcastHashJoin [seller_id#1], [seller_id#20], Inner, BuildRight, false
      :- Filter isnotnull(seller_id#1)
      :  +- FileScan parquet [item_id#0L,seller_id#1,price#2L] Batched: true, DataFilters: [isnotnull(seller_id#1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/work/jsy/meta_code_spark/maple_items.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(seller_id)], ReadSchema: struct<item_id:bigint,seller_id:string,price:bigint>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=58]
         +- Filter isnotnull(seller_id#20)
            +- FileScan parquet [seller_id#20,seller_name#21] Batched: true, DataFilters: [isnotnull(seller_id#20)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/work/jsy/meta_code_spark/maple_sellers.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(seller_id)], ReadSchema: struct<seller_id:string,seller_name:string>

 

[ 물리 실행 계획 확인 해보기 ]

  • Project: 최종 선택된 컬럼 ( seller_id, item_id, price, seller_name )
  • BroadcastHashJoin: small table을 드라이버가 보내고, 각 익스큐터 메모리에 캐시 형태로 복사한 뒤 → 로컬 join 하는 방법 ( 셔플 디스크 I/O + 네트워크 병목 제거 )
    • BroadcastHashJoin [seller_id#1], [seller_id#20], Inner, BuildRight, false → seller_id로 조인하는데, 오른쪽 테이블을 브로드캐스트 해서 Hash Join으로 Inner Join 한다는 뜻
    • Filter isnotnull → 조인전에 null 체크
  • BroadcastExchange: 아래쪽이 작은 테이블 → 드라이버가 복사 후, 각 익스큐터 메모리에 복사 ( 로컬 Join )
    • Filter isnotnull → 조인전에 null 체크
    • 여기서는 maple_sellers.parquet 테이블을 브로드캐스트함
filter가 어느 순서로 실행되느냐에 따라 성능 차이가 있을 수 있느니 튜닝시 참고할것

 

[ 브로드캐스트 조인 장단점 ]

  • 장점
    • 분산 처리 비용 최적화 ( 네트워크 병목 + 디스크 I/O 없음 )
    • 작은 데이터 + 큰 데이터 조합에 최적
    • Spark 옵티마이저 자동 적용 가능 ( spark.sql.autoBroadcastJoinThreshold ) → 보통 10MB ~  100MB 수준
  • 단점
    • 메모리 압박 발생 → 작은 테이블 모든 익스큐터에 복제 →  익스큐터 갯수 많을수록 메모리 사용 증가 → 메모리 터짐 발생 ( OOM )

 

[ 브로드캐스트 조인 핵심 CheckPoint ]

  • 작은 테이블 크기 확인 → 디스크 크기 말고 실제 실행 시 메모리 크기 + 압축 풀린 상태 기준
    • sellers_df.cache() + sellers_df.count → http://192.168.56.60:4040 → 스토리지탭 → size in memory 확인
  • 익스큐터 메모리 대비 비율 → 브로드캐스트 테이블 * 익스큐터 개수 = 전체 메모리 부담 확인

 

1. spark-submit: 스파크 작업을 실행하는 명령어
2. 파티션
   - 병렬 처리 단위
   - 많으면 좋지만 "적당히"가 중요
3. 셔플: 데이터 이동, 분산 작업 성능 저하의 주범 ( 디스크 I/O + 네트워크 병목 )
Spark 성능 최적화의 핵심은
셔플 최소화 + 파티션 튜닝 = 성능