Kafkajs disconnect itself and do not attempt to reconnect
See original GitHub issueHello, first of all maybe I just did not understand something properly and has wrong expectations, but I was expecting that kafkajs will do autoreconnect / autodiscovery of new brokers but instead it will just stop consuming without emmiting crash event or something.
Here are logs from our server, I removed repeating cycle of heartbeat and fetching:
2020-03-11T09:18:05.612Z,Kafka server has closed connection
2020-03-11T09:18:05.612Z,disconnected
2020-03-11T09:18:05.612Z,disconnecting...
2020-03-11T09:17:00.552Z,Kafka server has closed connection
2020-03-11T09:17:00.552Z,disconnected
2020-03-11T09:17:00.552Z,disconnecting...
2020-03-11T09:08:05.612Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:08:05.612Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:08:05.611Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:08:05.611Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:08:05.610Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:08:00.608Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:08:00.606Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:08:00.605Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:08:00.604Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:08:00.604Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:07:55.602Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:07:55.602Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:07:55.602Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:55.601Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:55.601Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:07:50.599Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:07:50.598Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:07:50.598Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:50.595Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:50.595Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:07:45.593Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:07:45.593Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:07:45.592Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:45.591Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:45.590Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:07:40.588Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:07:40.587Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:07:40.587Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:40.586Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:40.584Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:07:35.582Z,"Request Fetch(key: 1, version: 7)"
...
...
2020-03-11T09:02:09.441Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:02:09.440Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:02:09.440Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:02:09.439Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:02:09.438Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:02:04.436Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:02:04.436Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:02:04.435Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:02:04.434Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:02:04.430Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:01:59.427Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:01:59.419Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:01:59.419Z,"Response ListOffsets(key: 2, version: 2)"
2020-03-11T09:01:59.417Z,"Request ListOffsets(key: 2, version: 2)"
2020-03-11T09:01:59.415Z,Verified support for SaslAuthenticate
2020-03-11T09:01:59.398Z,Connecting
2020-03-11T09:01:59.397Z,"Response OffsetFetch(key: 9, version: 3)"
2020-03-11T09:01:59.396Z,"Request OffsetFetch(key: 9, version: 3)"
2020-03-11T09:01:59.393Z,Consumer has joined the group
2020-03-11T09:01:59.392Z,Received assignment
2020-03-11T09:01:59.392Z,"Response SyncGroup(key: 14, version: 1)"
2020-03-11T09:01:59.391Z,"Request SyncGroup(key: 14, version: 1)"
2020-03-11T09:01:59.389Z,Group assignment
2020-03-11T09:01:59.388Z,"Response Metadata(key: 3, version: 5)"
2020-03-11T09:01:59.387Z,"Request Metadata(key: 3, version: 5)"
2020-03-11T09:01:59.386Z,Chosen as group leader
2020-03-11T09:01:59.385Z,"Response JoinGroup(key: 11, version: 2)"
2020-03-11T09:01:59.383Z,"Request JoinGroup(key: 11, version: 2)"
2020-03-11T09:01:59.381Z,Found group coordinator
2020-03-11T09:01:59.381Z,"Response GroupCoordinator(key: 10, version: 1)"
2020-03-11T09:01:59.379Z,"Request GroupCoordinator(key: 10, version: 1)"
2020-03-11T09:01:59.378Z,Verified support for SaslAuthenticate
2020-03-11T09:01:59.358Z,Connecting
2020-03-11T09:01:59.357Z,Starting
2020-03-11T09:01:59.356Z,"Response Metadata(key: 3, version: 5)"
2020-03-11T09:01:59.354Z,"Request Metadata(key: 3, version: 5)"
2020-03-11T09:01:59.351Z,Verified support for SaslAuthenticate
2020-03-11T09:01:59.351Z,"Response ApiVersions(key: 18, version: 2)"
2020-03-11T09:01:59.349Z,"Request ApiVersions(key: 18, version: 2)"
2020-03-11T09:01:59.292Z,Connecting
So we have 1 consumers which is consuming messages from 1 topic only and we have 3 brokers. The consumer stops consuming when we kill one of the brokers. The same scenario is captured in the logs.
We have brokers:
- kafka-1
- kafka-2
- kafka-3
Some notes:
- all heartbeats are done to the kafka-2 broker
- all fetch are done to the kafka-1 broker
- kafka-1 is KILLED
- disconnects in logs are for kafka-2 and kafka-3
- last Request Fetch do not receive response
- after last fetch there is 10 minutes silence and then there are disconnects
- we are using confluent kafka
- group coordinator is kafka-2
I would appreciate if you could help me to resolve this. Thank you
Edit This is my configuration in code:
this.kafka = new Kafka({
clientId: 'xyz',
brokers: options.kafkaBrokers,
ssl: true,
})
this.consumer = this.kafka.consumer({ groupId: uuidv4() })
this.consumer.on('consumer.crash', event => {
const error = event?.payload?.error
this.crashHandler(error)
})
await this.consumer.connect()
await this.consumer.subscribe({ topic: this.options.topic, fromBeginning: false })
await this.consumer.run({
autoCommit: false,
partitionsConsumedConcurrently: 5,
eachMessage: async ({ topic, partition, message }) => {
// process message
},
})
Issue Analytics
- State:
- Created 4 years ago
- Comments:11 (4 by maintainers)
Top Results From Across the Web
Client Configuration - KafkaJS
Note that the broker may be configured to reject your authentication attempt if you are not using TLS, even if the credentials themselves...
Read more >javascript - Kafka.JS refuses to connect <<[BrokerPool] Failed ...
I was trying to reproduce the same issue. But this code is working fine with brokers: ['localhost:9092'] . There is no issue with...
Read more >Why Can't I Connect to Kafka? | Troubleshoot Connectivity
This means that the producer and consumer fail because they'll be trying to connect to that—and localhost from the client container is itself,...
Read more >kafkajs - npm
KafkaJS is a modern Apache Kafka client for Node.js. ... KafkaJS has no affiliation with and is not endorsed by The Apache Software ......
Read more >Nest.js + Kafka Tutorial With KafkaJS in 15 Minutes - YouTube
In this tutorial, we integrate KafkaJS into our Nest.js project so that we can produce and consume messages to our local Kafka server....
Read more >
Top Related Medium Post
No results found
Top Related StackOverflow Question
No results found
Troubleshoot Live Code
Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free
Top Related Reddit Thread
No results found
Top Related Hackernoon Post
No results found
Top Related Tweet
No results found
Top Related Dev.to Post
No results found
Top Related Hashnode Post
No results found
@WaleedAshraf describes the expected experience. The way that Kafka clients work is that they initially connect to one broker, and then discover the rest of the cluster. In case it gets disconnected from all brokers, it will try to reconnect to the brokers that you have configured the client with, one by one, until it finds one that it can connect to, after which it will re-discover the rest of the cluster.
The only thing I could think of would be if the topic you are consuming from only has a singe partition and isn’t being replicated to any of the other brokers, so when that broker goes away there is no leader for that topic to connect to, but even then I would at the very least expect an error - not to silently fail.
Whether you do a “violent” or graceful shutdown doesn’t matter that much. The only difference would be that it could take longer before the cluster realizes that the broker is gone, but in the end it should end up in the same state eventually.
If you could create a repo with a way to reproduce the issue, we could take a look at it, but without that it’s hard to help you. The logs you are posting are also not raw KafkaJS logs, so there’s a lot of information that is missing. I don’t really see anything wrong in those. It just looks like it’s connecting, heartbeating and fetching, as expected.
I am trying to fix this now; I can reproduce the case locally. KafkaJS is doing the right thing, but taking the OS 10 minutes to realize that the connection is dead and then propagating the correct signals to nodejs and then for our code.
The latest pre-release version (
1.14.0-beta.7) does a good job of waiting for the connection to finish, but this is just a long time. I am working on a heartbeat mechanism for the connection so we can detect the issue much earlier.