개발강의정리/DevOps

[Apache kafka 조금 아는 척하기] 카프카 프로듀서

nineDeveloper 2021. 1. 5.
728x90

https://www.youtube.com/watch?v=geMtm17ofPY&list=PLwouWTPuIjUgr29uSrSkVo8PRmem6HRDE&index=2

 

프로듀서

  • 토픽에 메시지 전송
    • 토픽, 키, 값

프로듀서를 사용해서 메시지를 보내는 코드

// Properties를 통해서 ack, batch사이즈 등의 설정 함
Properties prop = new Properties();
prop.put("bootstrap.servers", "kafka01:9092,kafka01:9092,kafka01:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Properties를 이용해서 KafkaProducer 객체를 생성 함
KafkaProducer<Integer, String> producer = new KafkaProducer<>(prop);

// KafkaProducer 객체는 send 메서드를 제공함
// send 메서드에 ProducerRecord를 전달을 함
// ProducerRecord가 Kafka Broker에 전송할 메시지가 됨

// ProducerRecord는 메시지를 크게 두가지 방법으로 생성할 수 있음
// 토픽이름과 key, value 를 사용해서 생성
producer.send(new ProducerRecord<>("topicname", "key", "value"));
// 토픽이름과 value 를 사용해서 생성
producer.send(new ProducerRecord<>("topicname", "value"));

// 사용 완료 후 close 메서드로 닫아줌
producer.close();

프로듀서의 기본 흐름


Sender 기본 동작

Sender가 배치를 Broker에 보내는 동안 send() 메서드를 통해서 들어온 record는 계속 배치에 누적해서 쌓이게 됨
SenderSender대로 배치를 꺼내서 broker에 보내고
send() 메서드는 send() 메서드대로 계속해서 메시지를 배치에 누적하게 됨

두개가 계속해서 별도의 쓰레드로 동작하기 때문에 쌓이는 동안에 Sender가 메시지를 보내지 않는 일은 벌어지지 않음

Sender는 배치에 메시지가 한개가 있던 여러개가 있던 다 차지 않아도 Sender가 배치를 보낼 수만 있다면 바로 보냄


처리량 관련 주요 속성

batch.size가 너무 작으면 한번에 보낼 수 있는 메시지의 개수가 줄고 전송 횟수가 많아지기 때문에 처리량이 줄어들게됨

linger.ms에 대기시간을 주면 그 시간만큼 기다리니까 기다리는 시간동안 배치에 다른 메시지가 쌓이게 되고
한번에 더 많은 메시지를 보낼 수 있는 여지가 생기므로 전반적인 처리량이 높아지는 효과를 볼 수 있게됨


전송 결과 확인 안함

producer.send(new ProducerRecord<>("simple","value));

  • 전송 실패를 알 수 없음
  • 실패에 대한 별도 처리가 필요없는 메시지 전송에 사용

전송 결과 확인함: Future 사용

Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic","value"));
try {
  RecordMetadata meta = f.get(); // 블로킹
} catch (ExecutionException ex) {

}

배치에 메시지가 쌓이지 않고 한개씩만 들어감
건별로 결과를 확실하게 알 수있게 됨

  • 배치 효과 떨어짐 -> 처리량 저하
  • 처리량이 낮아도 되는 경우에만 사용

전송 결과 확인함: Callback 사용

producer.send(new ProducerRecord<>("simple","value"),
  new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception ex) {

    }
  }
});

send() 메서드에 Callback 겍체를 전달
Callback 객체는 전송이 완료되면 전송 결과를 onCompletion 메서드로 받게됨
이때 Exception 객체를 받게되면 전송이 실패한 것
성공, 실패에 따른 후처리를 할 수 있음

  • 블로킹 방식이 아니므로 배치가 쌓이는데 문제가 없고 처리량 저하 없음

전송 보장과 ack

  • ack = 0
    • 서버 응답을 기다리지 않음
    • 전송 보장도 zero
    • 처리량은 많이 높아지겠지만 메시지가 유실되더라도 그 실패여부를 알 수 없기 떄문에 메시지 전송여부가 중요한 경우에는 사용하면 안됨
  • ack = 1
    • 파티션의 리더에 저장되면 응답 받음
    • 리더 장애시 메시지 유실 가능
    • 리더에 성공적으로 저장이되서 성공응답을 받았는데 팔로워에는 아직 복제되지 않았는데 리더에서 장애가 나면 리더에 저장된 메시지가
      아직 팔로워에 복제되지 않은 상태에서 팔로워 중에 하나가 리더가 되면 기존 리더에 저장된 메시지가 유실됨
  • ack = all (또는 -1)
    • 모든 리플리카에 저장되면 응답 받음
      • 브로커 min.insync.replicas 설정에 따라 달라짐
    • 엄격하게 전송을 보장해야하는 상황에서 사용


ack + min.insync.replicas

  • min.insync.replicas (브로커 옵션)
    • 프로듀서 ack 옵션이 all일 때 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수
  • 예1
    • 리플리카 개수 3, ack = all, min,insync.replicas = 2
    • 리더에 저장하고 팔로워 중 한 개에 저장하면 최소 개수를 충족하기 때문에 성공 응답
  • 예2
    • 리플리카 개수 3, ack = all, min,insync.replicas = 1
    • 리더에 저장되면 성공 응답
    • ack = 1과 동일 (리더 장애시 메시지 유실 가능)
  • 예3
    • 리플리카 개수 3, ack = all, min,insync.replicas = 3
    • 리더와 팔로워 2개에 저장되면 성공 응답
    • 팔로워 중 한 개라도 장애가 나면 리플리카 부족으로 저장에 실패함
    • min,insync.replicas 옵션은 리플리카 개수와 동일하게 지정하면 안됨


에러 유형

  • 전송 과정에서 실패
    • 전송 타임 아웃(일시적인 네트워크 오류 등)
    • 리더 다운에 의한 새 리더 선출 진행 중
    • 브로커 설정 메시지 크기 한도 초과
    • 등등
  • 전송 전에 실패
    • 직렬화 실패, 프로듀서 자체 요청 크기 제한 초과
    • 프로듀서 버퍼가 차서 기다린 시간이 최대 대기 시간 초과
    • 등등

실패 대응 1 : 재시도

  • 재시도
    • 재시도 가능한 에러는 재시도 처리
      • 예: 브로커 응답 타임 아웃, 일시적인 리더 없음 등
  • 재시도 위치
    • 프로듀서는 기본적으로 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도
      • retries 속성
    • send() 메서드에서 Exception 발생시 Exception 타입에 따라 send() 재호출
    • 콜백 메서드에서 Exception 받으면 타입에 따라 send() 재호출
  • 아주 아주 특별한 이유가 없다면 무한 재시도 X
    • 다음 보내야할 메시지가 밀리는 것임
    • 재시도를 일정시간이나 일정횟수로 제한을 해서 전체적인 메시지가 밀리지 않도록 주의해야함

실패 대응 2 : 기록

  • 추후 재처리 위해 기록
    • 별도 파일, DB 등을 이용해서 실패한 메시지 기록
    • 추후에 수동(또는 자동) 보정 작업 진행
  • 기록 위치
    • send() 메서드에서 Exception 발생시
    • send() 메서드에 전달한 콜백에서 Exception 받는 경우
    • send() 메서드가 리턴한 Futureget() 메서드에서 Exception 발생시

재시도와 메시지 중복 전송 가능성

  • 브로커 응답이 늦게 와서 재시도할 경우 중복 발송 가능
  • *참고: enable.idempotence 속성을 사용하면 중복 전송될 가능성을 줄일 수 있음


재시도와 순서

  • max.in.flight.requests.per.connection
    • 블로킹없이 한 커넥션에서 전송할 수 있는 최대 전송중인 요청 개수
    • 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 바뀔 수 있음
      • 전송 순서가 중요하면 이 값을 1로 지정


정리

처리량 관련

  • batch.size: 배치 크기, 배치가 다 차면 바로 전송
  • linger.ms: 전송 대기 시간(기본값 0)

전송 신뢰성

  • ack = all: 모든 리플리카에 저장되면 응답 받음
  • replication factor = 3 리플리카 개수가 3개이면
  • min.insync.replicas = 2, 2까지만 지정해야 함

재시도 주의 사항

  • 중복 전송
  • 순서 바뀜
728x90

댓글

💲 추천 글