리밸런싱 & 특정 offset 부터 읽기
상황1. Kafka restart caused by something ..
기존 consumer group의 offset 정보가 사라져버림… 또한, group 내에는 active customer 없게됨.
이로인해 offset은 undefined 상태가되고, seek를 이용할 수 없음.
결론적으로 offset을 reset 하는 방법중에 auto-offset-reset 을 latest로 설정하였음. 그 후 seek를 통해 next record 위치를 재설정할 수 있음.
2021-04-01 07:21:48.813 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] (Re-)joining group
2021-04-01 07:21:48.856 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Finished assignment for group at generation 1: {consumer-teltonika-logger-1-a692091e-c91f-4786-941e-b341be846592=Assignment(partitions=[teltonika-location-0, teltonika-location-1, teltonika-location-2, teltonika-location-3, teltonika-api-server-device-msg-0])}
2021-04-01 07:21:48.866 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Successfully joined group with generation 1
2021-04-01 07:21:48.867 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Notifying assignor about the new Assignment(partitions=[teltonika-location-0, teltonika-location-1, teltonika-location-2, teltonika-location-3, teltonika-api-server-device-msg-0])
2021-04-01 07:21:48.867 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Adding newly assigned partitions: teltonika-location-2, teltonika-location-1, teltonika-location-3, teltonika-location-0, teltonika-api-server-device-msg-0
2021-04-01 07:21:48.906 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-2
2021-04-01 07:21:48.906 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-1
2021-04-01 07:21:48.906 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-3
2021-04-01 07:21:48.906 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-0
2021-04-01 07:21:48.909 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-api-server-device-msg-0
2021-04-01 07:21:48.966 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-api-server-device-msg-0 to offset 0.
2021-04-01 07:21:49.170 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-2 to offset 1009.
2021-04-01 07:21:49.170 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-1 to offset 946.
2021-04-01 07:21:49.171 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-3 to offset 977.
2021-04-01 07:21:49.171 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-0 to offset 965.
=====onPartitionsAssigned
2021-04-01 07:21:49.171 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 1009 for partition teltonika-location-2
2021-04-01 07:21:49.171 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 946 for partition teltonika-location-1
2021-04-01 07:21:49.172 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 977 for partition teltonika-location-3
2021-04-01 07:21:49.172 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 965 for partition teltonika-location-0
2021-04-01 07:21:49.172 INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 0 for partition teltonika-api-server-device-msg-0
Custom ConsumerRebalanceListener
- ConsumerRebalanceListener interface 구현
- onPartitionsRevoked(Collection partitions)
- 리밸런싱이 시작되기 전, 그리고 컨슈머가 메세지 소비를 중단한 후 호출
- offset 를 커밋 해야 하는 곳 (저장)
- onPartitionsAssigned(Collection partitions)
- 파티션이 브로커에게 재할당된 후, 그리고 컨슈머가 파티션을 새로 할당 받아 메세지 소비를 시작하기 전에 호출
- consumer.seek() 를 이용하여 장애 발생 offset 부터 읽음
- onPartitionsRevoked(Collection partitions)
ComsumerRebalanceListener의 sub interface인 ConsumerAwareRebalanceListener를 활용
- KafkaListenerContainerFactor에 Custom Rebalance Listener를 정의하자.
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
ConsumerRebalanceListener consumerRebalanceListener) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
//factory.getContainerProperties().setAssignmentCommitOption(AssignmentCommitOption.ALWAYS);
factory.getContainerProperties().setConsumerRebalanceListener(myConsumerRebalanceListener());
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
- Custom Rebalance Listener(ConsumerAwareRebalanceListener)
@Bean
public ConsumerAwareRebalanceListener myConsumerRebalanceListener() {
return new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
for(TopicPartition tp:partitions) {
long nextUncommitedOffset = new Long(consumer.position(new TopicPartition(tp.topic(), tp.partition())));
consumer.seek(new TopicPartition(tp.topic(), tp.partition()), nextUncommitedOffset);
}
}
//System.out.println(String.format("========onPartitionsAssigned topic:%s, partition:%s, offset:%s", tp.topic(), tp.partition(), pos));
/*
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
store(consumer.position(partition));
}
*/
};
}
'빅데이터 > Kafka Cluster' 카테고리의 다른 글
Kafka 스트레스 테스트 (0) | 2022.02.28 |
---|---|
Kafka Consumer/Producer (0) | 2022.02.28 |
Kafka 자주 사용하는 명령 (0) | 2022.02.28 |
Kafka Overview (0) | 2022.02.28 |
Kafka cluster failover (0) | 2022.02.28 |