[BUG] ServiceBusReceiverClient stops consuming messages after some time though messages are present in subscription.

See original GitHub issue

Describe the bug We are using ServiceBusReceiverClient as receiver but we have observed that though the services were running and messages were present in subscription it has stopped consuming the messages .

Exception or Stack Trace

2022-01-06` 15:14:25,169 ERROR [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Error in SendLinkHandler. Disposing unconfirmed sends. The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96] com.azure.core.amqp.exception.AmqpException: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96] at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85) at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:110) at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:61) at com.azure.core.amqp.implementation.handler.SendLinkHandler.onLinkRemoteClose(SendLinkHandler.java:29) at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:92) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2022-01-06 15:14:25,196 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Closing request/response channel. 2022-01-06 15:14:25,196 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Closing request/response channel. 2022-01-06 15:14:25,220 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel:$cbs - namespace[MF_7b61df_1641477262829] entityPath[$cbs]: Retry #1. Transient error occurred. Retrying after 4964 ms. com.azure.core.amqp.exception.AmqpException: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96] 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] terminating 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] terminating 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] completed the termination of 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] completed the termination of 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Channel already closed. 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Channel already closed. 2022-01-06 15:14:25,222 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] SendLinkHandler disposed. Remaining: 1 2022-01-06 15:14:25,222 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] SendLinkHandler disposed. Remaining: 1 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - onLinkRemoteClose connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver], errorCondition[amqp:connection:forced] errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25] 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - onLinkRemoteClose connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver], errorCondition[amqp:connection:forced] errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25] 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver] state[ACTIVE] Local link state is not closed. 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver] state[ACTIVE] Local link state is not closed. 2022-01-06 15:14:25,224 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] ReceiveLinkHandler disposed. Remaining: 0

To Reproduce This issue is coming randomly.

Expected behavior ServiceBusReceiverClient should consume the messages available in subscription.

Setup (please complete the following information):

  • OS: Linux
  • IDE: IntelliJ
  • Library/Libraries: com.azure:azure-messaging-servicebus-7.5.1.jar
  • Java version: Java 8

Additional context Similar to issue https://github.com/Azure/azure-sdk-for-java/issues/26138

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added

Issue Analytics

  • State:open
  • Created 2 years ago
  • Comments:18 (6 by maintainers)

github_iconTop GitHub Comments

2reactions
anuchandycommented, Jun 28, 2022

Hi @nomisRev, to give you some background, the messages arrive via an amqp-link that has the life cycle “open-event -> message-delivery-events -> close-event”. The LowLevelClient creates a new amqp-link when the current amqp-link emits a close-event. You asked me in the past where this link recovery logic is, which I missed to follow up; the logic is in the class “ServiceBusReceiveLinkProcessor” here.

An amqp-session owns the amqp-link, and multiple amqp-session are owned by an amqp-connection. Both of these ancestors have the life-cycle, and has recovery logic. The amqp-connection recovery logic is in the generic class “AmqpChannelProcessor” here .

As you can see, the underlying types “react” to close (successful or error) events by creating a new amqp-link / amqp-connection (collectively called channels).

The expectation is that these “reactive” logic should take care of recovery, but there were edge cases (which are very hard to repro) where these “reactive” handlers never get to kick off recovery due to signal missing. If you go through the SDK changelog, you can find multiple signal loss root causing via DEBUG logs. A signal loss bug fix for the “reactive” recovery is coming in mid-July release (details here).

The historical reason for “HighLevelClient” using a timer to “proactive” channel check (in addition to “LowLevelClient”'s “reactive” channel check) is something I need to check.

Interestingly, you’re not (so far) seem to have recovery problems with the queue entity but with the topic entity. I’ll kick off a long-running topic test with DEBUG enabled to see if we can get some helpful log.


Hi @mseiler90, thanks for the Camel project; I’ll use it to debug to understand how Camel is doing wiring with SDK.

But your observation “It seems that this has only been happening a specific queue where we are picking up the message and then sending a REST API call to another service” is giving some hint. I think what could be happening here is -

  1. A subset of REST API call blocks a long time, eventually throwing read-timeout.
  2. Let’s say those problematic API call blocks for 90 seconds, which will block the SDK callback for that period. I.e., no message will be read from the queue for 90 sec, leading to an increase in queue depth.
  3. Then eventually, API calls throw read timeout; if the app doesn’t handle it, it gets propagated to SDK callback which stops the receive-flux (refer my previous comment on exception handling) that further increased the queue depth.

As you investigate the REST API call connection (read) timeout, see if app was running into the above flow.

I’ll find sometime go over the Camel sample to understand the Camel’s bridging with the SDK and see if there is any issue in bridging.

2reactions
nomisRevcommented, Jun 27, 2022

Hey @anuchandy,

I can confirm that we’re not throwing an exception from our own code, all our code is wrapped in try/catch where it interleaves code with the Azure SDK. So we have try/catch around our processing of the messages, as well as around the settlement of the messages. We have added logs everywhere in our Azure interaction and cannot see errors coming from Azure nor from our own code. We’re seeing this on Topics, and we don’t seem to be having issues with Queues btw.

As mentioned in my comments above, HighLevelClient doesn’t seem to have this issue because it implements a patch around LowLevelClient. I am curious why this patch is placed inside the HighLevelClient rather than the LowLevelClient, the LowLevelClient should also act on changes in the ServiceBusConnectionProcessor#isChannelClosed no?

Currently I can also see that HighLevelClient is polling the public boolean isChannelClosed() state, but it seems this can also be implemented in a reactive manner to improve perf and reaction time.

I’m happy to discuss further, or share any more details if you need more information. I can sadly not share our production code, but I can rewrite them as I’ve done above to share a patch. We seem to have success with this patch, but more time / battle testing in production will confirm that.

Read more comments on GitHub >

github_iconTop Results From Across the Web

ServiceBusReceiverAsyncClient Class - Microsoft Learn
This returns an infinite stream of messages from Service Bus. The stream ends when the subscription is disposed or other terminal scenarios. See...
Read more >
Azure Service Bus client library for Java - Javadoc.io
To receive messages, you will need to create a ServiceBusProcessorClient with callbacks for incoming messages and any error that occurs in the process....
Read more >
April 2021 - Jaliya's Blog
Azure Service Bus Client Library for .NET mainly provides two approaches for consumers to consume messages from a Queue/Subscription, those are ...
Read more >
azure-servicebus - PyPI
Send and receive messages within your Service Bus channels. ... but requires a subscription, of which there can be multiple in parallel, to...
Read more >
Reliable Messaging using Azure Functions and Service Bus
In this presentation, we will demonstrate how to construct messaging listeners known as triggers, as well as how to easily send messages ......
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found