Skip to content

Commit b8ca99d

Browse files
committed
[IMP] orm: add optional parallelism to iter_browse.__attr__()
In some cases, e.g. if it is known that calling a certain method on the model will only trigger inserts or it is clear that updates will be disjunct, such method calls can be done in parallel.
1 parent 578982c commit b8ca99d

File tree

1 file changed

+94
-9
lines changed

1 file changed

+94
-9
lines changed

src/util/orm.py

Lines changed: 94 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,23 @@
99
on this module work along the ORM of *all* supported versions.
1010
"""
1111

12+
import collections
1213
import logging
14+
import multiprocessing
15+
import os
1316
import re
17+
import sys
1418
import uuid
1519
from contextlib import contextmanager
1620
from functools import wraps
17-
from itertools import chain
21+
from itertools import chain, repeat
1822
from textwrap import dedent
1923

24+
try:
25+
from concurrent.futures import ProcessPoolExecutor
26+
except ImportError:
27+
ProcessPoolExecutor = None
28+
2029
try:
2130
from unittest.mock import patch
2231
except ImportError:
@@ -28,9 +37,9 @@
2837
except ImportError:
2938
from odoo import SUPERUSER_ID
3039
from odoo import fields as ofields
31-
from odoo import modules, release
40+
from odoo import modules, release, sql_db
3241
except ImportError:
33-
from openerp import SUPERUSER_ID, modules, release
42+
from openerp import SUPERUSER_ID, modules, release, sql_db
3443

3544
try:
3645
from openerp import fields as ofields
@@ -42,8 +51,8 @@
4251
from .const import BIG_TABLE_THRESHOLD
4352
from .exceptions import MigrationError
4453
from .helpers import table_of_model
45-
from .misc import chunks, log_progress, version_between, version_gte
46-
from .pg import SQLStr, column_exists, format_query, get_columns, named_cursor
54+
from .misc import chunks, log_progress, str2bool, version_between, version_gte
55+
from .pg import SQLStr, column_exists, format_query, get_columns, get_max_workers, named_cursor
4756

4857
# python3 shims
4958
try:
@@ -53,6 +62,10 @@
5362

5463
_logger = logging.getLogger(__name__)
5564

65+
UPG_PARALLEL_ITER_BROWSE = str2bool(os.environ.get("UPG_PARALLEL_ITER_BROWSE", "0"))
66+
# FIXME: for CI! Remove before merge
67+
UPG_PARALLEL_ITER_BROWSE = True
68+
5669

5770
def env(cr):
5871
"""
@@ -342,6 +355,21 @@ def get_ids():
342355
cr.execute("DROP TABLE IF EXISTS _upgrade_rf")
343356

344357

358+
def _mp_iter_browse_cb(ids_or_values, params):
359+
me = _mp_iter_browse_cb
360+
# init upon first call. Done here instead of initializer callback, because py3.6 doesn't have it
361+
if not hasattr(me, "env"):
362+
sql_db._Pool = None # children cannot borrow from copies of the same pool, it will cause protocol error
363+
me.env = env(sql_db.db_connect(params["dbname"]).cursor())
364+
me.env.clear()
365+
# process
366+
if params["mode"] == "browse":
367+
getattr(
368+
me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"]
369+
)(*params["args"], **params["kwargs"])
370+
me.env.cr.commit()
371+
372+
345373
class iter_browse(object):
346374
"""
347375
Iterate over recordsets.
@@ -389,7 +417,18 @@ class iter_browse(object):
389417
See also :func:`~odoo.upgrade.util.orm.env`
390418
"""
391419

392-
__slots__ = ("_chunk_size", "_cr_uid", "_ids", "_it", "_logger", "_model", "_patch", "_size", "_strategy")
420+
__slots__ = (
421+
"_chunk_size",
422+
"_cr_uid",
423+
"_ids",
424+
"_it",
425+
"_logger",
426+
"_model",
427+
"_patch",
428+
"_size",
429+
"_strategy",
430+
"_task_size",
431+
)
393432

394433
def __init__(self, model, *args, **kw):
395434
assert len(args) in [1, 3] # either (cr, uid, ids) or (ids,)
@@ -400,7 +439,28 @@ def __init__(self, model, *args, **kw):
400439
self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument
401440
self._logger = kw.pop("logger", _logger)
402441
self._strategy = kw.pop("strategy", "flush")
403-
assert self._strategy in {"flush", "commit"}
442+
assert self._strategy in {"flush", "commit", "multiprocessing"}
443+
if self._strategy == "multiprocessing":
444+
if not ProcessPoolExecutor:
445+
raise ValueError("multiprocessing strategy can not be used in scripts run by python2")
446+
if UPG_PARALLEL_ITER_BROWSE:
447+
self._task_size = self._chunk_size
448+
self._chunk_size = min(get_max_workers() * 10 * self._task_size, 1000000)
449+
else:
450+
self._strategy = "commit" # downgrade
451+
if self._size > 100000:
452+
_logger.warning(
453+
"Browsing %d %s, which may take a long time. "
454+
"This can be sped up by setting the env variable UPG_PARALLEL_ITER_BROWSE to 1. "
455+
"If you do, be sure to examine the results carefully.",
456+
self._size,
457+
self._model._name,
458+
)
459+
else:
460+
_logger.info(
461+
"Caller requested multiprocessing strategy, but UPG_PARALLEL_ITER_BROWSE env var is not set. "
462+
"Downgrading strategy to commit.",
463+
)
404464
if kw:
405465
raise TypeError("Unknown arguments: %s" % ", ".join(kw))
406466

@@ -447,7 +507,7 @@ def _browse(self, ids):
447507
return self._model.browse(*args)
448508

449509
def _end(self):
450-
if self._strategy == "commit":
510+
if self._strategy in ["commit", "multiprocessing"]:
451511
self._model.env.cr.commit()
452512
else:
453513
flush(self._model)
@@ -482,7 +542,32 @@ def __getattr__(self, attr):
482542

483543
def caller(*args, **kwargs):
484544
args = self._cr_uid + args
485-
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
545+
if self._strategy != "multiprocessing":
546+
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
547+
params = {
548+
"dbname": self._model.env.cr.dbname,
549+
"model_name": self._model._name,
550+
# convert to dict for pickle. Will still break if any value in the context is not pickleable
551+
"context": dict(self._model.env.context),
552+
"attr_name": attr,
553+
"args": args,
554+
"kwargs": kwargs,
555+
"mode": "browse",
556+
}
557+
self._model.env.cr.commit()
558+
extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {}
559+
with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor:
560+
for chunk in it:
561+
collections.deque(
562+
executor.map(
563+
_mp_iter_browse_cb, chunks(chunk._ids, self._task_size, fmt=tuple), repeat(params)
564+
),
565+
maxlen=0,
566+
)
567+
next(self._end(), None)
568+
# do not return results in // mode, we expect it to be used for huge numbers of
569+
# records and thus would risk MemoryError, also we cannot know if what attr returns is pickleable
570+
return None
486571

487572
self._it = None
488573
return caller

0 commit comments

Comments
 (0)