본문 바로가기
클라우드/Kafka

[Kafka] 오프셋 커밋 동기 비동기, 리밸런스 리스너

by 정권이 내 2022. 1. 24.

[Kafka] 오프셋 커밋 리밸런스 리스너

 

지난번에 카프카 컨슈머의 기본예제와 중요개념에 대해 설명했고 이번엔 카프카 컨슈머의 주요 옵션들과 동기, 비동기 오프셋 커밋 방식과 리밸런싱, 파티션할당에 대한 예제 코드를 만들고 테스트 해보겠습니다.

 

Kafka Consumer Client 주요 옵션

컨슈머 프로그램을 실행할때 필요한 필수옵션과 선택옵션들이 있습니다.

 

필수 옵션

옵션설명
bootstrap.servers카프카 브로커 서버의 주소와 포트를 입력한다. 브로커 서버가 여러대일 경우 2개이상의 서버정보를 입력하여 특정 브로커 서버가 장애시에도 다른 서버를 통해 데이터를 처리할수 있다.
key.deserializer레코드 메시지 키를 역직렬화하는 클래스 지정
value.deserializer레코드 메시지 값을 역직렬화하는 클래스 지정

 

선택 옵션

옵션설명기본값
group.id컨슈머 그룹 아이디를 지정한다. subscribe 메서드로 토픽을 구독할때는 필수로 사용해야한다.null
auto.offset.reset토픽에서 데이터를 가져올때 기존에 저장된 오프셋이 없는경우 어떤 방식으로 가져올지에 대한 옵션. - latest : 가장 최근의 오프셋부터 읽기 - earliest : 가장 오래된 오프셋부터 읽기 - none : 커밋기록이 없으면 오류반환, 있으면 해당 시점부터 읽음latest
enable.auto.commit자동/수동 커밋 여부 설정true
auto.commit.interval.ms자동커밋일 경우 커밋간격을 설정 (ms 단위)5000
max.poll.recordspoll 메서드로 반환되는 레코드 개수 설정500
max.poll.interval.mspoll 메서드 호출간격 시간 설정 데이터 처리시간이 길어져서 설정값을 넘어가면 브로커는 컨슈머를 장애상태로 판단하여 리밸런싱을 수행300,000
session.timeout.ms컨슈머와 브로커 서버가 끊기는 최대 시간, 이 시간을 넘을때까지 heartbeat 통신이 없다면 브로커는 리밸런싱을 수행한다.10000
heartbeat.interval.msheartbeat 통신 간격3000

 

Kafka Consumer Client 예제

지난번 포스팅에서 기본으로 만든 컨슈머 프로그램에서 동기/비동기 오프셋 커밋, 컨슈머에서 특정 파티션을 할당받는 방법등에 대해 예제코드를 만들어 보겠습니다.

 

동기 오프셋 커밋 commitSync

poll 메서드 호출후 commitSync 메서드를 명시적으로 호출하는 방법입니다.

 

[TestConsumer.java]

Properties conf = new Properties();

conf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_SERVER);
conf.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
conf.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
conf.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
conf.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(conf);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

    for (ConsumerRecord<String, String> record : records){
        log.info("offset : {} key : {} value : {}", 
                 record.offset(), record.key(), record.value());
    }
    consumer.commitSync();
}

commitSync 메서드는 poll 메서드로 받은 가장 마지막 레코드를 기준으로 커밋을 합니다. 그래서 poll 메서드로 받은 데이터를 모두 처리하고 호출해야하며 커밋을 완료할때까지 기다립니다.

메서드의 인자가 없으면 가장 마지막 레코드의 오프셋을 기준으로 커밋이되고 레코드마다 커밋을 하려면 Map<TopicPartition, OffsetAndMetadata> 인스턴스를 인자로 넣어주면 됩니다.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    Map<TopicPartition, OffsetAndMetadata> curOffset = new HashMap<>();

    for (ConsumerRecord<String, String> record : records){
        log.info("offset : {} key : {} value : {}", 
                 record.offset(), record.key(), record.value());
        curOffset.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1, null)
        );
        consumer.commitSync(curOffset);
    }
}

현재의 오프셋 정보를 받을 HashMap의 키는 토픽과 파티션 정보인 TopicPartition 클래스가 되고 값은 레코드 오프셋 정보인 OffsetAndMeatadata 클래스가 됩니다.

레코드 별로 수행해야 하므로 for 루프문 안에서 OffsetAndMeatadata 정보를 넣을때 record.offset() 에 1을 더하는데 오프셋 커밋이 끝난후 poll 메서드를 호출시 마지막으로 커밋한 오프셋기준으로 레코드를 반환하기 때문입니다.

 

비동기 오프셋 커밋 commitAsync

동기 오프셋 커밋은 컨슈머의 데이터 처리량이 적어지는 단점이 있기 때문에 커밋의 응답을 받기전에도 데이터를 처리할수있는 비동기 오프셋 커밋 방식이 있습니다.

 

[TestConsumer.java]

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

    for (ConsumerRecord<String, String> record : records){
        log.info("offset : {} key : {} value : {}", record.offset(), record.key(), record.value());
    }
    consumer.commitAsync();
}

 

commitAsync() 메서드의 응답을 받으려면 OffsetCommitCallback() 함수를 파라미터로 사용해야 합니다.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

    for (ConsumerRecord<String, String> record : records){
        log.info("offset : {} key : {} value : {}", 
        record.offset(), record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
        Exception exception) {
            if(exception != null)
                log.error("commit failed offset : {}   errMsg : {}",
                offsets, exception.getMessage());
            else
                log.info("commit success");
        }
    });
}

 

리밸런스 리스너(Rebalance Listener)

컨슈머 그룹에서 특정 컨슈머에 문제가 발생하면 컨슈머를 그룹에서 제외하고 다시 그룹에 재할당하는 리밸런싱이라는 작업을 수행합니다.

정상일때는 아래 그림처럼 컨슈머그룹의 컨슈머들이 토픽의 파티션과 매핑되어 poll 메서드로 데이터를 가져와서 처리합니다.

img

 

하지만 데이터 처리도중 특정 컨슈머에 문제가 생겨서 컨슈머 그룹에서 제외가 되면 해당 컨슈머에서 처리한 데이터에 대해 커밋을 하지못하게 되고 리밸런싱될때 이미 DB에 들어간 데이터와 중복되는 데이터를 받는 상황이 발생할수 있습니다.

img

 

이러한 상황을 막기위해 리밸런싱이 수행될때 처리한 데이터를 기준으로 커밋을 하도록 리밸런싱 수행을 감지하는 리밸런싱 리스너인 ConsumerRebalanceListener 인터페이스를 사용합니다.

해당 인터페이스에는 onPartitionAssigned(), onPartitionRevoked() 메서드가 있습니다.

  • onPartitionAssigned : 리밸런싱이 끝나고 파티션이 할당될때 호출되는 메서드
  • onPartitionRevoked : 리밸런싱 수행직전 호출되는 메서드, 이 메서드에서 커밋을 해야한다.

 

함수설명에서 알수 있듯이 마지막으로 처리한 레코드를 기준으로 커밋을 해야하므로 onPartitionRevoked 메서드 내부에 커밋함수를 명시적으로 선언하면됩니다.

 

[RebalanceListner.java]

RebalanceListner 클래스는 ConsumerRebalanceListener 인터페이스를 상속받는 클래스입니다. 리밸런싱이 수행되기전에 onPartitionsRevoked 메서드에서 마지막으로 처리한 데이터로 커밋을 합니다.

public class RebalanceListner implements ConsumerRebalanceListener {

    private KafkaConsumer consumer;
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap();

    public RebalanceListner(KafkaConsumer consumer) {
        this.consumer = consumer;
    }

    public void addOffset(String topic, int partition, long offset) {
        currentOffsets.put(new TopicPartition(topic, partition), 
                           new OffsetAndMetadata(offset, "Commit"));

        Iterator<TopicPartition> keys = currentOffsets.keySet().iterator();

        while (keys.hasNext()) {
            TopicPartition key = keys.next();
            log.info("[KEY] topic : {}   partition : {}  offset : {}", 
                     key.topic(), key.partition(), currentOffsets.get(key).offset());
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
        return currentOffsets;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

        log.info("Following Partitions Revoked ....");
        for (TopicPartition partition : partitions)
            log.info(partition.partition() + ",");

        log.info("Following Partitions commited ....");
        for (TopicPartition tp : currentOffsets.keySet())
            log.info(tp.partition() + "");

        consumer.commitSync(currentOffsets);
        currentOffsets.clear();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        log.warn("Partition Assigned");
    }
}

 

[TestConsumer.java]

RebalanceListner 클래스의 인스턴스를 만들어서 KafkaConsumer 의 인스턴스를 파라미터로 전달합니다. 각각의 오프셋 데이터를 처리하는 for문 안에서 오프셋 정보를 RebalanceListner 인스턴스로 전송하고 커밋 메서드인 commitSync 의 파라미터로 전송해놓은 오프셋 정보를 가져와서 처리합니다.

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(conf);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
RebalanceListner rebalanceListner = new RebalanceListner(consumer);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

    for (ConsumerRecord<String, String> record : records) {
        log.info("offset : {} key : {} value : {}", 
                 record.offset(), record.key(), record.value());
        rebalanceListner.addOffset(record.topic(), record.partition(), record.offset());
    }

    consumer.commitSync(rebalanceListner.getCurrentOffsets());
}

 

컨슈머에 파티션 할당하기, assign

컨슈머가 파티션에서 데이터를 가져올때 일반적으로 사용하는 subscribe 메서드 외에도 특정 파티션을 지정할수 있는 assign 메서드가 있고 Collection 타입의 인자를 받습니다.

 

[TestConsumer.java]

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(conf);
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, 0)));
RebalanceListner rebalanceListner = new RebalanceListner(consumer);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

    for (ConsumerRecord<String, String> record : records) {
        log.info("offset : {} key : {} value : {}", 
                 record.offset(), record.key(), record.value());
        rebalanceListner.addOffset(record.topic(), record.partition(), record.offset());
    }

    consumer.commitSync(rebalanceListner.getCurrentOffsets());
}

assign 메서드의 파라미터로 Collections.singleton(new TopicPartition(TOPIC_NAME, 0)) 라는 값을 입력했는데 0번 파티션을 할당받아서 데이터를 가져오겠다는 의미입니다.

 

Kafka Consumer Client 종료

컨슈머는 종료될때도 안전하게 하는것이 중요한데 정상적으로 종료되지 않는 컨슈머는 타임아웃이 발생하기 전까지 컨슈머그룹에 남아있게되고 결국은 데이터 처리량이 느려지는 상황이 발생하게 된다.

카프카 라이브러리에서는 컨슈머의 안전한 종료를 위해 wakeup 이라는 메서드를 제공합니다. wakeup 메서드가 실행된 이후에 컨슈머가 poll을 수행하면 WakeupException 예외가 발생되고 예외처리로 컨슈머를 안전하게 종료하면 됩니다.

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

        for (ConsumerRecord<String, String> record : records) {
            log.info("offset : {} key : {} value : {}", 
                     record.offset(), record.key(), record.value());
            rebalanceListner.addOffset(record.topic(), record.partition(), 
                                       record.offset());
        }

        consumer.commitSync(rebalanceListner.getCurrentOffsets());
    }

} catch (WakeupException exception) {
    log.error(exception.getMessage());
    // 자원 해제...
} finally {
    consumer.close();
}
반응형

댓글