KafkaJS claims a broker does not host a topic-partition, even though it does
See original GitHub issueDescribe the bug I receive lots of error messages like the following:
{
"level": "ERROR",
"timestamp": "2020-07-15T16:48:34.740Z",
"logger": "kafkajs",
"message": "[Connection] Response Metadata(key: 3, version: 5)",
"broker": "aws_msk_host_1:9092",
"clientId": "ti-qa",
"error": "This server does not host this topic-partition",
"correlationId": 16,
"size": 2120
}
However, a check of the topic metadata (using this topic as an example), speaks to the contrary:
"learningpaths" with 6 partition(s)
partition 0 leader: 2, replicas: [2, 3, 1], isrs: [3, 1, 2] errstr:
partition 1 leader: 1, replicas: [1, 2, 3], isrs: [3, 1, 2] errstr:
partition 2 leader: 3, replicas: [3, 1, 2], isrs: [3, 1, 2] errstr:
partition 3 leader: 2, replicas: [2, 1, 3], isrs: [3, 1, 2] errstr:
partition 4 leader: 1, replicas: [1, 3, 2], isrs: [3, 1, 2] errstr:
partition 5 leader: 3, replicas: [3, 2, 1], isrs: [3, 1, 2] errstr:
This happens across multiple topics.
Code
const awslog = require('lib/awslog');
const config = require('config');
const { Kafka } = require('kafkajs');
const BROKERS =
config.kafkaBrokers && config.kafkaBrokers.trim() !== '' ? config.kafkaBrokers.split(',') : null;
const USE_KAFKA = config.env !== 'test' && BROKERS !== null;
const kafka = USE_KAFKA
? new Kafka({
clientId: `ti-${config.env}`,
brokers: BROKERS,
retry: {
initialRetryTime: 1000,
retries: 9
}
})
: null;
const producer = USE_KAFKA
? kafka.producer({
metadataMaxAge: 60000
})
: null;
function push(name, records) {
if (USE_KAFKA && records && records.length) {
Promise.all(
records.map(record =>
// `Promise.resolve` here prevents invalid messages from throwing,
// just in case others in the same batch are valid.
Promise.resolve(keysToLowerCase(record)).then(
value => ({ value, key: record.id || record.requestId }),
err => {
config.bugsnag.notify(new Error('Failed to prepare record for Kafka'), {
message: err.message,
paths: err.paths,
record,
topic: name.toLowerCase(),
brokers: BROKERS
});
return null;
}
)
)
)
.then(encodedMessages => {
const validMessages = encodedMessages.filter(message => message);
if (validMessages.length) {
return producer.send({
topic: name.toLowerCase(),
messages: validMessages,
acks: 1
});
}
})
.catch(e => {
awslog.error(null, new Error('Failed to send record to Kafka'), {
message: e.message,
topic: name.toLowerCase(),
messages: records,
brokers: BROKERS,
paths: e.paths
});
});
}
}
function flush() {
if (USE_KAFKA) {
return producer.disconnect();
} else {
return Promise.resolve(true);
}
}
module.exports = {
push,
flush
};
function keysToLowerCase(obj) {
const newObj = {};
const keys = Object.keys(obj);
for (const key of keys) {
newObj[key.toLowerCase()] = obj[key];
}
return JSON.stringify(newObj);
}
Expected behavior Messages get sent and acknowledged by at least the topic-partition leader, unless an error occurs.
Observed behavior
KafkaJS producer throws above error claiming This server does not host this topic-partition, when it obviously does. It’s possible there’s another issue but the logic throws this error instead.
Environment:
- OS: Ubuntu 14.04.6 LTS
- KafkaJS version 1.12.0
- Kafka version 2.2.1 (Amazon MSK)
- NodeJS version 10.20.1
Additional context Any pointers on what might be wrong with my code, or wrong with the library would be helpful.
Issue Analytics
- State:
- Created 3 years ago
- Reactions:18
- Comments:31 (4 by maintainers)
Top Results From Across the Web
This server does not host this topic-partition - Stack Overflow
I tried bringing down broker while producing but it is failing with TimeoutException . I am looking for suggestions to reproduce this issue....
Read more >Admin Client - KafkaJS
The admin client hosts all the cluster operations, such as: `createTopics`, `createPartitions`, etc.
Read more >kafkajs - Bountysource
KafkaJS producer throws above error claiming This server does not host this topic-partition , when it obviously does. It's possible there's another issue...
Read more >Documentation - Apache Kafka
Messages sent by a producer to a particular topic partition will be appended in ... If this is not set, it will use...
Read more >Top 5 Things Every Apache Kafka Developer Should Know
What it doesn't specify is how many replicas need to be in sync. The lead broker will always be in sync with itself....
Read more >
Top Related Medium Post
No results found
Top Related StackOverflow Question
No results found
Troubleshoot Live Code
Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free
Top Related Reddit Thread
No results found
Top Related Hackernoon Post
No results found
Top Related Tweet
No results found
Top Related Dev.to Post
No results found
Top Related Hashnode Post
No results found
I had the same issue. There is a bug in refreshMetadata. It is not able to handle external (outside of kafkajs) removal of the topic properly.
Problem is related to refreshMetadata code which is trying to refresh metadata for each topic that is in this.targetTopics Set (from admin/cluster/index.js)
Easy way to reproduce this bug:
So if one of the topics from this.targetTopics was externally removed from kafka (without use of kafkajs) - you will not be able to perform metadata refresh for any other topic and it will not be able to recover from this error.
Interesting.
We ended up switching over to node-rdkafka and stopped experiencing the issue. There must be some issue in the way
kafkajscommunicates with the brokers that’s causing it to throw these errors unnecessarily.