Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions connector_oxigesti/common/tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright NuoBiT Solutions - Eric Antones <eantones@nuobit.com>
# Copyright NuoBiT Solutions - Frank Cespedes <fcespedes@nuobit.com>
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
import hashlib

from odoo import _
from odoo.exceptions import ValidationError
from odoo.osv.expression import normalize_domain

OP_MAP = {
"&": "AND",
"|": "OR",
"!": "NOT",
}


def domain_prefix_to_infix(domain):
stack = []
i = len(domain) - 1
while i >= 0:
item = domain[i]
if item in OP_MAP:
if item == "!":
stack.append((item, stack.pop()))
else:
stack.append((stack.pop(), item, stack.pop()))
else:
if not isinstance(item, (tuple, list)):
raise ValidationError(_("Unexpected domain clause %s") % item)
stack.append(item)
i -= 1
return stack.pop()


def domain_infix_to_where(domain):
def _convert_operator(operator, value):
if value is None:
if operator == "=":
operator = "is"
elif operator == "!=":
operator = "is not"
else:
raise ValidationError(
_("Operator '%s' is not implemented on NULL values") % operator
)
return operator

def _domain_infix_to_where_raw(domain, values):
if not isinstance(domain, (list, tuple)):
raise ValidationError(_("Invalid domain format %s") % domain)
if len(domain) == 2:
operator, expr = domain
if operator not in OP_MAP:
raise ValidationError(
_("Invalid format, operator not supported %s on domain %s")
% (operator, domain)
)
values_r, right = _domain_infix_to_where_raw(expr, values)
return values_r, f"{OP_MAP[operator]} ({right})"
elif len(domain) == 3:
expr_l, operator, expr_r = domain
if operator in OP_MAP:
values_l, left = _domain_infix_to_where_raw(expr_l, values)
values_r, right = _domain_infix_to_where_raw(expr_r, values)
return {**values_l, **values_r}, f"({left} {OP_MAP[operator]} {right})"
field, operator, value = domain
# field and values
if field not in values:
values[field] = {"next": 1, "values": {}}
field_n = f"{field}{values[field]['next']}"
if field_n in values[field]["values"]:
raise ValidationError(_("Unexpected!! Field %s already used") % field)
values[field]["values"][field_n] = value
values[field]["next"] += 1
# operator and nulls values
operator = _convert_operator(operator, value)
return values, f"{field} {operator} %({field_n})s"
else:
raise ValidationError(_("Invalid domain format %s") % domain)

values, where = _domain_infix_to_where_raw(domain, {})
values_norm = {}
for _k, v in values.items():
values_norm.update(v["values"])
return values_norm, where


def domain_to_where(domain):
domain_norm = normalize_domain(domain)
domain_infix = domain_prefix_to_infix(domain_norm)
return domain_infix_to_where(domain_infix)


def idhash(external_id):
if not isinstance(external_id, (tuple, list)):
raise ValidationError(_("external id must be list or tuple"))
external_id_hash = hashlib.sha256()
for e in external_id:
if isinstance(e, int):
e9 = str(e)
if int(e9) != e:
raise Exception("Unexpected")
elif isinstance(e, str):
e9 = e
elif e is None:
pass
else:
raise Exception("Unexpected type for a key: type %s" % type(e))

external_id_hash.update(e9.encode("utf8"))

return external_id_hash.hexdigest()
1 change: 1 addition & 0 deletions connector_oxigesti/components/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
from . import import_mapper
from . import export_mapper
from . import listener
from . import deleter
53 changes: 19 additions & 34 deletions connector_oxigesti/components/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from odoo.addons.component.core import AbstractComponent
from odoo.addons.connector.exception import NetworkRetryableError

from ..common.tools import domain_to_where

try:
import pymssql
except ImportError:
Expand Down Expand Up @@ -153,7 +155,7 @@ def _escape(self, s):

def _exec_sql(self, sql, params, as_dict=False, commit=False):
# Convert params
params = self._convert_tuple(params, to_backend=True)
params = self._convert_dict(params, to_backend=True)
# Execute sql
conn = self.conn()
cr = conn.cursor(as_dict=as_dict)
Expand All @@ -179,15 +181,15 @@ def _exec_query(self, filters=None, fields=None, as_dict=True):
filters = []
# check if schema exists to avoid injection
schema_exists = self._exec_sql(
"select 1 from sys.schemas where name=%s", (self.schema,)
"select 1 from sys.schemas where name=%(schema)s", dict(schema=self.schema)
)
if not schema_exists:
raise pymssql.InternalError("The schema %s does not exist" % self.schema)

# prepare the sql and execute
sql = self._sql % dict(schema=self.schema)

values = []
values = {}
if filters or fields:
sql_l = ["with t as (%s)" % sql]

Expand All @@ -201,26 +203,12 @@ def _exec_query(self, filters=None, fields=None, as_dict=True):
sql_l.append("select %s from t" % (", ".join(fields_l),))

if filters:
where = []
for k, operator, v in filters:
if v is None:
if operator == "=":
operator = "is"
elif operator == "!=":
operator = "is not"
else:
raise Exception(
"Operator '%s' is not implemented on NULL values"
% operator
)

where.append("%s %s %%s" % (k, operator))
values.append(v)
sql_l.append("where %s" % (" and ".join(where),))
values, where = domain_to_where(filters)
sql_l.append("where %s" % where)

sql = " ".join(sql_l)

res = self._exec_sql(sql, tuple(values), as_dict=as_dict)
res = self._exec_sql(sql, values, as_dict=as_dict)

filter_keys_s = {e[0] for e in filters}
if self._id and set(self._id).issubset(filter_keys_s):
Expand Down Expand Up @@ -291,7 +279,7 @@ def write(self, _id, values_d): # pylint: disable=W8106

# check if schema exists to avoid injection
schema_exists = self._exec_sql(
"select 1 from sys.schemas where name=%s", (self.schema,)
"select 1 from sys.schemas where name=%(schema)s", dict(schema=self.schema)
)
if not schema_exists:
raise pymssql.InternalError("The schema %s does not exist" % self.schema)
Expand Down Expand Up @@ -357,22 +345,16 @@ def create(self, values_d): # pylint: disable=W8106

# check if schema exists to avoid injection
schema_exists = self._exec_sql(
"select 1 from sys.schemas where name=%s", (self.schema,)
"select 1 from sys.schemas where name=%(schema)s", dict(schema=self.schema)
)
if not schema_exists:
raise pymssql.InternalError("The schema %s does not exist" % self.schema)

# build the sql parts
fields, params, phvalues = [], [], []
for k, v in values_d.items():
fields, phvalues = [], []
for k in values_d.keys():
fields.append(k)
params.append(v)
if v is None or isinstance(v, (str, datetime.date, datetime.datetime)):
phvalues.append("%s")
elif isinstance(v, (int, float)):
phvalues.append("%d")
else:
raise NotImplementedError("Type %s" % type(v))
phvalues.append(f"%({k})s")

# build retvalues
retvalues = ["inserted.%s" % x for x in self._id]
Expand All @@ -386,9 +368,8 @@ def create(self, values_d): # pylint: disable=W8106
)

# executem la insercio
res = None
try:
res = self._exec_sql(sql, tuple(params), commit=True)
res = self._exec_sql(sql, values_d, commit=True)
except pymssql.IntegrityError as e:
# Workaround: Because of Microsoft SQL Server
# removes the spaces on varchars on comparisions
Expand Down Expand Up @@ -431,12 +412,16 @@ def delete(self, _id):

# check if schema exists to avoid injection
schema_exists = self._exec_sql(
"select 1 from sys.schemas where name=%s", (self.schema,)
"select 1 from sys.schemas where name=%(schema)s", dict(schema=self.schema)
)
if not schema_exists:
raise pymssql.InternalError("The schema %s does not exist" % self.schema)

# prepare the sql with base strucrture
if not hasattr(self, "_sql_delete"):
raise ValidationError(
_("The model %s does not have a delete SQL defined") % (self._name,)
)
sql = self._sql_delete % dict(schema=self.schema)

# get id fieldnames and values
Expand Down
58 changes: 58 additions & 0 deletions connector_oxigesti/components/deleter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright NuoBiT Solutions - Eric Antones <eantones@nuobit.com>
# Copyright NuoBiT Solutions - Frank Cespedes <fcespedes@nuobit.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl)

import logging

from odoo.addons.component.core import AbstractComponent

_logger = logging.getLogger(__name__)


class OxigestiExportDeleter(AbstractComponent):
"""Base Export Deleter for Oxigesti"""

_name = "oxigesti.export.deleter"
_inherit = ["base.deleter", "base.oxigesti.connector"]

_usage = "record.export.deleter"

def run(self, external_id):
adapter = self.component(usage="backend.adapter")
adapter.delete(external_id)


class OxigestiBatchExportDeleter(AbstractComponent):
_name = "oxigesti.batch.export.deleter"
_inherit = ["base.exporter", "base.oxigesti.connector"]

def run(self, external_ids=None):
if not external_ids:
return
# Run the synchronization
for ext_id in external_ids:
self._export_delete_record(ext_id)

def _export_delete_record(self, external_id):
raise NotImplementedError


class OxigestiDirectBatchExportDeleter(AbstractComponent):
_name = "oxigesti.direct.batch.export.deleter"
_inherit = "oxigesti.batch.export.deleter"

_usage = "direct.batch.export.deleter"

def _export_delete_record(self, external_id):
self.model.export_delete_record(self.backend_record, external_id)


class OxigestiDelayedBatchExportDeleter(AbstractComponent):
_name = "oxigesti.delayed.batch.export.deleter"
_inherit = "oxigesti.batch.export.deleter"

_usage = "delayed.batch.export.deleter"

def _export_delete_record(self, external_id, job_options=None):
delayable = self.model.with_delay(**job_options or {})
delayable.export_delete_record(self.backend_record, external_id)
62 changes: 62 additions & 0 deletions connector_oxigesti/components/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def run(self, domain=None):
):
self._export_record(relation)

def _export_chunk(self, domain, job_options=None, **kwargs):
raise NotImplementedError

def _export_record(self, external_id):
"""Export a record directly or delay the export of the record.

Expand All @@ -153,6 +156,9 @@ class OxigestiDirectBatchExporter(AbstractComponent):

_usage = "direct.batch.exporter"

def _export_chunk(self, domain, job_options=None, **kwargs):
self.model.export_chunk(self.backend_record, domain, **kwargs)

def _export_record(self, relation):
"""export the record directly"""
self.model.export_record(self.backend_record, relation)
Expand All @@ -166,6 +172,62 @@ class OxigestiDelayedBatchExporter(AbstractComponent):

_usage = "delayed.batch.exporter"

def _export_chunk(self, domain, job_options=None, **kwargs):
delayable = self.model.with_delay(**job_options or {})
delayable.export_chunk(self.backend_record, domain, **kwargs)

def _export_record(self, relation, job_options=None):
"""Delay the export of the records"""
delayable = self.model.with_delay(**job_options or {})
delayable.export_record(self.backend_record, relation)


class OxigestiChunkExporter(AbstractComponent):
_name = "oxigesti.chunk.exporter"
_inherit = ["base.exporter", "base.oxigesti.connector"]

def run(self, domain, **kwargs):
"""Run the synchronization"""
raise NotImplementedError

def get_batch_exporter(self):
raise NotImplementedError

def _export_record(self, external_id):
"""Export a record directly or delay the export of the record.

Method to implement in sub-classes.
"""
raise NotImplementedError


class OxigestiChunkDirectExporter(AbstractComponent):
"""Export the records directly, without delaying the jobs."""

_name = "oxigesti.chunk.direct.exporter"
_inherit = "oxigesti.chunk.exporter"

_usage = "chunk.direct.exporter"

def get_batch_exporter(self):
return self.component(usage="batch.direct.exporter")

def _export_record(self, relation):
"""export the record directly"""
self.model.export_record(self.backend_record, relation)


class OxigestiChunkDelayedExporter(AbstractComponent):
"""Delay export of the records"""

_name = "oxigesti.chunk.delayed.exporter"
_inherit = "oxigesti.chunk.exporter"

_usage = "chunk.delayed.exporter"

def get_batch_exporter(self):
return self.component(usage="batch.delayed.exporter")

def _export_record(self, relation, job_options=None):
"""Delay the export of the records"""
delayable = self.model.with_delay(**job_options or {})
Expand Down
4 changes: 4 additions & 0 deletions connector_oxigesti/data/queue_data.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl)-->
<field name="name">oxigesti_batch</field>
<field name="parent_id" ref="queue_job.channel_root" />
</record>
<record id="channel_oxigesti_chunk" model="queue.job.channel">
<field name="name">oxigesti_chunk</field>
<field name="parent_id" ref="queue_job.channel_root" />
</record>
</odoo>
Loading
Loading