Error in KafkaTopicProvisioner For Consumer
See original GitHub issueHello, you have solved this error before. Thank you so much. But you just solved this for ProducerProvisioner(provisionProducerDestination method) . (https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/888 ) I am currently getting the same error for ConsumerProvisioner. Could you please add the retry mechanism in ConsumerProvisioner? Or I can add it and open it in pull request.
You have identified the problem very well in this link. That’s why I didn’t give a long explanation. But I can give a longer explanation if you want. (https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/888 )
Your solution for the producer:
But still no retry mechanism has been added for consumer provisioner.
I am getting the following error from the KafkaTopicProvisioner:
[2022-10-03T12:39:05.005Z] [org.springframework.cloud.stream.binding.BindingService] [main] [200] [ERROR] Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for person-topic-1 nested exception is java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:252)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:202)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:86)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:421)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:92)
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:180)
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:126)
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118)
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58)
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:144)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:771)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:763)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:438)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:339)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1329)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1318)
at tr.cloud.stream.examples.PersonConsumerServiceApplication.main(PersonConsumerServiceApplication.java:9)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:241)
... 28 common frames omitted
Issue Analytics
- State:
- Created a year ago
- Comments:7 (7 by maintainers)
Top Related StackOverflow Question
Hello @sobychacko. I sent the pull request. However, this is my first time doing this. Can you check if I did something wrong? Thank you.
@omercelikceng I reviewed. Looks good. Made some minor polishing on top (checkstyle and tests). Merged the PR upstream to
mainand backported to3.2.x.