Dispatcher has no subscribers for channel

See original GitHub issue

I developed new service using spring cloud stream and it was working fine, and suddenly I got the following Error

ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.MessageDeliveryException: 
Dispatcher has no subscribers for channel 'application.inputTopic'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3287], headers={kafka_offset=438359, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3a13bf52, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=[B@5f4d910d, kafka_receivedPartitionId=3, kafka_receivedTopic=products, kafka_receivedTimestamp=1522912482308, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = products, partition = 3, offset = 438359, CreateTime = 1522912482308, serialized key size = 36, serialized value size = 3287, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@5f4d910d, value = [B@74acd088), contentType=application/json}], failedMessage=GenericMessage [payload=byte[3287], headers={kafka_offset=438359, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3a13bf52, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=[B@5f4d910d, kafka_receivedPartitionId=3, kafka_receivedTopic=marketplace.products, kafka_receivedTimestamp=1522912482308, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = products, partition = 3, offset = 438359, CreateTime = 1522912482308, serialized key size = 36, serialized value size = 3287, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@5f4d910d, value = [B@74acd088), contentType=application/json}]\
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)\
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)\
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)\
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)\
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)\
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)\
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)\
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)\
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:69)\
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:376)\
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:353)\
	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)\
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)\
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)\
	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)\
	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)\
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001)\
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981)\
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932)\
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801)\
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689)\
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)\
	at java.lang.Thread.run(Thread.java:745)\ 

and this is my configuration

@Component
public interface BinderProcessor {
    @Input
    SubscribableChannel inputTopic();
    @Output
    MessageChannel outputTopic();
}
@Component
@EnableBinding(BinderProcessor.class)
public class Consumer {

    private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);

    private final BinderProcessor binderProcessor;

    @Autowired
    public Consumer(final BinderProcessor binderProcessor) {
        this.binderProcessor = binderProcessor;
    }

    public Flux<ProductDifferenceModel> consume() {

        LOG.debug("attache new consumer to kafka topic");

        return Flux.create(fluxSink -> {
            MessageHandler handler = message -> fluxSink.next(new ProductDifferenceModel(JsonConverter.convert((byte[]) message.getPayload()),
                    (Acknowledgment) message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class)));
            binderProcessor.inputTopic().subscribe(handler);
            fluxSink.onCancel(()-> binderProcessor.inputTopic().unsubscribe(handler));
        });

    }

}

application.properties

#Kafka configuration
spring.kafka.bootstrap-servers=192.168.99.100:9092
#Binder Configuration
spring.cloud.stream.kafka.binder.brokers=192.168.99.100
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=192.168.99.100

spring.cloud.stream.bindings.inputTopic.destination=products
spring.cloud.stream.kafka.bindings.inputTopic.consumer.autoCommitOffset=false

spring.cloud.stream.bindings.outputTopic.destination=test

pom.xml

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.0.0.RELEASE</version>
	<relativePath/> <!-- lookup parent from repository -->
</parent>

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

<spring-cloud.version>Finchley.M6</spring-cloud.version>

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-dependencies</artifactId>
			<version>${spring-cloud.version}</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
     </dependencies>
</dependencyManagement>

Issue Analytics

  • State:closed
  • Created 5 years ago
  • Comments:10 (6 by maintainers)

github_iconTop GitHub Comments

3reactions
artembilancommented, Apr 5, 2018

That’s exactly what I meant! You can’t start consuming from the CommandLineRunner. That’s still early in the application context lifecycle.

You should consider to move that flowManager.manage() code to the main():

ApplicationContext applicationContext = SpringApplication.run(ProductApplication.class, args);
applicationContext.getBean(FlowManager.class).manage();

Also you can consider to use @StreamEmitter instead for such a Reactor-based stuff: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RC3/reference/htmlsingle/#_reactive_sources

@sabbyanandan , @sobychacko , we need to consider to add a @StreamEmitter source sample: https://github.com/spring-cloud/spring-cloud-stream-samples

Thanks

1reaction
artembilancommented, Apr 18, 2018

Well, without your custom project we can’t help you.

I see you have the code like:

fluxSink.onCancel(()-> binderProcessor.inputTopic().unsubscribe(handler));

So, maybe your application meets some condition when it really cancel a subscription to that Flux, therefore you drop a subscriber from the inputTopic channel and not able to process any new consumed message from the topic.

It looks like this ticket turns out far away from Framework issue. Would be better to move it into some community forum, e.g. StackOverflow.

Read more comments on GitHub >

github_iconTop Results From Across the Web

Spring Integration Dispatcher has no subscribers for channel
It is unusual (rare) to ever subscribe to a channel from user code; normally, the framework will automatically subscribe consumers when they are ......
Read more >
Getting MessageDispatchingException: Dispatcher has no ...
Hi @rhyashu , the exception of "Dispatcher has no subscribers" happens when StreamBridge sends messages via a channel while finds no handler for ......
Read more >
spring-cloud/spring-cloud-stream - Gitter
Thanks for reaching out. Dispatcher has no subscribers for channel typically means that some beans are initialized in the wrong order. It's hard ......
Read more >
BroadcastingDispatcher (Spring Integration 6.0.0 API)
If the 'requireSubscribers' flag is set to true, the sent message is considered as non-dispatched and rejected to the caller with the "Dispatcher...
Read more >
Spring Integration 5: Subscribable Message Channels
Adds a subscriber to the subscribers of the message channel. ... MessageDeliveryException: Dispatcher has no subscribers for channel ...
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found