Skip to content

Commit 7eab714

Browse files
committed
[FIX] spreadsheet: batch process spreadsheet_revision.commands
Some dbs have `spreadsheet_revision` records with over 10 millions characters in `commands`. If the number of record is high, this leads to memory errors. We distribute them in buckets of `memory_cap` maximum size, and use a named cursor to process them in buckets. Commands larger than `memory_cap` fit in one bucket.
1 parent 86409e5 commit 7eab714

File tree

1 file changed

+57
-24
lines changed

1 file changed

+57
-24
lines changed

src/util/spreadsheet/misc.py

Lines changed: 57 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,65 @@
1-
from .. import json
1+
from .. import json, pg
2+
3+
MEMORY_CAP = 2 * 10**8 # 200MB
24

35

46
def iter_commands(cr, like_all=(), like_any=()):
57
if not (bool(like_all) ^ bool(like_any)):
68
raise ValueError("Please specify `like_all` or `like_any`, not both")
7-
cr.execute(
8-
"""
9-
SELECT id,
10-
commands
11-
FROM spreadsheet_revision
12-
WHERE commands LIKE {}(%s::text[])
13-
""".format("ALL" if like_all else "ANY"),
14-
[list(like_all or like_any)],
15-
)
16-
for revision_id, data in cr.fetchall():
17-
data_loaded = json.loads(data)
18-
if "commands" not in data_loaded:
19-
continue
20-
data_old = json.dumps(data_loaded, sort_keys=True)
21-
22-
changed = yield data_loaded["commands"]
23-
if changed is None:
24-
changed = data_old != json.dumps(data_loaded, sort_keys=True)
25-
26-
if changed:
27-
cr.execute(
28-
"UPDATE spreadsheet_revision SET commands=%s WHERE id=%s", [json.dumps(data_loaded), revision_id]
29-
)
9+
10+
with pg.named_cursor(cr, itersize=1) as ncr:
11+
ncr.execute(
12+
pg.format_query(
13+
cr,
14+
"""
15+
WITH filtered AS (
16+
SELECT id,
17+
commands,
18+
LENGTH(commands) AS commands_length
19+
FROM spreadsheet_revision
20+
WHERE commands LIKE {condition} (%s::text[])
21+
), smaller AS (
22+
SELECT id,
23+
commands,
24+
sum(commands_length) OVER (ORDER BY id) / %s AS num
25+
FROM filtered
26+
WHERE commands_length <= %s
27+
)
28+
SELECT array_agg(id ORDER BY id),
29+
array_agg(commands ORDER BY id),
30+
min(id) AS sort_key
31+
FROM smaller
32+
GROUP BY num
33+
34+
UNION ALL
35+
36+
SELECT ARRAY[id],
37+
ARRAY[commands],
38+
id AS sort_key
39+
FROM filtered
40+
WHERE commands_length > %s
41+
ORDER BY sort_key
42+
""",
43+
condition=pg.SQLStr("ALL" if like_all else "ANY"),
44+
),
45+
[list(like_any or like_all), MEMORY_CAP, MEMORY_CAP, MEMORY_CAP],
46+
)
47+
for ids, commands, _ in ncr:
48+
for revision_id, data in zip(ids, commands):
49+
data_loaded = json.loads(data)
50+
if "commands" not in data_loaded:
51+
continue
52+
data_old = json.dumps(data_loaded, sort_keys=True)
53+
54+
changed = yield data_loaded["commands"]
55+
if changed is None:
56+
changed = data_old != json.dumps(data_loaded, sort_keys=True)
57+
58+
if changed:
59+
cr.execute(
60+
"UPDATE spreadsheet_revision SET commands=%s WHERE id=%s",
61+
[json.dumps(data_loaded), revision_id],
62+
)
3063

3164

3265
def process_commands(cr, callback, *args, **kwargs):

0 commit comments

Comments
 (0)