Kafka Producer
Producer 개요
- Producer는 Topic에 메시지를 보냄(메시지 write)
- Producer는 성능/로드밸런싱/가용성/업무 정합성등을 고려하여 어떤 브로커의 파티션으로 메시지를 보내야 할지 전략적으로 결정됨
Producer Record

simple producer
String topicName = "simgple-producer";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(props);
ProducerRecord<String,String> producerRecord = new Producer<>(topicName, "hello");
kafkaProducer.send(producerRecord);
kafkaProducer.flush();
kafkaProducer.close();
Serialized Message 전송
kafka에서 제공하는 serializer
- StringSerializer
- ShortSerializer
- IntegerSerializer
- DoubleSerializer
- LongSerializer
- BytesSerializer
자바 Object Serialization
- 객체의 유형, 데이터의 포맷, 적용 시스템에 상관없이 이동/저장/복원을 자유롭게 하기 위해서 바이트 스트림으로 저장하는 것
Custom 데이터 전송
- kafka 기본 지원 serializer는 String,Long 등의 primitive 기반 object에 대해서만 지원
Serializer 구현
public class OrderSerialzer implements Serializer<Order> {
ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
@override
public byte[] serializer(String topic, Order order) {
byte[] serializedOrder = null;
try {
serializedOrder = objectMapper.writeValueAsBytes(order);
} catch (JsonProcessingException e) {
...
}
return serializedOrder;
}
}
- Java 8의 LocalDateTime은 Jackson에서 기본 지원하지 않음
- 따라서
com.fasterxml.jackson.datatype.jackson-datatype-jsr310을 추가 import하고 JavaTimeModule을 register 해야함
OrderSerdeProducer
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, OrderSerializer.class.getName());
KafkaProducer<String, Order> kafkaProducer = new KafkaProducer<String, Order>(props);
ProducerRecord<String,Order> producerRecord = new ProducerRecord<>(topic, key, value);
kakfaProducer.send(producerRecord);
Producer 메시지 파티셔닝
key 값을 가지지 않는 메시지 전송
- 메시지는 producer를 통해 전송 시 partitioner를 통해 어떤 파티션으로 전송되어야 하는지 미리 결정됨
- key 값을 가지지 않는 경우 Round Robin, Sticky Partitioning 등의 전략이 선택되어 파티션 별로 메시지가 전송됨
- Topic이 여러 개의 파티션을 가질 때
전송 순서가 보장되지 않은 채로 consumer에서 읽혀질 수 있음
라운드 로빈
- kafka 2.4 이전 default 파티셔닝 전략
- 최대한 메시지를 파티션에 균등하게 분배하려는 전략으로써 메시지 배치를 순차적으로 다른 파티션으로 전송
- 메시지가 배치 데이터를 빨리 채우지 못하면서 전송이 늦어지거나 배치를 다 채우지 못하고 전송하면서 성능 이슈

스티키 파티셔닝
- kafka 2.4 이후 default 파티셔닝 전략
- 라운드 로빈의 성능을 개선하고자 특정 파티션으로 전송되는 하나의 배치에 메시지를 빠르게 먼저 채워서 보내는 방식

key 값을 가지는 메시지 전송
- 특정 key 값을 가지는 메시지는
특정 파티션으로 고정 전송됨 - 특정 key 값을 가지는 메시지는 단일 파티션 내에서 전송 순서가 보장되어 consumer에서 읽힘
- 하나의 파티션에서만 메시지 순서 보장

key 메시지 전송
- 전송
kafka-console-producer --bootstrap-server localhost:9092 --topic 토픽명 --property key.separator=: --property parse.key=true - 수신
kafka-console-consumer --bootstrap-server localhost:9092 --topic 토픽명 --property print.key=true --from-beginning
DefaultPartitioner
- kafka는 기본적으로 DefaultPartitioner를 통해 partition 지정
- key를 가지는 메시지의 경우 key 값을 해싱하여 파티션별로 균일하게 전송
public int partition(String topic, Object key, bytes[] keyBytes, Bytes[] valueBytes, Cluster cluster, int numPartition)
{
if(keyBytes == null){
return StickyPartitionCache.partition(topic,cluster);
}
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartition;
}
Custom Partitioner 구현
- Partitioner interface 구현, partition() 메소드 구현 필요
public class CustomPartitioner implements Partitioner { public static final Logger logger = LoggerFactory.getLogger(CustomPartitioner.class.getName()); private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache(); private String specialKeyName; @Override public void configure(Map<String, ?> configs) { specialKeyName = configs.get("custom.specialKey").toString(); } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic); int numPartitions = partitionInfoList.size(); int numSpecialPartitions = (int)(numPartitions * 0.5); int partitionIndex = 0; if (keyBytes == null) { //return stickyPartitionCache.partition(topic, cluster); throw new InvalidRecordException("key should not be null"); } if (((String)key).equals(specialKeyName)) { partitionIndex = Utils.toPositive(Utils.murmur2(valueBytes)) % numSpecialPartitions; } else { partitionIndex = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - numSpecialPartitions) + numSpecialPartitions; } logger.info("key:{} is sent to partition:{}", key.toString(), partitionIndex); return partitionIndex; } @Override public void close() { } } configs.get("custom.specialKey").toString();- producer에 등록한 config 값을 가져올 수 있음
- producer에 등록한 config 값을 가져올 수 있음
send() 메소드 호출 프로세스

- Kafka Producer 전송은 Producer Client의 별도 Thread가 전송을 담당한다는 점에서 기본적으로 Thread간 Async 전송
- Main Thread가 send( ) 메소드를 호출하여 메시지 전송을 시작하지만 바로 전송되지 않으며 내부 Buffer에서 토픽 파티션에 따라 Record Batch 단위로 묶인 뒤 전송됨
- producer client의 내부 메모리 (Record Accumulator)에 여러 개의 batch들로 buffer.memory 설정 사이즈만큼 보관 가능
- 복수 개의 batch 한꺼번에 전송 가능
batch.size만큼 레코드가 쌓이면 sender thread를 통해 전송linger.ms를 줄이면 batch 사이즈를 다 채우기 전에 전송할 수 있도록 할 수 있음 (20ms 이하 권장)
max.inflight.requests.per.connection
- 한 번에 전송 가능한 메시지 배치 개수 (default=5)
- 1보다 큰 경우 일부 배치의 전송 실패 시 순서 뒤집힐 수 있음

메시지 동기식/비동기식 전송
Sync 방식
- Producer는 broker로부터 해당 메시지를 성공적으로 받았다는 Ack 메시지를 받은 후 메시지 전송
- 동기 처리시 1개씩 전송하며 ack를 받아야 하므로 batch 처리 불가능, 전송은 batch 레벨이지만 메시지는 1개
Future<RecordMetadata> future = producer.send();
RecordMetadata metadata = future.get();
# inline
RecordMetadata metadata = producer.send().get();
Async 방식
- Callback을 이용한 비동기 메시지 전송 가능
- 비동기적으로 메시지를 보내면서 RecordMetadata를 client가 받을 수 있는 방식 제공
- Async 전송 시 batch를 통해 전송함
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion( RecordMetadata metadata, Exception exception ) {
if (exception == null) {
System.out.println("received metadata \n" +
"Topic:" + metadata.topic() + "\n" +
"Partition:" + metadata.partition() + "\n" +
"Offset:" + metadata.offset());
} else {
exception.printStackTrace();
}
}
Custom Callback 생성
- Callback 인터페이스 상속
public class CustomCallback implements Callback{
@Override
public void onCompletion( RecordMetadata metadata, Exception exception ) {
if (exception == null) {
System.out.println("received metadata \n" +
"Topic:" + metadata.topic() + "\n" +
"Partition:" + metadata.partition() + "\n" +
"Offset:" + metadata.offset());
} else {
exception.printStackTrace();
}
}
}
메시지 전송/재전송 시간 파라미터 이해

- acks = 1 or all 인 동기식 전송에 적용됨
- max.block.ms
- send() 호출 시 RecordAccumulator 입력이 block되는 최대 시간, 초과 시 TimeoutException
- linger.ms
- sender Thread가 RecordAccumulator에서 배치 별로 가져가기 위한 최대 대기 시간
- request.timeout.ms
- 전송에 걸리는 최대 대기 시간
- 초과 시 retry 또는 timeout
- retry.backoff.ms
- 재전송을 위한 대기 시간
- delivery.timeout.ms
- 재전송을 포함하여 producer 메시지 전송에 허용된 최대 시간
- 초과 시 TimeoutException
delivery.timeout.ms >= linger.ms + request.timeout.ms
retries와 delivery.timeout.ms
retries = 2147483647 (MAX INT)
delivery.timeout.ms = 120000
- retries는 재전송 횟수를 결정
- delivery.timeout.ms는 메시지 재전송을 멈출 때까지의 시간
- 보통 retries는 무한대 값으로 설정하고 delivery.timeout.ms (기본 2분)를 조정하는 것을 권장
retries와 request.timeout.ms, retry.backoff.ms
retries = 10
retry.backoff.ms = 30
request.timeout.ms = 10000ms
- retry.backoff.ms는 재전송 주기 시간을 결정
- 위와 같이 설정 시 전송 후 10000ms 기다린 후 재전송 전 30ms 이후 재전송 시도
- 이와 같이 10회 수행 후 더 이상 retry 수행하지 않음
- 만약 10회 이내에 delivery.timeout.ms 도달 시 더 이상 재전송 하지 않음
producer 전송 방법
최대 한 번 전송 (at most once)
- ack=0
- producer는 broker로부터 받는 ack 또는 에러메시지 없이 다음 메시지를 연속적으로 보냄
- 메시지는 소실될 수 있지만 중복 전송하지 않음
적어도 한 번 전송 (at least once)
- acks=1,all, retries>0
- producer는 broker로부터 ack를 받은 다음에 다음 메시지 전송
- 메시지 소실은 없지만 중복 전송 가능
- 실제로 broker에 전송 되었으나 ack만 전송되지 못한 경우
정확히 한 번 전송 (exactly once)
- 정확히 한 번 전송 (트랜잭션 처리) != 중복 없이 전송 (멱등성)
- 중복 없이 전송 (Idempotence)
- producer는 producer ID와 sequence를 Header에 담아 전송
- 메시지 sequence는 0부터 시작하여 순차적으로 증가
- producer ID는 producer 기동 시마다 새롭게 생성됨
- 브로커에서는 만약 sequence가 중복인 경우 로그에 기록하지 않고 ack만 전송
- 브로커는 자신이 가지고 있는 메시지의 sequence보다 1만큼 큰 경우에만 브로커에 저장

Idempotence 설정
- enable.idempotence=true
- acks=all
- acks=1이면 leader가 복제 중 다운되면 메시지 소실
- retries는 0보다 큰 값
- max.in.flight.requests.per.connection은 1에서 5사이
- producer가 전송 중인 메시지 상태를 추적해야 하므로 관리를 위함
- 메시지 순서 바뀌는 것도 보장해야 하므로 상태 추적 필요
- kafka 3.0부터 producer 기본 설정이 idempotence
- idempotence 적용 시 성능이 감소할 수 있지만 적용 권장
- 만약 기본 설정인 enable.idempotence=true를 유지하고 다른 파라미터를 잘못 설정하면 (예를 들면 acks=1) producer는 메시지 정상적으로 보내지만 idempotence가 적용되지 않음
- 하지만 명시적으로 enable.idempotence=true를 선언한 뒤에 다른 파라미터 설정 바꾸면 오류 발생
Idempotence 기반에서 메시지 전송 순서 유지

주의
- Idempotence는 producer와 broker 사이 retry 시에만 중복 제거를 수행하는 메커니즘
- 동일 메시지를 send()로 재전송 하는 것은 producer가 재기동되어 PID가 달라지므로 적용되지 않음