본문 바로가기
클라우드/Kafka

Kafka Producer Client 주요 옵션 설명과 메시지 키/특정 파티션/동기-비동기 전송

by 정권이 내 2021. 12. 2.

 

 

Kafka Producer Client - 2

 

지난번에 Java Project로 카프카 프로듀서 클라이언트를 만들어서 쿠버네티스에서 실행시킨후 100건의

데이터를 토픽을 전송하는 테스트를 해봤습니다.

 

[Kafka] 카프카 프로듀서 Kubernetes에서 Pod로 실행하기 #jar #Kafka Producer #Docker image

Kafka Producer Client 카프카 브로커 서버에 토픽을 만들어 봤다면 카프카 프로듀서 역할을 할수있는 클라이언트 프로그램을 만들어서 데이터를 전송하는 테스트를 해볼수 있습니다. 카프카 프로듀

ksr930.tistory.com

 

이번엔 카프카 프로듀서 클라이언트에서 설정할수 있는 옵션들과 토픽으로 데이터 전송시 단순하게 토픽만

지정해서 전송하는 방식외에 3가지 방법을 추가로 알아보겠습니다.

 

 

Kafka 주요 옵션

 

key.serializer

  • key 를 위한 직렬화 클래스입니다.

 

value.serializer

  • value 를 위한 직렬화 클래스입니다.

 

bootstrap.servers

  • Kafka Producer Client 에서 Kafka Cluster에 접속하기위한 host, port 정보를 설정하는 옵션입니다.

  • host1:port1, host2:port2... 형식으로 작성하며 여러개 입력할경우 특정 브로커 서버가 다운됬을때

    다른 브로커 서버를 사용할수 있게 합니다.

 

buffer.memory

  • Kafka Producer Client 가 브로커 서버의 토픽으로 레코드 전송시 버퍼에 담아 보내는데 그때 버퍼의

    bytes 크기를 설정하는 옵션입니다.

  • 기본값은 33,554,432 byte (32MB)

 

compression.type

  • Kafka Producer Client 에 의해서 생성된 데이터를 압축할 타입에 대한 옵션입니다.
  • 기본값은 none이며 gzip, snappy, lz4, zstd 들의 옵션을 설정할수 있습니다.

 

retries

  • Kafka Producer Client 에서 토픽으로 레코드 전송시 실패할경우 재전송 횟수에 대한 옵션입니다.

  • max.in.flight.requests.per.connection 옵션을 1로 설정하지 않고 retries 옵션만 설정하면 저장되는

    레코드의 순서가 바뀔가능성이 있습니다. 이러한 이유로 대부분의 사용자들은 retries 옵션을 설정하지

    않고 delivery.timeout.ms 옵션을 사용합니다.

  • 기본값은 2147483647.

 

batch.size

  • Kafka Producer Client 에서 여러개의 레코드를 동일한 파티션으로 전송할때 batch 를 사용하는데 이때

    batch 의 크기를 설정하는 옵션입니다.

  • batch.size 값을 작게 설정하면 batching 이 덜 발생하여 처리량이 줄어들게 됩니다. 반대로 너무 크게

    설정하면 낭비되는 메모리 공간이 많게되므로 적절한 크기를 설정해야합니다.

  • 기본값은 16,384 byte

 

더 자세한 옵션들은 https://kafka.apache.org/documentation/#producerconfigs 사이트에서 참고하면 됩니다.

 

 

Kafka Producer Client 전송 방식 종류

 

Kafka Producer Client 에서 토픽으로 데이터 전송시 별도의 설정이 없다면 지정한 토픽의 여러 파티션에

라운드로빈 형식으로 레코드가 전송될것입니다.

기본 방식외에도 메시지 키를 포함하거나 특정 파티션을 지정하여 레코드를 전송하는 방법을 알아보겠습니다.

 

1. 메시지 키를 포함하여 전송하는 방법

메시지 키를 사용하는 방식은 Kafka Producer Client 를 사용할때 아주 유용한 방식입니다. 특정 파티션에

들어갈 레코드의 순서를 보장하고 싶다면 unique한 message key 를 만들어서 특정 파티션에 레코드가

순서대로 저장됩니다.

 

ProducerRecord 클래스를 다음과 같이 생성합니다.

ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "msgKey1", value);

 

TestProducer.java
package com.example.Kafka_Producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;

public class TestProducer {
    private final static Logger logger = LoggerFactory.getLogger(TestProducer.class);
    private final static String TOPIC_NAME = "ksr-test";
    private final static String BROKER_SERVER = "kafka.kafka.svc:9092";

    public static void main(String[] args) {

        Properties conf = new Properties();
        conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_SERVER);
        conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        conf.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> kafkaProd = new KafkaProducer<String, String>(conf);

        String value = "value test";

        for (int i=0;i<100;i++) {

            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "msgKey3", value);
            logger.info("[{}] {}", i, record);

            kafkaProd.send(record);
            kafkaProd.flush();
        }

        kafkaProd.close();
    }
}

 

테스트를 하기위해 ksr-test 토픽에 파티션을 3개를 생성하였고 일반적인 경우라면 라운드로빈 방식으로

3개에 파티션에 레코드가 나누어져 저장되야 하지만 메시지 키를 지정하였기 때문에 하나의 파티션으로만

들어가게 됩니다.

 

카프카 클러스터 UI에서 확인한 ksr-test 토픽의 파티션 정보입니다.

 

실제 들어간 데이터를 확인해보겠습니다.

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ksr-test --property print.key=true --property key.separator=" : " --from-beginning
msgKey3 : value test
msgKey3 : value test
msgKey3 : value test
msgKey3 : value test
msgKey3 : value test
msgKey3 : value test
...
...
Processed a total of 100 messages

 

토픽의 어느파티션으로 들어갈지는 알수없지만 레코드가 저장되는 파티션은 동일합니다.

 

 

2. 특정 파티션을 지정하여 전송하는 방법

메시지키를 지정하여 전송하는 방식은 동일한 메시지키를 가지는 레코드에 대해서는 같은 파티션으로

전송 되었지만 어느 파티션에 들어가는지는 알수 없습니다.

 

그래서 Kafka Producer Client 에서는 파티션까지 지정하여 레코드를 저장할수 있는 Partitioner Interface

를 사용하는 방법이 있습니다.

 

MyPartitioner 클래스를 새로 생성하고 Partitioner Interface 를 implements 합니다.

 

MyPartitioner.java

중간에 key 값을 "msgKey3" 라는 문자열과 비교하는 부분이 있는데 지정한 message Key 값과 일치할 경우

리턴값으로 반환되는 숫자의 파티션으로 데이터가 전송됩니다.

package com.example.Kafka_Producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class MyPartitioner implements Partitioner {


    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        if(keyBytes == null)
            throw new InvalidRecordException("메시지 키가 없습니다.");
		
        // message Key 값이 "msgKey3"와 일치하면 1번 파티션으로 전송.
        if(((String)key).equals("msgKey3"))
            return 1;

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

 

 

TestProducer.java

ProducerConfig 객체에 PARTITIONER_CLASS_CONFIG 설정을 새로 추가합니다.

package com.example.Kafka_Producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;

public class TestProducer {
    private final static Logger logger = LoggerFactory.getLogger(TestProducer.class);
    private final static String TOPIC_NAME = "ksr-test";
    private final static String BROKER_SERVER = "kafka.kafka.svc:9092";

    public static void main(String[] args) {

        Properties conf = new Properties();
        conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_SERVER);
        conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        conf.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        conf.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);

        KafkaProducer<String, String> kafkaProd = new KafkaProducer<String, String>(conf);

        String value = "value test";

        for (int i=0;i<100;i++) {

            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "msgKey3", value);
            logger.info("[{}] {}", i, record);

            kafkaProd.send(record);
            kafkaProd.flush();
        }

        kafkaProd.close();
    }
}

 

테스트를 하기위해 ksr-test 토픽을 새로 만들고 데이터를 전송해봤습니다.

 

실제 들어간 데이터를 확인해보겠습니다.

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ksr-test --property print.partition=true --property print.key=true --property key.separator="-" --from-beginning
Partition:1-msgKey3-value test
Partition:1-msgKey3-value test
Partition:1-msgKey3-value test
Partition:1-msgKey3-value test
Partition:1-msgKey3-value test
Partition:1-msgKey3-value test
Partition:1-msgKey3-value test
Partition:1-msgKey3-value test
Partition:1-msgKey3-value test
Partition:1-msgKey3-value test
...
...
...
Processed a total of 100 messages

 

파티션 1번 인덱스에 100건의 데이터가 모두 들어간것을 확인할수 있습니다.

 

 

3. 데이터 전송결과를 수신하는 전송방법

Kafka Producer Client 에서 레코드 전송시 KafkaProducer 클래스의 send() 함수는 java Future 객체를

반환하는데 이 객체는 RecordMetadata 결과를 포함하고 있습니다.

 

RecordMetadata는 브로커 서버내의 토픽에 데이터가 정상적으로 저장되었는지에 대한 반환값을

제공하는데 동기 or 비동기 방식으로 불러올수 있습니다.

 

동기방식과 비동기 방식은 각각의 특성이 있습니다.

방식속도특징
동기 방식느림데이터 순서 보장
비동기 방식빠름데이터 순서 보장하지 않음

 

 

동기 방식

Kafka_Producer.java
package com.example.Kafka_Producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class TestProducer {
    private final static Logger logger = LoggerFactory.getLogger(TestProducer.class);
    private final static String TOPIC_NAME = "ksr-test";
    private final static String BROKER_SERVER = "kafka.kafka.svc:9092";

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties conf = new Properties();
        conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_SERVER);
        conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        conf.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        conf.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);

        KafkaProducer<String, String> kafkaProd = new KafkaProducer<String, String>(conf);

        String value = "value test";

        for (int i = 0; i < 100; i++) {

            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "msgKey3", value);

            try {
                RecordMetadata rcdMetaData = kafkaProd.send(record).get();
                kafkaProd.flush();
                logger.info("[{}] {}", i, rcdMetaData.toString());
            } catch (ExecutionException ex) {
                ex.printStackTrace();
            }

        }

        kafkaProd.close();
    }
}

 

이 부분이 새로 추가되었는데 KafkaProducer 객체의 send() 함수결과를 get() 함수로 받아 RecordMetadata

객체로 결과를 반환받는 부분입니다.

try {
	RecordMetadata rcdMetaData = kafkaProd.send(record).get();
	kafkaProd.flush();
	logger.info("[{}] {}", i, rcdMetaData.toString());
} catch (ExecutionException ex) {
	ex.printStackTrace();
}

 

실행한 로그를 확인해보면 다음과 같이 출력됩니다.

[main] INFO com.example.Kafka_Producer.TestProducer - [0] ksr-test-1@300
[main] INFO com.example.Kafka_Producer.TestProducer - [1] ksr-test-1@301
[main] INFO com.example.Kafka_Producer.TestProducer - [2] ksr-test-1@302
[main] INFO com.example.Kafka_Producer.TestProducer - [3] ksr-test-1@303
[main] INFO com.example.Kafka_Producer.TestProducer - [4] ksr-test-1@304
[main] INFO com.example.Kafka_Producer.TestProducer - [5] ksr-test-1@305
[main] INFO com.example.Kafka_Producer.TestProducer - [6] ksr-test-1@306
[main] INFO com.example.Kafka_Producer.TestProducer - [7] ksr-test-1@307
[main] INFO com.example.Kafka_Producer.TestProducer - [8] ksr-test-1@308
...
...
[main] INFO com.example.Kafka_Producer.TestProducer - [98] ksr-test-1@398
[main] INFO com.example.Kafka_Producer.TestProducer - [99] ksr-test-1@399

 

ksr-test 는 제가 지정한 토픽의 이름이고 그 뒤로 파티션의 인덱스 번호와 레코드의 오프셋 넘버가

출력되고 있습니다.

 

동기방식은 각 전송건에 대해 결과를 순서대로 확인할수 있다는 장점이 있지만 중간에 딜레이가 생기는경우

Kafka의 장점인 빠른 처리량을 기대하기는 어려워질수 있습니다.

 

그래서 RecordMetadata 객체로 받을 전송결과를 Callback 으로 만들어서 비동기 형식으로 처리할수 있습니다.

 

비동기방식

MyProducerCallback.java
package com.example.Kafka_Producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyProducerCallback implements Callback {

    private final static Logger logger = LoggerFactory.getLogger(MyProducerCallback.class);

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            logger.error(exception.getMessage(), exception);
        } else {
            logger.info(metadata.toString());
        }
    }
}

 

Kafka_Producer.java
package com.example.Kafka_Producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.Timer;
import java.util.concurrent.ExecutionException;

public class TestProducer {
    private final static Logger logger = LoggerFactory.getLogger(TestProducer.class);
    private final static String TOPIC_NAME = "ksr-test";
    //    private final static String BROKER_SERVER = "kafka.kafka.svc:9092";
    private final static String BROKER_SERVER = "ipron-svc:31090";

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties conf = new Properties();
        conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_SERVER);
        conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        conf.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        conf.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);

        KafkaProducer<String, String> kafkaProd = new KafkaProducer<String, String>(conf);

        String value = "value test";

        for (int i = 0; i < 10000; i++) {

            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "msgKey3", value);
            logger.info("[{}] {}", i, record);
            kafkaProd.send(record, new MyProducerCallback());
            kafkaProd.flush();
        }

        kafkaProd.close();
    }
}

 

실행한 로그를 확인해보면 다음과 같이 출력됩니다. 비동기 방식에서는 TestProducer 클래스가 아닌

MyProducerCallback 클래스에서 로그를 출력하기 때문에 로그에 출력되는 클래스가 다릅니다.

[kafka-producer-network-thread | producer-1] INFO com.example.Kafka_Producer.MyProducerCallback - ksr-test-1@809
[kafka-producer-network-thread | producer-1] INFO com.example.Kafka_Producer.MyProducerCallback - ksr-test-1@810
[kafka-producer-network-thread | producer-1] INFO com.example.Kafka_Producer.MyProducerCallback - ksr-test-1@811
[kafka-producer-network-thread | producer-1] INFO com.example.Kafka_Producer.MyProducerCallback - ksr-test-1@812
[kafka-producer-network-thread | producer-1] INFO com.example.Kafka_Producer.MyProducerCallback - ksr-test-1@813
[kafka-producer-network-thread | producer-1] INFO com.example.Kafka_Producer.MyProducerCallback - ksr-test-1@814
[kafka-producer-network-thread | producer-1] INFO com.example.Kafka_Producer.MyProducerCallback - ksr-test-1@815
[kafka-producer-network-thread | producer-1] INFO com.example.Kafka_Producer.MyProducerCallback - ksr-test-1@816
[kafka-producer-network-thread | producer-1] INFO com.example.Kafka_Producer.MyProducerCallback - ksr-test-1@817

 

 

 

반응형

댓글