producer.connect() - KafkaJSError: The producer is disconnected

See original GitHub issue

Describe the bug Somewhere is problem in 1.14.0 version. I tried to connect and then produce message, but sometimes I got error:

(node:78713) UnhandledPromiseRejectionWarning: KafkaJSError: The producer is disconnected
    at validateConnectionStatus (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:30:15)
    at sendBatch (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:81:5)
    at Object.send (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:153:12)
    at EventHubClient.produce (/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25)
    at processTicksAndRejections (internal/process/next_tick.js:81:5)

Also producer.event.CONNECTED is not emitted.

I tried to produce 2 messages at once and to log KafkaJS connectionStatus into console and result is:

2020-10-05T16:29:20.854Z level=info label=eventhub-client message="Registered handler for all events."
CONNECTING.................  (Output from producer.connect method )
CONNECTING.................
CONNECTED.................
2020-10-05T16:29:20.879Z level=info label=eventhub-client message="Producing 1 message(s)=[{'key':'partition-key1','value':'{\'id\':\'739ec863-be56-43b1-99bd-f4ef2184081c\',\'time\':\'2020-10-05T16:29:20.855Z\',\'specversion\':\'0.2\',\'type\':\'event1\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar1\'}}','headers':{}}] test"
STATUS....................... disconnected (Output from validateConnectionStatus in sentBatch method )
(node:78713) UnhandledPromiseRejectionWarning: KafkaJSError: The producer is disconnected
    at validateConnectionStatus (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:30:15)
    at sendBatch (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:81:5)
    at Object.send (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:153:12)
    at EventHubClient.produce (/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25)
    at processTicksAndRejections (internal/process/next_tick.js:81:5)
(node:78713) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 2)
(node:78713) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
CONNECTED.................
2020-10-05T16:29:20.882Z level=info label=eventhub-client message="Producing 2 message(s)=[{'key':'partition-key2','value':'{\'id\':\'996b7604-e715-4a31-97ce-d058abac721d\',\'time\':\'2020-10-05T16:29:20.858Z\',\'specversion\':\'0.2\',\'type\':\'event2\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar2\'}}','headers':{}},{'key':'partition-key3','value':'{\'id\':\'f1fcc9e2-1905-417d-a54e-b624e5b299b6\',\'time\':\'2020-10-05T16:29:20.858Z\',\'specversion\':\'0.2\',\'type\':\'event3\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar3\'}}','headers':{}}] test2"
STATUS....................... connected

Code sample:

if (!this.producerConnected) {
      this.producer = this.kafka.producer();
      try {
        await this.producer.connect();
        this.producerConnected = true;
      }catch(error){
        throw new Error(`Couldn't connect producer.`);
      }
    }
await this.producer.send({ topic, messages });

To Reproduce Please provide either a link to a:

  1. failing test in a KafkaJS fork
  2. repository with an example project reproducing the issue

If none of the above are possible to provide, please write down the exact steps to reproduce the behavior:

  1. Run a producer that continuously produces messages to a topic
  2. Run a consumer that subscribes to that topic and logs each topic offset
  3. After the consumer has consumed 100 messages, it…

Expected behavior resolve connect then produce messages

Observed behavior Even when connection is resolved, producing first message failed and connectionStatus is not updated, other messages are produced.

Environment:

  • OS: Mac OS 18.7.0
  • KafkaJS version 1.14.0
  • Kafka version 5.1.2
  • NodeJS version 11.10.0

Additional context Add any other context about the problem here.

Issue Analytics

  • State:open
  • Created 3 years ago
  • Reactions:9
  • Comments:20 (4 by maintainers)

github_iconTop GitHub Comments

5reactions
Nevoncommented, Oct 8, 2020

You should just connect at startup and disconnect at shutdown.

There seems to be a bug introduced with the latest version related to reconnects, but regardless, nothing is changing about the intended usage.

2reactions
superLincolncommented, Jul 7, 2022

It is probably because you submit message without connection built at the beginning , it happened in parallel coding but no problem in serial coding. So just make sure there is a success connection initialized before sending msg. Plus, You can use debug logging to check the disconnected connection information when sending messages in parallel which causes this issue. Good luck

On Thu, Jul 7, 2022 at 04:46 Manish Sharma @.***> wrote:

Hi all, I have the same issue, Is there any resolution for this?

— Reply to this email directly, view it on GitHub https://github.com/tulios/kafkajs/issues/907#issuecomment-1176717045, or unsubscribe https://github.com/notifications/unsubscribe-auth/ARI74VPWCRE3MM62YW2L36TVSXWB3ANCNFSM4SE6BQCA . You are receiving this because you commented.Message ID: @.***>

Read more comments on GitHub >

github_iconTop Results From Across the Web

KafkaJS producer is disconnected only for transactions
As per this issue, you need the producer to connect after creation. await producer.connect() const transaction = await producer.transaction ...
Read more >
Producer
Producer. The following example assumes that you are using the local Kafka ... const run = async () => { await producer.connect() setInterval(sendMessage, ......
Read more >
kafkajs.disconnect JavaScript and Node.js code examples
Disconnects from the broker and unsubscribes from the topics * * @returns a Promise that resolves if the connection is disconnected successfully */...
Read more >
kafkajs
Producer ; Consumer groups with pause, resume, and seek ... Producing await producer.connect() await producer.send({ topic: 'test-topic', ...
Read more >
kafkajs
const producer = kafka.producer({ allowAutoTopicCreation: false, ... The Consumer disconnect() method (and the stop() method it calls) swallows errors, ...
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found