728x90
www.youtube.com/watch?v=4BbKCsKSq_I&list=PL3Re5Ri5rZmkY46j6WcJXQYRlDRZSUQ1j&index=18
Kafka Producer application
PRACTICE - Import project from github
https://github.com/freeserver1191/tacademy-kafka
git clone
$ git clone https://github.com/freeserver1191/tacademy-kafka.git
Cloning into 'tacademy-kafka'...
remote: Enumerating objects: 286, done.
remote: Counting objects: 100% (286/286), done.
remote: Compressing objects: 100% (130/130), done.
remote: Total 286 (delta 60), reused 260 (delta 44), pack-reused 0
Receiving objects: 100% (286/286), 2.38 MiB | 380.00 KiB/s, done.
Resolving deltas: 100% (60/60), done.
$ ls
tacademy-kafka
Producer
- 카프카로 데이터(key, value) 전송
- ProducerRecord 객체를 생성
- Java Kafka-client 제공
- 그 외 3rd party language의 경우 아래 링크 참고
👉 https://cwiki.apache.org/confluence/display/KAFKA/Clients
- 그 외 3rd party language의 경우 아래 링크 참고
- 어떤 데이터를 보내나요?
- Web, Application 클릭로그
- 공유 자전거/자동차의 위치(GPS)정보
- 스마트팩토리 공장의 머신 센서정보
- 상호 통신하기 위한 application
- 기타 ...
Producer code
dependencies {
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}
https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
- 일부 카프카 브로커와 클라이언트 버전의 호환에 이슈가 있음
PRACTICE - Simple producer code
public class Main {
private static String TOPIC_NAME = "test"; //토픽명
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092"; //카프카 클러스터 주소
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
try {
producer.send(record);
System.out.println("Send to "+ TOPIC_NAME +" | data : "+data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
실습
- SimpleProducer BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
- 소스코드 RUN
- 17번 커맨드 Consumer Polling Test 스크립트로 데이터 consuming
./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --from-beginning
Producer options
- 필수옵션 - 반드시 입력
bootstrap.servers
: 카프카 클러스터에 연결하기 위한 브로커 목록key.serializer
: 메시지 키 직렬화에 사용되는 클래스value.serializer
: 메시지 값을 직렬화 하는데 사용되는 클래스
- 선택옵션 - default값 존재
acks
: 레코드 전송 신뢰도 조절(리플리카)comression.type
: snappy, gzip, lz4 중 하나로 압축하여 전송retries
: 클러스터 장애에 대응하여 메시지 전송을 재시도하는 회수buffer.memory
: 브로커에 전송될 메시지의 버퍼로 사용 될 메모리 양batch.size
: 여러 데이터를 함께 보내기 위한 레코드 크기linger.ms
: 현재의 배치를 전송하기 전까지 기다리는 시간client.id
: 어떤 클라이언트인지 구분하는 식별자
Producer
https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/
PRACTICE - Produce with key/value records
public class Main {
private static String TOPIC_NAME = "test";
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
// Key - Integer.toString(index), Value - data
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data);
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
$ ./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --property print.key=true --property key.separator="-"
실습
- ProducerWithKeyValue BOOTSTRAP_SERVERS 에 퍼블릭IP 입력
- 24번 커맨드로 데이터 consuming
./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --property print.key=true --property key.separator="-"
Record key
레코드 키
-
역할 : 메시지를 구분하는 구분자 역할
-
특징
- 동일 키, 동일 파티션 적재 by Default partitioner
👉 순서를 보장하므로, 상태머신(state machine)으로 사용 가능
👉 역할에 따른 컨슈머 할당 적용 가능
- 동일 키, 동일 파티션 적재 by Default partitioner
-
레코드 값(value)을 정의 하는 구분자
👉 키에 레코드 값 해쉬값을 넣음으로서 중복처리 방지 가능
Record value
레코드 값
- 역할 : 실질적으로 전달하고 싶은 데이터
- 어떤 형(type)을 보낼 수 있나요?
- String, ByteArray, Int 등 사실상 제한 없음
- 어떤 데이터 포맷이 좋나요?
- CSV, TSV, JSON, Object 등 서비스의 특징에 맞게 사용 권장
- JSON 사용시
👉 key/value형태로서 확장성이 뛰어남. 컬럼 정보(key) 포함 - CSV 사용시
👉 콤마(,)기준으로 데이터 구분. 용량 이득
- JSON 사용시
- CSV, TSV, JSON, Object 등 서비스의 특징에 맞게 사용 권장
- 어떤 형(type)을 보낼 수 있나요?
- 포멧을 관리하는 다른 방법?
- 컨플루언트 스키마 레지스트리 👉 confluentinc/schema-registry
PRACTICE - Produce exact partition
public class ProducerExactParition {
private static String TOPIC_NAME = "test";
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
private static int PARTITION_NUMBER = 1;
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, PARTITION_NUMBER, Integer.toString(index), data);
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
$ ./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --property print.key=true --property key.separator="-"
Producer acks
- acks = 0
- 가장 속도가 빠름, 유실 가능성이 높음
- acks = 1(default)
- 속도 보통, 유실 가능성이 있음
- acks = all 또는 -1
- 속도 가장 느림. 메시지 전달 손실 가능성 없음
acks = 0
- 프로듀서가 브로커와 소켓연결을 맺어 보낸 즉시 성공으로 간주
- 브로커가 정상적으로 받아서 리더 파티션에 저장했는지 알 수 없음
- 팔로워 파티션에도 저장됬는지 알 수 없음
👉 전송 속도가 중요하고 일부 유실되어도 무관한 데이터에 사용
acks = 1
- 프로듀서가 보낸 메시지가 리더 파티션에 정상 저장되었는지 확인
- 팔로워 파티션에 저장됬는지는 모름
- 즉, 리더 파티션에 저장되고 해당 브로커가 죽으면 데이터 유실
👉 acks=0에 비해 신뢰도가 높지만 아직 유실 가능성은 있음
acks = all 또는 -1
- 프로듀서가 보낸 메시지가 리더, 팔로워 파티션에 정상 저장되었는지 확인
- 리더 파티션의 데이터가 팔로워 파티션까지 복제될때 까지 기다림
- 복제가 완료되기 까지 기다림으로 인해 속도가 느림
👉 유실 가능성이 없지만, 속도가 느림
728x90
'개발강의정리 > DevOps' 카테고리의 다른 글
[아파치 카프카 입문과 활용] 5. Apache kafka 파이프라인 실습 (0) | 2021.01.04 |
---|---|
[아파치 카프카 입문과 활용] 4. Apache kafka 컨슈머 애플리케이션 개발, 실습 (0) | 2021.01.04 |
[아파치 카프카 입문과 활용] 2. Apache kafka 설치, 실행, CLI (0) | 2021.01.04 |
[아파치 카프카 입문과 활용] 1. Apache kafka 기본개념 및 생태계 (0) | 2021.01.04 |
[데브옵스를 위한 쿠버네티스 마스터] 쿠버네티스 핵심개념-Statefulset (0) | 2021.01.03 |
댓글