1. kafka consumer : 토픽의 파티션에 저장된 데이터를 가져옴
- 데이터를 가져오는 것을 "폴링" 이라함
- 컨슈머도 프로듀서와 똑같이 카프카 클라이언트 이기 떄문에 kafka server 와 버전 호환성을 맞춰야됨!!!!
2. kafka consumer 주요 개념
폴링 (Polling)
- 카프카는 컨슈머가 브로커에 데이터를 요청(폴링)
- 컨슈머는 자신이 원하는 만큼 브로커에게 데이터 폴링
오프셋 (Offset)
- 컨슈머가 데이터를 읽을떄 데이터 위치를 오프셋으로 관리!!!
- 읽은 데이터는 __consumer_offset 토픽에 저장
- 컨슈머는 자신이 읽은 데이터 위치를 저장하고 다음 작업 이어서 처리
- 즉, 컨슈머 실행 중지후 재시작시 __consumer_offset을 통해 이전 데이터 처리를 확인하고 중지 시점부터 다시 처리 가능( 고가용성 )
- 오프셋은 자동커밋, 수동커밋이 존재 ( enable.auto.commit )
자동 커밋 : 설정 주기로 자동 오프셋 커밋
수동 커밋 : 컨슈머가 처리후 직접 커밋, 대부분 수동 커밋 사용 ( 메시지 처리 완료를 직접 관리 )
컨슈머 그룹(Consumer Group)

- 여러 컨슈머가 하나의 그룹을 이룰수 있다.
- 같은 그룹내 컨슈머는 서로 협력하며 데이터 병렬처리
- 컨슈머가 하나고 처리할 파티션이 3개면 혼자서 3개처리... ㅠㅠ, 컨슈머가 3개고 처리할 파티션이 3개면 각각 한 개씩 맡아서 처리!!!
- 즉, 컨슈머 그룹내의 각 컨슈머는 처리할 파티션 소유권을 가지며 처리, 여기서 컨슈머가 추가 및 삭제 시 각 컨슈머의 파티션 소유권은 재조정 ( 리밸런싱 )
- 컨슈머 개수는 파티션 개수보다 적거나 같아야됨!!!
- 각기 다른 컨슈머 그룹은 다른 컨슈머에 영향을 미치지 않음 왜??, __consumer_offset 토픽에 컨슈머 그룹별, 토픽별 offset을 나누어 저장하기에
- 즉, 각각의 컨슈머그룹은 각자가 원하는 데이터를 처리할수 있다!!
컨슈머 그룹1 >>> 엘라스틱서치
컨슈머 그룹2 >>> postgresql
컨슈머 그룹3 >>> hadoop hdfs
3. kafka consumer 예제 코드
from confluent_kafka import Consumer, KafkaException
# Consumer 설정
conf = {
'bootstrap.servers': 'brocker1:9092','brocker2:9092','brocker3:9092'
'group.id': 'test-group',
'auto.offset.reset': 'earliest'
}
# Consumer 인스턴스 생성
consumer = Consumer(conf)
# 토픽 구독
consumer.subscribe(['test-topic'])
try:
while True:
# 메시지 폴링 ( 1초 동안 기다리며 주기적으로 브로커에 메시지 요청 )
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# 파티션의 끝에 도달했음을 알리는 메시지
print('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
# 메시지 처리
print('Received message: %s' % msg.value().decode('utf-8'))
except KeyboardInterrupt:
pass
finally:
# Consumer 종료
consumer.close()
bootstrap.servers : 카프카 클러스터 주소 ( 고가용성 위해 여러개 추천 )
group.id : 카프카 컨슈머 그룹을 식별하는 id
- 동일한 그룹id를 가진 컨슈머들은 협력하여 토픽의 파티션 데이터를 병렬처리
auto.offset.reset : 컨슈머가 offset을 찾을수 없을떄...(처음 구독하거나, 이전 오프셋이 없는 경우), 데이터를 어디서 부터 처리할지 설정
- earliest : 그냥 가장 처음부터 처리~
- latest : 가장 마지막 오프셋 메시지 읽음, 새로 도착하는 메시지만 (아 몰라~ 그냥 이제부터 읽을겨~)
- none : 이전에 커밋된 오프셋이 없을시 오류 발생!!!(아니 없자나 뺴애애애애애애엑!!!!)
'데이터 엔지니어( 실습 정리 ) > kafka' 카테고리의 다른 글
| kafka 간단 실습 (0) | 2024.09.02 |
|---|---|
| (4) - kafka consumer Lag, Burrow (0) | 2024.07.16 |
| (2) - kafka 브로커, 레플리케이션 (0) | 2024.07.12 |
| (1) - kafka 개요 (0) | 2024.07.12 |