Skip to content

Commit efa83ee

Browse files
committed
[IMP] orm: add optional // attr call to iter_browse
In some cases, e.g. if it is known that calling a certain method on the model will only trigger inserts or it is clear that updates will be disjunct, such method calls can be done in parallel.
1 parent 909274f commit efa83ee

File tree

1 file changed

+63
-9
lines changed

1 file changed

+63
-9
lines changed

src/util/orm.py

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
on this module work along the ORM of *all* supported versions.
1010
"""
1111

12+
import collections
1213
import logging
14+
import os
1315
import re
16+
from concurrent.futures import ProcessPoolExecutor
1417
from contextlib import contextmanager
1518
from functools import wraps
1619
from itertools import chain
@@ -27,9 +30,9 @@
2730
except ImportError:
2831
from odoo import SUPERUSER_ID
2932
from odoo import fields as ofields
30-
from odoo import modules, release
33+
from odoo import modules, release, sql_db
3134
except ImportError:
32-
from openerp import SUPERUSER_ID, modules, release
35+
from openerp import SUPERUSER_ID, modules, release, sql_db
3336

3437
try:
3538
from openerp import fields as ofields
@@ -41,8 +44,8 @@
4144
from .const import BIG_TABLE_THRESHOLD
4245
from .exceptions import MigrationError
4346
from .helpers import table_of_model
44-
from .misc import chunks, log_progress, version_between, version_gte
45-
from .pg import column_exists, format_query, get_columns, named_cursor
47+
from .misc import chunks, log_progress, str2bool, version_between, version_gte
48+
from .pg import column_exists, format_query, get_columns, get_max_workers, named_cursor
4649

4750
# python3 shims
4851
try:
@@ -52,6 +55,8 @@
5255

5356
_logger = logging.getLogger(__name__)
5457

58+
UPG_PARALLEL_ITER_BROWSE = str2bool(os.environ.get("UPG_PARALLEL_ITER_BROWSE", "0"))
59+
5560

5661
def env(cr):
5762
"""
@@ -338,6 +343,23 @@ def get_ids():
338343
invalidate(records)
339344

340345

346+
def _mp_iter_browse_cb(ids_or_values):
347+
me = _mp_iter_browse_cb
348+
# init upon first call. Done here instead of initializer callback, because py3.6 doesn't have it
349+
if not hasattr(me, "env"):
350+
sql_db._Pool = None # children cannot borrow from copies of the same pool, it will cause protocol error
351+
me.env = env(sql_db.db_connect(me.params["dbname"]).cursor())
352+
me.env.clear()
353+
# process
354+
if me.params["mode"] == "browse":
355+
getattr(me.env[me.params["model_name"]].browse(ids_or_values), me.params["attr_name"])(
356+
*me.params["args"], **me.params["kwargs"]
357+
)
358+
if me.params["mode"] == "create":
359+
me.env[me.params["model_name"]].create(ids_or_values)
360+
me.env.cr.commit()
361+
362+
341363
class iter_browse(object):
342364
"""
343365
Iterate over recordsets.
@@ -372,12 +394,11 @@ class iter_browse(object):
372394
:param model: the model to iterate
373395
:type model: :class:`odoo.model.Model`
374396
:param iterable(int) ids: iterable of IDs of the records to iterate
375-
:param int chunk_size: number of records to load in each iteration chunk, `200` by
376-
default
397+
:param int chunk_size: number of records to load in each iteration chunk, `200` by default
377398
:param logger: logger used to report the progress, by default
378399
:data:`~odoo.upgrade.util.orm._logger`
379400
:type logger: :class:`logging.Logger`
380-
:param str strategy: whether to `flush` or `commit` on each chunk, default is `flush`
401+
:param str strategy: whether to `flush`, `commit` or `multiprocess` each chunk, default is `flush`
381402
:return: the object returned by this class can be used to iterate, or call any model
382403
method, safely on millions of records.
383404
@@ -400,7 +421,19 @@ def __init__(self, model, *args, **kw):
400421
self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument
401422
self._logger = kw.pop("logger", _logger)
402423
self._strategy = kw.pop("strategy", "flush")
403-
assert self._strategy in {"flush", "commit"}
424+
assert self._strategy in {"flush", "commit", "multiprocessing"}
425+
if self._strategy == "multiprocessing":
426+
if UPG_PARALLEL_ITER_BROWSE:
427+
self._task_size = self._chunk_size
428+
self._chunk_size = max(get_max_workers() * 10 * self._task_size, 1000000)
429+
elif self._size > 100000:
430+
_logger.warning(
431+
"Browsing %d %s, which may take a long time. "
432+
"This can be sped up by setting the env variable UPG_PARALLEL_ITER_BROWSE to 1. "
433+
"If you do, be sure to examine the results carefully.",
434+
self._size,
435+
self._model._name,
436+
)
404437
if kw:
405438
raise TypeError("Unknown arguments: %s" % ", ".join(kw))
406439

@@ -417,8 +450,10 @@ def _browse(self, ids):
417450
return self._model.browse(*args)
418451

419452
def _end(self):
420-
if self._strategy == "commit":
453+
if self._strategy in ["commit", "multiprocessing"]:
421454
self._model.env.cr.commit()
455+
if self._strategy == "multiprocessing" and UPG_PARALLEL_ITER_BROWSE:
456+
delattr(_mp_iter_browse_cb, "params")
422457
else:
423458
flush(self._model)
424459
invalidate(self._model, *self._cr_uid)
@@ -452,6 +487,25 @@ def __getattr__(self, attr):
452487

453488
def caller(*args, **kwargs):
454489
args = self._cr_uid + args
490+
if self._strategy == "multiprocessing" and UPG_PARALLEL_ITER_BROWSE:
491+
_mp_iter_browse_cb.params = {
492+
"dbname": self._model.env.cr.dbname,
493+
"model_name": self._model._name,
494+
"attr_name": attr,
495+
"args": args,
496+
"kwargs": kwargs,
497+
"mode": "browse",
498+
}
499+
with ProcessPoolExecutor(max_workers=get_max_workers()) as executor:
500+
for chunk in it:
501+
collections.deque(
502+
executor.map(_mp_iter_browse_cb, chunks(chunk._ids, self._task_size, fmt=tuple)),
503+
maxlen=0,
504+
)
505+
next(self._end(), None)
506+
# do not return results in // mode, we expect it to be used for huge numbers of
507+
# records and thus would risk MemoryError
508+
return None
455509
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
456510

457511
self._it = None

0 commit comments

Comments
 (0)