728x90
www.youtube.com/watch?v=5FEE5wVi8uY&list=PL3Re5Ri5rZmkY46j6WcJXQYRlDRZSUQ1j&index=19
Kafka Consumer application
Consumer
- 데이터를 가져가는(polling) 주체
- commit을 통해 읽은 consumer offset을 카프카에 기록
- Java Kafka-client 제공
- 그 외 3rd party language의 경우 아래 링크 참고
👉 https://cwiki.apache.org/confluence/display/KAFKA/Clients
- 그 외 3rd party language의 경우 아래 링크 참고
- 어디에 데이터를 저장하나요?
- FileSystem(.csv .log .tsv)
- Object Storage(S3, Minio)
- Hadoop(Hdfs, Hive)
- RDBMS(Oracle, Mysql)
- NoSql(MongoDB, CouchDB)
- 기타 다양한 저장소들(Elasticsearch, influxDB)
Consumer code
dependencies {
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}
https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
- 일부 카프카 브로커와 클라이언트 버젼의 호환에 이슈가 있음
PRACTICE - Simple consumer code
public class SimpleConsumer {
private static String TOPIC_NAME = "test"; // 토픽명
private static String GROUP_ID = "testgroup"; // 컨슈머그룹
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092"; // 카프카 클러스터 주소
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 무한루프
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); //polling timeout 가격
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
실습
- SimpleConsumer BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
- 16번 Producing Test 스크립트로 테스트
./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test
- 소스코드 RUN 해서 consuming 테스트
Consumer subscribes
- 1개 토픽만 구독
consumer.subscribe(Arrays.asList("click_log"));
- n개 토픽 구독
- 정규 표현식(regex)를 통한 토픽 구독
consumer.subscribe(Pattern.compile("dev.*"));
- 특정 토픽의 파티션 구독
- Key를 포함한 Record를 consume할 때, 특정 파티션을 할당하고 싶다면
consumer.assign(Collections.singleton(new TopicPartition("web_log", 1)));
Consumer options
- 필수옵션 - 반드시 입력
bootstrap.servers
: 카프카 클러스터에 연결하기 위한 브로커 목록group.id
: 컨슈머 그룹 idkey.deserializer
: 메시지 키 역직렬화에 사용되는 클래스value.deserializer
: 메시지 값을 역직렬화 하는데 사용되는 클래스
- 선택옵션 - default값 존재
enable.auto.commit
: 자동 오프셋 커밋 여부auto.commit.interval.ms
: 자동 오프셋 커밋일 때 interval 시간auto.offset.reset
: 신규 컨슈머그룹일 때 읽을 파티션의 오프셋 위치client.id
: 클라이언트 식별값max.poll.records
: poll() 메서드 호출로 반환되는 레코드의 최대 개수session.timeout.ms
: 컨슈머가 브로커와 연결이 끊기는 시간
Consumer commit
enable.auto.commit=true
- 일정 간격(auto.commit.interval.ms), poll() 메서드 호출시 자동 commit
- commit 관련 코드를 작성할 필요없음. 편리함
- 속도가 가장 빠름
- 중복 또는 유실이 발생할 수 있음
👉 중복/유실을 허용하지 않는 곳(은행, 카드 등)에서는 사용하면 안됨!!
👉 일부 데이터가 중복/유실되도 상관 없는 곳(센서, GPS 등)에서 사용
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
- 중복 처리 되는 메시지들
- 유실되는 메시지들
PRACTICE - Auto commit test
enable.auto.commit=true
중복/유실 테스트
Run application in Intellij
public class Main {
private static String TOPIC_NAME = "test";
private static String GROUP_ID = "testgroup";
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
-
Input data in kafka-console-producer
$ ./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test >test1
>test2
>test3
>test4
>test5
- Intellij println 내용 확인
- Intellij stop application
- Intellij println 내용 확인
실습
- ConsumerWithAutoCommit BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
- 소스코드 RUN 해서 consuming 준비
- 16번 Producing Test 스크립트로 producing
./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test
- 1분이 안된시점에 consuming 받고 종료
- 다시 소스코드 RUN 하면 이미 consuming 된 데이터를 또 받아옴
Consumer auto commit
데이터 중복처리? 무슨 문제지?
- 커머스의 장바구니 시스템에서 중복 이슈 발생
👉 나는 장바구니에 1개 상품을 담았는데 2개 상품이 담김 - 카드사의 결제 시스템에서 중복 이슈 발생
👉 편의점에서 아이스크림 결제를 했는데 2번 결재됨 - 택배사 SMS 발송 시스템에서 중복 이슈 발생
👉 집에 도착 했다는 SMS가 2번 발송
데이터 중복을 막을 수 있는 방법
- 오토 커밋을 사용하되, 컨슈머가 죽지 않도록 잘 돌봐준다
👉 불가능. 서버/애플리케이션은 언젠가 죽을 수 있다. ex. 배포 - 오토 커밋을 사용하지 않는다
👉 Kafka consumer의commitSync()
,commitAsync()
사용
enable.auto.commit=false
1. commitSync()
: 동기 커밋
- ConsumerRecord 처리 순서를 보장함
- 가장 느림(커밋이 완료될 때 까지 block)
- poll() 메서드로 반환된 ConsumerRecord의 마지막 offset을 커밋
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
try {
consumer.commitSync(); // 동기 커밋
} catch (CommitFailedException e) {
System.err.println("commit failed");
}
}
Map<TopicPartition, OffsetAndMetadata>
을 통해 오프셋 지정 커밋 가능
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(new TopicPartition(record.topic(), record.partition()), null);
try {
consumer.commitSync(offset); // offset 지정 커밋
} catch (CommitFailedException e) {
System.err.println("commit failed");
}
2. commitAsync()
: 비동기 커밋
- 동기 커밋보다 빠름
- 중복이 발생할 수 있음
👉 일시적인 통신 문제로 이전 offset보다 이후 offset이 먼저 커밋 될 때 - ConsumerRecord 처리 순서를 보장하지 못함
👉 처리 순서가 중요한 서비스(주문, 재고관리 등)에서는 사용 제한
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.commitASync();
}
commitAsync()
+commitSync()
: 비동기, 동기 커밋 같이 쓰기
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.commitAsync();
}
} catch (CommitFailedException e) {
System.err.println("commit failed");
} finally {
consumer.commitSync();
}
PRACTICE - Sync commit test
public class Main {
private static String TOPIC_NAME = "test";
private static String GROUP_ID = "testgroup";
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // sync commit을 위한 설정
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.commitSync(); // sync commit을 위한 설정
}
}
}
실습
- ConsumerWithSyncCommit BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
- 소스코드 RUN 해서 consuming
- 소스코드 종료 후 추가 데이터 producing
- 소스코드 RUN 해서 추가된 데이터만 consuming 하는지 확인
Consumer rebalance
- 리밸런스 - 컨슈머 그룹의 파티션 소유권이 변경될 때 일어나는 현상
- 리밸런스를 하는 동안 일시적으로 메시지를 가져올 수 없음
- 리밸런스 발생시 데이터 유실/중복 발생 가능성 있음
👉commitSync()
또는 추가적인 방법(unique key)으로 데이터 유실/중복 방지 - 언제 리밸런스 발생?
👉consumer.close()
호출시 또는 consumer의 세션이 끊어졌을 때
- 3초마다 그룹 코디네이터(브로커 중 1대)에게 Heartbeat 전송
- 10초의 세션 타임아웃 시간 안에 Heartbeat가 왔는지 확인
- 만약 Heartbeat가 없으면 해당 컨슈머는 죽은것으로 마킹
- 리밸런스 시작
configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
Consumer rebalance listener
...
consumer.subscribe(Arrays.asList("test"), new RebalanceListener()); // 리밸런스 리스너 사용
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
static class RebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions.");
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Assigned partitions.");
}
}
- 리밸런스 리스너는 언제 사용할까?
- 리밸런스 발생에 따른 offset commit
- 리밸런스 시간 측정을 통한 컨슈머 모니터링
Consumer wakeup
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // 1
for (ConsumerRecord<String, String> record : records) { // 2
System.out.println(record.value()); // 3
}
consumer.commitSync(); // 4
}
정상동작 예시
poll()
호출- records 100개 반환 : offset 101번 ~ 200번
- records loop 구문 수행
record.value()
system print- offset 200 커밋
poll()
호출- 반복
SIGKILL 로 인한 중복 처리 발생 예시
poll()
호출- 마지막 커밋된 오프셋이 100
- records 100개 반환 : 오프셋 101 ~ 200
- records loop 구문 수행
record.value()
system 150번 오프셋 print 중, SIGKILL 호출- 101번
150번 오프셋 처리완료, 151번 오프셋200번 오프셋 미처리
- 101번
- offset 200 커밋 불가
- 브로커에는 100번 오프셋이 마지막 커밋
👉 컨슈머 재시작, 다시 오프셋 101부터 처리 시작, 101번~150번 중복처리
- 브로커에는 100번 오프셋이 마지막 커밋
Consumer wakeup
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
consumer.wakeup(); // wakeup()을 통한 graceful shutdown
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.commitSync();
}
} catch (WakeupException e) {
System.out.println("poll() method trigger WakeupException");
} finally {
consumer.commitSync();
consumer.close();
}
- wakeup()을 통한 graceful shutdown 필수!
- SIGTERM을 통한 shutdown signal로 kill하여 처리한 데이터 커밋 필요
- SIGKILL(9)는 프로세스 강제 종료로 커밋 불가 👉 중복/유실 발생
Consumer thread 전략
1안) 1 프로세스 + 1 스레드(컨슈머)
- 간략한 코드
- 프로세스 단위 실행/종료
- 다수의 컨슈머 실행 필요시 다수의 프로세스 실행 필요
$ cat consumer.conf
{"topic":"click_log", "group.id":"hadoop-consumers"}
$ java -jar one-process-one-consumer.jar --path consumer.conf
2안) 1 프로세스 + n 스레드(동일 컨슈머 그룹)
- 복잡한 코드
- 스레드 단위 실행/종료
- 스레드간 간섭 주의(세마포어, 데드락 등)
- 다수 컨슈머 실행시 다수 스레드 실행 가능
$ cat consumer.conf
{"topic":"click_log", "group.id":"hadoop-consumers", "consumer.no":20}
$ java -jar one-process-multiple-consumer.jar --path consumer.conf
3안) 1 프로세스 + n 스레드(다수 컨슈머 그룹)
- 복잡한 코드
- 컨슈머 그룹별 스레드 개수 조절 주의
$ cat consumer.conf
[
{"topic":"click_log", "group.id":"hadoop-consumers", "consumer.no":20},
{"topic":"click_log", "group.id":"elasticsearch-consumers", "consumer.no":1}, {"topic":"application_log", "group.id":"hadoop-consumers", "consumer.no":5}
]
$ java -jar one-process-multiple-consumer-multiple-group.jar --path consumer.conf
PRACTICE - Consumer multiple thread pool
하나의 프로세스에 다수의 쓰레드
public class Main {
private static String TOPIC_NAME = "test";
private static String GROUP_ID = "testgroup";
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
private static int CONSUMER_COUNT = 3;
private static List<ConsumerWorker> workerThreads = new ArrayList<>();
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < CONSUMER_COUNT; i++) {
ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
workerThreads.add(worker);
executorService.execute(worker);
}
}
static class ShutdownThread extends Thread {
public void run() {
workerThreads.forEach(ConsumerWorker::shutdown);
System.out.println("Bye");
}
}
}
public class ConsumerWorker implements Runnable {
private Properties prop;
private String topic;
private String threadName;
private KafkaConsumer<String, String> consumer;
ConsumerWorker(Properties prop, String topic, int number) {
this.prop = prop;
this.topic = topic;
this.threadName = "consumer-thread-" + number;
}
@Override
public void run() {
consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(threadName + " >> " + record.value());
}
consumer.commitSync();
}
} catch (WakeupException e) {
System.out.println(threadName + " trigger WakeupException");
} finally {
consumer.commitSync();
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
실습
- ConsumerWithMultiThread BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
- 소스코드 RUN 해서 consuming 준비
- 16번 Producing Test 스크립트로 producing 테스트
./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test
- consuming 서버에서 jps 프로세스 확인
kill -term PID
wakeup()
메서드로 안전하게 종료함을 확인
Consumer lag
- 컨슈머 랙은 컨슈머의 상태를 나타내는 지표
- 컨슈머 랙의 최대값은 컨슈머 인스턴스를 통해 직접 확인할 수 있음
consumer.metrics()
를 통해 확인할 수 있는 지표records-lag-max
: 토픽의 파티션 중 최대 랙fetch-size-avg
: 1번 polling하여 가져올 때 레코드 byte평균fetch-rate
: 1초 동안 레코드 가져오는 회수- ...
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
for (MetricName metric : metrics.keySet()) {
System.out.println(metric.name() + " is " + metrics.get(metric).metricValue());
}
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
- 컨슈머 컨슈머 인스턴스를 통한 컨슈머 랙 수집의 문제점
- 컨슈머 인스턴스 장애가 발생하면 지표 수집 불가능
- 구현하는 컨슈머마다 지표를 수집하는 로직 개발 필요
- 컨슈머 랙 최대값(records-lag-max) 만 알 수 있음
👉 토픽에 파티션은 n개가 있을 수 있음
👉 최대값을 제외한 나머지 파티션의 컨슈머 랙은 알 수 없음
-
records-lag-max
: 6 -
Partition 0 consumer lag : 6 👈 모니터링을 위해 알고 싶은 것
-
Partition 1 consumer lag : 3 👈 모니터링을 위해 알고 싶은 것
-
컨슈머 랙 모니터링
👉 외부 모니터링 애플리케이션을 사용하세요!
👉 Confluent Platform, Datadog, Kafka Burrow(Open source) -
카프카 버로우
👉 https://github.com/linkedin/Burrow- Linkedin에서 오픈소스로 제공하는 컨슈머 랙 체크 툴
- 버로우 실행 시 Kafka, Zookeeper정보를 통해 랙 정보 자체 수집
- 슬라이딩 윈도우를 통한 컨슈머 상태 정의
- OK : 정상
- ERROR : 컨슈머가 polling을 멈춤
- WARNING : 컨슈머가 polling을 수행하지만 lag이 계속 증가
-
HTTP api를 통한 랙 조회
728x90
'개발강의정리 > DevOps' 카테고리의 다른 글
[Apache kafka 조금 아는 척하기] 카프카란? (0) | 2021.01.05 |
---|---|
[아파치 카프카 입문과 활용] 5. Apache kafka 파이프라인 실습 (0) | 2021.01.04 |
[아파치 카프카 입문과 활용] 3. Apache kafka 프로듀서 애플리케이션 개발, 실습 (0) | 2021.01.04 |
[아파치 카프카 입문과 활용] 2. Apache kafka 설치, 실행, CLI (0) | 2021.01.04 |
[아파치 카프카 입문과 활용] 1. Apache kafka 기본개념 및 생태계 (0) | 2021.01.04 |
댓글