-
Notifications
You must be signed in to change notification settings - Fork 669
Closed
Labels
Memory 💾Issues related to memoryIssues related to memoryP1Important tasks that we should complete soonImportant tasks that we should complete soonRay ⚡Issues related to the Ray engineIssues related to the Ray enginequestion ❓Questions about ModinQuestions about Modin
Description
The situation worsens when the flow code is filled with a large number of single-column inserts, resulting in a large number of column partitions, each of which stores its own version of the multiIndex.
Solutions to this problem can be:
- reduce the number of partitions using environment variables
- when creating a new partition, store as many columns as possible in it (on the other hand, the number of column partitions must be greater than one to parallelize calculations). This requires modification of operations such as setitem/insert and so on.
- store internal dataframes with range indexes (just like placeholders) and only for the duration of operations set them to a valid index, which will need to be stored somewhere.
Code to reproduce:
import modin.pandas as pd
import numpy as np
import ray
np.random.seed(42)
count_rows = 10**6
nonunique_ratio = 0.7
base_strings = ["long_string_dataaaaaaaaaaaaaaaa{}", "cat{}"]
arrays = [None, None]
for idx in range(len(arrays)):
nonunique_count = int(nonunique_ratio * count_rows)
data = [base_strings[idx].format(x) for x in range(nonunique_count // 2)]
nonunique_data = np.append(np.array(data), np.array(data))
unique_data = np.array(
[base_strings[idx].format(x) for x in range(nonunique_count, count_rows)]
)
arrays[idx] = np.append(nonunique_data, unique_data)
np.random.shuffle(arrays[idx])
index = pd.MultiIndex.from_arrays(arrays)
print(index)
# initialize ray as Modin does
_ = pd.DataFrame([1, 2])
# `ray memory` before all explicit put operations
# --- Aggregate object store stats across all nodes ---
# Plasma memory usage 0 MiB, 4 objects,
# put in storage by part
count_cpus = 100
one_part = count_rows // count_cpus
refs = [None] * count_cpus
for idx in range(count_cpus):
refs[idx] = ray.put(
index[idx * one_part : (idx + 1) * one_part]
) # it takes ~ 3210 MiB in storage
# `ray memory` output after several ray.put operations
# --- Aggregate object store stats across all nodes ---
# Plasma memory usage 3210 MiB, 104 objects,
# put in storage at once
ref = ray.put(index) # it takes ~ 40 MiB in storage
# `ray memory` output after last put operation
# --- Aggregate object store stats across all nodes ---
# Plasma memory usage 3250 MiB, 105 objects,Metadata
Metadata
Assignees
Labels
Memory 💾Issues related to memoryIssues related to memoryP1Important tasks that we should complete soonImportant tasks that we should complete soonRay ⚡Issues related to the Ray engineIssues related to the Ray enginequestion ❓Questions about ModinQuestions about Modin