KafkaJS claims a broker does not host a topic-partition, even though it does

See original GitHub issue

Describe the bug I receive lots of error messages like the following:

{
    "level": "ERROR",
    "timestamp": "2020-07-15T16:48:34.740Z",
    "logger": "kafkajs",
    "message": "[Connection] Response Metadata(key: 3, version: 5)",
    "broker": "aws_msk_host_1:9092",
    "clientId": "ti-qa",
    "error": "This server does not host this topic-partition",
    "correlationId": 16,
    "size": 2120
}

However, a check of the topic metadata (using this topic as an example), speaks to the contrary:

  "learningpaths" with 6 partition(s)
partition 0 leader: 2, replicas: [2, 3, 1], isrs: [3, 1, 2] errstr: 
partition 1 leader: 1, replicas: [1, 2, 3], isrs: [3, 1, 2] errstr: 
partition 2 leader: 3, replicas: [3, 1, 2], isrs: [3, 1, 2] errstr: 
partition 3 leader: 2, replicas: [2, 1, 3], isrs: [3, 1, 2] errstr: 
partition 4 leader: 1, replicas: [1, 3, 2], isrs: [3, 1, 2] errstr: 
partition 5 leader: 3, replicas: [3, 2, 1], isrs: [3, 1, 2] errstr: 

This happens across multiple topics.

Code

const awslog = require('lib/awslog');
const config = require('config');

const { Kafka } = require('kafkajs');

const BROKERS =
  config.kafkaBrokers && config.kafkaBrokers.trim() !== '' ? config.kafkaBrokers.split(',') : null;

const USE_KAFKA = config.env !== 'test' && BROKERS !== null;

const kafka = USE_KAFKA
  ? new Kafka({
      clientId: `ti-${config.env}`,
      brokers: BROKERS,
      retry: {
        initialRetryTime: 1000,
        retries: 9
      }
    })
  : null;

const producer = USE_KAFKA
  ? kafka.producer({
      metadataMaxAge: 60000
    })
  : null;

function push(name, records) {
  if (USE_KAFKA && records && records.length) {
    Promise.all(
      records.map(record =>
        // `Promise.resolve` here prevents invalid messages from throwing,
        // just in case others in the same batch are valid.
        Promise.resolve(keysToLowerCase(record)).then(
          value => ({ value, key: record.id || record.requestId }),
          err => {
            config.bugsnag.notify(new Error('Failed to prepare record for Kafka'), {
              message: err.message,
              paths: err.paths,
              record,
              topic: name.toLowerCase(),
              brokers: BROKERS
            });

            return null;
          }
        )
      )
    )
      .then(encodedMessages => {
        const validMessages = encodedMessages.filter(message => message);
        if (validMessages.length) {
          return producer.send({
            topic: name.toLowerCase(),
            messages: validMessages,
            acks: 1
          });
        }
      })
      .catch(e => {
        awslog.error(null, new Error('Failed to send record to Kafka'), {
          message: e.message,
          topic: name.toLowerCase(),
          messages: records,
          brokers: BROKERS,
          paths: e.paths
        });
      });
  }
}

function flush() {
  if (USE_KAFKA) {
    return producer.disconnect();
  } else {
    return Promise.resolve(true);
  }
}

module.exports = {
  push,
  flush
};

function keysToLowerCase(obj) {
  const newObj = {};
  const keys = Object.keys(obj);
  for (const key of keys) {
    newObj[key.toLowerCase()] = obj[key];
  }

  return JSON.stringify(newObj);
}

Expected behavior Messages get sent and acknowledged by at least the topic-partition leader, unless an error occurs.

Observed behavior KafkaJS producer throws above error claiming This server does not host this topic-partition, when it obviously does. It’s possible there’s another issue but the logic throws this error instead.

Environment:

  • OS: Ubuntu 14.04.6 LTS
  • KafkaJS version 1.12.0
  • Kafka version 2.2.1 (Amazon MSK)
  • NodeJS version 10.20.1

Additional context Any pointers on what might be wrong with my code, or wrong with the library would be helpful.

Issue Analytics

  • State:open
  • Created 3 years ago
  • Reactions:18
  • Comments:31 (4 by maintainers)

github_iconTop GitHub Comments

4reactions
vadimlcommented, Jul 16, 2022

I had the same issue. There is a bug in refreshMetadata. It is not able to handle external (outside of kafkajs) removal of the topic properly.

Problem is related to refreshMetadata code which is trying to refresh metadata for each topic that is in this.targetTopics Set (from admin/cluster/index.js)

Easy way to reproduce this bug:

  1. create topic1 and topic2
  2. get offsets for topic1 and topic2 with kafkajs
  3. manually delete topic1 without use of kafkajs
  4. try to get offset for topic2 with kafkajs. It will fail with “This server does not host this topic-partition” exception even if topic2 exists and we only deleted topic1.

So if one of the topics from this.targetTopics was externally removed from kafka (without use of kafkajs) - you will not be able to perform metadata refresh for any other topic and it will not be able to recover from this error.

3reactions
mgirard772commented, Jul 31, 2020

Interesting.

We ended up switching over to node-rdkafka and stopped experiencing the issue. There must be some issue in the way kafkajs communicates with the brokers that’s causing it to throw these errors unnecessarily.

Read more comments on GitHub >

github_iconTop Results From Across the Web

This server does not host this topic-partition - Stack Overflow
I tried bringing down broker while producing but it is failing with TimeoutException . I am looking for suggestions to reproduce this issue....
Read more >
Admin Client - KafkaJS
The admin client hosts all the cluster operations, such as: `createTopics`, `createPartitions`, etc.
Read more >
kafkajs - Bountysource
KafkaJS producer throws above error claiming This server does not host this topic-partition , when it obviously does. It's possible there's another issue...
Read more >
Documentation - Apache Kafka
Messages sent by a producer to a particular topic partition will be appended in ... If this is not set, it will use...
Read more >
Top 5 Things Every Apache Kafka Developer Should Know
What it doesn't specify is how many replicas need to be in sync. The lead broker will always be in sync with itself....
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found