Skip to content

Commit ef79dc6

Browse files
committed
[IMP] orm: add optional // attr call to iter_browse
In some cases, e.g. if it is known that calling a certain method on the model will only trigger inserts or it is clear that updates will be disjunct, such method calls can be done in parallel.
1 parent 5d515f7 commit ef79dc6

File tree

1 file changed

+95
-8
lines changed

1 file changed

+95
-8
lines changed

src/util/orm.py

Lines changed: 95 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,23 @@
99
on this module work along the ORM of *all* supported versions.
1010
"""
1111

12+
import collections
1213
import logging
14+
import multiprocessing
15+
import os
1316
import re
17+
import sys
1418
import uuid
1519
from contextlib import contextmanager
1620
from functools import wraps
17-
from itertools import chain
21+
from itertools import chain, repeat
1822
from textwrap import dedent
1923

24+
try:
25+
from concurrent.futures import ProcessPoolExecutor
26+
except ImportError:
27+
ProcessPoolExecutor = None
28+
2029
try:
2130
from unittest.mock import patch
2231
except ImportError:
@@ -28,9 +37,9 @@
2837
except ImportError:
2938
from odoo import SUPERUSER_ID
3039
from odoo import fields as ofields
31-
from odoo import modules, release
40+
from odoo import modules, release, sql_db
3241
except ImportError:
33-
from openerp import SUPERUSER_ID, modules, release
42+
from openerp import SUPERUSER_ID, modules, release, sql_db
3443

3544
try:
3645
from openerp import fields as ofields
@@ -42,8 +51,8 @@
4251
from .const import BIG_TABLE_THRESHOLD
4352
from .exceptions import MigrationError
4453
from .helpers import table_of_model
45-
from .misc import chunks, log_progress, version_between, version_gte
46-
from .pg import SQLStr, column_exists, format_query, get_columns, named_cursor
54+
from .misc import chunks, log_progress, str2bool, version_between, version_gte
55+
from .pg import SQLStr, column_exists, format_query, get_columns, get_max_workers, named_cursor
4756

4857
# python3 shims
4958
try:
@@ -53,6 +62,10 @@
5362

5463
_logger = logging.getLogger(__name__)
5564

65+
UPG_PARALLEL_ITER_BROWSE = str2bool(os.environ.get("UPG_PARALLEL_ITER_BROWSE", "0"))
66+
# FIXME: for CI! Remove before merge
67+
UPG_PARALLEL_ITER_BROWSE = True
68+
5669

5770
def env(cr):
5871
"""
@@ -342,6 +355,23 @@ def get_ids():
342355
cr.execute("DROP TABLE IF EXISTS _upgrade_rf")
343356

344357

358+
def _mp_iter_browse_cb(ids_or_values, params):
359+
me = _mp_iter_browse_cb
360+
# init upon first call. Done here instead of initializer callback, because py3.6 doesn't have it
361+
if not hasattr(me, "env"):
362+
sql_db._Pool = None # children cannot borrow from copies of the same pool, it will cause protocol error
363+
me.env = env(sql_db.db_connect(params["dbname"]).cursor())
364+
me.env.clear()
365+
# process
366+
if params["mode"] == "browse":
367+
getattr(
368+
me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"]
369+
)(*params["args"], **params["kwargs"])
370+
if params["mode"] == "create":
371+
me.env[params["model_name"]].with_context(params["context"]).create(ids_or_values)
372+
me.env.cr.commit()
373+
374+
345375
class iter_browse(object):
346376
"""
347377
Iterate over recordsets.
@@ -389,7 +419,18 @@ class iter_browse(object):
389419
See also :func:`~odoo.upgrade.util.orm.env`
390420
"""
391421

392-
__slots__ = ("_chunk_size", "_cr_uid", "_ids", "_it", "_logger", "_model", "_patch", "_size", "_strategy")
422+
__slots__ = (
423+
"_chunk_size",
424+
"_cr_uid",
425+
"_ids",
426+
"_it",
427+
"_logger",
428+
"_model",
429+
"_patch",
430+
"_size",
431+
"_strategy",
432+
"_task_size",
433+
)
393434

394435
def __init__(self, model, *args, **kw):
395436
assert len(args) in [1, 3] # either (cr, uid, ids) or (ids,)
@@ -400,7 +441,28 @@ def __init__(self, model, *args, **kw):
400441
self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument
401442
self._logger = kw.pop("logger", _logger)
402443
self._strategy = kw.pop("strategy", "flush")
403-
assert self._strategy in {"flush", "commit"}
444+
assert self._strategy in {"flush", "commit", "multiprocessing"}
445+
if self._strategy == "multiprocessing":
446+
if not ProcessPoolExecutor:
447+
raise ValueError("multiprocessing mode can not be used in scripts run by python2")
448+
if UPG_PARALLEL_ITER_BROWSE:
449+
self._task_size = self._chunk_size
450+
self._chunk_size = min(get_max_workers() * 10 * self._task_size, 1000000)
451+
else:
452+
self._strategy = "commit" # downgrade
453+
if self._size > 100000:
454+
_logger.warning(
455+
"Browsing %d %s, which may take a long time. "
456+
"This can be sped up by setting the env variable UPG_PARALLEL_ITER_BROWSE to 1. "
457+
"If you do, be sure to examine the results carefully.",
458+
self._size,
459+
self._model._name,
460+
)
461+
else:
462+
_logger.info(
463+
"Caller requested multiprocessing strategy, but UPG_PARALLEL_ITER_BROWSE env var is not set. "
464+
"Downgrading strategy to commit.",
465+
)
404466
if kw:
405467
raise TypeError("Unknown arguments: %s" % ", ".join(kw))
406468

@@ -447,7 +509,7 @@ def _browse(self, ids):
447509
return self._model.browse(*args)
448510

449511
def _end(self):
450-
if self._strategy == "commit":
512+
if self._strategy in ["commit", "multiprocessing"]:
451513
self._model.env.cr.commit()
452514
else:
453515
flush(self._model)
@@ -482,6 +544,31 @@ def __getattr__(self, attr):
482544

483545
def caller(*args, **kwargs):
484546
args = self._cr_uid + args
547+
if self._strategy == "multiprocessing":
548+
params = {
549+
"dbname": self._model.env.cr.dbname,
550+
"model_name": self._model._name,
551+
# convert to dict for pickle. Will still break if any value in the context is not pickleable
552+
"context": dict(self._model.env.context),
553+
"attr_name": attr,
554+
"args": args,
555+
"kwargs": kwargs,
556+
"mode": "browse",
557+
}
558+
self._model.env.cr.commit()
559+
extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {}
560+
with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor:
561+
for chunk in it:
562+
collections.deque(
563+
executor.map(
564+
_mp_iter_browse_cb, chunks(chunk._ids, self._task_size, fmt=tuple), repeat(params)
565+
),
566+
maxlen=0,
567+
)
568+
next(self._end(), None)
569+
# do not return results in // mode, we expect it to be used for huge numbers of
570+
# records and thus would risk MemoryError
571+
return None
485572
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
486573

487574
self._it = None

0 commit comments

Comments
 (0)