Help: KafkaConsumer hangs forever when retrieve messages
See original GitHub issueI wrote some python codes to retrieve Kafka messages from brokers. Below is my code.
def read_message(topic ,partition, broker, from_offset, until_offset):
clientName = "Client_" + str(topic) + "_" + str(partition)
consumer = KafkaConsumer(bootstrap_servers=broker, client_id=clientName)
topicPartition = TopicPartition(topic,partition)
consumer.assign([topicPartition])
consumer.seek(topicPartition, from_offset)
result_list = []
for msg in consumer:
if msg.offset >= until_offset:
break
else:
result_list.append(msg)
return result_list
“broker” here is the lead broker that I find in other method. “partition” is the partition that messages should be in. “from_offset” and “until_offset” is the offset range that I want to retrieve messages in.
However, after “result_list” is created, it hangs forever. This function used to work properly. But sometimes it hangs. I don’t know what is the problem. It works in most cases. However, it hangs sometimes.
Issue Analytics
- State:
- Created 6 years ago
- Reactions:1
- Comments:7 (2 by maintainers)
Top Results From Across the Web
Standard Kafka consumer hangs and does not output messages
The standard Kafka consumer ( kafka-console-consumer.sh ) is unable to receive messages and hangs without producing any output.
Read more >Why consumer hangs while consuming messages from ...
First I ran Producer on the cluster to send messages to topic1 . However when I ran Consumer , it couldn't receive anything,...
Read more >Documentation - Apache Kafka
Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log. The messages in the partitions are each...
Read more >KafkaConsumer — kafka-python 2.0.2-dev documentation
If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. Default: 1048576. request_timeout_ms (int) –...
Read more >kafka consumer stuck if brokers go down : r/rust
Eventually, this is what I ended up doing: wait for a bit, if no new message then check if the brokers are reachable...
Read more >
Top Related Medium Post
No results found
Top Related StackOverflow Question
No results found
Troubleshoot Live Code
Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free
Top Related Reddit Thread
No results found
Top Related Hackernoon Post
No results found
Top Related Tweet
No results found
Top Related Dev.to Post
No results found
Top Related Hashnode Post
No results found
Looks like you have to set a timeout:
consumer_timeout_ms (int) – number of milliseconds to block during message iteration before raising StopIteration (i.e., ending the iterator). Default block forever [float(‘inf’)].So lets assume you have nothing on your topic currently, when you start your KafkaConsumer, it will sit there and wait for messages to come in (via a next iterator). Once messages comes in, your consumer will process those messages and then continue to wait.
Setting the consumer_timeout_ms to some value (like 10 seconds or so), will cause the KafkaConsumer to stop processing after X amount of time without receiving a new message. Not sure if that was the intent or not, but that works for me.
consumer = KafkaConsumer( args.topic, bootstrap_servers=args.bootstrap_servers.split(), auto_offset_reset=args.auto_offset_reset, enable_auto_commit=args.enable_auto_commit, group_id=args.group_id, value_deserializer=lambda x: x.decode('utf-8'), consumer_timeout_ms=10000)(args is just grabbing info from python command line)
try adding this: