리밸런싱 & 특정 offset 부터 읽기

상황1. Kafka restart caused by something ..

기존 consumer group의 offset 정보가 사라져버림… 또한, group 내에는 active customer 없게됨.

이로인해 offset은 undefined 상태가되고, seek를 이용할 수 없음.

결론적으로 offset을 reset 하는 방법중에 auto-offset-reset 을 latest로 설정하였음. 그 후 seek를 통해 next record 위치를 재설정할 수 있음.

2021-04-01 07:21:48.813  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] (Re-)joining group
2021-04-01 07:21:48.856  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Finished assignment for group at generation 1: {consumer-teltonika-logger-1-a692091e-c91f-4786-941e-b341be846592=Assignment(partitions=[teltonika-location-0, teltonika-location-1, teltonika-location-2, teltonika-location-3, teltonika-api-server-device-msg-0])}
2021-04-01 07:21:48.866  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Successfully joined group with generation 1
2021-04-01 07:21:48.867  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Notifying assignor about the new Assignment(partitions=[teltonika-location-0, teltonika-location-1, teltonika-location-2, teltonika-location-3, teltonika-api-server-device-msg-0])
2021-04-01 07:21:48.867  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Adding newly assigned partitions: teltonika-location-2, teltonika-location-1, teltonika-location-3, teltonika-location-0, teltonika-api-server-device-msg-0
2021-04-01 07:21:48.906  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-2
2021-04-01 07:21:48.906  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-1
2021-04-01 07:21:48.906  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-3
2021-04-01 07:21:48.906  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-0
2021-04-01 07:21:48.909  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-api-server-device-msg-0
2021-04-01 07:21:48.966  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-api-server-device-msg-0 to offset 0.
2021-04-01 07:21:49.170  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-2 to offset 1009.
2021-04-01 07:21:49.170  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-1 to offset 946.
2021-04-01 07:21:49.171  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-3 to offset 977.
2021-04-01 07:21:49.171  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-0 to offset 965.
=====onPartitionsAssigned
2021-04-01 07:21:49.171  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 1009 for partition teltonika-location-2
2021-04-01 07:21:49.171  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 946 for partition teltonika-location-1
2021-04-01 07:21:49.172  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 977 for partition teltonika-location-3
2021-04-01 07:21:49.172  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 965 for partition teltonika-location-0
2021-04-01 07:21:49.172  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 0 for partition teltonika-api-server-device-msg-0

 

 

Custom ConsumerRebalanceListener

  • ConsumerRebalanceListener interface 구현
    • onPartitionsRevoked(Collection partitions)
      • 리밸런싱이 시작되기 전, 그리고 컨슈머가 메세지 소비를 중단한 후 호출
      • offset 를 커밋 해야 하는 곳 (저장)
    • onPartitionsAssigned(Collection partitions)
      • 파티션이 브로커에게 재할당된 후, 그리고 컨슈머가 파티션을 새로 할당 받아 메세지 소비를 시작하기 전에 호출
      • consumer.seek() 를 이용하여 장애 발생 offset 부터 읽음
      •  

ComsumerRebalanceListener의 sub interface인 ConsumerAwareRebalanceListener를 활용

  • KafkaListenerContainerFactor에 Custom Rebalance Listener를 정의하자.
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        ConsumerRebalanceListener consumerRebalanceListener) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    //factory.getContainerProperties().setAssignmentCommitOption(AssignmentCommitOption.ALWAYS);
    factory.getContainerProperties().setConsumerRebalanceListener(myConsumerRebalanceListener());
    configurer.configure(factory, kafkaConsumerFactory);
    return factory;
}
 
  • Custom Rebalance Listener(ConsumerAwareRebalanceListener)
@Bean
	public ConsumerAwareRebalanceListener myConsumerRebalanceListener() {
	    return new ConsumerAwareRebalanceListener() {
	    		@Override
	    		public void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
	    		}
	    	    
	    	    @Override
	    	    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
	    	    	for(TopicPartition tp:partitions) {	
	    	    		long nextUncommitedOffset = new Long(consumer.position(new TopicPartition(tp.topic(), tp.partition())));
	    	    		consumer.seek(new TopicPartition(tp.topic(), tp.partition()), nextUncommitedOffset);
		   
	    	    	}
	    	    }
	    	    //System.out.println(String.format("========onPartitionsAssigned topic:%s, partition:%s, offset:%s", tp.topic(), tp.partition(), pos));	
	    	    
	    		
	    		/*
	    	 	@Override
	    	    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {	    	 		
	    	    }

	    	    @Override
	    	    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
	    	    	store(consumer.position(partition));
	    	    }
	    	    */
	    };
	}

'빅데이터 > Kafka Cluster' 카테고리의 다른 글

Kafka 스트레스 테스트  (0) 2022.02.28
Kafka Consumer/Producer  (0) 2022.02.28
Kafka 자주 사용하는 명령  (0) 2022.02.28
Kafka Overview  (0) 2022.02.28
Kafka cluster failover  (0) 2022.02.28

+ Recent posts