[BUG] Connections to service bus dropped and recreated regularly when using ServiceBusSessionReceiverAsyncClient bean

See original GitHub issue

Describe the bug I am creating a Spring Bean with an ServiceBusSessionReceiverAsyncClient so that I can reuse this in my REST API based application.

	@Bean
	 public ServiceBusSessionReceiverAsyncClient apiMessageQueueIntegrator() {
	
		return new ServiceBusClientBuilder()
		.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
        .connectionString(connectionString)
        .sessionReceiver()
        .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
        .queueName(queueName)
        .buildAsyncClient();
	}

In an @RestController, I do the following:

	@Autowired
	ServiceBusSessionReceiverAsyncClient apiMessageQueueIntegrator;
.
.
.
	@GetMapping("/simulateapicall")
	public ResponseEntity<?> processApiCall() {
.
.
.
		Mono<ServiceBusReceiverAsyncClient> receiverMono = apiMessageQueueIntegrator.acceptSession(sessionid);

		Disposable subscription = Flux.usingWhen(receiverMono,
	            receiver -> receiver.receiveMessages(),
	            receiver -> Mono.fromRunnable(() -> receiver.close()))
	            .subscribe(message -> {
	                // Process message.
	                logger.info(String.format("Message received from queue. Session id: %s. Contents: %s%n", message.getSessionId(),
	                    message.getBody()));

	            }, error -> {
	                logger.info("Queue error occurred: " + error);
	            });

.
.
.
		// After having received message
                  subscription.dispose();
}

(NB! I am sending/receiving only one message per session. I use this in an request/reply scheme.)

This code logic works fine, but when running load testing, I see that the link to the service bus, is closed regularly and reopened. It seems that the connection to the service bus lives for 1-2 seconds, and is then closed and a new connection is opened. This causes issues because my sessions (from acceptSession), gets broken or fails when trying to be created.

It might seem that the dropped connections happens more if I increase the load towards the application.

Exception or Stack Trace I have attached a file that shows the typical lifecycle of the connection. From when it is created to when it is dropped. My log files typically consists of many such lifecycles as connections are dropped and recreated.

error_log_2.txt

To Reproduce Create similar code to what I have described. Run jMeter to put load on.

Code Snippet See bug description.

Expected behavior I expect the connection to the service bus to be kept up and not terminated.

Screenshots N/A

Setup (please complete the following information):

  • OS: Azure app service (Linux, Java 11)
  • Library/Libraries: azure-messaging-servicebus:7.5.0
  • Java version: 11
  • App Server/Environment: Azure app service
  • Frameworks: Spring Boot

Additional context N/A

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added

Issue Analytics

  • State:closed
  • Created 2 years ago
  • Reactions:3
  • Comments:37 (14 by maintainers)

github_iconTop GitHub Comments

7reactions
joan2424commented, Apr 1, 2022

can you please increase the priority of this task? We have been waiting for a solution for a long time…

1reaction
freedevcommented, May 18, 2022

Hi, few days ago I’ve updated to the latest ServiceBus SDK (com.azure:azure-messaging-servicebus:jar:7.8.0). The situation seems to have been slightly changed, now after ~5 minutes of idle a exception is raised. Anyway after the exception the app is still working because it seems a new connection is opened under the hood.

Here is how I create the async client:

        serviceBusReceiverAsyncClient = serviceBusClientBuilder
                .transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
                .receiver()
                .topicName(inTopicName)
                .subscriptionName(subscriptionName)
                .prefetchCount(prefetchCount)
                .maxAutoLockRenewDuration(this.maxAutoLockRenewDuration)
                .disableAutoComplete()
                .buildAsyncClient();

And then how I subscribe new message with Quarkus Mutiny:

        Multi.createFrom()
                .publisher(serviceBusReceiverAsyncClient.receiveMessages())
                .onItem()
                .transform(m -> this.consumeMessage(m))
                .subscribe()
                .with(o -> this.processReceivedMessage(o));

This is the raised exception:

2022-05-17 21:25:46,323 WARN  [com.azu.cor.amq.imp.RequestResponseChannel] (reactor-executor-3) {"az.sdk.message":"Error in SendLinkHandler. Disposing unconfirmed sends.","exception":"The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6a5b592335994c66b9f3ef62ad4bee7e_G50, SystemTracker:gateway7, Timestamp:2022-05-17T21:25:46, errorContext[NAMESPACE: eldmangessbq01.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 97]","connectionId":"MF_0c2d51_1652795696565","linkName":"cbs"}
2022-05-17 21:25:46,329 WARN  [com.azu.cor.amq.imp.ReactorExecutor] (reactor-executor-3) {"az.sdk.message":"Unhandled exception while processing events in reactor, report this error.","exception":"java.lang.IllegalStateException","connectionId":"MF_0c2d51_1652795696565"}
2022-05-17 21:25:50,329 WARN  [com.azu.cor.amq.imp.ReactorExecutor] (reactor-executor-3) {"az.sdk.message":"scheduleCompletePendingTasks - exception occurred while  processing events.\njava.lang.IllegalStateException\norg.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:112)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:292)\ncom.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$1(ReactorExecutor.java:158)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\njava.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\njava.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\njava.base/java.lang.Thread.run(Thread.java:833)Cause: null\norg.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54)\norg.apache.qpid.proton.engine.impl.TransportImpl.unbind(TransportImpl.java:315)\norg.apache.qpid.proton.reactor.impl.IOHandler.onUnhandled(IOHandler.java:387)\norg.apache.qpid.proton.engine.BaseHandler.onTransportClosed(BaseHandler.java:84)\norg.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:200)\norg.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:292)\ncom.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$1(ReactorExecutor.java:158)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\njava.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\njava.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\njava.base/java.lang.Thread.run(Thread.java:833)","connectionId":"MF_0c2d51_1652795696565"}
2022-05-17 21:25:50,837 WARN  [com.azu.cor.amq.imp.ReactorDispatcher] (parallel-1) {"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","connectionId":"MF_0c2d51_1652795696565"}
2022-05-17 21:25:50,837 WARN  [com.azu.cor.amq.imp.ReactorDispatcher] (boundedElastic-10) {"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","connectionId":"MF_0c2d51_1652795696565"}
2022-05-17 21:25:50,837 WARN  [com.azu.cor.amq.imp.RequestResponseChannel] (parallel-1) {"az.sdk.message":"Unable to open send and receive link.","exception":"Unable to open send and receive link.","connectionId":"MF_0c2d51_1652795696565","linkName":"cbs"}
2022-05-17 21:25:50,838 WARN  [com.azu.cor.amq.imp.AmqpChannelProcessor] (parallel-1) {"az.sdk.message":"Retry attempts exhausted or exception was not retriable.","exception":"Unable to open send and receive link.","connectionId":"MF_0c2d51_1652795696565","entityPath":"$cbs","retry":2}
2022-05-17 21:25:50,838 ERROR [rea.cor.pub.Operators] (parallel-1) Operator called default onErrorDropped: reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Unable to open send and receive link.
Caused by: java.lang.RuntimeException: Unable to open send and receive link.
	at com.azure.core.amqp.implementation.RequestResponseChannel.<init>(RequestResponseChannel.java:221)
	at com.azure.core.amqp.implementation.ReactorConnection.lambda$createRequestResponseChannel$16(ReactorConnection.java:406)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at reactor.core.publisher.Operators$MonoSubscriber.request(Operators.java:1906)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158)
	at com.azure.core.amqp.implementation.AmqpChannelProcessor.requestUpstream(AmqpChannelProcessor.java:320)
	at com.azure.core.amqp.implementation.AmqpChannelProcessor.lambda$onError$4(AmqpChannelProcessor.java:213)
	at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:171)
	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.util.concurrent.RejectedExecutionException: ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.
	at com.azure.core.amqp.implementation.ReactorDispatcher.throwIfSchedulerError(ReactorDispatcher.java:139)
	at com.azure.core.amqp.implementation.ReactorDispatcher.invoke(ReactorDispatcher.java:103)
	at com.azure.core.amqp.implementation.RequestResponseChannel.<init>(RequestResponseChannel.java:216)
	... 20 more
Read more comments on GitHub >

github_iconTop Results From Across the Web

Troubleshooting guide for Azure Service Bus - Microsoft Learn
This article provides troubleshooting tips and recommendations for a few issues that you may see when using Azure Service Bus.
Read more >
Reusing Connections in Azure Service Bus - Stack Overflow
Again if the connection is dropped it will be recreated and so will be the links for the Queues, you do not need...
Read more >
Azure ServiceBus - Apache Camel
properties|yaml), or directly with Java code. Configuring Endpoint Options. Where you find yourself configuring the most is on endpoints, as endpoints often ......
Read more >
Azure Service Bus and its Complete Overview | Serverless360
Azure Service Bus is a messaging service on cloud used to connect any applications ... periodically export events to Storage Blob service using...
Read more >
azure-servicebus - PyPI
Install the package. Install the Azure Service Bus client library for Python with pip: pip install azure-servicebus. Prerequisites: To use this package ...
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found