본문 바로가기
데이터 엔지니어( 실습 정리 )/kafka

(3) - kafka 컨슈머

by 세용용용용 2024. 7. 13.

1. kafka consumer : 토픽의 파티션에 저장된 데이터를 가져옴

- 데이터를 가져오는 것을 "폴링" 이라함

- 컨슈머도 프로듀서와 똑같이 카프카 클라이언트 이기 떄문에 kafka server 와 버전 호환성을 맞춰야됨!!!!

 

 

2. kafka consumer 주요 개념

폴링 (Polling)
  • 카프카는 컨슈머가 브로커에 데이터를 요청(폴링)
  • 컨슈머는 자신이 원하는 만큼 브로커에게 데이터 폴링

 

오프셋 (Offset)
  • 컨슈머가 데이터를 읽을떄 데이터 위치를 오프셋으로 관리!!!
  • 읽은 데이터는 __consumer_offset 토픽에 저장
  • 컨슈머는 자신이 읽은 데이터 위치를 저장하고 다음 작업 이어서 처리
  • 즉, 컨슈머 실행 중지후 재시작시 __consumer_offset을 통해 이전 데이터 처리를 확인하고 중지 시점부터 다시 처리 가능( 고가용성 )
  • 오프셋은 자동커밋, 수동커밋이 존재 ( enable.auto.commit )

자동 커밋 : 설정 주기로 자동 오프셋 커밋

수동 커밋 : 컨슈머가 처리후 직접 커밋, 대부분 수동 커밋 사용 ( 메시지 처리 완료를 직접 관리 )

 

컨슈머 그룹(Consumer Group)

  • 여러 컨슈머가 하나의 그룹을 이룰수 있다.
  • 같은 그룹내 컨슈머는 서로 협력하며 데이터 병렬처리
  • 컨슈머가 하나고 처리할 파티션이 3개면 혼자서 3개처리... ㅠㅠ, 컨슈머가 3개고 처리할 파티션이 3개면 각각 한 개씩 맡아서 처리!!!
  • 즉, 컨슈머 그룹내의 각 컨슈머는 처리할 파티션 소유권을 가지며 처리, 여기서 컨슈머가 추가 및 삭제 시 각 컨슈머의 파티션 소유권은 재조정 ( 리밸런싱 )
  • 컨슈머 개수는 파티션 개수보다 적거나 같아야됨!!!
  • 각기 다른 컨슈머 그룹은 다른 컨슈머에 영향을 미치지 않음 왜??, __consumer_offset 토픽에 컨슈머 그룹별, 토픽별 offset을 나누어 저장하기에
  • 즉, 각각의 컨슈머그룹은 각자가 원하는 데이터를 처리할수 있다!!

컨슈머 그룹1 >>> 엘라스틱서치

컨슈머 그룹2 >>> postgresql

컨슈머 그룹3 >>> hadoop hdfs

 

3. kafka consumer 예제 코드

from confluent_kafka import Consumer, KafkaException

# Consumer 설정
conf = {
    'bootstrap.servers': 'brocker1:9092','brocker2:9092','brocker3:9092'
    'group.id': 'test-group',
    'auto.offset.reset': 'earliest'
}

# Consumer 인스턴스 생성
consumer = Consumer(conf)

# 토픽 구독
consumer.subscribe(['test-topic'])

try:
    while True:
        # 메시지 폴링 ( 1초 동안 기다리며 주기적으로 브로커에 메시지 요청 )
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # 파티션의 끝에 도달했음을 알리는 메시지
                print('%% %s [%d] reached end at offset %d\n' %
                      (msg.topic(), msg.partition(), msg.offset()))
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # 메시지 처리
            print('Received message: %s' % msg.value().decode('utf-8'))

except KeyboardInterrupt:
    pass
finally:
    # Consumer 종료
    consumer.close()

 

bootstrap.servers : 카프카 클러스터 주소 ( 고가용성 위해 여러개 추천 )
group.id : 카프카 컨슈머 그룹을 식별하는 id
  • 동일한 그룹id를 가진 컨슈머들은 협력하여 토픽의 파티션 데이터를 병렬처리
auto.offset.reset : 컨슈머가 offset을 찾을수 없을떄...(처음 구독하거나, 이전 오프셋이 없는 경우), 데이터를 어디서 부터 처리할지 설정
  • earliest : 그냥 가장 처음부터 처리~
  • latest : 가장 마지막 오프셋 메시지 읽음, 새로 도착하는 메시지만 (아 몰라~ 그냥 이제부터 읽을겨~)
  • none : 이전에 커밋된 오프셋이 없을시 오류 발생!!!(아니 없자나 뺴애애애애애애엑!!!!)

'데이터 엔지니어( 실습 정리 ) > kafka' 카테고리의 다른 글

kafka 간단 실습  (0) 2024.09.02
(4) - kafka consumer Lag, Burrow  (0) 2024.07.16
(2) - kafka 브로커, 레플리케이션  (0) 2024.07.12
(1) - kafka 개요  (0) 2024.07.12