Error in KafkaTopicProvisioner For Consumer

See original GitHub issue

Hello, you have solved this error before. Thank you so much. But you just solved this for ProducerProvisioner(provisionProducerDestination method) . (https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/888 ) I am currently getting the same error for ConsumerProvisioner. Could you please add the retry mechanism in ConsumerProvisioner? Or I can add it and open it in pull request.

You have identified the problem very well in this link. That’s why I didn’t give a long explanation. But I can give a longer explanation if you want. (https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/888 )

Your solution for the producer:

ProducerProvisioner

But still no retry mechanism has been added for consumer provisioner.

ConsumerProvisioner

I am getting the following error from the KafkaTopicProvisioner:

[2022-10-03T12:39:05.005Z] [org.springframework.cloud.stream.binding.BindingService] [main] [200] [ERROR] Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for person-topic-1 nested exception is java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:252)
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:202)
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:86)
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:421)
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:92)
	at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
	at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:180)
	at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:126)
	at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118)
	at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58)
	at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
	at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
	at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34)
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
	at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)
	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)
	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:144)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:771)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:763)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:438)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:339)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1329)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1318)
	at tr.cloud.stream.examples.PersonConsumerServiceApplication.main(PersonConsumerServiceApplication.java:9)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:241)
	... 28 common frames omitted

Issue Analytics

  • State:closed
  • Created a year ago
  • Comments:7 (7 by maintainers)

github_iconTop GitHub Comments

1reaction
omercelikcengcommented, Oct 16, 2022

Hello @sobychacko. I sent the pull request. However, this is my first time doing this. Can you check if I did something wrong? Thank you.

0reactions
sobychackocommented, Oct 20, 2022

@omercelikceng I reviewed. Looks good. Made some minor polishing on top (checkstyle and tests). Merged the PR upstream to main and backported to 3.2.x.

Read more comments on GitHub >

github_iconTop Results From Across the Web

Spring Cloud Stream Kafka - "Failed to create topics" error for ...
This issue is due to carelessness on my part. The method signature should be: public Consumer<Message<MyObjectType>> consumer().
Read more >
Handling Errors when Topics dont exist · Issue #513 - GitHub
KafkaTopicProvisioner is throwing a NPE when topics don't exist. I do believe it can handle the scenario and log a meaningful message.
Read more >
org.springframework.cloud.stream.binder.kafka.provisioning ...
Best Java code snippets using org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner (Showing top 20 results out of 315).
Read more >
spring-cloud/spring-cloud-stream - Gitter
I was trying to configure the consumers in spring.cloud.stream binder to consume form a ... But I get this error A list of...
Read more >
KafkaMessageChannelBinder (spring-cloud-stream-docs ...
Creates MessageProducer that receives data from the consumer destination. ... Binders can return a message handler to be subscribed to the error channel....
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found