https://www.youtube.com/watch?v=geMtm17ofPY&list=PLwouWTPuIjUgr29uSrSkVo8PRmem6HRDE&index=2
프로듀서
- 토픽에 메시지 전송
- 토픽, 키, 값
프로듀서를 사용해서 메시지를 보내는 코드
// Properties를 통해서 ack, batch사이즈 등의 설정 함
Properties prop = new Properties();
prop.put("bootstrap.servers", "kafka01:9092,kafka01:9092,kafka01:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Properties를 이용해서 KafkaProducer 객체를 생성 함
KafkaProducer<Integer, String> producer = new KafkaProducer<>(prop);
// KafkaProducer 객체는 send 메서드를 제공함
// send 메서드에 ProducerRecord를 전달을 함
// ProducerRecord가 Kafka Broker에 전송할 메시지가 됨
// ProducerRecord는 메시지를 크게 두가지 방법으로 생성할 수 있음
// 토픽이름과 key, value 를 사용해서 생성
producer.send(new ProducerRecord<>("topicname", "key", "value"));
// 토픽이름과 value 를 사용해서 생성
producer.send(new ProducerRecord<>("topicname", "value"));
// 사용 완료 후 close 메서드로 닫아줌
producer.close();
프로듀서의 기본 흐름
Sender 기본 동작
Sender가 배치를 Broker에 보내는 동안 send()
메서드를 통해서 들어온 record는 계속 배치에 누적해서 쌓이게 됨
Sender는 Sender대로 배치를 꺼내서 broker에 보내고send()
메서드는 send()
메서드대로 계속해서 메시지를 배치에 누적하게 됨
두개가 계속해서 별도의 쓰레드로 동작하기 때문에 쌓이는 동안에 Sender가 메시지를 보내지 않는 일은 벌어지지 않음
Sender는 배치에 메시지가 한개가 있던 여러개가 있던 다 차지 않아도 Sender가 배치를 보낼 수만 있다면 바로 보냄
처리량 관련 주요 속성
batch.size
가 너무 작으면 한번에 보낼 수 있는 메시지의 개수가 줄고 전송 횟수가 많아지기 때문에 처리량이 줄어들게됨
linger.ms
에 대기시간을 주면 그 시간만큼 기다리니까 기다리는 시간동안 배치에 다른 메시지가 쌓이게 되고
한번에 더 많은 메시지를 보낼 수 있는 여지가 생기므로 전반적인 처리량이 높아지는 효과를 볼 수 있게됨
전송 결과 확인 안함
producer.send(new ProducerRecord<>("simple","value));
- 전송 실패를 알 수 없음
- 실패에 대한 별도 처리가 필요없는 메시지 전송에 사용
전송 결과 확인함: Future 사용
Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic","value"));
try {
RecordMetadata meta = f.get(); // 블로킹
} catch (ExecutionException ex) {
}
배치에 메시지가 쌓이지 않고 한개씩만 들어감
건별로 결과를 확실하게 알 수있게 됨
- 배치 효과 떨어짐 -> 처리량 저하
- 처리량이 낮아도 되는 경우에만 사용
전송 결과 확인함: Callback 사용
producer.send(new ProducerRecord<>("simple","value"),
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception ex) {
}
}
});
send()
메서드에 Callback 겍체를 전달
Callback 객체는 전송이 완료되면 전송 결과를 onCompletion 메서드로 받게됨
이때 Exception 객체를 받게되면 전송이 실패한 것
성공, 실패에 따른 후처리를 할 수 있음
- 블로킹 방식이 아니므로 배치가 쌓이는데 문제가 없고 처리량 저하 없음
전송 보장과 ack
ack = 0
- 서버 응답을 기다리지 않음
- 전송 보장도 zero
- 처리량은 많이 높아지겠지만 메시지가 유실되더라도 그 실패여부를 알 수 없기 떄문에 메시지 전송여부가 중요한 경우에는 사용하면 안됨
ack = 1
- 파티션의 리더에 저장되면 응답 받음
- 리더 장애시 메시지 유실 가능
- 리더에 성공적으로 저장이되서 성공응답을 받았는데 팔로워에는 아직 복제되지 않았는데 리더에서 장애가 나면 리더에 저장된 메시지가
아직 팔로워에 복제되지 않은 상태에서 팔로워 중에 하나가 리더가 되면 기존 리더에 저장된 메시지가 유실됨
ack = all
(또는 -1)- 모든 리플리카에 저장되면 응답 받음
- 브로커
min.insync.replicas
설정에 따라 달라짐
- 브로커
- 엄격하게 전송을 보장해야하는 상황에서 사용
- 모든 리플리카에 저장되면 응답 받음
ack + min.insync.replicas
min.insync.replicas
(브로커 옵션)- 프로듀서
ack
옵션이 all일 때 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수
- 프로듀서
- 예1
- 리플리카 개수 3,
ack = all
,min,insync.replicas
= 2 - 리더에 저장하고 팔로워 중 한 개에 저장하면 최소 개수를 충족하기 때문에 성공 응답
- 리플리카 개수 3,
- 예2
- 리플리카 개수 3,
ack = all
,min,insync.replicas
= 1 - 리더에 저장되면 성공 응답
ack = 1
과 동일 (리더 장애시 메시지 유실 가능)
- 리플리카 개수 3,
- 예3
- 리플리카 개수 3,
ack = all
,min,insync.replicas
= 3 - 리더와 팔로워 2개에 저장되면 성공 응답
- 팔로워 중 한 개라도 장애가 나면 리플리카 부족으로 저장에 실패함
min,insync.replicas
옵션은 리플리카 개수와 동일하게 지정하면 안됨
- 리플리카 개수 3,
에러 유형
- 전송 과정에서 실패
- 전송 타임 아웃(일시적인 네트워크 오류 등)
- 리더 다운에 의한 새 리더 선출 진행 중
- 브로커 설정 메시지 크기 한도 초과
- 등등
- 전송 전에 실패
- 직렬화 실패, 프로듀서 자체 요청 크기 제한 초과
- 프로듀서 버퍼가 차서 기다린 시간이 최대 대기 시간 초과
- 등등
실패 대응 1 : 재시도
- 재시도
- 재시도 가능한 에러는 재시도 처리
- 예: 브로커 응답 타임 아웃, 일시적인 리더 없음 등
- 재시도 가능한 에러는 재시도 처리
- 재시도 위치
- 프로듀서는 기본적으로 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도
retries
속성
send()
메서드에서 Exception 발생시 Exception 타입에 따라send()
재호출- 콜백 메서드에서 Exception 받으면 타입에 따라
send()
재호출
- 프로듀서는 기본적으로 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도
- 아주 아주 특별한 이유가 없다면 무한 재시도 X
- 다음 보내야할 메시지가 밀리는 것임
- 재시도를 일정시간이나 일정횟수로 제한을 해서 전체적인 메시지가 밀리지 않도록 주의해야함
실패 대응 2 : 기록
- 추후 재처리 위해 기록
- 별도 파일, DB 등을 이용해서 실패한 메시지 기록
- 추후에 수동(또는 자동) 보정 작업 진행
- 기록 위치
send()
메서드에서 Exception 발생시send()
메서드에 전달한 콜백에서 Exception 받는 경우send()
메서드가 리턴한Future
의get()
메서드에서 Exception 발생시
재시도와 메시지 중복 전송 가능성
- 브로커 응답이 늦게 와서 재시도할 경우 중복 발송 가능
- *참고:
enable.idempotence
속성을 사용하면 중복 전송될 가능성을 줄일 수 있음
재시도와 순서
max.in.flight.requests.per.connection
- 블로킹없이 한 커넥션에서 전송할 수 있는 최대 전송중인 요청 개수
- 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 바뀔 수 있음
- 전송 순서가 중요하면 이 값을 1로 지정
정리
처리량 관련
batch.size
: 배치 크기, 배치가 다 차면 바로 전송linger.ms
: 전송 대기 시간(기본값 0)
전송 신뢰성
ack = all
: 모든 리플리카에 저장되면 응답 받음- replication factor = 3 리플리카 개수가 3개이면
min.insync.replicas
= 2, 2까지만 지정해야 함
재시도 주의 사항
- 중복 전송
- 순서 바뀜
'개발강의정리 > DevOps' 카테고리의 다른 글
[데브옵스를 위한 쿠버네티스 마스터] VisualStudio Code 설치와 활용 (0) | 2021.01.19 |
---|---|
[Apache kafka 조금 아는 척하기] 카프카 컨슈머 (0) | 2021.01.05 |
[Apache kafka 조금 아는 척하기] 카프카란? (0) | 2021.01.05 |
[아파치 카프카 입문과 활용] 5. Apache kafka 파이프라인 실습 (0) | 2021.01.04 |
[아파치 카프카 입문과 활용] 4. Apache kafka 컨슈머 애플리케이션 개발, 실습 (0) | 2021.01.04 |
댓글