1. Spark 데이터프레임 이란?
컬럼이 있는 테이블 형태 + SQL 처럼 다룰 수 있는 스파크 데이터 구조
[ 데이터프레임 특징 ]
- Spark 자동 최적화 사용 가능 ( 카탈리스트 옵티마이저 )
- 스키마 존재
- SQL 처럼 사용 가능 ( select, filter, groupBy )
- 컬럼 와이즈 함수 사용 가능
[ 컬럼 와이즈 함수 ]
- 행 단위가 아니라 컬럼 전체를 한 번에 처리하는 함수 방식
- 스파크는 컬럼 단위 연산 최적화됨 → 최적화 + 병렬처리 가능
1-1. 스파크 데이터 프레임 만드는 방법
1. Row 생성
from pyspark.sql import Row
data = [Row(1), Row(2), Row(3)]
df = spark.createDataFrame(data)
df.show()
---------------------------------------------------
2. 컬럼명 지정
df = spark.createDataFrame(data, ["id"])
---------------------------------------------------
3. Schema 직접 지정
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType()),
StructField("age", IntegerType())
])
→ 타입 안정성
---------------------------------------------------
4. 파일 읽기
df = spark.read.parquet("/path/maple.parquet")
---------------------------------------------------
2. Spark 데이터프레임 기본 명령어
1. schema 확인
df.printSchema()
2. 컬럼 확인
df.columns
3. 타입 확인
df.dtypes
4. 일부 행만 져오기
df.limit(5).show()
4. 데이터 확인
df.show()
df.head()
df.take(3)
df.collect()
3. Spark 데이터프레임 Select / Column 사용
3-1. 기본 select
df.select("item_id", "price").show()
+-------+-----+
|item_id|price|
+-------+-----+
| 1|93933|
| 2|61517|
| 3|26880|
| 4|45907|
| 5|48963|
| 6|47765|
| 7|17408|
| 8|22468|
| 9|33540|
| 10|91708|
+-------+-----+
## col ( 컬럼 와이즈 펑션 ) 파라미터로 컬럼 넣기 가능
from pyspark.sql.functions import col
c1 = col("item_id")
c2 = col("price")
cols = [c1, c2]
df.select(*cols).show()
3-2. selectExpr → select를 쿼리하는 식으로 사용 가능 ( 간단한 전처리 가능 )
df.selectExpr("upper(item_name) as upperName").show()
+---------+
|upperName|
+---------+
| ITEM_1|
| ITEM_2|
| ITEM_3|
| ITEM_4|
| ITEM_5|
| ITEM_6|
| ITEM_7|
| ITEM_8|
| ITEM_9|
| ITEM_10|
+---------+
3-3. pyspark.sql.functions as F → 대부분의 함수 사용 가능
import pyspark.sql.functions as F
df = df.withColumn("newColumn", F.trim(F.col("item_id")))
df.show()
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+---------+
|item_id|item_name|price|quantity| seller| world|category|upgrade_level|star_force| timestamp|newColumn|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+---------+
| 1| item_1|93933| 39| user_1|scania| weapon| 1| 20|2026-01-01 00:00:00| 1|
| 2| item_2|61517| 62| user_2|scania| armor| 4| 19|2026-01-01 01:00:00| 2|
| 3| item_3|26880| 76| user_3|scania| etc| 1| 0|2026-01-01 02:00:00| 3|
| 4| item_4|45907| 16| user_4|scania| etc| 11| 3|2026-01-01 03:00:00| 4|
| 5| item_5|48963| 51| user_5|scania| weapon| 10| 19|2026-01-01 04:00:00| 5|
| 6| item_6|47765| 61| user_6|scania| armor| 4| 2|2026-01-01 05:00:00| 6|
| 7| item_7|17408| 44| user_7|scania| etc| 9| 3|2026-01-01 06:00:00| 7|
| 8| item_8|22468| 64| user_8|scania| weapon| 12| 21|2026-01-01 07:00:00| 8|
| 9| item_9|33540| 65| user_9|scania| armor| 14| 21|2026-01-01 08:00:00| 9|
| 10| item_10|91708| 76|user_10|scania| etc| 10| 19|2026-01-01 09:00:00| 10|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+---------+
#### 컬럼 삭제도 가능 ####
df = df.drop("newColumn")
4. Spark 데이터프레임 filter / where
filter / where 둘은 똑같은 기능
df.filter("quantity >= 70").show()
df.where("quantity >= 70").show()
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+
|item_id|item_name|price|quantity| seller| world|category|upgrade_level|star_force| timestamp|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+
| 3| item_3|26880| 76| user_3|scania| etc| 1| 0|2026-01-01 02:00:00|
| 10| item_10|91708| 76|user_10|scania| etc| 10| 19|2026-01-01 09:00:00|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+
5. Spark 데이터프레임 orderBy / sort
orderBy / sort 둘은 똑같은 기능
df.orderBy("price")
df.sort("price")
#### 내림차순 ####
df.orderBy(F.col("price").desc())
6. UDF ( 사용자 정의 함수 )
Spark 기본 함수로 못하는 것을 직접 만드는 함수
[ 주의할 점 ]
느림 ( 최적화 꺠짐 )
[ 왜 UDF가 느림? ]
-------------------------------------------
Spark JVM
↓
Python으로 데이터 넘김 (직렬화)
↓
Python 함수 실행 (lambda)
↓
결과 다시 JVM으로 전달
-------------------------------------------
즉, 해당 왕복 비용 발생..
→ 가능한 상황에서는 빌트인 함수 사용 ( 스파크 엔진 함수 )
1. 100 더하는 사용자 정의 함수
import pyspark.sql.functions as F
up_100_udf = F.udf(lambda v: v + 100)
df.withColumn("up_price", up_100_udf(F.col("price"))).show()
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+--------+
|item_id|item_name|price|quantity| seller| world|category|upgrade_level|star_force| timestamp|up_price|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+--------+
| 1| item_1|93933| 39| user_1|scania| weapon| 1| 20|2026-01-01 00:00:00| 94033|
| 2| item_2|61517| 62| user_2|scania| armor| 4| 19|2026-01-01 01:00:00| 61617|
| 3| item_3|26880| 76| user_3|scania| etc| 1| 0|2026-01-01 02:00:00| 26980|
| 4| item_4|45907| 16| user_4|scania| etc| 11| 3|2026-01-01 03:00:00| 46007|
| 5| item_5|48963| 51| user_5|scania| weapon| 10| 19|2026-01-01 04:00:00| 49063|
| 6| item_6|47765| 61| user_6|scania| armor| 4| 2|2026-01-01 05:00:00| 47865|
| 7| item_7|17408| 44| user_7|scania| etc| 9| 3|2026-01-01 06:00:00| 17508|
| 8| item_8|22468| 64| user_8|scania| weapon| 12| 21|2026-01-01 07:00:00| 22568|
| 9| item_9|33540| 65| user_9|scania| armor| 14| 21|2026-01-01 08:00:00| 33640|
| 10| item_10|91708| 76|user_10|scania| etc| 10| 19|2026-01-01 09:00:00| 91808|
+-------+---------+-----+--------+-------+------+--------+-------------+----------+-------------------+--------+
7. GroupBy + Aggregation
그룹바이 = 묶기
agg = 집계 연산
df.groupBy("category").agg(
(F.sum("price") * F.sum("quantity")).alias("total")
).orderBy(F.col("total").desc())
+--------+--------+
|category| total|
+--------+--------+
| etc|38563436|
| armor|26850536|
| weapon|25466056|
+--------+--------+
→ 카테고리고 그룹화 + 총 판매 가격 + 내림차순 정렬
#### max 사용 가능 ####
df.groupby("category").agg(
F.max("price").alias("max_price")
).show()
→ 이외에도 max, avg 등.. 집계 연산 사용 가능
8. join
Spark에서 Join시 키 단위로 각 익스큐터에 다시 할당되야함
즉, 해당 과정에서 분산 서버간 네트워크 병목 + 디스크 I/O 발생 → Spark에서 Join은 무거운 작업..
1. 실습 item 데이터
+-------+---------+-----+
|item_id|seller_id|price|
+-------+---------+-----+
| 1| user_1| 5888|
| 2| user_2|37808|
| 3| user_3|55158|
| 4| user_4|99189|
...
items.printSchema()
root
|-- item_id: long (nullable = true)
|-- seller_id: string (nullable = true)
|-- price: long (nullable = true)
2. 실습 sellers 데이터
+---------+-----------+
|seller_id|seller_name|
+---------+-----------+
| user_1| seller_1|
| user_2| seller_2|
| user_3| seller_3|
| user_4| seller_4
...
sellers.printSchema()
root
|-- seller_id: string (nullable = true)
|-- seller_name: string (nullable = true)
1. seller_id 기준 inner join
items.join(sellers, ["seller_id"], "inner").show()
+---------+-------+-----+-----------+
|seller_id|item_id|price|seller_name|
+---------+-------+-----+-----------+
| user_1| 1| 5888| seller_1|
| user_2| 2|37808| seller_2|
| user_3| 3|55158| seller_3|
| user_4| 4|99189| seller_4|
| user_5| 5|55305| seller_5|
| user_6| 6|13533| seller_6|
| user_7| 7|23688| seller_7|
| user_8| 8|60822| seller_8|
| user_9| 9|11351| seller_9|
| user_10| 10|49051| seller_10|
+---------+-------+-----+-----------+
join, left join, full join, cross join 사용 가능
9. 윈도우 함수
그룹 + 행 단위 분석
즉, Spark에서 윈도우 기능도 사용가능 → 특정 컬럼으로 파티션해서 통계나 추가 정보 처리 시
from pyspark.sql.window import Window
import pyspark.sql.functions as F
window = Window.partitionBy("seller_id").orderBy(F.desc("price"))
df.withColumn("rank", F.row_number().over(window))
[ 활용 ]
- Top 1 상품
- 순위
- 누적합
- 그룹 내 정렬
'데이터 엔지니어( 이론 공부 ) > spark' 카테고리의 다른 글
| Apache Spark ( Spark 최적화 하기 ) (0) | 2026.04.28 |
|---|---|
| Apache Spark ( Spark submit & partition & shuffle ) (0) | 2026.04.25 |
| Apache Spark ( 핵심 개념 ) (1) | 2026.04.14 |
| 2. 스파크 간단히 살펴보기 ( 빅데이터와 스파크 살펴보기 ) (0) | 2026.03.25 |
| 1. 아파치 스파크란? ( 빅데이터와 스파크 살펴보기 ) (0) | 2026.03.24 |