producer.connect() - KafkaJSError: The producer is disconnected
See original GitHub issueDescribe the bug Somewhere is problem in 1.14.0 version. I tried to connect and then produce message, but sometimes I got error:
(node:78713) UnhandledPromiseRejectionWarning: KafkaJSError: The producer is disconnected
at validateConnectionStatus (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:30:15)
at sendBatch (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:81:5)
at Object.send (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:153:12)
at EventHubClient.produce (/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25)
at processTicksAndRejections (internal/process/next_tick.js:81:5)
Also producer.event.CONNECTED is not emitted.
I tried to produce 2 messages at once and to log KafkaJS connectionStatus into console and result is:
2020-10-05T16:29:20.854Z level=info label=eventhub-client message="Registered handler for all events."
CONNECTING................. (Output from producer.connect method )
CONNECTING.................
CONNECTED.................
2020-10-05T16:29:20.879Z level=info label=eventhub-client message="Producing 1 message(s)=[{'key':'partition-key1','value':'{\'id\':\'739ec863-be56-43b1-99bd-f4ef2184081c\',\'time\':\'2020-10-05T16:29:20.855Z\',\'specversion\':\'0.2\',\'type\':\'event1\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar1\'}}','headers':{}}] test"
STATUS....................... disconnected (Output from validateConnectionStatus in sentBatch method )
(node:78713) UnhandledPromiseRejectionWarning: KafkaJSError: The producer is disconnected
at validateConnectionStatus (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:30:15)
at sendBatch (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:81:5)
at Object.send (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:153:12)
at EventHubClient.produce (/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25)
at processTicksAndRejections (internal/process/next_tick.js:81:5)
(node:78713) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 2)
(node:78713) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
CONNECTED.................
2020-10-05T16:29:20.882Z level=info label=eventhub-client message="Producing 2 message(s)=[{'key':'partition-key2','value':'{\'id\':\'996b7604-e715-4a31-97ce-d058abac721d\',\'time\':\'2020-10-05T16:29:20.858Z\',\'specversion\':\'0.2\',\'type\':\'event2\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar2\'}}','headers':{}},{'key':'partition-key3','value':'{\'id\':\'f1fcc9e2-1905-417d-a54e-b624e5b299b6\',\'time\':\'2020-10-05T16:29:20.858Z\',\'specversion\':\'0.2\',\'type\':\'event3\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar3\'}}','headers':{}}] test2"
STATUS....................... connected
Code sample:
if (!this.producerConnected) {
this.producer = this.kafka.producer();
try {
await this.producer.connect();
this.producerConnected = true;
}catch(error){
throw new Error(`Couldn't connect producer.`);
}
}
await this.producer.send({ topic, messages });
To Reproduce Please provide either a link to a:
If none of the above are possible to provide, please write down the exact steps to reproduce the behavior:
- Run a producer that continuously produces messages to a topic
- Run a consumer that subscribes to that topic and logs each topic offset
- After the consumer has consumed 100 messages, it…
Expected behavior resolve connect then produce messages
Observed behavior Even when connection is resolved, producing first message failed and connectionStatus is not updated, other messages are produced.
Environment:
- OS: Mac OS 18.7.0
- KafkaJS version 1.14.0
- Kafka version 5.1.2
- NodeJS version 11.10.0
Additional context Add any other context about the problem here.
Issue Analytics
- State:
- Created 3 years ago
- Reactions:9
- Comments:20 (4 by maintainers)
Top Results From Across the Web
KafkaJS producer is disconnected only for transactions
As per this issue, you need the producer to connect after creation. await producer.connect() const transaction = await producer.transaction ...
Read more >Producer
Producer. The following example assumes that you are using the local Kafka ... const run = async () => { await producer.connect() setInterval(sendMessage, ......
Read more >kafkajs.disconnect JavaScript and Node.js code examples
Disconnects from the broker and unsubscribes from the topics * * @returns a Promise that resolves if the connection is disconnected successfully */...
Read more >kafkajs
Producer ; Consumer groups with pause, resume, and seek ... Producing await producer.connect() await producer.send({ topic: 'test-topic', ...
Read more >kafkajs
const producer = kafka.producer({ allowAutoTopicCreation: false, ... The Consumer disconnect() method (and the stop() method it calls) swallows errors, ...
Read more >
Top Related Medium Post
No results found
Top Related StackOverflow Question
No results found
Troubleshoot Live Code
Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free
Top Related Reddit Thread
No results found
Top Related Hackernoon Post
No results found
Top Related Tweet
No results found
Top Related Dev.to Post
No results found
Top Related Hashnode Post
No results found
You should just connect at startup and disconnect at shutdown.
There seems to be a bug introduced with the latest version related to reconnects, but regardless, nothing is changing about the intended usage.
It is probably because you submit message without connection built at the beginning , it happened in parallel coding but no problem in serial coding. So just make sure there is a success connection initialized before sending msg. Plus, You can use debug logging to check the disconnected connection information when sending messages in parallel which causes this issue. Good luck
On Thu, Jul 7, 2022 at 04:46 Manish Sharma @.***> wrote: