[BUG] ServiceBusReceiverClient stops consuming messages after some time though messages are present in subscription.
See original GitHub issueDescribe the bug We are using ServiceBusReceiverClient as receiver but we have observed that though the services were running and messages were present in subscription it has stopped consuming the messages .
Exception or Stack Trace
2022-01-06` 15:14:25,169 ERROR [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Error in SendLinkHandler. Disposing unconfirmed sends. The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96] com.azure.core.amqp.exception.AmqpException: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96] at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85) at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:110) at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:61) at com.azure.core.amqp.implementation.handler.SendLinkHandler.onLinkRemoteClose(SendLinkHandler.java:29) at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:92) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2022-01-06 15:14:25,196 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Closing request/response channel. 2022-01-06 15:14:25,196 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Closing request/response channel. 2022-01-06 15:14:25,220 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel:$cbs - namespace[MF_7b61df_1641477262829] entityPath[$cbs]: Retry #1. Transient error occurred. Retrying after 4964 ms. com.azure.core.amqp.exception.AmqpException: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96] 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] terminating 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] terminating 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] completed the termination of 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] completed the termination of 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Channel already closed. 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Channel already closed. 2022-01-06 15:14:25,222 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] SendLinkHandler disposed. Remaining: 1 2022-01-06 15:14:25,222 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] SendLinkHandler disposed. Remaining: 1 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - onLinkRemoteClose connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver], errorCondition[amqp:connection:forced] errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25] 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - onLinkRemoteClose connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver], errorCondition[amqp:connection:forced] errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25] 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver] state[ACTIVE] Local link state is not closed. 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver] state[ACTIVE] Local link state is not closed. 2022-01-06 15:14:25,224 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] ReceiveLinkHandler disposed. Remaining: 0
To Reproduce This issue is coming randomly.
Expected behavior ServiceBusReceiverClient should consume the messages available in subscription.
Setup (please complete the following information):
- OS: Linux
- IDE: IntelliJ
- Library/Libraries: com.azure:azure-messaging-servicebus-7.5.1.jar
- Java version: Java 8
Additional context Similar to issue https://github.com/Azure/azure-sdk-for-java/issues/26138
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- Bug Description Added
- Repro Steps Added
- Setup information Added
Issue Analytics
- State:
- Created 2 years ago
- Comments:18 (6 by maintainers)
Top Related StackOverflow Question
Hi @nomisRev, to give you some background, the messages arrive via an
amqp-linkthat has the life cycle “open-event -> message-delivery-events -> close-event”. TheLowLevelClientcreates a newamqp-linkwhen the currentamqp-linkemits a close-event. You asked me in the past where this link recovery logic is, which I missed to follow up; the logic is in the class “ServiceBusReceiveLinkProcessor” here.An
amqp-sessionowns theamqp-link, and multipleamqp-sessionare owned by anamqp-connection. Both of these ancestors have the life-cycle, and has recovery logic. Theamqp-connectionrecovery logic is in the generic class “AmqpChannelProcessor” here .As you can see, the underlying types “react” to close (successful or error) events by creating a new
amqp-link/amqp-connection(collectively called channels).The expectation is that these “reactive” logic should take care of recovery, but there were edge cases (which are very hard to repro) where these “reactive” handlers never get to kick off recovery due to signal missing. If you go through the SDK changelog, you can find multiple signal loss root causing via DEBUG logs. A signal loss bug fix for the “reactive” recovery is coming in mid-July release (details here).
The historical reason for “HighLevelClient” using a timer to “proactive” channel check (in addition to “LowLevelClient”'s “reactive” channel check) is something I need to check.
Interestingly, you’re not (so far) seem to have recovery problems with the queue entity but with the topic entity. I’ll kick off a long-running topic test with DEBUG enabled to see if we can get some helpful log.
Hi @mseiler90, thanks for the Camel project; I’ll use it to debug to understand how Camel is doing wiring with SDK.
But your observation “It seems that this has only been happening a specific queue where we are picking up the message and then sending a REST API call to another service” is giving some hint. I think what could be happening here is -
As you investigate the REST API call connection (read) timeout, see if app was running into the above flow.
I’ll find sometime go over the Camel sample to understand the Camel’s bridging with the SDK and see if there is any issue in bridging.
Hey @anuchandy,
I can confirm that we’re not throwing an exception from our own code, all our code is wrapped in
try/catchwhere it interleaves code with the Azure SDK. So we havetry/catcharound our processing of the messages, as well as around the settlement of the messages. We have added logs everywhere in our Azure interaction and cannot see errors coming from Azure nor from our own code. We’re seeing this on Topics, and we don’t seem to be having issues with Queues btw.As mentioned in my comments above,
HighLevelClientdoesn’t seem to have this issue because it implements a patch aroundLowLevelClient. I am curious why this patch is placed inside theHighLevelClientrather than theLowLevelClient, theLowLevelClientshould also act on changes in theServiceBusConnectionProcessor#isChannelClosedno?Currently I can also see that
HighLevelClientis polling thepublic boolean isChannelClosed()state, but it seems this can also be implemented in a reactive manner to improve perf and reaction time.I’m happy to discuss further, or share any more details if you need more information. I can sadly not share our production code, but I can rewrite them as I’ve done above to share a patch. We seem to have success with this patch, but more time / battle testing in production will confirm that.