How to clear objects from the object store in ray?

See original GitHub issue

I am trying out the promising multiprocessing package ray. I have a problem I seem not be able to solve. My program runs fine the first time, but at a second run this Exception is raised on the ray.put() line:

ObjectStoreFullError: Failed to put object ffffffffffffffffffffffffffffffffffffffff010000000c000000 in object store because it is full. Object size is 2151680255 bytes.
The local object store is full of objects that are still in scope and cannot be evicted. Tip: Use the `ray memory` command to list active objects in the cluster.

What do I want to do: In my actual code (I’m planning to write) I need to process many big_data_objects sequentially. I want to hold one big_data_object in memory at a time and do several heavy (independent) computations on the big data object. I want to execute these computations in parallel. When these are done, I have to replace these big_data_object in the object store by new ones and start the computations (in parallel) again.

Using my test script I simulate this by starting the script again without ray.shutdown(). If I shutdown ray using ray.shutdown() the object store is cleared but then reinitializing takes a long time and I cannot process multiple big_data_object sequentially as I want to.

What sources of information have I studied: I studied this document Ray Design Patterns and studied the section ‘Antipattern: Closure capture of large / unserializable object’ and how to the proper pattern(s) should look like. I also studied the getting started guide which lead to the following test script.

A minimum example to reproduce the problem: I created a test script to test this. It is this:

#%% Imports
import ray
import time
import psutil
import numpy as np


#%% Testing ray
# Start Ray
num_cpus = psutil.cpu_count(logical=False)
if not ray.is_initialized():
    ray.init(num_cpus=num_cpus, include_dashboard=False)

# Define function to do work in parallel
@ray.remote
def my_function(x):  # Later I will have multiple (different) my_functions to extract different feature from my big_data_object
    time.sleep(1)
    data_item = ray.get(big_data_object_ref)
    return data_item[0,0]+x

# Define large data
big_data_object = np.random.rand(16400,16400)  # Define an object of approx 2 GB. Works on my machine (16 GB RAM)
# big_data_object = np.random.rand(11600,11600)  # Define an object of approx 1 GB.
# big_data_object = np.random.rand(8100,8100)  # Define an object of approx 500 MB.
# big_data_object = np.random.rand(5000,5000)  # Define an object of approx 190 MB.
big_data_object_ref = ray.put(big_data_object)

# Start 4 tasks in parallel.
result_refs = []
# for item in data:
for item in range(4):
    result_refs.append(my_function.remote(item))
    
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_refs)
print("Results: {}".format(results))


#%% Clean-up object store data - Still their is a (huge) memory leak in the object store.
for index in range(4):
    del result_refs[0]

del big_data_object_ref

Where do I think it’s going wrong: I think I delete all the references to the object store in the end of the script. As a result the objects should be cleared from the object store (as described here). Apparently, something is wrong because the big_data_object remains in the object store. The results are deleted from the object store just fine, however.

Some debug information: I inspected the object store using ray memory command, this is what I get:

(c:\python\cenv38rl) PS C:\WINDOWS\system32> ray memory
---------------------------------------------------------------------------------------------------------------------
 Object ID                                                Reference Type       Object Size   Reference Creation Site
=====================================================================================================================
; worker pid=20952
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=29368
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=17388
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=24208
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=27684
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=6860
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; driver pid=28684
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\worker.py:put_object:277 | c:\python\cenv38rl\lib\site-packages\ray\worker.py:put:1489 | c:\python\cenv38rl\lib\site-packages\ray\_private\client_mode_hook.py:wrapper:47 | C:\Users\Stefan\Documents\Python examples\Multiprocess_Ray3_SO.py:<module>:42
---------------------------------------------------------------------------------------------------------------------

--- Aggregate object store stats across all nodes ---
Plasma memory usage 2052 MiB, 1 objects, 77.41% full

Some of the things I have tried: If, I replace my_function for:

@ray.remote
def my_function(x):  # Later I will have multiple different my_functions to extract separate feature from my big_data_objects
    time.sleep(1)
    # data_item = ray.get(big_data_object_ref)
    # return data_item[0,0]+x
    return 5

and then the script successfully clears the object store but my_function cannot use the big_data_object which I need it to.

My question is: How to fix my code so that the big_data_object is removed from the object store at the end of my script without shutting down ray?

Note: I installed ray using pip install ray which gave me version ray==1.2.0 which I am using now. I use ray on Windows and I develop in Spyder v4.2.5 in a conda (actually miniconda) environment in case it is relevant.

Issue Analytics

  • State:closed
  • Created 2 years ago
  • Comments:7 (3 by maintainers)

github_iconTop GitHub Comments

2reactions
rkooo567commented, Apr 1, 2021

The problem is that your remote function captures big_data_object_ref , and the ref from there is never removed. Note that when you do this type of thing

# Define function to do work in parallel
@ray.remote
def my_function(x):  # Later I will have multiple (different) my_functions to extract different feature from my big_data_object
    time.sleep(1)
    data_item = ray.get(big_data_object_ref)
    return data_item[0,0]+x
    return 5

# Define large data
big_data_object = np.random.rand(16400,16400)
big_data_object_ref = ray.put(big_data_object)

big_data_object_ref is seriliazed to the remote function definition (thus there’s a permanant pointer until you remove this serialized function definition, which is from Ray internal).

Instead use this type of pattern;

#%% Imports
import ray
import time
import psutil
import numpy as np


#%% Testing ray
# Start Ray
num_cpus = psutil.cpu_count(logical=False)
if not ray.is_initialized():
    ray.init(num_cpus=num_cpus, include_dashboard=False)

# Define function to do work in parallel
@ray.remote
def my_function(big_data_object, x):  # Later I will have multiple (different) my_functions to extract different feature from my big_data_object
    time.sleep(1)
    return big_data_object[0,0]+x
    # return 5

# Define large data
#big_data_object = np.random.rand(16400,16400)  # Define an object of approx 2 GB. Works on my machine (16 GB RAM)
# big_data_object = np.random.rand(11600,11600)  # Define an object of approx 1 GB.
big_data_object = np.random.rand(8100,8100)  # Define an object of approx 500 MB.
# big_data_object = np.random.rand(5000,5000)  # Define an object of approx 190 MB.
big_data_object_ref = ray.put(big_data_object)
print("ref in a driver ", big_data_object_ref)

# Start 4 tasks in parallel.
result_refs = []
# for item in data:
for item in range(4):
    result_refs.append(my_function.remote(big_data_object_ref, item))
    
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_refs)
print("Results: {}".format(results))
print(result_refs)

#%% Clean-up object store data - Still their is a (huge) memory leak in the object store.
#for index in range(4):
#    del result_refs[0]
del result_refs

del big_data_object_ref
import time
time.sleep(1000)

(The difference is that now we pass big_data_object_ref instead of capturing it)

1reaction
nathanincommented, Sep 3, 2021

The comments here helped me fix an issue in a setup where I need to take results and save them to disk when ready. I expect the results to be consumed pretty much as quickly as they’re available, therefore I expect near-constant memory usage. All results would be too large to hold in memory at once, so I was getting the ready tasks with ray.wait, but was totally confused about why my objects were remaining in the object store. I was doing deep copies and deleting everything. Turns out I wasn’t deleting the actual task IDs!

This pattern keeps LOCAL_REFERENCES to every returned object, resulting in growing memory usage:

data_id = ray.put(big_data)

futures = []
for i in range(Ntasks):
  futures.append( remote_fn.remote(data_id,) )

# not deleting the original futures list was the core mistake
not_ready = futures
for i n range(Ntasks):
  ready, not_ready = ray.wait(not_ready, num_returns=1)
  big_return_data = ray.get(ready[0])

  # save big_return_data to disk

  del big_return_data

This pseudocode summarizes the setup that’s working in my case:

data_id = ray.put(big_data)

# initialize the not_ready list directly with the task IDs
not_ready = []
for i in range(Ntasks):
  not_ready.append( remote_fn.remote(data_id,) )

for i n range(Ntasks):
  ready, not_ready = ray.wait(not_ready, num_returns=1)
  big_return_data = ray.get(ready[0])

  # save big_return_data to disk

  # pretty sure this would get cleared on the next iteration, but for good measure..
  del ready
  del big_return_data

Read more comments on GitHub >

github_iconTop Results From Across the Web

How to clear objects from the object store in ray?
Using my test script I simulate this by starting the script again without ray.shutdown() . If I shutdown ray using ray.shutdown() the object...
Read more >
Memory Management — Ray 0.8.6 documentation
Try Enabling LRU Fallback, which will cause unused objects referenced by IPython to be LRU evicted when the object store is full instead...
Read more >
How to clear objects from the object store in ray?
The local object store is full of objects that are still in scope and cannot be ... Tip: Use the `ray memory` command...
Read more >
ray_tutorial.py
# # **When objects are passed into a remote function, Ray puts them in the object store under the hood.** That is, if...
Read more >
Data Transfer Speed Comparison: Ray Plasma Store vs. S3
The data is stored in the objects store on B. B object store informs GCS about availability of results of id_c. This is...
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found