본문 바로가기
데이터 엔지니어( 이론 공부 )/spark

Apache Spark ( 데이터프레임 )

by 세용융용융용 2026. 4. 17.

1. Spark 데이터프레임 이란?

컬럼이 있는 테이블 형태 + SQL 처럼 다룰 수 있는 스파크 데이터 구조

 

[ 데이터프레임 특징 ]

  • Spark 자동 최적화 사용 가능 ( 카탈리스트 옵티마이저 )
  • 스키마 존재
  • SQL 처럼 사용 가능 ( select, filter, groupBy )
  • 컬럼 와이즈 함수 사용 가능

 

[ 컬럼 와이즈 함수 ]

  • 행 단위가 아니라 컬럼 전체를 한 번에 처리하는 함수 방식
  • 스파크는 컬럼 단위 연산 최적화됨 → 최적화 + 병렬처리 가능

1-1. 스파크 데이터 프레임 만드는 방법

1. Row 생성
from pyspark.sql import Row

data = [Row(1), Row(2), Row(3)]
df = spark.createDataFrame(data)
df.show()
---------------------------------------------------

2. 컬럼명 지정
df = spark.createDataFrame(data, ["id"])
---------------------------------------------------

3. Schema 직접 지정
from pyspark.sql.types import *

schema = StructType([
    StructField("name", StringType()),
    StructField("age", IntegerType())
])
→ 타입 안정성
---------------------------------------------------

4. 파일 읽기
df = spark.read.parquet("/path/maple.parquet")
---------------------------------------------------

 

2. Spark 데이터프레임 기본 명령어

1. schema 확인
df.printSchema()

2. 컬럼 확인
df.columns

3. 타입 확인
df.dtypes

4. 일부 행만 져오기
df.limit(5).show()

4. 데이터 확인
df.show()
df.head()
df.take(3)
df.collect()

 

3. Spark 데이터프레임 Select / Column 사용


3-1. 기본 select

df.select("item_id", "price").show()

+-------+-----+
|item_id|price|
+-------+-----+
|      1|93933|
|      2|61517|
|      3|26880|
|      4|45907|
|      5|48963|
|      6|47765|
|      7|17408|
|      8|22468|
|      9|33540|
|     10|91708|
+-------+-----+


## col ( 컬럼 와이즈 펑션 ) 파라미터로 컬럼 넣기 가능
from pyspark.sql.functions import col
c1 = col("item_id")
c2 = col("price")

cols = [c1, c2]
df.select(*cols).show()

 


3-2. selectExpr → select를 쿼리하는 식으로 사용 가능 ( 간단한 전처리 가능 )

df.selectExpr("upper(item_name) as upperName").show()

+---------+
|upperName|
+---------+
|   ITEM_1|
|   ITEM_2|
|   ITEM_3|
|   ITEM_4|
|   ITEM_5|
|   ITEM_6|
|   ITEM_7|
|   ITEM_8|
|   ITEM_9|
|  ITEM_10|
+---------+

3-3. pyspark.sql.functions as F → 대부분의 함수 사용 가능

import pyspark.sql.functions as F
df = df.withColumn("newColumn", F.trim(F.col("item_id")))
df.show()

+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+---------+
|item_id|item_name|price|quantity| seller| world|category|upgrade_level|star_force|          timestamp|newColumn|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+---------+
|      1|   item_1|93933|      39| user_1|scania|  weapon|            1|        20|2026-01-01 00:00:00|        1|
|      2|   item_2|61517|      62| user_2|scania|   armor|            4|        19|2026-01-01 01:00:00|        2|
|      3|   item_3|26880|      76| user_3|scania|     etc|            1|         0|2026-01-01 02:00:00|        3|
|      4|   item_4|45907|      16| user_4|scania|     etc|           11|         3|2026-01-01 03:00:00|        4|
|      5|   item_5|48963|      51| user_5|scania|  weapon|           10|        19|2026-01-01 04:00:00|        5|
|      6|   item_6|47765|      61| user_6|scania|   armor|            4|         2|2026-01-01 05:00:00|        6|
|      7|   item_7|17408|      44| user_7|scania|     etc|            9|         3|2026-01-01 06:00:00|        7|
|      8|   item_8|22468|      64| user_8|scania|  weapon|           12|        21|2026-01-01 07:00:00|        8|
|      9|   item_9|33540|      65| user_9|scania|   armor|           14|        21|2026-01-01 08:00:00|        9|
|     10|  item_10|91708|      76|user_10|scania|     etc|           10|        19|2026-01-01 09:00:00|       10|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+---------+

#### 컬럼 삭제도 가능 ####
df = df.drop("newColumn")

 

4. Spark 데이터프레임 filter / where

filter / where 둘은 똑같은 기능
df.filter("quantity >= 70").show()
df.where("quantity >= 70").show()

+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+
|item_id|item_name|price|quantity| seller| world|category|upgrade_level|star_force|          timestamp|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+
|      3|   item_3|26880|      76| user_3|scania|     etc|            1|         0|2026-01-01 02:00:00|
|     10|  item_10|91708|      76|user_10|scania|     etc|           10|        19|2026-01-01 09:00:00|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+

 

5. Spark 데이터프레임 orderBy / sort

orderBy / sort 둘은 똑같은 기능
df.orderBy("price")
df.sort("price")

#### 내림차순 ####
df.orderBy(F.col("price").desc())

 

6. UDF ( 사용자 정의 함수 )

Spark 기본 함수로 못하는 것을 직접 만드는 함수

[ 주의할 점 ]
느림 ( 최적화 꺠짐 )

[ 왜 UDF가 느림? ]
-------------------------------------------
Spark JVM

Python으로 데이터 넘김 (직렬화)

Python 함수 실행 (lambda)

결과 다시 JVM으로 전달
-------------------------------------------
즉, 해당 왕복 비용 발생..
→ 가능한 상황에서는 빌트인 함수 사용 ( 스파크 엔진 함수 )

 

1. 100 더하는 사용자 정의 함수
import pyspark.sql.functions as F
up_100_udf = F.udf(lambda v: v + 100)
df.withColumn("up_price", up_100_udf(F.col("price"))).show()

+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+--------+
|item_id|item_name|price|quantity| seller| world|category|upgrade_level|star_force|          timestamp|up_price|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+--------+
|      1|   item_1|93933|      39| user_1|scania|  weapon|            1|        20|2026-01-01 00:00:00|   94033|
|      2|   item_2|61517|      62| user_2|scania|   armor|            4|        19|2026-01-01 01:00:00|   61617|
|      3|   item_3|26880|      76| user_3|scania|     etc|            1|         0|2026-01-01 02:00:00|   26980|
|      4|   item_4|45907|      16| user_4|scania|     etc|           11|         3|2026-01-01 03:00:00|   46007|
|      5|   item_5|48963|      51| user_5|scania|  weapon|           10|        19|2026-01-01 04:00:00|   49063|
|      6|   item_6|47765|      61| user_6|scania|   armor|            4|         2|2026-01-01 05:00:00|   47865|
|      7|   item_7|17408|      44| user_7|scania|     etc|            9|         3|2026-01-01 06:00:00|   17508|
|      8|   item_8|22468|      64| user_8|scania|  weapon|           12|        21|2026-01-01 07:00:00|   22568|
|      9|   item_9|33540|      65| user_9|scania|   armor|           14|        21|2026-01-01 08:00:00|   33640|
|     10|  item_10|91708|      76|user_10|scania|     etc|           10|        19|2026-01-01 09:00:00|   91808|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+--------+

 

7. GroupBy + Aggregation

그룹바이 = 묶기
agg = 집계 연산
df.groupBy("category").agg(
    (F.sum("price") * F.sum("quantity")).alias("total")
).orderBy(F.col("total").desc())

+--------+--------+
|category|   total|
+--------+--------+
|     etc|38563436|
|   armor|26850536|
|  weapon|25466056|
+--------+--------+
→ 카테고리고 그룹화 + 총 판매 가격 + 내림차순 정렬



#### max 사용 가능 ####
df.groupby("category").agg(
    F.max("price").alias("max_price")
).show()

→ 이외에도 max, avg 등.. 집계 연산 사용 가능

 

8. join

Spark에서 Join시 키 단위로 각 익스큐터에 다시 할당되야함
즉, 해당 과정에서 분산 서버간 네트워크 병목 + 디스크 I/O 발생 → Spark에서 Join은 무거운 작업..
1. 실습 item 데이터
+-------+---------+-----+
|item_id|seller_id|price|
+-------+---------+-----+
|      1|   user_1| 5888|
|      2|   user_2|37808|
|      3|   user_3|55158|
|      4|   user_4|99189|
...

items.printSchema()
root
 |-- item_id: long (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- price: long (nullable = true)


2. 실습 sellers 데이터
+---------+-----------+
|seller_id|seller_name|
+---------+-----------+
|   user_1|   seller_1|
|   user_2|   seller_2|
|   user_3|   seller_3|
|   user_4|   seller_4
...

sellers.printSchema()
root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
1. seller_id 기준 inner join
items.join(sellers, ["seller_id"], "inner").show()

+---------+-------+-----+-----------+
|seller_id|item_id|price|seller_name|
+---------+-------+-----+-----------+
|   user_1|      1| 5888|   seller_1|
|   user_2|      2|37808|   seller_2|
|   user_3|      3|55158|   seller_3|
|   user_4|      4|99189|   seller_4|
|   user_5|      5|55305|   seller_5|
|   user_6|      6|13533|   seller_6|
|   user_7|      7|23688|   seller_7|
|   user_8|      8|60822|   seller_8|
|   user_9|      9|11351|   seller_9|
|  user_10|     10|49051|  seller_10|
+---------+-------+-----+-----------+
join, left join, full join, cross join 사용 가능

 

9. 윈도우 함수

그룹 + 행 단위 분석
즉, Spark에서 윈도우 기능도 사용가능 → 특정 컬럼으로 파티션해서 통계나 추가 정보 처리 시
from pyspark.sql.window import Window
import pyspark.sql.functions as F

window = Window.partitionBy("seller_id").orderBy(F.desc("price"))

df.withColumn("rank", F.row_number().over(window))

 

[ 활용 ]

  • Top 1 상품
  • 순위
  • 누적합
  • 그룹 내 정렬