await producer.connect() hangs indefinitely without retrying on receiving an ECONNRESET from kafka

See original GitHub issue

Describe the bug I have found that my producer is not retrying a broker connection when it received an ECONNRESET error from kafka. The scenario here is that i am trying to connect a producer but the kafka server does a ECONRESET on the TCP connection.

      const kafka = new Kafka({
      clientId: `wait-until-ready-${uuid()}`,
      brokers: [`${KAFKA_HOST}:${KAFKA_PORT}`],
      connectionTimeout: READY_WAIT_TIMEOUT,
      logLevel: logLevel.DEBUG
      });
      const producer = kafka.producer();
      await producer.connect();

This causes the await producer.connect() to hang indefinitely. I tracked the hang to the following place in kafakjs. kafkajs/src/network/requestQueue/index.js

https://github.com/tulios/kafkajs/blob/02bc8a3c19504d182df131914ee9af4661bf060c/src/network/requestQueue/index.js#L222-L237

The promise resolve() is wrapped inside a function within the emitter so it is not being executed. I think if there was an event listener listening it would execute.

I changed the code to resolve like the following and the producer kept trying to reconnect as expected.

  waitForPendingRequests() {
    return new Promise(resolve => {
      if (this.pending.length === 0 && this.inflight.size === 0) {
        return resolve()
      }

      this.logger.debug('Waiting for pending requests', {
        clientId: this.clientId,
        broker: this.broker,
        currentInflightRequests: this.inflight.size,
        currentPendingQueueSize: this.pending.length,
      })

      this.once(REQUEST_QUEUE_EMPTY, () => resolve())
      this.logger.debug('waitForPendingRequests end')
      resolve()
    })
  }

In 1.12.0 the producer kept trying to connect as I expected.

To Reproduce

This happened in my CI environment where kafka responded with the ECONNRESET error. However I was able to reproduce a similar situation by starting up a node server with the following code to mimic a kafka broker resetting the connection. I then tried to connect the producer. The code below causes the server to reset the TCP connection.

    var net = require("net");

    var server = net.createServer(function(socket) {
      logger.info("Server got message");
      // socket.pipe(socket);
      setTimeout(() => socket.destroy(), 0);
    });
    await server.listen(9092, "127.0.0.1");

Expected behavior The producer should continue to try and reconnect instead of hanging on the await producer.connect() call like it does in 1.12.0

Observed behavior The producer stopped trying to reconnect as soon as it received the ECONNRESET error from kafka. Prior to that it was trying to reconnect.

    {"level":"DEBUG","timestamp":"2020-10-02T15:10:53.788Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d","ssl":false,"sasl":false}
  console.log node_modules/kafkajs/src/loggers/console.js:19
    {"level":"DEBUG","timestamp":"2020-10-02T15:10:53.789Z","logger":"kafkajs","message":"[Connection] Request ApiVersions(key: 18, version: 2)","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d","correlationId":9,"expectResponse":true,"size":67}
  console.error node_modules/kafkajs/src/loggers/console.js:15
    {"level":"ERROR","timestamp":"2020-10-02T15:10:53.792Z","logger":"kafkajs","message":"[Connection] Connection error: read ECONNRESET","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d","stack":"Error: read ECONNRESET\n    at TCP.onStreamRead (internal/stream_base_commons.js:183:27)"}
  console.log node_modules/kafkajs/src/loggers/console.js:19
    {"level":"DEBUG","timestamp":"2020-10-02T15:10:53.792Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d"}
  console.log node_modules/kafkajs/src/loggers/console.js:19

Environment:

  • OS: Mac and linux alpine on CI env
  • KafkaJS version [1.14.0]
  • Kafka version [2.3.1]
  • NodeJS version [v12.16.1]

Additional context Add any other context about the problem here.

Issue Analytics

  • State:closed
  • Created 3 years ago
  • Reactions:7
  • Comments:9 (6 by maintainers)

github_iconTop GitHub Comments

3reactions
Nevoncommented, Nov 19, 2020

I believe this was fixed by #944 and #956. You can try it out with 1.15.0-beta.25 and feel free to re-open this issue if it is not solved.

1reaction
goriunovcommented, Oct 13, 2020

I believe it could be related to https://github.com/tulios/kafkajs/issues/918

Read more comments on GitHub >

github_iconTop Results From Across the Web

How do I debug error ECONNRESET in Node.js?
"ECONNRESET" means the other side of the TCP conversation abruptly closed its end of the connection. This is most probably due to one...
Read more >
Fix list for IBM App Connect Enterprise Version 11.0
IBM App Connect Enterprise provides periodic fixes for Version 11.0. The following is a complete listing of available fixes for IBM App ...
Read more >
librdkafka - the Apache Kafka C/C++ client library - RustRepo
The transactional producer would retry (re)initializing its PID if a PRODUCER_FENCED error was returned from the broker (added in Apache Kafka ...
Read more >
Event-Driven and Asynchronous Programming with Python
pauses on sys.stdin.readline(), which asks the operating system to allow the user to input a complete line. Until one is received, our program...
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found