producer.send() does not reconnect to broker when receiving an ETIMEDOUT error

See original GitHub issue

Describe the bug A lambda I’m running will occasionally receive an ETIMEDOUT error when producing messages to Kafka. When this happens, kafkajs disconnects from the broker and never reconnects, but will continue to retry sending the messages.

To Reproduce

Unfortunately I cannot link to sample code or give you a way to reproduce this reliably (maybe someone reading this can guide me toward a way of forcing an ETIMEDOUT error?), but I will explain our setup with as much code as I can. We’re not doing anything too strange here, just setting it up and producing messages.

We configure the kafka object with the following:

{
  clientId: "example",
  brokers: ["bootstrapbroker.cloud:9092"],
  enforceRequestTimeout: true,
  requestTimeout: 2000,
  ssl: true,
  sasl: {
    mechanism: "plain",
    username: "username",
    password: "password"
  }
}

and then the producer, nothing wild here:

const producer = kafka.producer({createPartitioner: Partitioners.JavaCompatiblePartitioner});
await producer.connect();
await producer.send({topic: "topic", messages: [...array of messages...]});

Expected behavior

I expect this to disconnect from the broker, try to reconnect to the broker, and if it successfully does that, start retrying the messages.

Observed behavior

tl;dr the timeout error happens, kafkajs disconnects from the broker, messages are retried without reconnecting to the broker.

This code is running in an aws lambda and happily chugs along doing what it should for awhile, then we receive an ETIMEDOUT error while producing. Log looks like the following:

{
	"level": "ERROR",
	"timestamp": "2020-10-11T19:19:51.698Z",
	"logger": "kafkajs",
	"message": "[Connection] Connection error: write ETIMEDOUT",
	"broker": "broker.cloud:9092",
	"clientId": "example",
	"stack": "Error: write ETIMEDOUT\n    at WriteWrap.onWriteComplete [as oncomplete] (internal/stream_base_commons.js:92:16)\n    at handleWriteReq (internal/stream_base_commons.js:51:26)\n    at writeGeneric (internal/stream_base_commons.js:143:15)\n    at TLSSocket.Socket._writeGeneric (net.js:786:11)\n    at TLSSocket.Socket._write (net.js:798:8)\n    at doWrite (_stream_writable.js:403:12)\n    at writeOrBuffer (_stream_writable.js:387:5)\n    at TLSSocket.Writable.write (_stream_writable.js:318:11)\n    at Object.sendRequest (/var/task/node_modules/kafkajs/src/network/connection.js:312:27)\n    at SocketRequest.send [as sendRequest] (/var/task/node_modules/kafkajs/src/network/requestQueue/index.js:135:23)"
}

after this, with debug logging turned on, we see the following logs:

  • [Connection] disconnecting...
  • [RequestQueue] Waiting for pending requests
  • [Connection] Kafka server has closed connection
  • [Connection] disconnecting...
  • [Connection] disconnected
  • [Connection] Request Metadata(key: 3, version: 5)
  • [Connection] Request Metadata(key: 3, version: 5)
  • [Connection] Request Metadata(key: 3, version: 5)

(presumably the 3 last lines are one request to each of the 3 brokers) and then the lambda times out. EDIT: the 3 last lines are actually just 3 retries to 1 broker, and they happen a few seconds apart from each other

Environment:

  • OS: AWS Lambda, nodejs12.x runtime
  • KafkaJS version 1.12.0
  • Kafka version 2.5
  • NodeJS version v12

Additional context I have dug around in the code quite a bit trying to understand what’s going on here and these are my findings, unclear whether they’re actually useful, but here they are.

There are retriers at several levels of the stack, I found them in connection.js (although this one seems to be unused?), cluster/index.js, messageProducer.js, producer/index.js and sendMessages.js. The retrier in messageProducer.js is not created with createRetry and includes code to actually reconnect to the broker if it’s disconnected (great!), but afaict the retrier in sendMessages.js catches this error first and retries the messages without attempting a reconnect to the cluster. Honestly though, any of the retries further down the stack other than the one in sendMessages.js could be what’s catching it, they all use the generic retrier from createRetry.

Also, in my attempts to work around what I saw as the problem, I added a disconnect listener to my code that listens for disconnects and tries to reinitiate the connection, but the DISCONNECT event never seems to fire (based on logging to producer.logger().info() I set up in the listener) when the ETIMEDOUT error happens, so the listener does not get a chance to do its job. I have tests in my codebase to trigger the DISCONNECT event and this code does what I’d expect it to in those tests. Code for the listener follows:

private listenForEvents() {
  if (this._producer) {
    const { DISCONNECT } = this._producer.events;
    this._producer.on(DISCONNECT, async (disconnectEvent: DisconnectEvent) => {
      this._producer.logger().info("Received a disconnect event", disconnectEvent);
      // force a reconnect if we didn't explicitly call disconnect
      if (!this.explicitlyCalledDisconnect) {
        this._producer.logger().info("Disconnect event received without explicit disconnect call, reconnecting...");
        await this.connect();
      }
    });
  }
}

Issue Analytics

  • State:open
  • Created 3 years ago
  • Comments:5

github_iconTop GitHub Comments

5reactions
vdesaboucommented, Sep 6, 2021

Hello,

I’ve made an attempt to reproduce the issue using docker, see below:

Test description

Environment:

KafkaJS version 1.15.0 Kafka version 2.8 (Confluent Platform 6.2.0) NodeJS version from node:lts-alpine image

How to run

Just run the script start-repro-timeout.sh

What the script does

It starts a zookeeper + 3 brokers + control-center

The producer code is very simple.

Config used is:

const kafka = new Kafka({
  clientId: 'my-kafkajs-producer',
  brokers: ['broker1:9092','broker2:9092','broker3:9092'],
  enforceRequestTimeout: true,
  logLevel: logLevel.DEBUG,
  acks:1,
  connectionTimeout: 20000,
})

It allows only one pending request in order to make troubleshooting easier.

I’m blocking IP address (using iptables) corresponding to kafkaJS client container in broker1 container, so KafkaJS producer does not receive any response back from broker1

ip=$(docker inspect -f '{{.Name}} - {{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' $(docker ps -aq) | grep client-kafkajs | cut -d " " -f 3)
docker exec -e ip=$ip --privileged --user root broker1 sh -c "iptables -A OUTPUT -p tcp -d $ip -j DROP"

let the test run 5 minutes

sleep 300

Unblocking IP address $ip corresponding to kafkaJS client

docker exec -e ip=$ip --privileged --user root broker1 sh -c "iptables -D OUTPUT -p tcp -d $ip -j DROP"

let the test run 5 minutes

sleep 300

Results

Test with 10 minutes connection error

Traffic is blocked at 11:32:33:

11:32:33 ℹ️ Blocking IP address 172.18.0.6 corresponding to kafkaJS client
11:32:33 ℹ️ Grepping for WARN|ERROR|Metadata|timed out

30 seconds later request timeout:

[[11:33:04.669]] [LOG]   Producer request timed out at 1630927984668 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":17,"createdAt":1630927954687,"sentAt":1630927954687,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}

Then we see a Connection error: read ETIMEDOUT:

[[11:33:29.928]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:33:29.928Z","logger":"kafkajs","message":"[Connection] Connection error: read ETIMEDOUT","broker":"broker1:9092","clientId":"my-kafkajs-producer","stack":"Error: read ETIMEDOUT\n    at TCP.onStreamRead (internal/stream_base_commons.js:209:20)"}

That seems to trigger a disconnect:

[[11:33:29.929]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:33:29.929Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:33:29.930]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:33:29.930Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"broker1:9092","clientId":"my-kafkajs-producer"}

I do not see a re-connect, but requests are retried:

Retries:

[[11:33:34.935]] [LOG]   Producer request timed out at 1630928014935 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":18,"createdAt":1630927984955,"sentAt":1630927984955,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:34:05.382]] [LOG]   Producer request timed out at 1630928045381 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":19,"createdAt":1630928015401,"sentAt":1630928015401,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:34:36.243]] [LOG]   Producer request timed out at 1630928076243 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":20,"createdAt":1630928046263,"sentAt":1630928046263,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:35:07.880]] [LOG]   Producer request timed out at 1630928107880 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":21,"createdAt":1630928077900,"sentAt":1630928077900,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:35:41.097]] [LOG]   Producer request timed out at 1630928141097 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":22,"createdAt":1630928111116,"sentAt":1630928111116,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:35:41.098]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:35:41.098Z","logger":"kafkajs","message":"[Producer] Request Produce(key: 0, version: 5) timed out","retryCount":0,"retryTime":282}
[[11:35:41.099]] [LOG]   failed to send data KafkaJSRequestTimeoutError: Request Produce(key: 0, version: 5) timed out
[[11:36:11.879]] [LOG]   Producer request timed out at 1630928171879 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":23,"createdAt":1630928141899,"sentAt":1630928141899,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:36:42.178]] [LOG]   Producer request timed out at 1630928202178 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":24,"createdAt":1630928172198,"sentAt":1630928172198,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:37:12.826]] [LOG]   Producer request timed out at 1630928232826 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":25,"createdAt":1630928202845,"sentAt":1630928202845,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:37:43.965]] [LOG]   Producer request timed out at 1630928263965 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":26,"createdAt":1630928233984,"sentAt":1630928233984,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:38:16.155]] [LOG]   Producer request timed out at 1630928296154 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":27,"createdAt":1630928266176,"sentAt":1630928266176,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:38:51.223]] [LOG]   Producer request timed out at 1630928331223 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":28,"createdAt":1630928301243,"sentAt":1630928301243,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:38:51.224]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:38:51.224Z","logger":"kafkajs","message":"[Producer] Request Produce(key: 0, version: 5) timed out","retryCount":0,"retryTime":320}
[[11:38:51.225]] [LOG]   failed to send data KafkaJSRequestTimeoutError: Request Produce(key: 0, version: 5) timed out

etc…

Note: Request metadata happening on broker3, broker1 is seen as ok as expected (because connection issue is only happening from broker1 to kakfaJS client):

[[11:37:46.168]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:37:46.167Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 5)","broker":"broker3:9092","clientId":"my-kafkajs-producer","correlationId":17,"expectResponse":true,"size":47}
[[11:37:46.169]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:37:46.169Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"broker3:9092","clientId":"my-kafkajs-producer","correlationId":17,"size":255,"data":{"throttleTime":0,"brokers":[{"nodeId":2,"host":"broker2","port":9092,"rack":null},{"nodeId":3,"host":"broker3","port":9092,"rack":null},{"nodeId":1,"host":"broker1","port":9092,"rack":null}],"clusterId":"_jMoVOJEQiS8ez1Eo1ucpQ","controllerId":2,"topicMetadata":[{"topicErrorCode":0,"topic":"kafkajs","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":2,"replicas":[2,3,1],"isr":[2,3,1],"offlineReplicas":[]},{"partitionErrorCode":0,"partitionId":1,"leader":3,"replicas":[3,1,2],"isr":[3,1,2],"offlineReplicas":[]},{"partitionErrorCode":0,"partitionId":2,"leader":1,"replicas":[1,2,3],"isr":[1,2,3],"offlineReplicas":[]}]}]}}

At 11:42:33 the iptables rule is removed:

11:42:33 ℹ️ Unblocking IP address 172.18.0.6 corresponding to kafkaJS client

We see a disconnection, probably because broker disconnected for good the client (due to connections.max.idle.ms which is 10 minutes by default). This time we have Kafka server has closed connection followed by Connecting:

[[11:42:34.388]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.387Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:42:34.388]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.388Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:42:34.388]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.388Z","logger":"kafkajs","message":"[Connection] Kafka server has closed connection","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:42:34.394]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:42:34.394Z","logger":"kafkajs","message":"[Connection] Connection error: write EPIPE","broker":"broker1:9092","clientId":"my-kafkajs-producer","stack":"Error: write EPIPE\n    at WriteWrap.onWriteComplete [as oncomplete] (internal/stream_base_commons.js:94:16)"}
[[11:42:34.855]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.855Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"broker1:9092","clientId":"my-kafkajs-producer","ssl":false,"sasl":false}

Full logs are here

Test with 5 minutes connection error

I re-ran a test with 5 minutes of iptables instead of 10 minutes (to avoid the disconnection from the broker due to connections.max.idle.ms)

Traffic was blocked at 12:50:45:

12:50:45 ℹ️ Blocking IP address 172.20.0.6 corresponding to kafkaJS client

Around 60 seconds later we see disconnection:

[[12:51:44.730]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T12:51:44.730Z","logger":"kafkajs","message":"[Connection] Connection error: read ETIMEDOUT","broker":"broker1:9092","clientId":"my-kafkajs-producer","stack":"Error: read ETIMEDOUT\n    at TCP.onStreamRead (internal/stream_base_commons.js:209:20)"}
[[12:51:44.730]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T12:51:44.730Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[12:51:44.731]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T12:51:44.731Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"broker1:9092","clientId":"my-kafkajs-producer"}

When traffic is back at 12:55:45:

12:55:45 ℹ️ Unblocking IP address 172.20.0.6 corresponding to kafkaJS client

We see a retry right after:

[[12:56:00.874]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T12:56:00.874Z","logger":"kafkajs","message":"[Connection] Request Produce(key: 0, version: 5)","broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":25,"expectResponse":true,"size":510055}

After 21 seconds (not sure why??), we see accumulated responses (blocked by iptables)

[[12:56:21.021]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.021Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":15}
[[12:56:21.023]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.023Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":16}
[[12:56:21.026]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.026Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":17}
[[12:56:21.029]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.029Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":18}
[[12:56:21.032]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.032Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":19}
[[12:56:21.034]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.033Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":20}
[[12:56:21.035]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.035Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":21}
[[12:56:21.038]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.038Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":22}
[[12:56:21.040]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.040Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":23}
[[12:56:21.043]] [WARN]  {"level":"WARN","timestamp":"2021-09-06T12:56:21.042Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":24}

Followed by request response:

[[12:56:21.044]] [LOG]   {"level":"DEBUG","timestamp":"2021-09-06T12:56:21.044Z","logger":"kafkajs","message":"[Connection] Response Produce(key: 0, version: 5)","broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":25,"size":55,"data":{"topics":[{"topicName":"kafkajs","partitions":[{"partition":0,"errorCode":0,"baseOffset":"83","logAppendTime":"-1","logStartOffset":"0"}]}],"throttleTime":0}}

So even if there was a disconnection, it seems that kafkaJS is able to send request again when connection is back ?

Full logs are here

0reactions
Nevoncommented, Oct 16, 2020

The code as currently written closes down the socket connection to a single broker, not the entire connection, so even if I implement a custom socket factory and listen for ETIMEDOUT events

Can you share a link to what you’re referring to? I would have expected this to come via the socket timeout event, which we listen to and throw a KafkaJSConnectionError in response to, but I guess that’s not the case.

Read more comments on GitHub >

github_iconTop Results From Across the Web

Kafka.JS refuses to connect <<[BrokerPool] Failed ...
I was trying to reproduce the same issue. But this code is working fine with brokers: ['localhost:9092'] . There is no issue with...
Read more >
Timeout Error When Using kafka-console-consumer an... - ...
When I bring up kafka-console-producer, the same happens. I am pointing both to the same node which is both a Kafka broker and...
Read more >
Why Can't I Connect to Kafka? | Troubleshoot Connectivity
When a client wants to send or receive a message from Apache Kafka ®, there are two types of connection that must succeed:...
Read more >
error: connect etimedout 34.234.108.77:5672 rabbitmq
If you want to connect your server with remotely database then make sure your remotely ... producer.send() does not reconnect to broker when...
Read more >
Fixing Etimedout error
Send the request using a different network. If the request doesn't return the error, it is likely that your IP address is blocked...
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found