알고리즘 : DFS

import java.io.*;
import java.util.*;

public class Main {
	private int computers;
	private int numOfPair;
	private final int MAX = 105;
	private boolean[][] conn = new boolean[MAX][MAX];
	private boolean[] visited = new boolean[MAX];
	private int totalCnt = 0;
	public static void main(String[] args) throws Exception {
		Main main = new Main();
		main.start();	
	}
	
	private void start() throws Exception {
		System.setIn(new FileInputStream("src/웜바이러스/input.txt"));
		BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
		computers = Integer.parseInt(br.readLine());
		numOfPair = Integer.parseInt(br.readLine());
		StringTokenizer st;
		for(int i =0 ;i<numOfPair; i++) {
			st = new StringTokenizer(br.readLine(), " ");
			int p1 = Integer.parseInt(st.nextToken());
			int p2 = Integer.parseInt(st.nextToken());
			conn[p1][p2] = true;
			conn[p2][p1] = true;
		}
		dfs(1);
		System.out.println(totalCnt);
		br.close();
	}
	private void dfs(int start) {
		for(int i = 2; i <= computers; i++) {
			if(conn[start][i] && !visited[i]) {
				visited[i] = true;
				totalCnt++;
				dfs(i);
			}
		}
	}
}

출처

- https://www.acmicpc.net/problem/2606

리밸런싱 & 특정 offset 부터 읽기

상황1. Kafka restart caused by something ..

기존 consumer group의 offset 정보가 사라져버림… 또한, group 내에는 active customer 없게됨.

이로인해 offset은 undefined 상태가되고, seek를 이용할 수 없음.

결론적으로 offset을 reset 하는 방법중에 auto-offset-reset 을 latest로 설정하였음. 그 후 seek를 통해 next record 위치를 재설정할 수 있음.

2021-04-01 07:21:48.813  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] (Re-)joining group
2021-04-01 07:21:48.856  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Finished assignment for group at generation 1: {consumer-teltonika-logger-1-a692091e-c91f-4786-941e-b341be846592=Assignment(partitions=[teltonika-location-0, teltonika-location-1, teltonika-location-2, teltonika-location-3, teltonika-api-server-device-msg-0])}
2021-04-01 07:21:48.866  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Successfully joined group with generation 1
2021-04-01 07:21:48.867  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Notifying assignor about the new Assignment(partitions=[teltonika-location-0, teltonika-location-1, teltonika-location-2, teltonika-location-3, teltonika-api-server-device-msg-0])
2021-04-01 07:21:48.867  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Adding newly assigned partitions: teltonika-location-2, teltonika-location-1, teltonika-location-3, teltonika-location-0, teltonika-api-server-device-msg-0
2021-04-01 07:21:48.906  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-2
2021-04-01 07:21:48.906  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-1
2021-04-01 07:21:48.906  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-3
2021-04-01 07:21:48.906  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-location-0
2021-04-01 07:21:48.909  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Found no committed offset for partition teltonika-api-server-device-msg-0
2021-04-01 07:21:48.966  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-api-server-device-msg-0 to offset 0.
2021-04-01 07:21:49.170  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-2 to offset 1009.
2021-04-01 07:21:49.170  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-1 to offset 946.
2021-04-01 07:21:49.171  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-3 to offset 977.
2021-04-01 07:21:49.171  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Resetting offset for partition teltonika-location-0 to offset 965.
=====onPartitionsAssigned
2021-04-01 07:21:49.171  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 1009 for partition teltonika-location-2
2021-04-01 07:21:49.171  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 946 for partition teltonika-location-1
2021-04-01 07:21:49.172  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 977 for partition teltonika-location-3
2021-04-01 07:21:49.172  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 965 for partition teltonika-location-0
2021-04-01 07:21:49.172  INFO 10632 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-teltonika-logger-1, groupId=teltonika-logger] Seeking to offset 0 for partition teltonika-api-server-device-msg-0

 

 

Custom ConsumerRebalanceListener

  • ConsumerRebalanceListener interface 구현
    • onPartitionsRevoked(Collection partitions)
      • 리밸런싱이 시작되기 전, 그리고 컨슈머가 메세지 소비를 중단한 후 호출
      • offset 를 커밋 해야 하는 곳 (저장)
    • onPartitionsAssigned(Collection partitions)
      • 파티션이 브로커에게 재할당된 후, 그리고 컨슈머가 파티션을 새로 할당 받아 메세지 소비를 시작하기 전에 호출
      • consumer.seek() 를 이용하여 장애 발생 offset 부터 읽음
      •  

ComsumerRebalanceListener의 sub interface인 ConsumerAwareRebalanceListener를 활용

  • KafkaListenerContainerFactor에 Custom Rebalance Listener를 정의하자.
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        ConsumerRebalanceListener consumerRebalanceListener) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    //factory.getContainerProperties().setAssignmentCommitOption(AssignmentCommitOption.ALWAYS);
    factory.getContainerProperties().setConsumerRebalanceListener(myConsumerRebalanceListener());
    configurer.configure(factory, kafkaConsumerFactory);
    return factory;
}
 
  • Custom Rebalance Listener(ConsumerAwareRebalanceListener)
@Bean
	public ConsumerAwareRebalanceListener myConsumerRebalanceListener() {
	    return new ConsumerAwareRebalanceListener() {
	    		@Override
	    		public void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
	    		}
	    	    
	    	    @Override
	    	    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
	    	    	for(TopicPartition tp:partitions) {	
	    	    		long nextUncommitedOffset = new Long(consumer.position(new TopicPartition(tp.topic(), tp.partition())));
	    	    		consumer.seek(new TopicPartition(tp.topic(), tp.partition()), nextUncommitedOffset);
		   
	    	    	}
	    	    }
	    	    //System.out.println(String.format("========onPartitionsAssigned topic:%s, partition:%s, offset:%s", tp.topic(), tp.partition(), pos));	
	    	    
	    		
	    		/*
	    	 	@Override
	    	    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {	    	 		
	    	    }

	    	    @Override
	    	    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
	    	    	store(consumer.position(partition));
	    	    }
	    	    */
	    };
	}

'빅데이터 > Kafka Cluster' 카테고리의 다른 글

Kafka 스트레스 테스트  (0) 2022.02.28
Kafka Consumer/Producer  (0) 2022.02.28
Kafka 자주 사용하는 명령  (0) 2022.02.28
Kafka Overview  (0) 2022.02.28
Kafka cluster failover  (0) 2022.02.28

테스트

'빅데이터 > Kafka Cluster' 카테고리의 다른 글

Kafka restart과 retention  (0) 2022.02.28
Kafka Consumer/Producer  (0) 2022.02.28
Kafka 자주 사용하는 명령  (0) 2022.02.28
Kafka Overview  (0) 2022.02.28
Kafka cluster failover  (0) 2022.02.28

Spring Kafka produce

KafkaProducer -&amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;gt; RecordAccumulator -&amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;gt; Sender -&amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;gt; Broker

kafkaProducer send(topicName, message)

사용자는 send() 호출 시 전송할 Record와 전송 완료 후 실행할 콜백을 지정할 수 있다. send()가 호출되면 Serialization, Partitioning, Compression 작업이 이루어지고 최종적으로 RecordAccumulator에 Record가 저장된다.

 

public void sendMessage(String data) {
    EventMessage<String> message = new EventMessage<>();
    message.setPayload(data);

    ListenableFuture<SendResult<String, EventMessage<String>>> future = 		
    kafkaTemplate.send(topicName, message);
    future.addCallback(new ListenableFutureCallback<SendResult<String, EventMessage<String>>> () {
        @Override
        public void onSuccess(SendResult<String, EventMessage<String>> result) {
            log.info("pub success");
        }
        @Override
        public void onFailure(Throwable ex) {
            log.error("Failed to pub message {}", message, ex);
        }
    });
}

Serialization

사용자로부터 전달된 Record의 Key, Value는 지정된 Serializer에 의해서 Byte Array로 변환된다.

  • bootstrap-servers
  • key-serializer : 프로듀서가 키를 직렬화 하기 위해 사용하는 클래스(org.apache.kafka.common.serialization.StringSerializer)
  • value-serializer : 프로듀서가 값을 직렬화 하기 위해 사용하는 클래스(org.springframework.kafka.support.serializer.JsonSerializer)
  • retry.backoff.ms : The amount of time to wait before attempting to retry a failed request to a given topic partition.
  • request.timeout.ms
  • acks : 보낸 레코드의 신뢰성

StringSerializer 외에 다음과 같은 Serializer를 기본적으로 제공하고 있다.

  • ByteArraySerializer
  • ByteBufferSerializer
  • BytesSerializer
  • DoubleSerializer
  • IntegerSerializer
  • LongSerializer

Default Partitioning

  • key 값이 있는 경우 key 값의 Hash 값을 이용해서 파티션을 할당한다.
  • key 값이 없는 경우 Round-Robin 방식으로 파티션이 할당된다.

Compression

사용자가 전송하려는 Record는 압축을 함으로써 네트워크 전송 비용도 줄일 수 있고 저장 비용도 줄일 수 있다. Record는 RecordAccumulator에 저장될 때 바로 압축되어 저장된다. compression.type을 설정하여 압축 시 사용할 코덱을 지정할 수 있다. 다음과 같은 코덱를 사용할 수 있으며 지정하지 않는 경우 기본값은 none이다.

  • gzip
  • snappy
  • lz4

RecordAccumulator

RecordAccumulator에 저장하기 전에 Record의 Serialized Size를 검사한다. 크기가 문제 없으면, RecordAccumulator의 append()를 이용해서 저장한다.

 

Record가 batch.size보다 작으면 하나의 RecordBatch에 여러 개의 Record가 저장되지만, Record가 batch.size보다 크면 하나의 RecordBatch에 하나의 Record만 저장된다. 

 

- max.request.size(default 1048576, 1MB) : Serialized Size가 max.request.size 설정값 또는 buffer.memory 설정값보다 크면 RecordTooLargeException이 발생한다.

- buffer.memory(default 33554432, 32MB) : buffer pool size. total bytes of memory the producer can use

- max.block.ms(default 60000, 1 min) : RecordBatch 생성을 위해 요청한 Buffer Size만큼의 여유가 없으면 할당이 Blocking되고 BufferPool에서 용량이 확보될 때까지max.block.ms설정값만큼 기다린다. max.block.ms설정값만큼의 시간이 초과해도 확보되지 않으면 TimeoutException이 발생한다.

- max.in.flight.requests.per.connection(default 5) : kafkaProducer Client가 하나의 Broker로 동시에 전송할 수 있는 요청 수

Sender Thread

Sender Thread는 RecordAccumulator에 저장된 Record를 꺼내서 Broker로 전송하고 Broker의 응답을 처리한다.


Spring Kafka consumer

Kafka는 파티션 단위로 데이터를 분배하기 때문에 파티션의 수보다 많은 컨슈머를 그룹에 추가한 경우 파티션의 수를 초과한 컨슈머는 파티션을 할당받지 못하여 데이터를 소비하지 못한다.

 

브로커 중 하나가 컨슈머 그룹를 관리하고 이를 GroupCoordinator라고 부른다. GroupCoordinator는 그룹의 메타데이터와 그룹을 관리한다.

 

Kafka는 리밸런스(rebalance)를 통해 컨슈머의 할당된 파티션을 다른 컨슈머로 이동시킨다. 컨슈머 그룹에 새로운 컨슈머가 추가되거나 컨슈머 그룹에 속해 있던 컨슈머가 제외되는 경우에 그룹 내 파티션을 다시 할당해야 하므로 리밸런스가 발생한다. 컨슈머 리밸런스가 일어날 때 모든 컨슈머에 할당된 파티션이 해제(revoke)되므로 새로 파티션이 할당되기 전까지 데이터 처리가 일시 정지된다.

- max.poll.interval.ms(default 5 min) : rebalanceTimeout 이내에 JoinGroup 요청을 보내지 않은 컨슈머는 컨슈머 그룹에서 제외

- session.timeout.ms(default 10 seconds) : 컨슈머가 sessionTimeout 시간 내에 heartbeat 요청을 GroupCoordinator에 보내지 않으면 GroupCoordinator는 해당 컨슈머가 죽은 것으로 판단한다.

- partition.assignment.strategy : 컨슈머가 지원하는 파티션 할당 정책(default: org.apache.kafka.clients.consumer.RangeAssignor). RangeAssignor, RoundRobinAssignor, StickyAssignor

- max.poll.records(default 500)

- fetch.max.wait.ms(default 0.5 sec)

- fetch.min.bytes(default 1) : 브로커가 최소한 반환해야하는 양. 반환할 만큼 데이터가 충분하지 않으면 브로커는 데이터가 누락되길 기다린다.  

- fetch.max.bytes(default 50MiB) : 절대적으로 적용되는 값은 아님. 첫 번째 파티션의 첫 번째 메시지가 이 값보다 크다면 컨슈머가 계속 진행될 수 있도록 데이터가 반환된다. 

- message.max.bytes, max.message.bytes : 브로커가 허용하는 최대 메시지 크기

- max.partition.fetch.bytes(default 1MiB) : 브로커가 반환할 파티션당 최대 데이터 크기. fetch.max.bytes와 동일하게 첫 번째 파티션의 첫 번째 메시지가 이 값보다 크다면 컨슈머가 계속 진행될 수 있도록 데이터가 반환된다.

 

KafkaConsumer의 poll 메서드가 호출되면 KafkaConsumer는 분주히 브로커로부터 데이터를 가져올 준비를 한다. KafkaConsumer가 올바르게 동작하기 위해서는 리밸런스는 필요하지만 안정적인 데이터 처리를 위해서 불필요한 리밸런스는 줄이는 것이 좋다. 불필요한 리밸런스를 줄이기 위해서는 max.poll.interval.ms max.poll.records를 적절히 조정하여 poll 메서드가 일정 간격으로 호출되도록 해야 한다. 필요한 경우에는 heartbeat.interval.ms session.timeout.ms를 조정한다.

 

또한 KafkaConsumer는 다양한 Processing guarantees(No guarantee, At most once, At least once, Effectively once)를 지원한다. At least once 방식이 성능은 높이면서 처리를 보장한다. 컨슈머가 중복된 데이터를 처리해도 문제가 없다면 At least once 방식을 사용하는 것이 유리하다.

 

 

RangeAssignor는 각 토픽별로 파티션을 할당한다. RangeAssignor는 파티션은 숫자 순서대로 정렬을 하고 컨슈머는 사전 순서대로 정렬을 한다. 그리고 각 토픽의 파티션을 컨슈머 숫자로 나누어 컨슈머에게 할당해야 하는 파티션 수를 결정한다. 만약 고르게 나누어지지 않는다면 앞쪽 컨슈머가 더 많은 파티션을 할당받는다. 그림 10의 경우 RangeAssignor는 파티션을 Consumer 1 = {T1-0, T1-1, T2-0, T2-1}, Consumer 2 = {T1-2, T2-2}으로 할당했다.

RoundRobinAssignor는 모든 파티션을 컨슈머에게 번갈아가면서 할당한다. 그림 10의 경우 RoundRobinAssignor는 파티션을 Consumer 1 = {T1-0, T1-2, T2-1}, Consumer 2 = {T1-1, T2-0, T2-2}으로 할당했다.

StickyAssignor는 최대한 파티션을 균등하게 분배하고, 파티션 재할당이 이루어질 때 파티션의 이동을 최소화하려는 할당 정책이다.

 

KafkaConsumer에 토픽, 파티션 할당은 assign 메서드를 통해 이루어진다. 컨슈머의 그룹 관리 기능을 사용하지 않고 사용자가 assign 메서드를 직접 호출하여 수동으로 토픽, 파티션을 할당할 수 있는데 이 경우에는 컨슈머 리밸런스가 일어나지 않는다.

assign 메서드를 통해 할당된 파티션은 초기 오프셋 값 설정이 필요하다. 초기 오프셋 값이 없으면 Fetch가 불가능한 파티션으로 분류된다. seek 메서드를 통해 초기 오프셋 값을 설정한다. 초기 오프셋 설정은 오프셋 초기화 과정을 통해 이루어진다. 사용자가 KafkaConsumer의 seek 메서드를 사용하여 설정할 수도 있다.

 

 

@KafkaListener(topics = "#{'${topic_names}'.split(',')}", groupId = "group_id")
public void consume(@Payload EventMessage<String> message,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName
) throws Exception {
	System.out.println(topicName);
	System.out.println(message);
}

 

 

 

기본 설정 옵션

- bootstrap-servers

- group-id

- auto-offset-reset

- value-deserializer

 

Spring Kafka Offsets

 

Offset Option

 

  • enable.auto.commit (default=true) : 특정 주기 마다 자동으로 commit 하는 설정
  • auto.commit.interval.ms (default=5000ms)
    • kafka 로 부터 메세지를 읽어 올때 이 주기와 맞으면 offset 정보를 commit 한다.
  • auto.offset.reset
    • Consumer 의 offset commit 정보가 존재 않거나 해당 offset 이 유효하지 않을 경우
      • latest : 가장 새로운(마지막) offset부터 (Default)
      • earliest : 가장 오래된(처음) offset부터
      • none : 해당 consumer group이 가져가고자 하는 topic의 consumer offset정보가 없으면 exception을 발생시킴(seek 명령어로 명시적으로 offset 재지정)

발생할 수 있는 장애 상황

1. auto commit 일 경우 장애 발생 타이밍에 따라 commit 된 메세지 처리가 완료 되지 않거나(메시지 처리 누락)

2. 메세지 처리가 완료 되었지만 offset commit 이 이루어 지지 않은 경우가 발생 할 수 있음(동일 메세지 중복 처리)

 

 

참고) Offsets and Consumer Position

Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There are actually two notions of position relevant to the user of the consumer:

The position of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every

time the consumer receives messages in a call to poll(Duration).

The committed position is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync).

 


출처

- https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html

- https://kafka.apache.org/documentation/

- https://stackoverflow.com/questions/42564920/kafka-producer-config-retry-strategy

- [Kafka Consumer 신뢰성] : https://medium.com/@ajmalbabu/kafka-0-9-0-clients-db1f43257d30

- [Kafka Offset] : https://skywingzz.github.io/kafka/kafka-offset/

- [Kafka Consumer] https://d2.naver.com/helloworld/0974525

- [Kafka Producer] https://d2.naver.com/helloworld/6560422

'빅데이터 > Kafka Cluster' 카테고리의 다른 글

Kafka restart과 retention  (0) 2022.02.28
Kafka 스트레스 테스트  (0) 2022.02.28
Kafka 자주 사용하는 명령  (0) 2022.02.28
Kafka Overview  (0) 2022.02.28
Kafka cluster failover  (0) 2022.02.28

+ Recent posts