개발강의정리/DevOps

[아파치 카프카 입문과 활용] 4. Apache kafka 컨슈머 애플리케이션 개발, 실습

nineDeveloper 2021. 1. 4.
728x90

www.youtube.com/watch?v=5FEE5wVi8uY&list=PL3Re5Ri5rZmkY46j6WcJXQYRlDRZSUQ1j&index=19

 

Kafka Consumer application

Consumer

  • 데이터를 가져가는(polling) 주체
  • commit을 통해 읽은 consumer offset을 카프카에 기록
  • Java Kafka-client 제공
  • 어디에 데이터를 저장하나요?
    • FileSystem(.csv .log .tsv)
    • Object Storage(S3, Minio)
    • Hadoop(Hdfs, Hive)
    • RDBMS(Oracle, Mysql)
    • NoSql(MongoDB, CouchDB)
    • 기타 다양한 저장소들(Elasticsearch, influxDB)

 

Consumer code

dependencies {
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix

  • 일부 카프카 브로커와 클라이언트 버젼의 호환에 이슈가 있음

 

PRACTICE - Simple consumer code

public class SimpleConsumer {
    private static String TOPIC_NAME = "test"; // 토픽명
    private static String GROUP_ID = "testgroup"; // 컨슈머그룹
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092"; // 카프카 클러스터 주소

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 무한루프
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); //polling timeout 가격
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

 

실습

  1. SimpleConsumer BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
  2. 16번 Producing Test 스크립트로 테스트
    ./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test
  3. 소스코드 RUN 해서 consuming 테스트

 

Consumer subscribes

  • 1개 토픽만 구독
consumer.subscribe(Arrays.asList("click_log"));
  • n개 토픽 구독
    • 정규 표현식(regex)를 통한 토픽 구독
consumer.subscribe(Pattern.compile("dev.*"));
  • 특정 토픽의 파티션 구독
    • Key를 포함한 Record를 consume할 때, 특정 파티션을 할당하고 싶다면
consumer.assign(Collections.singleton(new TopicPartition("web_log", 1)));

 

Consumer options

  • 필수옵션 - 반드시 입력
    • bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록
    • group.id : 컨슈머 그룹 id
    • key.deserializer : 메시지 키 역직렬화에 사용되는 클래스
    • value.deserializer : 메시지 값을 역직렬화 하는데 사용되는 클래스
  • 선택옵션 - default값 존재
    • enable.auto.commit : 자동 오프셋 커밋 여부
    • auto.commit.interval.ms : 자동 오프셋 커밋일 때 interval 시간
    • auto.offset.reset : 신규 컨슈머그룹일 때 읽을 파티션의 오프셋 위치
    • client.id : 클라이언트 식별값
    • max.poll.records : poll() 메서드 호출로 반환되는 레코드의 최대 개수
    • session.timeout.ms : 컨슈머가 브로커와 연결이 끊기는 시간

 

Consumer commit

enable.auto.commit=true

  • 일정 간격(auto.commit.interval.ms), poll() 메서드 호출시 자동 commit
  • commit 관련 코드를 작성할 필요없음. 편리함
  • 속도가 가장 빠름
  • 중복 또는 유실이 발생할 수 있음
    👉 중복/유실을 허용하지 않는 곳(은행, 카드 등)에서는 사용하면 안됨!!
    👉 일부 데이터가 중복/유실되도 상관 없는 곳(센서, GPS 등)에서 사용
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

consumer.subscribe(Arrays.asList(TOPIC_NAME));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
}
  • 중복 처리 되는 메시지들

  • 유실되는 메시지들

 

PRACTICE - Auto commit test

enable.auto.commit=true 중복/유실 테스트

Run application in Intellij

public class Main {
 private static String TOPIC_NAME = "test";
 private static String GROUP_ID = "testgroup";
 private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";

 public static void main(String[] args) {
     Properties configs = new Properties();
     configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
     configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
     configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
     configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);

     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
     consumer.subscribe(Arrays.asList(TOPIC_NAME));

     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
         for (ConsumerRecord<String, String> record : records) {
             System.out.println(record.value());
         }
     }
 }
}

 

  1. Input data in kafka-console-producer

$ ./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test >test1
>test2
>test3
>test4
>test5
  1. Intellij println 내용 확인

  1. Intellij stop application

  1. Intellij println 내용 확인

 

실습

  1. ConsumerWithAutoCommit BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
  2. 소스코드 RUN 해서 consuming 준비
  3. 16번 Producing Test 스크립트로 producing
    ./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test
  4. 1분이 안된시점에 consuming 받고 종료
  5. 다시 소스코드 RUN 하면 이미 consuming 된 데이터를 또 받아옴

 

Consumer auto commit

데이터 중복처리? 무슨 문제지?

  • 커머스의 장바구니 시스템에서 중복 이슈 발생
    👉 나는 장바구니에 1개 상품을 담았는데 2개 상품이 담김
  • 카드사의 결제 시스템에서 중복 이슈 발생
    👉 편의점에서 아이스크림 결제를 했는데 2번 결재됨
  • 택배사 SMS 발송 시스템에서 중복 이슈 발생
    👉 집에 도착 했다는 SMS가 2번 발송

데이터 중복을 막을 수 있는 방법

  • 오토 커밋을 사용하되, 컨슈머가 죽지 않도록 잘 돌봐준다
    👉 불가능. 서버/애플리케이션은 언젠가 죽을 수 있다. ex. 배포
  • 오토 커밋을 사용하지 않는다
    👉 Kafka consumer의 commitSync(), commitAsync() 사용

enable.auto.commit=false

1. commitSync() : 동기 커밋

  • ConsumerRecord 처리 순서를 보장함
  • 가장 느림(커밋이 완료될 때 까지 block)
  • poll() 메서드로 반환된 ConsumerRecord의 마지막 offset을 커밋
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  for (ConsumerRecord<String, String> record : records) {
    System.out.println(record.value()); 
  }
  try {
    consumer.commitSync(); // 동기 커밋
  } catch (CommitFailedException e) {
    System.err.println("commit failed");
  }
}
  • Map<TopicPartition, OffsetAndMetadata> 을 통해 오프셋 지정 커밋 가능
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(new TopicPartition(record.topic(), record.partition()), null);
try {
  consumer.commitSync(offset); // offset 지정 커밋
} catch (CommitFailedException e) {
  System.err.println("commit failed"); 
}

2. commitAsync() : 비동기 커밋

  • 동기 커밋보다 빠름
  • 중복이 발생할 수 있음
    👉 일시적인 통신 문제로 이전 offset보다 이후 offset이 먼저 커밋 될 때
  • ConsumerRecord 처리 순서를 보장하지 못함
    👉 처리 순서가 중요한 서비스(주문, 재고관리 등)에서는 사용 제한
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  for (ConsumerRecord<String, String> record : records) {
    System.out.println(record.value());
  }
  consumer.commitASync();
}
  • commitAsync() + commitSync() : 비동기, 동기 커밋 같이 쓰기
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
      System.out.println(record.value());
    }
    consumer.commitAsync();
  }
} catch (CommitFailedException e) {
  System.err.println("commit failed");
} finally {
  consumer.commitSync();
}

 

PRACTICE - Sync commit test

public class Main {
    private static String TOPIC_NAME = "test";
    private static String GROUP_ID = "testgroup";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // sync commit을 위한 설정

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
              System.out.println(record.value());
            }
            consumer.commitSync(); // sync commit을 위한 설정
        }
    }
}

 

실습

  1. ConsumerWithSyncCommit BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
  2. 소스코드 RUN 해서 consuming
  3. 소스코드 종료 후 추가 데이터 producing
  4. 소스코드 RUN 해서 추가된 데이터만 consuming 하는지 확인

 

 

Consumer rebalance

  • 리밸런스 - 컨슈머 그룹의 파티션 소유권이 변경될 때 일어나는 현상
    • 리밸런스를 하는 동안 일시적으로 메시지를 가져올 수 없음
    • 리밸런스 발생시 데이터 유실/중복 발생 가능성 있음
      👉 commitSync() 또는 추가적인 방법(unique key)으로 데이터 유실/중복 방지
    • 언제 리밸런스 발생?
      👉 consumer.close() 호출시 또는 consumer의 세션이 끊어졌을 때

  1. 3초마다 그룹 코디네이터(브로커 중 1대)에게 Heartbeat 전송
  2. 10초의 세션 타임아웃 시간 안에 Heartbeat가 왔는지 확인
  3. 만약 Heartbeat가 없으면 해당 컨슈머는 죽은것으로 마킹
  4. 리밸런스 시작
configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);

 

Consumer rebalance listener

...
  consumer.subscribe(Arrays.asList("test"), new RebalanceListener()); // 리밸런스 리스너 사용
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
      System.out.println(record.value());
    }
  }
}
static class RebalanceListener implements ConsumerRebalanceListener {
  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    System.out.println("Lost partitions.");
  }

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    System.out.println("Assigned partitions.");
  }
}
  • 리밸런스 리스너는 언제 사용할까?
    • 리밸런스 발생에 따른 offset commit
    • 리밸런스 시간 측정을 통한 컨슈머 모니터링

 

Consumer wakeup

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // 1
  for (ConsumerRecord<String, String> record : records) { // 2
    System.out.println(record.value()); // 3
  }
  consumer.commitSync(); // 4
}

정상동작 예시

  1. poll() 호출
    • records 100개 반환 : offset 101번 ~ 200번
  2. records loop 구문 수행
  3. record.value() system print
  4. offset 200 커밋
  5. poll() 호출
  6. 반복

SIGKILL 로 인한 중복 처리 발생 예시

  1. poll() 호출
    • 마지막 커밋된 오프셋이 100
    • records 100개 반환 : 오프셋 101 ~ 200

  1. records loop 구문 수행
  2. record.value() system 150번 오프셋 print 중, SIGKILL 호출
    • 101번150번 오프셋 처리완료, 151번 오프셋200번 오프셋 미처리

  1. offset 200 커밋 불가
    • 브로커에는 100번 오프셋이 마지막 커밋
      👉 컨슈머 재시작, 다시 오프셋 101부터 처리 시작, 101번~150번 중복처리

Consumer wakeup

Runtime.getRuntime().addShutdownHook(new Thread() {
  public void run() {
    consumer.wakeup(); // wakeup()을 통한 graceful shutdown
  }
});
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
      System.out.println(record.value());
    }
    consumer.commitSync();
  }
} catch (WakeupException e) {
  System.out.println("poll() method trigger WakeupException");
} finally {
  consumer.commitSync();
  consumer.close();
}
  • wakeup()을 통한 graceful shutdown 필수!
    • SIGTERM을 통한 shutdown signal로 kill하여 처리한 데이터 커밋 필요
    • SIGKILL(9)는 프로세스 강제 종료로 커밋 불가 👉 중복/유실 발생

 

Consumer thread 전략

1안) 1 프로세스 + 1 스레드(컨슈머)

  • 간략한 코드
  • 프로세스 단위 실행/종료
  • 다수의 컨슈머 실행 필요시 다수의 프로세스 실행 필요
$ cat consumer.conf
{"topic":"click_log", "group.id":"hadoop-consumers"}
$ java -jar one-process-one-consumer.jar --path consumer.conf

2안) 1 프로세스 + n 스레드(동일 컨슈머 그룹)

  • 복잡한 코드
  • 스레드 단위 실행/종료
  • 스레드간 간섭 주의(세마포어, 데드락 등)
  • 다수 컨슈머 실행시 다수 스레드 실행 가능
$ cat consumer.conf
{"topic":"click_log", "group.id":"hadoop-consumers", "consumer.no":20}
$ java -jar one-process-multiple-consumer.jar --path consumer.conf

3안) 1 프로세스 + n 스레드(다수 컨슈머 그룹)

  • 복잡한 코드
  • 컨슈머 그룹별 스레드 개수 조절 주의
$ cat consumer.conf
[
  {"topic":"click_log", "group.id":"hadoop-consumers", "consumer.no":20},
  {"topic":"click_log", "group.id":"elasticsearch-consumers", "consumer.no":1}, {"topic":"application_log", "group.id":"hadoop-consumers", "consumer.no":5}
]
$ java -jar one-process-multiple-consumer-multiple-group.jar --path consumer.conf

 

PRACTICE - Consumer multiple thread pool

하나의 프로세스에 다수의 쓰레드

public class Main {
    private static String TOPIC_NAME = "test";
    private static String GROUP_ID = "testgroup";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
    private static int CONSUMER_COUNT = 3;
    private static List<ConsumerWorker> workerThreads = new ArrayList<>();

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new ShutdownThread());
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
            workerThreads.add(worker);
            executorService.execute(worker);
        }
    }

    static class ShutdownThread extends Thread {
        public void run() {
            workerThreads.forEach(ConsumerWorker::shutdown);
            System.out.println("Bye");
        }
    }
}
public class ConsumerWorker implements Runnable {
    private Properties prop;
    private String topic;
    private String threadName;
    private KafkaConsumer<String, String> consumer;

    ConsumerWorker(Properties prop, String topic, int number) {
        this.prop = prop;
        this.topic = topic;
        this.threadName = "consumer-thread-" + number;
    }

    @Override
    public void run() {
        consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Arrays.asList(topic));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(threadName + " >> " + record.value());
                }
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            System.out.println(threadName + " trigger WakeupException");
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }

    public void shutdown() {
        consumer.wakeup();
    }
}

 

실습

  1. ConsumerWithMultiThread BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
  2. 소스코드 RUN 해서 consuming 준비
  3. 16번 Producing Test 스크립트로 producing 테스트
    ./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test
  4. consuming 서버에서 jps 프로세스 확인
  5. kill -term PID
  6. wakeup() 메서드로 안전하게 종료함을 확인

 

Consumer lag

  • 컨슈머 랙은 컨슈머의 상태를 나타내는 지표
  • 컨슈머 랙의 최대값은 컨슈머 인스턴스를 통해 직접 확인할 수 있음
    • consumer.metrics()를 통해 확인할 수 있는 지표
      • records-lag-max : 토픽의 파티션 중 최대 랙
      • fetch-size-avg : 1번 polling하여 가져올 때 레코드 byte평균
      • fetch-rate : 1초 동안 레코드 가져오는 회수
      • ...
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  Map<MetricName, ? extends Metric> metrics = consumer.metrics();
  for (MetricName metric : metrics.keySet()) {
    System.out.println(metric.name() + " is " + metrics.get(metric).metricValue());
  }
  for (ConsumerRecord<String, String> record : records) {
    System.out.println(record.value());
  }
}
  • 컨슈머 컨슈머 인스턴스를 통한 컨슈머 랙 수집의 문제점
    • 컨슈머 인스턴스 장애가 발생하면 지표 수집 불가능
    • 구현하는 컨슈머마다 지표를 수집하는 로직 개발 필요
    • 컨슈머 랙 최대값(records-lag-max) 만 알 수 있음
      👉 토픽에 파티션은 n개가 있을 수 있음
      👉 최대값을 제외한 나머지 파티션의 컨슈머 랙은 알 수 없음

  • records-lag-max : 6

  • Partition 0 consumer lag : 6 👈 모니터링을 위해 알고 싶은 것

  • Partition 1 consumer lag : 3 👈 모니터링을 위해 알고 싶은 것

  • 컨슈머 랙 모니터링
    👉 외부 모니터링 애플리케이션을 사용하세요!
    👉 Confluent Platform, Datadog, Kafka Burrow(Open source)

  • 카프카 버로우
    👉 https://github.com/linkedin/Burrow

    • Linkedin에서 오픈소스로 제공하는 컨슈머 랙 체크 툴
    • 버로우 실행 시 Kafka, Zookeeper정보를 통해 랙 정보 자체 수집
    • 슬라이딩 윈도우를 통한 컨슈머 상태 정의
      • OK : 정상
      • ERROR : 컨슈머가 polling을 멈춤
      • WARNING : 컨슈머가 polling을 수행하지만 lag이 계속 증가
  • HTTP api를 통한 랙 조회

728x90

댓글

💲 추천 글