개발강의정리/DevOps

[아파치 카프카 입문과 활용] 3. Apache kafka 프로듀서 애플리케이션 개발, 실습

nineDeveloper 2021. 1. 4.
728x90

www.youtube.com/watch?v=4BbKCsKSq_I&list=PL3Re5Ri5rZmkY46j6WcJXQYRlDRZSUQ1j&index=18

 

Kafka Producer application

 

PRACTICE - Import project from github

https://github.com/freeserver1191/tacademy-kafka

git clone

$ git clone https://github.com/freeserver1191/tacademy-kafka.git
Cloning into 'tacademy-kafka'...
remote: Enumerating objects: 286, done.
remote: Counting objects: 100% (286/286), done.
remote: Compressing objects: 100% (130/130), done.
remote: Total 286 (delta 60), reused 260 (delta 44), pack-reused 0
Receiving objects: 100% (286/286), 2.38 MiB | 380.00 KiB/s, done.
Resolving deltas: 100% (60/60), done.
$ ls
tacademy-kafka

 

Producer

  • 카프카로 데이터(key, value) 전송
  • ProducerRecord 객체를 생성
  • Java Kafka-client 제공
  • 어떤 데이터를 보내나요?
    • Web, Application 클릭로그
    • 공유 자전거/자동차의 위치(GPS)정보
    • 스마트팩토리 공장의 머신 센서정보
    • 상호 통신하기 위한 application
    • 기타 ...

 

Producer code

dependencies {
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix

  • 일부 카프카 브로커와 클라이언트 버전의 호환에 이슈가 있음

 

PRACTICE - Simple producer code

public class Main {
  private static String TOPIC_NAME = "test"; //토픽명
  private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092"; //카프카 클러스터 주소

  public static void main(String[] args) {
    Properties configs = new Properties();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

    for (int index = 0; index < 10; index++) {
      String data = "This is record " + index;
      ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
      try {
        producer.send(record);
        System.out.println("Send to "+ TOPIC_NAME +" | data : "+data);
        Thread.sleep(1000);
      } catch (Exception e) { 
        System.out.println(e);
      } 
    }
  } 
}

 

실습

  1. SimpleProducer BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
  2. 소스코드 RUN
  3. 17번 커맨드 Consumer Polling Test 스크립트로 데이터 consuming
    ./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --from-beginning

 

Producer options

  • 필수옵션 - 반드시 입력
    • bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록
    • key.serializer : 메시지 키 직렬화에 사용되는 클래스
    • value.serializer : 메시지 값을 직렬화 하는데 사용되는 클래스
  • 선택옵션 - default값 존재
    • acks : 레코드 전송 신뢰도 조절(리플리카)
    • comression.type : snappy, gzip, lz4 중 하나로 압축하여 전송
    • retries : 클러스터 장애에 대응하여 메시지 전송을 재시도하는 회수
    • buffer.memory : 브로커에 전송될 메시지의 버퍼로 사용 될 메모리 양
    • batch.size : 여러 데이터를 함께 보내기 위한 레코드 크기
    • linger.ms : 현재의 배치를 전송하기 전까지 기다리는 시간
    • client.id : 어떤 클라이언트인지 구분하는 식별자

 

Producer

https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/

 

PRACTICE - Produce with key/value records

public class Main {
    private static String TOPIC_NAME = "test";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index;
            // Key - Integer.toString(index), Value - data
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data);
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

$ ./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --property print.key=true --property key.separator="-"

 

실습

  1. ProducerWithKeyValue BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
  2. 24번 커맨드로 데이터 consuming
    ./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --property print.key=true --property key.separator="-"

 

 

 

Record key

레코드 키

  • 역할 : 메시지를 구분하는 구분자 역할

  • 특징

    • 동일 키, 동일 파티션 적재 by Default partitioner
      👉 순서를 보장하므로, 상태머신(state machine)으로 사용 가능

    👉 역할에 따른 컨슈머 할당 적용 가능

  • 레코드 값(value)을 정의 하는 구분자
    👉 키에 레코드 값 해쉬값을 넣음으로서 중복처리 방지 가능

 

Record value

레코드 값

  • 역할 : 실질적으로 전달하고 싶은 데이터
    • 어떤 형(type)을 보낼 수 있나요?
      • String, ByteArray, Int 등 사실상 제한 없음
    • 어떤 데이터 포맷이 좋나요?
      • CSV, TSV, JSON, Object 등 서비스의 특징에 맞게 사용 권장
        • JSON 사용시
          👉 key/value형태로서 확장성이 뛰어남. 컬럼 정보(key) 포함
        • CSV 사용시
          👉 콤마(,)기준으로 데이터 구분. 용량 이득
  • 포멧을 관리하는 다른 방법?
    • 컨플루언트 스키마 레지스트리 👉 confluentinc/schema-registry

 

PRACTICE - Produce exact partition

public class ProducerExactParition {
    private static String TOPIC_NAME = "test";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
    private static int PARTITION_NUMBER = 1;

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, PARTITION_NUMBER, Integer.toString(index), data);
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

$ ./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --property print.key=true --property key.separator="-"

 

Producer acks

  • acks = 0
    • 가장 속도가 빠름, 유실 가능성이 높음
  • acks = 1(default)
    • 속도 보통, 유실 가능성이 있음
  • acks = all 또는 -1
    • 속도 가장 느림. 메시지 전달 손실 가능성 없음

acks = 0

  • 프로듀서가 브로커와 소켓연결을 맺어 보낸 즉시 성공으로 간주
  • 브로커가 정상적으로 받아서 리더 파티션에 저장했는지 알 수 없음
  • 팔로워 파티션에도 저장됬는지 알 수 없음
    👉 전송 속도가 중요하고 일부 유실되어도 무관한 데이터에 사용

acks = 1

  • 프로듀서가 보낸 메시지가 리더 파티션에 정상 저장되었는지 확인
  • 팔로워 파티션에 저장됬는지는 모름
  • 즉, 리더 파티션에 저장되고 해당 브로커가 죽으면 데이터 유실
    👉 acks=0에 비해 신뢰도가 높지만 아직 유실 가능성은 있음

acks = all 또는 -1

  • 프로듀서가 보낸 메시지가 리더, 팔로워 파티션에 정상 저장되었는지 확인
  • 리더 파티션의 데이터가 팔로워 파티션까지 복제될때 까지 기다림
  • 복제가 완료되기 까지 기다림으로 인해 속도가 느림
    👉 유실 가능성이 없지만, 속도가 느림

728x90

댓글

💲 추천 글