KafkaJSLockTimeout when producing large amount of messages at once with SASL/OAUTHBEARER

See original GitHub issue

This issue is extremely close to the https://github.com/tulios/kafkajs/issues/554. The difference is that I am using SASL/OAUTHBEARER for authentication.

Some background I am watching a directory using chokidar. When a new file arrives in the directory, it is immediately sent to Kafka. I am currently stress testing my Kafka setup. For this, I am copying some (~1000) files to the directory using a bash script. Unfortunately, this is causing errors and only a fraction of files (200-400) are ending up in Kafka.

Errors faced Below is the list of errors being thrown from KafkaJS. I have tried to add as many errors as possible. I have formatted the errors for brevity.

{"level":"ERROR",
"message": "[BrokerPool] KafkaJSLockTimeout: Timeout while acquiring lock (55 waiting locks): "connect to broker localhost:9093"",
"retryCount": 0, "retryTime": 311, 
"stack": "KafkaJSLockTimeout: Timeout while acquiring lock (55 waiting locks):  "connect to broker localhost:9093"
at Timeout._onTimeout (node_modules\\kafkajs\\src\\utils\\lock.js:48:23)}
{"level": "ERROR", "message": "[Connection] Connection timeout", "broker": "localhost:9093", "clientId":"test-producer"}
{"level":"ERROR",
"message":"[SASLOAuthBearerAuthenticator] SASL OAUTHBEARER authentication failed: Not connected", "broker": "localhost:9093"}

{"level":"ERROR",
"message":"[BrokerPool] KafkaJSSASLAuthenticationError: SASL OAUTHBEARER authentication failed: Not connected",
"retryCount":0, "retryTime":306,
"stack": "KafkaJSSASLAuthenticationError: SASL OAUTHBEARER authentication failed: Not connected
at OAuthBearerAuthenticator.authenticate (node_modules\\kafkajs\\src\\broker\\saslAuthenticator\\oauthBearer.js:49:21)
---- snipped -----
at async sendBatch (node_modules\\kafkajs\\src\\producer\\messageProducer.js:95:12)"}
error:  KafkaJSNonRetriableError
  Caused by: KafkaJSLockTimeout: Timeout while acquiring 
lock (107 waiting locks): "connect to broker localhost:9093"
at Timeout._onTimeout (node_modules\kafkajs\src\utils\lock.js:48:23)
at listOnTimeout (internal/timers.js:554:17)
at processTimers (internal/timers.js:497:7) {
  name: 'KafkaJSNumberOfRetriesExceeded',
  retriable: false,
  helpUrl: undefined,
  originalError: KafkaJSLockTimeout: Timeout while acquiring lock (107 waiting locks): "connect to broker localhost:9093"
      at Timeout._onTimeout (node_modules\kafkajs\src\utils\lock.js:48:23)
      at listOnTimeout (internal/timers.js:554:17)
      at processTimers (internal/timers.js:497:7) {
    retriable: false,
    helpUrl: undefined
  },
  retryCount: 0,
  retryTime: 291
}
{"level":"ERROR", "message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout", "retryCount":0,"retryTime":287}
{"level":"ERROR",
"message":"[Connection] Connection error: write after end",
"broker": "localhost:9093","clientId": "test-producer",
"stack":"Error [ERR_STREAM_WRITE_AFTER_END]: write after end
at Socket.Writable.write (_stream_writable.js:292:11)
at Object.sendRequest (node_modules\\kafkajs\\src\\network\\connection.js:312:27)
--- snipped ---
at sendRequest (node_modules\\kafkajs\\src\\network\\connection.js:302:14)
at async Connection.send (node_modules\\kafkajs\\src\\network\\connection.js:321:53)"}
error:  KafkaJSNonRetriableError
  Caused by: KafkaJSProtocolError: Request is not valid given the current SASL state
    at createErrorFromCode (node_modules\kafkajs\src\protocol\error.js:581:10)
    at Object.parse (node_modules\kafkajs\src\protocol\requests\metadata\v0\response.js:56:11)
    at Connection.send (node_modules\kafkajs\src\network\connection.js:336:35)
    --- snipped ---
    at async Cluster.refreshMetadata (node_modules\kafkajs\src\cluster\index.js:134:5)
    at async Cluster.findBroker (node_modules\kafkajs\src\cluster\index.js:221:9) {
  name: 'KafkaJSNumberOfRetriesExceeded', retriable: false, helpUrl: undefined,
  originalError: KafkaJSProtocolError: Request is not valid given the current SASL state
      at createErrorFromCode (node_modules\kafkajs\src\protocol\error.js:581:10)
      --- snipped --
      at async Cluster.findBroker (\node_modules\kafkajs\src\cluster\index.js:221:9) {
    retriable: false,
    helpUrl: undefined,
    type: 'ILLEGAL_SASL_STATE',
    code: 34
  },
  retryCount: 0,
  retryTime: 244
}

I have logged when the oauthBearerProvider gets called for refreshing the token. Ideally, it should be run every 50 seconds. But from the log, I saw that sometimes it was being called simultaneously,

[Thu Mar 04 2021 23:47:46 GMT+0600] - Fetching token from http://keycloak:8080
[Thu Mar 04 2021 23:47:46 GMT+0600] - Fetching token from http://keycloak:8080
[Thu Mar 04 2021 23:47:46 GMT+0600] - Fetching token from http://keycloak:8080

Besides, I have noticed the following errors on the Kafka broker log,

broker             | java.lang.IllegalStateException: Unexpected ApiVersions request received during SASL authentication state HANDSHAKE_REQUEST
broker             | 	at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleApiVersionsRequest(SaslServerAuthenticator.java:562)
broker             | 	at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleKafkaRequest(SaslServerAuthenticator.java:499)
broker             | 	at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:259)
broker             | 	at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:177)
broker             | 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
broker             | 	at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
broker             | 	at kafka.network.Processor.poll(SocketServer.scala:893)
broker             | 	at kafka.network.Processor.run(SocketServer.scala:792)
broker             | 	at java.lang.Thread.run(Thread.java:748)
broker             | [2021-02-25 10:40:11,239] WARN [SocketServer brokerId=1] Unexpected error from /172.18.0.1; closing connection (org.apache.kafka.common.network.Selector)
INFO [SocketServer brokerId=1] Failed re-authentication with /172.18.0.1 (Unexpected Kafka request of type SASL_HANDSHAKE during SASL authentication.) (org.apache.kafka.common.network.Selector)
[2021-03-03 18:14:44,782] INFO [SocketServer brokerId=1] Failed re-authentication with /172.18.0.1 (Unexpected Kafka request of type PRODUCE during SASL authentication.) (org.apache.kafka.common.network.Selector)

I think the error is happening because KafkaJS is trying to produce value during the re-authentication of the OAuth token. I have no idea how I can fix this.

For comparison, I tried the same thing with the Java client and it can send thousands of messages to the same Kafka broker without any issues.

For example, here is the debug log printed by the Java producer during re-authentication,


2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to SEND_APIVERSIONS_REQUEST
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:178 - Creating SaslClient: client=null;service=kafka;serviceHostname=localhost;mechs=[OAUTHBEARER]
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:155 - Setting SASL/OAUTHBEARER client state to SEND_CLIENT_FIRST_MESSAGE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_SEND_HANDSHAKE_REQUEST
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_INITIAL
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:155 - Setting SASL/OAUTHBEARER client state to RECEIVE_SERVER_FIRST_MESSAGE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to INTERMEDIATE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:109 - Successfully authenticated as test-producer
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:155 - Setting SASL/OAUTHBEARER client state to COMPLETE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to COMPLETE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:620 - Finished re-authentication with session expiration in 55385 ms and session re-authentication on or after 49950 ms
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG Selector:553 - [Producer clientId=test-producer] Successfully re-authenticated with localhost/127.0.0.1

Notice the last message. A similar flow happens with KafkaJS too. But the broker somehow throws the previously mentioned error

java.lang.IllegalStateException: Unexpected ApiVersions request received during SASL authentication state HANDSHAKE_REQUEST

Kafka Setup

I am running Kafka in docker using the confluentinc/cp-kafka:5.5.0 image. My sample broker setup is below,

broker:
    image: confluentinc/cp-kafka:5.5.0
    ports:
      - 29092:29092
      - 9093:9093
    environment:
      # --- common properties --- #
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CLIENT:SASL_PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://broker:29092,CLIENT://broker:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,CLIENT://localhost:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      ## OAuth Bearer SASL
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: OAUTHBEARER
      KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
      KAFKA_CONNECTIONS_MAX_REAUTH_MS: 60000

Producer code

const { Kafka } = require('kafkajs');

const BROKER = 'localhost:9093';

const kafka = new Kafka({
    clientId: "<client ID>",
    brokers: [ "localhost:9093" ],
    ssl: false,
    sasl: {
        mechanism: 'oauthbearer',
        oauthBearerProvider: async () => {
            // fetch token from keycloak. code omitted.
            return {
                    value: token
            }
        }
    },
})

const producer = kafka.producer();

producer.connect()
.then((value) => console.log("Producer connected"))
.catch((err) => console.error("Failed to connect", err));

const sendDataToTopic = async (data, topic) => {
    return producer.send({
        topic: topic,
        messages: data
    })
    .then((resp) => {
        console.log('producerData: ', resp);
    })
    .catch((err) => {
        console.error('error: ', err);
    })
}

module.exports = {
    sendDataToTopic,
}

To Reproduce Call the sendDataToTopic method in a loop (~1000 iterations)

Expected behavior All the messages are sent to Kafka and connection reauthentication happens automatically.

Environment:

  • OS [Windows 10]
  • KafkaJS version [1.15.0]
  • Kafka version [2.5 (using confluentinc/cp-kafka:5.5.0 image in docker)]
  • NodeJS version [14.15.0]

Issue Analytics

  • State:open
  • Created 3 years ago
  • Reactions:3
  • Comments:9

github_iconTop GitHub Comments

2reactions
kzaycommented, Jun 27, 2021

I have exactly the same issue when trying to push data coming from websocket to kafka cluster.

imeout while acquiring lock (712 waiting locks): “updating target topics” {“name”:“KafkaJSNumberOfRetriesExceeded”,“retriable”:false,“originalError”:{“name”:“KafkaJSLockTimeout”,“retriable”:false},“retryCount”:0,“retryTime”:281,“stack”:“KafkaJSNonRetriableError\n Caused by: KafkaJSLockTimeout: Timeout while acquiring lock (712 waiting locks): "updating target topics"\n at Timeout._onTimeout (F:\Environement\git-repo\crypto-feeds-api\node_modules\kafkajs\src\utils\lock.js:48:23)\n at listOnTimeout (internal/timers.js:549:17)\n at processTimers (internal/timers.js:492:7)”}

1reaction
j-a-h-i-rcommented, Mar 9, 2021

Thanks for the info. I went through that issue multiple times and tried different configs without any success. I’ll try again with your suggestions to see if the situation improves.

Read more comments on GitHub >

github_iconTop Results From Across the Web

No results found

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found