Help: KafkaConsumer hangs forever when retrieve messages

See original GitHub issue

I wrote some python codes to retrieve Kafka messages from brokers. Below is my code.

def read_message(topic ,partition, broker, from_offset, until_offset):
    clientName = "Client_" + str(topic) + "_" + str(partition)
    consumer = KafkaConsumer(bootstrap_servers=broker, client_id=clientName)
    topicPartition = TopicPartition(topic,partition)
    consumer.assign([topicPartition])
    consumer.seek(topicPartition, from_offset)
    result_list = []
    for msg in consumer:
        if msg.offset >= until_offset:
            break
        else:
            result_list.append(msg)
    return result_list

“broker” here is the lead broker that I find in other method. “partition” is the partition that messages should be in. “from_offset” and “until_offset” is the offset range that I want to retrieve messages in.

However, after “result_list” is created, it hangs forever. This function used to work properly. But sometimes it hangs. I don’t know what is the problem. It works in most cases. However, it hangs sometimes.

Issue Analytics

  • State:closed
  • Created 6 years ago
  • Reactions:1
  • Comments:7 (2 by maintainers)

github_iconTop GitHub Comments

9reactions
M0bil3Rulzcommented, Sep 28, 2018

Looks like you have to set a timeout:

consumer_timeout_ms (int) – number of milliseconds to block during message iteration before raising StopIteration (i.e., ending the iterator). Default block forever [float(‘inf’)].

So lets assume you have nothing on your topic currently, when you start your KafkaConsumer, it will sit there and wait for messages to come in (via a next iterator). Once messages comes in, your consumer will process those messages and then continue to wait.

Setting the consumer_timeout_ms to some value (like 10 seconds or so), will cause the KafkaConsumer to stop processing after X amount of time without receiving a new message. Not sure if that was the intent or not, but that works for me.

consumer = KafkaConsumer( args.topic, bootstrap_servers=args.bootstrap_servers.split(), auto_offset_reset=args.auto_offset_reset, enable_auto_commit=args.enable_auto_commit, group_id=args.group_id, value_deserializer=lambda x: x.decode('utf-8'), consumer_timeout_ms=10000)

(args is just grabbing info from python command line)

3reactions
dpkpcommented, Mar 9, 2018

try adding this:

import logging
logging.basicConfig(level=logging.DEBUG)
Read more comments on GitHub >

github_iconTop Results From Across the Web

Standard Kafka consumer hangs and does not output messages
The standard Kafka consumer ( kafka-console-consumer.sh ) is unable to receive messages and hangs without producing any output.
Read more >
Why consumer hangs while consuming messages from ...
First I ran Producer on the cluster to send messages to topic1 . However when I ran Consumer , it couldn't receive anything,...
Read more >
Documentation - Apache Kafka
Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log. The messages in the partitions are each...
Read more >
KafkaConsumer — kafka-python 2.0.2-dev documentation
If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. Default: 1048576. request_timeout_ms (int) –...
Read more >
kafka consumer stuck if brokers go down : r/rust
Eventually, this is what I ended up doing: wait for a bit, if no new message then check if the brokers are reachable...
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found