Co zrobić, jeśli konsument Kafki przetwarza komunikat zbyt długo? Czy Kafka ponownie transmitować ten rozdział innego konsumenta, a komunikat będzie obsługiwane dwa razy?

0

Pytanie

Zakładać Kafka, 1 partition, 2 consumers.(2. konsument jest bezczynny)

Załóżmy, że 1-szy z nich otrzymał wiadomość, obsługuje się go za pomocą 3 innych usług i nagle podłącza się do jednej z nich i przepuszcza limit czasu Kafki.

Czy Kafka ponownie wyznaczyć sekcja 2-mu konsumenta, a komunikat będzie obsługiwane dwa razy (załóżmy, że 1-szy w końcu zakończy się powodzeniem)?

1

Najlepsza odpowiedź

1

Co zrobić, jeśli konsument Kafki przetwarza komunikat zbyt długo? Czy Kafka ponownie transmitować ten rozdział innego konsumenta, a komunikat będzie obsługiwane dwa razy?

Tak, to prawda. Jeśli konsumenta Kafka zajmie zbyt wiele czasu na przetwarzanie wiadomości, a kolejne badanie() będzie opóźniony, Kafka ponownie wyznaczy ten rozdział innego konsumenta, a wiadomość zostanie przetworzony ponownie (i znowu).

Dla większej przejrzystości, najpierw musimy zdecydować i określić " Jak długo to trwa zbyt długo?".

To jest definiowane w max.poll.interval.ms. Z dokumentów,

Maksymalne opóźnienie między połączeniami ankiety() przy użyciu sterowania grupami konsumentów. To nakłada górną granicę na ilość czasu, w ciągu którego konsument może bezczynności, zanim coraz więcej wpisów. Jeśli funkcja ankiety() nie zostanie wywołana przed upływem tego limitu czasu, to użytkownik jest сбойным, i grupa wykona korzystanie z nowych broni, aby przypisać sekcje innemu uczestnikowi.

Grupa konsumentów балансируется ponownie, jeśli w ciągu tego czasu nie ma połączeń do ankiety ().

Ma jeszcze jedną właściwość auto.commit.interval.ms. Kwestia przesunięcia automatycznego zatwierdzania zostanie wywołana tylko w czasie odpytywania - sprawdza, czy więcej czasu przeszłego, niż określony czas interwał automatycznego zatwierdzania, i jeśli wynik "tak", przesunięcie odnotowano.

Jeśli konsument Kafka zbyt długo przetwarza rekordy, to kolejne wyzwanie poll() również jest opóźniony, i przesunięcia zwrócone w ostatnim badaniu () nie są rejestrowane. Jeśli w tym czasie nastąpi перебалансировка, nowy klient-konsument, przypisany do tej sekcji, ponownie rozpocznie przetwarzanie wiadomości.

Zmiany bilansu grupy konsumentów i późniejszego przenoszenia partycji można uniknąć, zwiększając tę wartość. To zwiększy dopuszczalny odstęp między ankietami i da więcej czasu konsumentom do przetwarzania rekordów zwracanych z ankiety (). Konsumenci dołączą do перебалансировке tylko wewnątrz połączenia do ankiety, w związku z tym wzrost maksymalny interwał sondowania również opóźni grupowe korzystanie z nowych broni.

Jest jeszcze jeden problem w zwiększeniu maksymalnego odstępu ankiety do wielkiego znaczenia. Jeśli konsument umiera z jakiegokolwiek innego powodu, to zajmuje więcej czasu, niż jest ustawione max.poll.interval.ms interwał dla wykrywania awarii.

session.timeout.ms i heartbeat.interval.ms dostępne są w tym przypadku jest, aby jak najszybciej wykryć pełna zawiesza.

Aby uzyskać więcej informacji na temat tych ustawień:

Proszę zwrócić uwagę, że wartości, które są skonfigurowane dla session.timeout.ms musi mieścić się w dopuszczalnym zakresie, określonym w konfiguracji brokera właściwości

  • group.min.session.timeout.ms
  • group.max.session.timeout.ms

W przeciwnym razie po uruchomieniu klienta-konsumenta pojawi się następujący wyjątek.

Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)

Aktualizacja: Aby uniknąć ponownego przetwarzania wiadomości

W klasie KafkaConsumer jest jeszcze jedna metoda commitAsync() aby uruchomić operację przesunięcia zatwierdzenia.

ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();

Aby uzyskać więcej informacji na temat commitSync() i commitAsync(), proszę sprawdzić ten wątek

Blokada przesunięcia ręcznie - to działanie, co oznacza, że przesunięcie było traktowane, aby Kafka więcej nie wysłałem zatwierdzone wpisów dla jednego i tego samego klucza. Gdy przesunięcia są rejestrowane ręcznie, ważne jest, aby pamiętać, że jeżeli konsument, z jakiegokolwiek powodu umiera do przetwarzania rekordów, istnieje prawdopodobieństwo, że te wpisy nie będą rozpatrywane ponownie.

2021-11-25 07:04:25

Dziękuję, wszystko jasne. Czy są jakieś sposoby, aby uniknąć ponownego nawrócenia?
J.J. Beam

@J. J. Beam zaktualizował odpowiedź z linkami i wzorem
arunkvelu

W innych językach

Ta strona jest w innych językach

Русский
..................................................................................................................
Italiano
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................