728x90
https://www.youtube.com/watch?v=xqrIDHbGjOY&list=PLwouWTPuIjUgr29uSrSkVo8PRmem6HRDE&index=1
컨슈머
- 토픽 파티션에서 레코드 조회
Properties prop = new Properties();
prop.put("bootstrap.servers","localhost:9092")
prop.put("group.id","group1"); // group.id 지정
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Properties를 이용해서 KafkaConsumer 객체를 생성 함
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple")); // 토픽 구독
while(조건) {
// 일정 시간 대기하면서 브로커로부터 컨슈머 레코드 목록을 읽어 온다
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 읽어온 컨슈머 레코드 목록을 순회하며 필요한 처리를 진행
for (ConsumerRecord<String, String> records : records) {
System.out.println(record.value() + ":" + record.topic() + ":" + record.partition() + ":" + record.offset());
}
}
// 사용 완료 후 close 메서드로 닫아줌
consumer.close();
토픽 파티션은 그룹 단위 할당
- 컨슈머 그룹 단위(
group.id
)로 파티션 할당 - 파티션 개수와 컨슈머 개수는 밀접하게 관련이 되어 있음
- 파티션 개수 < 컨슈머 그룹: 컨슈머는 놀게 됨
- 처리량이 떨어져서 컨슈머 개수를 늘려야 한다면 파티션의 개수도 늘려야 함
커밋과 오프셋
- 컨슈머의
poll()
는 이전에 커밋한 offset이 있으면 그 offset 이후의 record를 읽어옴 - 읽어온 다음 마지막 읽어온 record의 offset을 커밋함
커밋된 오프셋이 없는 경우
poll()
로 record를 읽어오기 위해 접근을 하는데 처음 접근이거나 커밋한 오프셋이 없는 경우auto.offset.reset
설정 사용earliest
: 맨 처음 오프셋 사용latest
: 가장 마지막 오프셋 사용 (기본값)none
: 컨슈머 그룹에 대한 이전 커밋이 없으면 Exception 발생
- 주로
earliest
,latest
두 옵션을 사용하는 편
컨슈머 설정
- 조회에 영향을 주는 주요 설정
fetch.min.bytes
: 조회시 브로커가 전송할 최소 데이터 크기- 기본값: 1
- 이 값이 크면 대기 시간은 늘지만 처리량이 증가
fetch.max.wait.ms
: 데이터가 최소 크기가 될 때까지 기다릴 시간- 기본값: 500
- 브로커가 리턴할 때까지 대기하는 시간으로
poll()
메서드의 대기 시간과 다름
max.partition.fetch.bytes
: 파티션 당 서버가 리턴할 수 있는 최대 크기- 기본값: 10948576 (1MB)
자동 커밋/수동 커밋
enable.auto.commit
설정true
: 일정 주기로 컨슈머가 읽은 오프셋을 커밋 (기본값)false
: 수동으로 커밋 실행
auto.commit.interval.ms
: 자동 커밋 주기- 기본값: 5000 (5초)
poll()
,close()
메서드 호출시 자동 커밋 실행
수동 커밋 : 동기/비동기 커밋
동기 커밋
- 커밋에 성공하면 Exception 이 발생하지 않고 실패하면 Exception 발생
- 커밋에 실패했을 때는 Exception 을 catch 해서 알맞은 처리가 필요함
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(1));
for (ConsumerRecord<String, String> record : records) {
... 처리
}
try {
consumer.commitSync();
} catch(Exception ex) {
// 커밋 실패시 에러 발생
}
비동기 커밋
- 코드 자체에서 바로 실패여부를 알 수 없음
- 성공 실패 여부를 알고 싶다면 callback을 받아서 처리
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(1));
for (ConsumerRecord<String, String> record : records) {
... 처리
}
consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback)
재처리와 순서
- 동일 메시지 조회(수신) 가능성
- 일시적 커밋 실패, 리밸런스 등에 의해 발생
- 컨슈머는 멱등성(idempotence)을 고려해야 함
- 예: 아래 메시지를 재처리 할 경우
- 조회수 1 증가 -> 좋아요 1증가 -> 조회수 1증가
- 단순 처리하면 조회수는 2가 아닌 4가 될 수 있음
- 예: 아래 메시지를 재처리 할 경우
- 데이터 특성에 따라 타임스탬프(timestamp), 일련 번호(serial number) 등을 활용
세션 타임아웃(session.timeout), 하트비트(heartbeat), 최대 poll 간격
- 컨슈머는 하트비트(heartbeat)를 전송해서 연결 유지
- 브로커는 일정 시간 컨슈머로부터 하트비트(heartbeat)가 없으면 컨슈머를 그룹에서 빼고 리밸런스 진행
- 관련 설정
session.timeout.ms
: 세션 타임 아웃 시간 (기본값 10초)heartbeat.interval.ms
: 하트비트 전송 주기 (기본값 3초)session.timeout.ms
의 1/3 이하 추천
max.poll.interval.ms
:poll()
메서드의 최대 호출 간격- 이 시간이 지나도록
poll()
하지 않으면 컨슈머를 그룹에서 빼고 리밸런스 진행
- 이 시간이 지나도록
종료 처리
- 다른 쓰레드에서
wakeup()
메서드 호출poll()
메서드가 WakeupException 발생 ->close()
메서드로 종료 처리
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple"));
try {
while(조건) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // wakeup() 호출시 Exception 발생
... records 처리
try {
consumer.commitSync();
} catch(Exception ex) {
e.printStackTrace();
}
}
} catch (Exception ex) {
...
} finally {
consumer.close();
}
주의: 쓰레드 안전하지 않음
- KafkaConsumer는 쓰레드에 안전하지 않음
- 여러 쓰레드에서 동시에 사용하지 말 것!
wakeup()
메서드는 예외
728x90
'개발강의정리 > DevOps' 카테고리의 다른 글
[데브옵스를 위한 쿠버네티스 마스터] 애플리케이션 스케줄링과 라이프사이클 관리 - 애플리케이션 변수 관리 (0) | 2021.01.19 |
---|---|
[데브옵스를 위한 쿠버네티스 마스터] VisualStudio Code 설치와 활용 (0) | 2021.01.19 |
[Apache kafka 조금 아는 척하기] 카프카 프로듀서 (0) | 2021.01.05 |
[Apache kafka 조금 아는 척하기] 카프카란? (0) | 2021.01.05 |
[아파치 카프카 입문과 활용] 5. Apache kafka 파이프라인 실습 (0) | 2021.01.04 |
댓글