await producer.connect() hangs indefinitely without retrying on receiving an ECONNRESET from kafka
See original GitHub issueDescribe the bug I have found that my producer is not retrying a broker connection when it received an ECONNRESET error from kafka. The scenario here is that i am trying to connect a producer but the kafka server does a ECONRESET on the TCP connection.
const kafka = new Kafka({
clientId: `wait-until-ready-${uuid()}`,
brokers: [`${KAFKA_HOST}:${KAFKA_PORT}`],
connectionTimeout: READY_WAIT_TIMEOUT,
logLevel: logLevel.DEBUG
});
const producer = kafka.producer();
await producer.connect();
This causes the await producer.connect() to hang indefinitely. I tracked the hang to the following place in kafakjs.
kafkajs/src/network/requestQueue/index.js
The promise resolve() is wrapped inside a function within the emitter so it is not being executed. I think if there was an event listener listening it would execute.
I changed the code to resolve like the following and the producer kept trying to reconnect as expected.
waitForPendingRequests() {
return new Promise(resolve => {
if (this.pending.length === 0 && this.inflight.size === 0) {
return resolve()
}
this.logger.debug('Waiting for pending requests', {
clientId: this.clientId,
broker: this.broker,
currentInflightRequests: this.inflight.size,
currentPendingQueueSize: this.pending.length,
})
this.once(REQUEST_QUEUE_EMPTY, () => resolve())
this.logger.debug('waitForPendingRequests end')
resolve()
})
}
In 1.12.0 the producer kept trying to connect as I expected.
To Reproduce
This happened in my CI environment where kafka responded with the ECONNRESET error. However I was able to reproduce a similar situation by starting up a node server with the following code to mimic a kafka broker resetting the connection. I then tried to connect the producer. The code below causes the server to reset the TCP connection.
var net = require("net");
var server = net.createServer(function(socket) {
logger.info("Server got message");
// socket.pipe(socket);
setTimeout(() => socket.destroy(), 0);
});
await server.listen(9092, "127.0.0.1");
Expected behavior The producer should continue to try and reconnect instead of hanging on the await producer.connect() call like it does in 1.12.0
Observed behavior The producer stopped trying to reconnect as soon as it received the ECONNRESET error from kafka. Prior to that it was trying to reconnect.
{"level":"DEBUG","timestamp":"2020-10-02T15:10:53.788Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d","ssl":false,"sasl":false}
console.log node_modules/kafkajs/src/loggers/console.js:19
{"level":"DEBUG","timestamp":"2020-10-02T15:10:53.789Z","logger":"kafkajs","message":"[Connection] Request ApiVersions(key: 18, version: 2)","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d","correlationId":9,"expectResponse":true,"size":67}
console.error node_modules/kafkajs/src/loggers/console.js:15
{"level":"ERROR","timestamp":"2020-10-02T15:10:53.792Z","logger":"kafkajs","message":"[Connection] Connection error: read ECONNRESET","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d","stack":"Error: read ECONNRESET\n at TCP.onStreamRead (internal/stream_base_commons.js:183:27)"}
console.log node_modules/kafkajs/src/loggers/console.js:19
{"level":"DEBUG","timestamp":"2020-10-02T15:10:53.792Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"127.0.0.1:9092","clientId":"wait-until-ready-fba76804-36fd-41aa-b504-cdf0dafd858d"}
console.log node_modules/kafkajs/src/loggers/console.js:19
Environment:
- OS: Mac and linux alpine on CI env
- KafkaJS version [1.14.0]
- Kafka version [2.3.1]
- NodeJS version [v12.16.1]
Additional context Add any other context about the problem here.
Issue Analytics
- State:
- Created 3 years ago
- Reactions:7
- Comments:9 (6 by maintainers)
Top Related StackOverflow Question
I believe this was fixed by #944 and #956. You can try it out with 1.15.0-beta.25 and feel free to re-open this issue if it is not solved.
I believe it could be related to https://github.com/tulios/kafkajs/issues/918