Co zrobić, jeśli konsument Kafki przetwarza komunikat zbyt długo? Czy Kafka ponownie transmitować ten rozdział innego konsumenta, a komunikat będzie obsługiwane dwa razy?
Tak, to prawda. Jeśli konsumenta Kafka zajmie zbyt wiele czasu na przetwarzanie wiadomości, a kolejne badanie() będzie opóźniony, Kafka ponownie wyznaczy ten rozdział innego konsumenta, a wiadomość zostanie przetworzony ponownie (i znowu).
Dla większej przejrzystości, najpierw musimy zdecydować i określić " Jak długo to trwa zbyt długo?".
To jest definiowane w max.poll.interval.ms
. Z dokumentów,
Maksymalne opóźnienie między połączeniami ankiety() przy użyciu sterowania grupami konsumentów. To nakłada górną granicę na ilość czasu, w ciągu którego konsument może bezczynności, zanim coraz więcej wpisów. Jeśli funkcja ankiety() nie zostanie wywołana przed upływem tego limitu czasu, to użytkownik jest сбойным, i grupa wykona korzystanie z nowych broni, aby przypisać sekcje innemu uczestnikowi.
Grupa konsumentów балансируется ponownie, jeśli w ciągu tego czasu nie ma połączeń do ankiety ().
Ma jeszcze jedną właściwość auto.commit.interval.ms
. Kwestia przesunięcia automatycznego zatwierdzania zostanie wywołana tylko w czasie odpytywania - sprawdza, czy więcej czasu przeszłego, niż określony czas interwał automatycznego zatwierdzania, i jeśli wynik "tak", przesunięcie odnotowano.
Jeśli konsument Kafka zbyt długo przetwarza rekordy, to kolejne wyzwanie poll() również jest opóźniony, i przesunięcia zwrócone w ostatnim badaniu () nie są rejestrowane. Jeśli w tym czasie nastąpi перебалансировка, nowy klient-konsument, przypisany do tej sekcji, ponownie rozpocznie przetwarzanie wiadomości.
Zmiany bilansu grupy konsumentów i późniejszego przenoszenia partycji można uniknąć, zwiększając tę wartość. To zwiększy dopuszczalny odstęp między ankietami i da więcej czasu konsumentom do przetwarzania rekordów zwracanych z ankiety (). Konsumenci dołączą do перебалансировке tylko wewnątrz połączenia do ankiety, w związku z tym wzrost maksymalny interwał sondowania również opóźni grupowe korzystanie z nowych broni.
Jest jeszcze jeden problem w zwiększeniu maksymalnego odstępu ankiety do wielkiego znaczenia. Jeśli konsument umiera z jakiegokolwiek innego powodu, to zajmuje więcej czasu, niż jest ustawione max.poll.interval.ms
interwał dla wykrywania awarii.
session.timeout.ms
i heartbeat.interval.ms
dostępne są w tym przypadku jest, aby jak najszybciej wykryć pełna zawiesza.
Aby uzyskać więcej informacji na temat tych ustawień:
Proszę zwrócić uwagę, że wartości, które są skonfigurowane dla session.timeout.ms
musi mieścić się w dopuszczalnym zakresie, określonym w konfiguracji brokera właściwości
- group.min.session.timeout.ms
- group.max.session.timeout.ms
W przeciwnym razie po uruchomieniu klienta-konsumenta pojawi się następujący wyjątek.
Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)
Aktualizacja: Aby uniknąć ponownego przetwarzania wiadomości
W klasie KafkaConsumer jest jeszcze jedna metoda commitAsync()
aby uruchomić operację przesunięcia zatwierdzenia.
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();
Aby uzyskać więcej informacji na temat commitSync() i commitAsync(), proszę sprawdzić ten wątek
Blokada przesunięcia ręcznie - to działanie, co oznacza, że przesunięcie było traktowane, aby Kafka więcej nie wysłałem zatwierdzone wpisów dla jednego i tego samego klucza. Gdy przesunięcia są rejestrowane ręcznie, ważne jest, aby pamiętać, że jeżeli konsument, z jakiegokolwiek powodu umiera do przetwarzania rekordów, istnieje prawdopodobieństwo, że te wpisy nie będą rozpatrywane ponownie.