[BUG] reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException while consuming events from EventHub
See original GitHub issueDescribe the bug
I have a very simple spring-cloud-azure-starter-stream-eventhubs application, all it does is consume the message and increment a counter.
@Bean
public Consumer<Message<SimpleEvent>> consume() {
return message -> {
counter.incrementAndGet();
LOG.debug("Consumed '{}'", message.getPayload());
}
After updating to spring-cloud-azure-dependencies to 4.0.0-beta.2, I’m getting the following error:
021-12-14 08:42:57.108 WARN 82909 --- [ parallel-5] c.a.s.e.c.RecordCheckpointManager : Consumer group '$default' failed to checkpoint {body=***, offset=12899279960, sequenceNumber=187894, enqueuedTime=2021-12-13T07:12:44.419Z} on partition 0
reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 32
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:365) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:551) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:268) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:432) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:676) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:135) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268) ~[reactor-netty-http-1.0.13.jar:1.0.13]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:271) ~[reactor-netty-http-1.0.13.jar:1.0.13]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoDelaySubscription.accept(MonoDelaySubscription.java:53) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoDelaySubscription.accept(MonoDelaySubscription.java:34) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.FluxDelaySubscription$DelaySubscriptionOtherSubscriber.onNext(FluxDelaySubscription.java:131) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.12.jar:3.4.12]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
2021-12-14 08:42:57.108 ERROR 82909 --- [ parallel-5] reactor.core.publisher.Operators : Operator called default onErrorDropped
This happens in both BATCH, RECORD, and MANUAL modes:
spring:
cloud:
stream:
eventhubs:
bindings:
consume-in-0:
consumer:
checkpoint:
mode: RECORD
When using TIME with an interval of 1m however, this doesn’t happen. We need to have the ability to use MANUAL.
Exception or Stack Trace
021-12-14 08:42:57.108 WARN 82909 --- [ parallel-5] c.a.s.e.c.RecordCheckpointManager : Consumer group '$default' failed to checkpoint {body=***, offset=12899279960, sequenceNumber=187894, enqueuedTime=2021-12-13T07:12:44.419Z} on partition 0
reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 32
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:365) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:551) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:268) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:432) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:676) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:135) ~[reactor-netty-core-1.0.13.jar:1.0.13]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268) ~[reactor-netty-http-1.0.13.jar:1.0.13]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:271) ~[reactor-netty-http-1.0.13.jar:1.0.13]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoDelaySubscription.accept(MonoDelaySubscription.java:53) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoDelaySubscription.accept(MonoDelaySubscription.java:34) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.FluxDelaySubscription$DelaySubscriptionOtherSubscriber.onNext(FluxDelaySubscription.java:131) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.12.jar:3.4.12]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.12.jar:3.4.12]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
2021-12-14 08:42:57.108 ERROR 82909 --- [ parallel-5] reactor.core.publisher.Operators : Operator called default onErrorDropped
Expected behavior The events to be checkpointed successfully.
Setup (please complete the following information):
- OS: Mac 11.6.1 (Big Sur)
- IDE: IntelliJ
- Library/Libraries: spring-cloud-azure-dependencies:4.0.0-beta.2
- Java version: 17
- Frameworks: [e.g. Spring Boot, Micronaut, Quarkus, etc]
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- Bug Description Added
- Repro Steps Added
- Setup information Added
Issue Analytics
- State:
- Created 2 years ago
- Comments:6 (3 by maintainers)
Top Results From Across the Web
reactor.netty.internal.shaded.reactor.pool ... - GitHub
I am using spring cloud stream to connect to azure event hub, while receiving events I am getting this issue, please help me...
Read more >Spring WebClient reactor.netty.internal.shaded.reactor.pool ...
There is no error in the server logs. I think because the client is pushing millions of messages, the Webclient is running out...
Read more >Reactor Netty Reference Guide
This section provides a brief overview of Reactor Netty reference documentation. You do not need to read this guide in a linear fashion....
Read more >When connecting to event hub storage AccessKey with + ...
I have a spring boot app using camel to connect to Azure event hub. I can publish events to Azure event hub but...
Read more >reactor/reactor-netty - Gitter
1, I am noticing connection pool is getting created multiple times, even though I am only creating the http client once in my...
Read more >
Top Related Medium Post
No results found
Top Related StackOverflow Question
No results found
Troubleshoot Live Code
Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free
Top Related Reddit Thread
No results found
Top Related Hackernoon Post
No results found
Top Related Tweet
No results found
Top Related Dev.to Post
No results found
Top Related Hashnode Post
No results found
That did the trick, once I’ve changed the value as you suggested things got back to normal. Thanks! I would love to hear any conclusions your investigation yields. For now, I will keep it at 500.
Hi @avpines , we will keep the default value consistent with the previous versions, which is 500. And this has been fixed via #26083 and you could expect it in our next release. Thanks.