wake-up-neo.net

Problem beim Rebalancing beim Lesen von Nachrichten in Kafka

Ich versuche, Nachrichten zum Thema Kafka zu lesen, kann es aber nicht lesen. Der Prozess wird nach einiger Zeit abgebrochen, ohne dass Nachrichten gelesen werden.

Hier ist der Rebalancing-Fehler, den ich bekomme:

[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer:  (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
    at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
    at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
    at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages

Ich habe versucht, ConsumerOffsetChecker auszuführen, und dies ist der Fehler, den ich bekomme. Ich habe überhaupt keine Ahnung, wie ich das lösen kann. Jemand, eine Idee?

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group  topic_group
Group           Topic                          Pid Offset          logSize         Lag             Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.Apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.Java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.Java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.Java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.Java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
        at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
Caused by: org.Apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.Apache.zookeeper.KeeperException.create(KeeperException.Java:102)
        at org.Apache.zookeeper.KeeperException.create(KeeperException.Java:42)
        at org.Apache.zookeeper.ZooKeeper.getData(ZooKeeper.Java:927)
        at org.Apache.zookeeper.ZooKeeper.getData(ZooKeeper.Java:956)
        at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.Java:103)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.Java:770)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.Java:766)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.Java:675)
        ... 16 more
16
divinedragon

Ich habe in letzter Zeit ähnliche Probleme. Sie können versuchen, die Verbraucherkonfigurationen rebalance.backoff.ms und zookeeper.session.timeout.ms auf etwa 5-10 Sekunden zu erhöhen.

Der erste Parameter sagt dem kafka, dass er mehr warten muss, bevor er rebalance erneut versucht. Der zweite Parameter sagt dem kafka, dass er geduldiger sein soll, während er versucht, sich mit dem zookeeper zu verbinden.

Weitere Konfigurationsmöglichkeiten finden Sie in der offiziellen Dokumentation .

13
Minh-Triet LÊ

Dies bedeutet wahrscheinlich, dass die Broker diese Knoten nicht korrekt erstellt haben, als sie sich mit Zookeeper verbunden haben. Der Pfad/consumer sollte vorhanden sein, wenn Sie versuchen, zu konsumieren. 

Hier einige Wege zum Debuggen:

Hast du irgendwelche Themen erstellt? 

Wenn ja:

  1. Wie viele Partitionen enthält das Thema?
  2. Haben Sie überprüft, ob die Metadaten des Themas im Zoopeeper korrekt ausgefüllt wurden?
  3. Können wir Ihre Verbraucherkonfiguration sehen?

Wenn nicht:

  1. Dann müssen Sie ein Thema mit dem Skript $KAFKA_DIR/bin/kafka-create-topic.sh erstellen. In dem Skript finden Sie Details zur Verwendung.
  2. Nachdem Sie ein Thema erstellt haben, müssen Sie einen Consumer mit einer zuvor noch nicht verwendeten Gruppen-ID erstellen.
2
laughing_man

In kafka.tools.ConsumerOffsetChecker ist ein Fehler aufgetreten. Wenn ein bestimmter Zookeeper-Knoten, der Informationen über den verbrauchten Versatz enthält, nicht beendet wird, wird das Programm mit der Ausführung beendet.

Angenommen, Sie haben eine Verbrauchergruppe "mygroup" und ein Thema "topictest". Dann wird der Versatz für Partition 2 in Znode beibehalten: /Consumer/Mygroup/Offsets/Topictest/2. 

Wenn für Partition 2 des Themas topictest kein Eintrag in Znode vorhanden ist, wird das Consumer-Offsetchecker-Tool beendet, während der Offset für Partition 2 geprüft wird .. _ Im Allgemeinen schlägt es fehl, wenn die erste Partition "n" überprüft wird, für die der Znode mygroup/offsets/topictest/n fehlt bei Zookeeper.

1
Nipun Talukdar

Ein weiteres Problem könnte aufgrund von Jar-Konflikten bestehen. Wenn Sie dasselbe Glas mit unterschiedlichen Versionen im Bibliotheksordner gespeichert haben. Dieses Problem kann auftreten. _. Gläser wie Scala-Library, zkclient, zookeeper, kafka-client sollten nicht mit verschiedenen Versionen dupliziert werden.

0
charan teja

ihre Broker sind wahrscheinlich offline und können keine Verbindung zu Zookeeper herstellen. Haben Sie versucht, das Konsolen-Konsumentenskript auszuführen, das im Pfad $KAFKA_ROOT_DIR/bin verfügbar ist, um zu überprüfen, ob Sie von einem bestimmten Thema aus konsumieren können.

0
user2720864