
[ "스파크 완벽 가이드" ] 공부한 후 정리한 내용 입니다
2. 스파크 간단히 살펴보기
스파크는 대규모 데이터를 빠르게 처리할 수 있는 분산 컴퓨팅 엔진입니다.
여기서는 DataFrame과 SQL을 사용해 스파크의 구조와 기본 사용법을 간단히 소개합니다.
1) 스파크 기본 아키텍처
스파크는 클러스터 환경에서 동작하며
사용자가 작성한 애플리케이션을 실행하기 위해 클러스터 매니저를 사용합니다.
1.1 스파크 애플리케이션 구조
스파크 애플리케이션은 드라이버 + 익스큐터로 구성
- 드라이버( Driver )
- 클러스터 노드 중 하나에서 실행
- 작업 분석, DAG 작성, 익스큐터에 작업 배포
- 애플리케이션 모든 정보 관리
- 익스큐터( Executor )
- 실제 데이터 처리 담당
- 드라이버가 할당한 작업 수행 후 결과 보고
## 하둡 YARN 환경 예시 ##
┌───────────┐
│ 사용자 │
└───────────┘
│
spark-submit
│
┌───────────┐
│ YARN │
│ 클러스터 │
│ (자원 관리)│
└───────────┘
│
┌────────────┐
│ Driver │
│ - 작업 분석 │
│ - Task 생성 │
│ - 스케줄링 │
└────────────┘
┌──────┬─────┬─────┐
│Exec │Exec │Exec │
│데이터│데이터│데이터│
│처리 │처리 │처리 │
└──────┴─────┴─────┘
│
결과 반환
│
┌───────────┐
│ 사용자 │
└───────────┘
스파크는 클러스터 매니저에게 자원을 보고함 → 드라이버 + 익스큐터 컨테이너 할당 받음
드라이버는 익스큐터에 작업을 나눠주고 결과를 취합
2) 스파크 API 언어
스파크는 여러 언어에서 사용할 수 있으며, 핵심 로직은 동일
| 언어 | 특징 |
| Scala | 스파크 기본 개발 언어 |
| Java | 스파크 구조 활용 가능 |
| Python | PySpark, 구조적 API 지원 |
| SQL | SQL 쿼리로 스파크 데이터 처리 가능 |
SparkSession 객체가 모든 언어에서 진입점 역할을 합니다.
Python → SparkSession(JVM) → Executor로 변환되어 실행
3) 구조적 API vs 비구조적 API
- 스파크는 두 가지 API 제공 ⤵
- DataFrame/SQL → 구조적 API (고수준, 최적화 가능)
- RDD → 저수준 API (직접 제어 가능)
4) 스파크 시작하기
4.1 스파크 세션 생성
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyApp") \
.getOrCreate()
SparkSession 객체는 사용자의 명령을 Driver에 전달하고, Driver는 이를 클러스터의 Executor에서 실행하도록 합니다.
하나의 SparkSession = 하나의 스파크 애플리케이션
4.2 간단한 숫자 범위 생성
myRange = spark.range(1000).toDF("number")
myRange.explain()
myRange.show(10)
== Physical Plan ==
*(1) Project [id#0L AS number#2L]
+- *(1) Range (0, 1000, step=1, splits=6)
> 기본 컬럼을 number로 변경
> 6개의 파티션으로 나눔 ( 익스큐터들이 나눠서 처리함 )
> *(1) 의미 → 같은 스테이지에서 실행
- 숫자 0~999를 6개 파티션으로 나눠 처리
- 각 파티션은 별도의 익스큐터에서 병렬 실행
5) DataFrame
- 데이터프레임 = 테이블 형태의 데이터 구조 ( 행 + 열 )
- 스파크가 제공하는 고수준 자료구조
- 여러 노드에 분산되어 저장
- 파티션 단위로 데이터를 쪼개 병렬 처리
| DataFrame | RDD |
| 자동 최적화 | 직접 제어 필요 |
| 사용 편리 | 복잡하지만 세밀한 제어 가능 |
- 장점: 사용 편의성 + 최적화
- 사용자는 코드를 작성하면, 스파크가 자동으로 파티션과 병렬 처리 관리
- 단점: 성능 미세조정 한계
- 자동 최적화 되지만 메모리 관리 + 캐싱 전략 등을 세밀하게 제어하기 어려움
6) 트랜스포메이션 & 액션
6.1 트랜스포메이션
- 데이터 프레임은 불변성
- 데이터를 직접 수정할 수 없고 → 트랜스포메이션으로 연산 계획 작성
# 트랜스포메이션 예시
sy0218 = myRange.where("number % 2 = 0") # 짝수만 선택
- 좁은 의존성: 입력 파티션 → 출력 파티션 1개 (예: where)
- 넓은 의존성: 입력 파티션 → 여러 출력 파티션 (예: shuffle 발생)
6.2 지연 연산
- 트랜스포메이션은 즉시 실행되지 않고, 실행 계획만 생성
- 마지막에 액션 호출 시 실제 연산 수행
sy0218.count() # 액션 → 결과 반환
7) 스파크 UI
- 웹 UI를 통해 스파크 잡 상태 + 클러스터 자원 사용량 확인 가능
- 튜닝 및 디버깅에 필수
8) 종합 예제: 항공 운항 데이터 분석
책에서 제공하는 예제를 통해 간단한 실습을 해볼 예정입니다.
[ 데이터 읽기 ]
wget https://raw.githubusercontent.com/FVBros/Spark-The-Definitive-Guide/master/data/flight-data/csv/2015-summary.csv
하둡 기반 환경이라면 데이터를 HDFS에 업로드 ⤵
hdfs dfs -mkdir -p /sy0218/spark/
hdfs dfs -put ./* /sy0218/spark/
8.1 스파크 실습
## 스파크 세션의 DataFrameReader를 사용하여 CSV 파일을 읽습니다. ##
flight = spark.read \
.option("inferSchema", "true") \
.option("header", "true") \
.csv("/sy0218/spark/2015-summary.csv")
header → 첫 로우를 컬럼명으로 사용
inferSchema → 데이터 타입 자동 추론 ( 편리하지만 운영 환경에서는 데이터 타입을 명시하는 것이 안전 )
csv → CSV 파일 읽기
--------------------------------------------------
## DataFrame의 기본 ##
take(n) : 상위 n개의 로우를 배열 형태로 반환
flight.take(3)
# 결과 예시
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]
--------------------------------------------------
## 정렬 예제 ##
sort는 트랜스포메이션 → 아직 계산 안됨
트랜스포메이션은 실행 계획만 만들고 데이터는 실제로 처리하지 않음
실제 연산은 take, show 같은 액션에서 수행
# 파티션 수를 줄이면 작은 테스트에서 속도 개선 가능
# 파티션 적으면
# 장점 → 스케줄링 오버헤드 줄어듦
# 단점 → 병렬성 저하
spark.conf.set("spark.sql.shuffle.partitions", "5")
flight.sort("count").take(3)
>>> flight.rdd.getNumPartitions() # 데이터 읽을때 파티션
1
>>> spark.conf.get("spark.sql.files.maxPartitionBytes")
'134217728b'
하둡은 블록단위 데이터 저장
스파크 읽을 파일 크기를 맞추면
데이터 지역성 최적화 가능
( 블록이 걸치면 다른 노드에서 데이터를 가져와야하는 네트워크 병목 발생 )
즉, Drive가 하둡 네임노드에서 블록위치 정보를 받아서 → 데이터 지역성을 고려해 Task를 스케줄링한다.
8.2 데이터프레임과 SQL 활용
## DataFrame을 SQL 테이블로 등록 ##
flight.createOrReplaceTempView("flight")
--------------------------------------------------
## SQL 방식 ##
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight
GROUP BY DEST_COUNTRY_NAME
""")
--------------------------------------------------
## DataFrame API 방식 ##
dataFrameWay = flight.groupBy("DEST_COUNTRY_NAME").count()
--------------------------------------------------
## 실행 계획 확인 ##
→ explain()
sqlWay.explain()
dataFrameWay.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=36]
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
+- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://job-cluster/sy0218/spark/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
1. FileScan → CSV 파일 전체 읽기
2. 부분 집계 → 파티션별 count 계산
3. Exchange (셔플) → 동일 키(나라) 데이터 모으기
4. 최종 집계 (HashAggregate) → 전체 count 계산
5. AdaptiveSparkPlan → 실행 중 최적화 (파티션 수, skew 자동 조정)
8.3 최대 비행 횟수( 집계 예제 )
# SQL 방식
spark.sql("SELECT max(count) FROM flight").take(1)
# DataFrame 방식
from pyspark.sql.functions import max
flight.select(max("count")).take(1)
[ 결과 ]
[Row(max(count)=370002)]
8.4 상위 5개 도착 국가 찾기
## SQL 방식 ##
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, SUM(count) AS destination_total
FROM flight
GROUP BY DEST_COUNTRY_NAME
ORDER BY destination_total DESC
LIMIT 5
""")
maxSql.show()
-------------------------------------------------------
## DataFrame API 방식 ##
from pyspark.sql.functions import desc
flight.groupBy("DEST_COUNTRY_NAME") \
.sum("count") \
.withColumnRenamed("sum(count)", "destination_total") \
.sort(desc("destination_total")) \
.limit(5) \
.show()
1. 읽기 단계 → CSV 파일 로드 (액션 전까지 실행 안됨)
2. 그룹화 → groupBy + 집계할 키 지정
3. 집계(sum) → 각 그룹별 합계 계산
4. 컬럼명 변경 → 이해하기 쉽게 이름 변경
5. 정렬(desc) → 내림차순 정렬
6. limit → 상위 n개만 추출
7. 액션 호출 → 실제 연산 시작 (show())
-------------------------------------------------------
## 실행 계획 ##
explain 메서드가 출력하는 실제 실행 계획은 물리적인 실행 시점에 수행하는 최적화
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#75L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#17,destination_total#75L])
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[sum(count#19)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=219]
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_sum(count#19)])
+- FileScan csv [DEST_COUNTRY_NAME#17,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://job-cluster/sy0218/spark/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
> FileScan → 부분 집계 → 셔플 → 최종 집계 → Top 5 추출
[ 왜?? Order By시 셔플 발생 안함?? ]
- 일반 Sort ( Order By )
- 전역 정렬 필요
- 셔플 발생 → 마지막에 1개 파티션으로 모일 수 있음
- 지금 케이스 ( Order By + Limit 5 )
- 전체 정렬 안 함
- 각 파티션 → top 5
- 최종 합침 = 즉 셔플 발생 안함

1. 스파크는 클러스터 환경에서 드라이버와 익스큐터를 통해 분산 처리하는 엔진이다.
2. 드라이버는 작업 계획과 DAG 작성, 익스큐터에 작업 배포, 결과 취합을 담당
3. 스파크는 다양한 언어를 지원하며 SparkSession이 진입점이다.
4. 데이터프레임은 구조적 API로 자동 최적화 가능, RDD는 저수준 API로 직접 제어 가능하다.
5. 데이터 프레임은 분별성을 가지며, 트랜스포메이션은 지연 연산으로 액션 호출 시 실제 처리
6. 파티션 단위로 데이터를 나눠 병렬 처리
7. explain()으로 실행 계획 확인'데이터 엔지니어( 이론 공부 ) > spark' 카테고리의 다른 글
| Apache Spark ( Spark 최적화 하기 ) (0) | 2026.04.28 |
|---|---|
| Apache Spark ( Spark submit & partition & shuffle ) (0) | 2026.04.25 |
| Apache Spark ( 데이터프레임 ) (0) | 2026.04.17 |
| Apache Spark ( 핵심 개념 ) (1) | 2026.04.14 |
| 1. 아파치 스파크란? ( 빅데이터와 스파크 살펴보기 ) (0) | 2026.03.24 |