Queue overflow error, change MAX_QUEUE_SIZE value

See original GitHub issue

My code:

# callback
def process_message_price(msg):
   print(msg)
# websocket
bm = ThreadedWebsocketManager()
bm.start()
# listOfPairings: all pairs with USDT (over 250 items in list)
for pairing in listOfPairings:
     conn_key = bm.start_trade_socket(callback=process_message_price,symbol=pairing)

bm.join()

hovewer after shor time, i am getting following error: ‘e’: ‘error’, ‘m’: ‘Queue overflow. Message not filled’

which is caused by MAX_QUEUE_SIZE in streams.py being too small for my program

How can i change this value outside of streams.py file ?

Thx

Issue Analytics

  • State:open
  • Created 2 years ago
  • Reactions:1
  • Comments:20

github_iconTop GitHub Comments

5reactions
Blademaster680commented, Dec 17, 2021

I was just playing around with the documentation code and got this same error when I was manually using the Asynchronous context manager.

What sorted it out for me was once you open the connection, you recieve the message you should close the context manager Here is my code:

async def run_listener():
    while True:
        await socket.__aenter__()
        msg = await socket.recv()
        await socket.__aexit__(None, None, None)
        try:
            frame = create_frame(msg)
            frame.to_sql('BTCUSDT', engine, if_exists='append', index=False)
            print(frame)
        except:
            print(f'Error: {msg["m"]}')

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_listener())

Its the line await socket.__aexit__(None, None, None) that sorted out the issue for me

3reactions
dpcweecommented, Sep 12, 2021

soo… is there a way, how to not get this error ? what should i change ?

from threading import Thread

class Stream():
        
    def start(self):
        self.bm = ThreadedWebsocketManager()
        self.bm.start()
        self.stream_error = False
        self.multiplex_list = list()
            
        # listOfPairings: all pairs with USDT (over 250 items in list)
        for pairing in listOfPairings:
            self.multiplex_list.append(pairing.lower() + '@trade')
        self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list)
        
        # monitoring the error
        stop_trades = threading.Thread(target = stream.restart_stream, daemon = True)
        stop_trades.start()
        
    def realtime(self, msg):
        if 'data' in msg:
            # Your code
        else:
            self.stream_error = True
        
    def restart_stream(self):
        while True:
            time.sleep(1)
            if self.stream_error == True:
                self.bm.stop_socket(self.multiplex)
                time.sleep(5)
                self.stream_error = False
                self.multiplex = self.bm.start_multiplex_socket(callback = realtime, streams = self.multiplex_list)

stream = Stream()
stream.start()
stream.bm.join()

And put MAX_QUEUE_SIZE = 10000 There is no other way to fix this error I have no more than 10-15 restarts of the web socket on all USDT pairs per day (while the restart occurs within 5-7 seconds after the error)

Calling the restart of the stream from except did not work for me either, only in the function as a separate thread

Read more comments on GitHub >

github_iconTop Results From Across the Web

How maxQueueSize and queueSizeRejectionThreshold work ...
According to my interpretation, if a server is not able to serve a request, then the clients will store the thread in the...
Read more >
nginx - FreeBSD Listen Queue Overflows - can't increase max ...
For such environments, it is recommended to increase this value to 1024 or higher. The service daemon may itself limit the listen queue...
Read more >
Is there a way to increase the maxQueueSize for Syslog output?
Hello Splunkers, I would like to know if there is any way to increase the queue of my syslog group. I mean, currently...
Read more >
"Message queue is full" causes Foglight Agent Manager ...
To fix this, please increase the upstream max-disk-space setting for this agent manager. Cause. Max disk space is set to low value and...
Read more >
Playing with Hystrix Thread Pool - Medium
When value is -1, a SynchronousQueue is used instead, a simple ... the maxQueueSize, only takes effect when maxQueueSize value is positive.
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found