producer.send() does not reconnect to broker when receiving an ETIMEDOUT error
See original GitHub issueDescribe the bug A lambda I’m running will occasionally receive an ETIMEDOUT error when producing messages to Kafka. When this happens, kafkajs disconnects from the broker and never reconnects, but will continue to retry sending the messages.
To Reproduce
Unfortunately I cannot link to sample code or give you a way to reproduce this reliably (maybe someone reading this can guide me toward a way of forcing an ETIMEDOUT error?), but I will explain our setup with as much code as I can. We’re not doing anything too strange here, just setting it up and producing messages.
We configure the kafka object with the following:
{
clientId: "example",
brokers: ["bootstrapbroker.cloud:9092"],
enforceRequestTimeout: true,
requestTimeout: 2000,
ssl: true,
sasl: {
mechanism: "plain",
username: "username",
password: "password"
}
}
and then the producer, nothing wild here:
const producer = kafka.producer({createPartitioner: Partitioners.JavaCompatiblePartitioner});
await producer.connect();
await producer.send({topic: "topic", messages: [...array of messages...]});
Expected behavior
I expect this to disconnect from the broker, try to reconnect to the broker, and if it successfully does that, start retrying the messages.
Observed behavior
tl;dr the timeout error happens, kafkajs disconnects from the broker, messages are retried without reconnecting to the broker.
This code is running in an aws lambda and happily chugs along doing what it should for awhile, then we receive an ETIMEDOUT error while producing. Log looks like the following:
{
"level": "ERROR",
"timestamp": "2020-10-11T19:19:51.698Z",
"logger": "kafkajs",
"message": "[Connection] Connection error: write ETIMEDOUT",
"broker": "broker.cloud:9092",
"clientId": "example",
"stack": "Error: write ETIMEDOUT\n at WriteWrap.onWriteComplete [as oncomplete] (internal/stream_base_commons.js:92:16)\n at handleWriteReq (internal/stream_base_commons.js:51:26)\n at writeGeneric (internal/stream_base_commons.js:143:15)\n at TLSSocket.Socket._writeGeneric (net.js:786:11)\n at TLSSocket.Socket._write (net.js:798:8)\n at doWrite (_stream_writable.js:403:12)\n at writeOrBuffer (_stream_writable.js:387:5)\n at TLSSocket.Writable.write (_stream_writable.js:318:11)\n at Object.sendRequest (/var/task/node_modules/kafkajs/src/network/connection.js:312:27)\n at SocketRequest.send [as sendRequest] (/var/task/node_modules/kafkajs/src/network/requestQueue/index.js:135:23)"
}
after this, with debug logging turned on, we see the following logs:
[Connection] disconnecting...[RequestQueue] Waiting for pending requests[Connection] Kafka server has closed connection[Connection] disconnecting...[Connection] disconnected[Connection] Request Metadata(key: 3, version: 5)[Connection] Request Metadata(key: 3, version: 5)[Connection] Request Metadata(key: 3, version: 5)
(presumably the 3 last lines are one request to each of the 3 brokers) and then the lambda times out. EDIT: the 3 last lines are actually just 3 retries to 1 broker, and they happen a few seconds apart from each other
Environment:
- OS: AWS Lambda, nodejs12.x runtime
- KafkaJS version 1.12.0
- Kafka version 2.5
- NodeJS version v12
Additional context I have dug around in the code quite a bit trying to understand what’s going on here and these are my findings, unclear whether they’re actually useful, but here they are.
There are retriers at several levels of the stack, I found them in connection.js (although this one seems to be unused?), cluster/index.js, messageProducer.js, producer/index.js and sendMessages.js. The retrier in messageProducer.js is not created with createRetry and includes code to actually reconnect to the broker if it’s disconnected (great!), but afaict the retrier in sendMessages.js catches this error first and retries the messages without attempting a reconnect to the cluster. Honestly though, any of the retries further down the stack other than the one in sendMessages.js could be what’s catching it, they all use the generic retrier from createRetry.
Also, in my attempts to work around what I saw as the problem, I added a disconnect listener to my code that listens for disconnects and tries to reinitiate the connection, but the DISCONNECT event never seems to fire (based on logging to producer.logger().info() I set up in the listener) when the ETIMEDOUT error happens, so the listener does not get a chance to do its job. I have tests in my codebase to trigger the DISCONNECT event and this code does what I’d expect it to in those tests. Code for the listener follows:
private listenForEvents() {
if (this._producer) {
const { DISCONNECT } = this._producer.events;
this._producer.on(DISCONNECT, async (disconnectEvent: DisconnectEvent) => {
this._producer.logger().info("Received a disconnect event", disconnectEvent);
// force a reconnect if we didn't explicitly call disconnect
if (!this.explicitlyCalledDisconnect) {
this._producer.logger().info("Disconnect event received without explicit disconnect call, reconnecting...");
await this.connect();
}
});
}
}
Issue Analytics
- State:
- Created 3 years ago
- Comments:5
Top Related StackOverflow Question
Hello,
I’ve made an attempt to reproduce the issue using docker, see below:
Test description
Environment:
KafkaJS version 1.15.0 Kafka version 2.8 (Confluent Platform 6.2.0) NodeJS version from
node:lts-alpineimageHow to run
Just run the script
start-repro-timeout.shWhat the script does
It starts a zookeeper + 3 brokers + control-center
The producer code is very simple.
Config used is:
It allows only one pending request in order to make troubleshooting easier.
I’m blocking IP address (using iptables) corresponding to kafkaJS client container in
broker1container, so KafkaJS producer does not receive any response back from broker1let the test run 5 minutes
Unblocking IP address $ip corresponding to kafkaJS client
let the test run 5 minutes
Results
Test with 10 minutes connection error
Traffic is blocked at
11:32:33:30 seconds later request timeout:
Then we see a
Connection error: read ETIMEDOUT:That seems to trigger a disconnect:
I do not see a re-connect, but requests are retried:
Retries:
etc…
Note: Request metadata happening on broker3, broker1 is seen as ok as expected (because connection issue is only happening from broker1 to kakfaJS client):
At
11:42:33the iptables rule is removed:We see a disconnection, probably because broker disconnected for good the client (due to
connections.max.idle.mswhich is 10 minutes by default). This time we haveKafka server has closed connectionfollowed byConnecting:Full logs are here
Test with 5 minutes connection error
I re-ran a test with 5 minutes of iptables instead of 10 minutes (to avoid the disconnection from the broker due to
connections.max.idle.ms)Traffic was blocked at
12:50:45:Around 60 seconds later we see disconnection:
When traffic is back at
12:55:45:We see a retry right after:
After 21 seconds (not sure why??), we see accumulated responses (blocked by iptables)
Followed by request response:
So even if there was a disconnection, it seems that kafkaJS is able to send request again when connection is back ?
Full logs are here
Can you share a link to what you’re referring to? I would have expected this to come via the socket
timeoutevent, which we listen to and throw aKafkaJSConnectionErrorin response to, but I guess that’s not the case.