[BUG] reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException while consuming events from EventHub

See original GitHub issue

Describe the bug I have a very simple spring-cloud-azure-starter-stream-eventhubs application, all it does is consume the message and increment a counter.

@Bean
public Consumer<Message<SimpleEvent>> consume() {
    return message -> {
      counter.incrementAndGet();
      LOG.debug("Consumed '{}'", message.getPayload());
}

After updating to spring-cloud-azure-dependencies to 4.0.0-beta.2, I’m getting the following error:

021-12-14 08:42:57.108  WARN 82909 --- [     parallel-5] c.a.s.e.c.RecordCheckpointManager        : Consumer group '$default' failed to checkpoint {body=***, offset=12899279960, sequenceNumber=187894, enqueuedTime=2021-12-13T07:12:44.419Z} on partition 0

reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 32
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:365) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:551) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:268) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:432) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:676) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:135) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268) ~[reactor-netty-http-1.0.13.jar:1.0.13]
	at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:271) ~[reactor-netty-http-1.0.13.jar:1.0.13]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoDelaySubscription.accept(MonoDelaySubscription.java:53) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoDelaySubscription.accept(MonoDelaySubscription.java:34) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.FluxDelaySubscription$DelaySubscriptionOtherSubscriber.onNext(FluxDelaySubscription.java:131) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.12.jar:3.4.12]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2021-12-14 08:42:57.108 ERROR 82909 --- [     parallel-5] reactor.core.publisher.Operators         : Operator called default onErrorDropped

This happens in both BATCH, RECORD, and MANUAL modes:

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          consume-in-0:
            consumer:
              checkpoint:
                mode: RECORD

When using TIME with an interval of 1m however, this doesn’t happen. We need to have the ability to use MANUAL.

Exception or Stack Trace

021-12-14 08:42:57.108  WARN 82909 --- [     parallel-5] c.a.s.e.c.RecordCheckpointManager        : Consumer group '$default' failed to checkpoint {body=***, offset=12899279960, sequenceNumber=187894, enqueuedTime=2021-12-13T07:12:44.419Z} on partition 0

reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 32
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:365) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:551) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:268) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:432) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:676) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:135) ~[reactor-netty-core-1.0.13.jar:1.0.13]
	at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268) ~[reactor-netty-http-1.0.13.jar:1.0.13]
	at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:271) ~[reactor-netty-http-1.0.13.jar:1.0.13]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoDelaySubscription.accept(MonoDelaySubscription.java:53) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoDelaySubscription.accept(MonoDelaySubscription.java:34) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.FluxDelaySubscription$DelaySubscriptionOtherSubscriber.onNext(FluxDelaySubscription.java:131) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.12.jar:3.4.12]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2021-12-14 08:42:57.108 ERROR 82909 --- [     parallel-5] reactor.core.publisher.Operators         : Operator called default onErrorDropped

Expected behavior The events to be checkpointed successfully.

Setup (please complete the following information):

  • OS: Mac 11.6.1 (Big Sur)
  • IDE: IntelliJ
  • Library/Libraries: spring-cloud-azure-dependencies:4.0.0-beta.2
  • Java version: 17
  • Frameworks: [e.g. Spring Boot, Micronaut, Quarkus, etc]

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added

Issue Analytics

  • State:closed
  • Created 2 years ago
  • Comments:6 (3 by maintainers)

github_iconTop GitHub Comments

2reactions
avpinescommented, Dec 16, 2021

That did the trick, once I’ve changed the value as you suggested things got back to normal. Thanks! I would love to hear any conclusions your investigation yields. For now, I will keep it at 500.

1reaction
yiliuTocommented, Dec 21, 2021

Hi @avpines , we will keep the default value consistent with the previous versions, which is 500. And this has been fixed via #26083 and you could expect it in our next release. Thanks.

Read more comments on GitHub >

github_iconTop Results From Across the Web

reactor.netty.internal.shaded.reactor.pool ... - GitHub
I am using spring cloud stream to connect to azure event hub, while receiving events I am getting this issue, please help me...
Read more >
Spring WebClient reactor.netty.internal.shaded.reactor.pool ...
There is no error in the server logs. I think because the client is pushing millions of messages, the Webclient is running out...
Read more >
Reactor Netty Reference Guide
This section provides a brief overview of Reactor Netty reference documentation. You do not need to read this guide in a linear fashion....
Read more >
When connecting to event hub storage AccessKey with + ...
I have a spring boot app using camel to connect to Azure event hub. I can publish events to Azure event hub but...
Read more >
reactor/reactor-netty - Gitter
1, I am noticing connection pool is getting created multiple times, even though I am only creating the http client once in my...
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found