Dispatcher has no subscribers for channel
See original GitHub issueI developed new service using spring cloud stream and it was working fine, and suddenly I got the following Error
ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.MessageDeliveryException:
Dispatcher has no subscribers for channel 'application.inputTopic'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3287], headers={kafka_offset=438359, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3a13bf52, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=[B@5f4d910d, kafka_receivedPartitionId=3, kafka_receivedTopic=products, kafka_receivedTimestamp=1522912482308, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = products, partition = 3, offset = 438359, CreateTime = 1522912482308, serialized key size = 36, serialized value size = 3287, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@5f4d910d, value = [B@74acd088), contentType=application/json}], failedMessage=GenericMessage [payload=byte[3287], headers={kafka_offset=438359, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3a13bf52, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=[B@5f4d910d, kafka_receivedPartitionId=3, kafka_receivedTopic=marketplace.products, kafka_receivedTimestamp=1522912482308, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = products, partition = 3, offset = 438359, CreateTime = 1522912482308, serialized key size = 36, serialized value size = 3287, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@5f4d910d, value = [B@74acd088), contentType=application/json}]\
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)\
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)\
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)\
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)\
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)\
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)\
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)\
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)\
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:69)\
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:376)\
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:353)\
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)\
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)\
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)\
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)\
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001)\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981)\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932)\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801)\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689)\
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\
at java.util.concurrent.FutureTask.run(FutureTask.java:266)\
at java.lang.Thread.run(Thread.java:745)\
and this is my configuration
@Component
public interface BinderProcessor {
@Input
SubscribableChannel inputTopic();
@Output
MessageChannel outputTopic();
}
@Component
@EnableBinding(BinderProcessor.class)
public class Consumer {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private final BinderProcessor binderProcessor;
@Autowired
public Consumer(final BinderProcessor binderProcessor) {
this.binderProcessor = binderProcessor;
}
public Flux<ProductDifferenceModel> consume() {
LOG.debug("attache new consumer to kafka topic");
return Flux.create(fluxSink -> {
MessageHandler handler = message -> fluxSink.next(new ProductDifferenceModel(JsonConverter.convert((byte[]) message.getPayload()),
(Acknowledgment) message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class)));
binderProcessor.inputTopic().subscribe(handler);
fluxSink.onCancel(()-> binderProcessor.inputTopic().unsubscribe(handler));
});
}
}
application.properties
#Kafka configuration
spring.kafka.bootstrap-servers=192.168.99.100:9092
#Binder Configuration
spring.cloud.stream.kafka.binder.brokers=192.168.99.100
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=192.168.99.100
spring.cloud.stream.bindings.inputTopic.destination=products
spring.cloud.stream.kafka.bindings.inputTopic.consumer.autoCommitOffset=false
spring.cloud.stream.bindings.outputTopic.destination=test
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<spring-cloud.version>Finchley.M6</spring-cloud.version>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
Issue Analytics
- State:
- Created 5 years ago
- Comments:10 (6 by maintainers)
Top Results From Across the Web
Spring Integration Dispatcher has no subscribers for channel
It is unusual (rare) to ever subscribe to a channel from user code; normally, the framework will automatically subscribe consumers when they are ......
Read more >Getting MessageDispatchingException: Dispatcher has no ...
Hi @rhyashu , the exception of "Dispatcher has no subscribers" happens when StreamBridge sends messages via a channel while finds no handler for ......
Read more >spring-cloud/spring-cloud-stream - Gitter
Thanks for reaching out. Dispatcher has no subscribers for channel typically means that some beans are initialized in the wrong order. It's hard ......
Read more >BroadcastingDispatcher (Spring Integration 6.0.0 API)
If the 'requireSubscribers' flag is set to true, the sent message is considered as non-dispatched and rejected to the caller with the "Dispatcher...
Read more >Spring Integration 5: Subscribable Message Channels
Adds a subscriber to the subscribers of the message channel. ... MessageDeliveryException: Dispatcher has no subscribers for channel ...
Read more >
Top Related Medium Post
No results found
Top Related StackOverflow Question
No results found
Troubleshoot Live Code
Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free
Top Related Reddit Thread
No results found
Top Related Hackernoon Post
No results found
Top Related Tweet
No results found
Top Related Dev.to Post
No results found
Top Related Hashnode Post
No results found
That’s exactly what I meant! You can’t start consuming from the
CommandLineRunner. That’s still early in the application context lifecycle.You should consider to move that
flowManager.manage()code to themain():Also you can consider to use
@StreamEmitterinstead for such a Reactor-based stuff: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RC3/reference/htmlsingle/#_reactive_sources@sabbyanandan , @sobychacko , we need to consider to add a
@StreamEmittersourcesample: https://github.com/spring-cloud/spring-cloud-stream-samplesThanks
Well, without your custom project we can’t help you.
I see you have the code like:
So, maybe your application meets some condition when it really cancel a subscription to that
Flux, therefore you drop a subscriber from theinputTopicchannel and not able to process any new consumed message from the topic.It looks like this ticket turns out far away from Framework issue. Would be better to move it into some community forum, e.g. StackOverflow.