diff --git a/.editorconfig b/.editorconfig deleted file mode 100644 index 0c386765e..000000000 --- a/.editorconfig +++ /dev/null @@ -1,22 +0,0 @@ -# install the editorconfig plugin for your editor: http://editorconfig.org/#download - -root = true - -[*] -charset = utf-8 -end_of_line = lf -insert_final_newline = true -indent_style = space -indent_size = 4 -trim_trailing_whitespace = true - -[*.py] -# 120 + 10% -# See error B950 https://github.com/PyCQA/flake8-bugbear#opinionated-warnings -max_line_length = 132 - -[*.xml] -indent_size = 2 - -[*.md] -trim_trailing_whitespace = false diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 8b6ad99c6..000000000 --- a/.gitignore +++ /dev/null @@ -1,15 +0,0 @@ -# dotfiles -.* -!.gitignore -# compiled python files -*.py[co] -# setup.py egg_info -*.egg-info -# emacs backup files -*~ -# hg stuff -*.orig -status - -# artefacts -src/_version.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml deleted file mode 100644 index cdd5356e1..000000000 --- a/.pre-commit-config.yaml +++ /dev/null @@ -1,36 +0,0 @@ -repos: - - repo: local - hooks: - - id: compile23 - name: Compile python files using the expected runtime version - entry: ./tools/compile23.py - language: script - require_serial: true - verbose: true - - id: bad-import-000 - name: Incompatible import with old versions in tests and `0.0.0` scripts - language: pygrep - entry: '(from|import) odoo.upgrade\b' - files: '^src/\w+/(tests|0\.0\.0)/.*\.py$' - - - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.14.0 - hooks: - - id: ruff - name: Check code with Ruff, apply automatic fixes - args: [ --exit-non-zero-on-fix ] - - id: ruff-format - name: Format code with Ruff - - repo: https://github.com/crate-ci/typos - rev: v1.38.1 - hooks: - - id: typos - - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v6.0.0 - hooks: - - id: check-xml - - id: check-yaml - - id: end-of-file-fixer - - id: trailing-whitespace - - id: check-symlinks - - id: debug-statements diff --git a/LICENSE b/LICENSE deleted file mode 100644 index 0a041280b..000000000 --- a/LICENSE +++ /dev/null @@ -1,165 +0,0 @@ - GNU LESSER GENERAL PUBLIC LICENSE - Version 3, 29 June 2007 - - Copyright (C) 2007 Free Software Foundation, Inc. - Everyone is permitted to copy and distribute verbatim copies - of this license document, but changing it is not allowed. - - - This version of the GNU Lesser General Public License incorporates -the terms and conditions of version 3 of the GNU General Public -License, supplemented by the additional permissions listed below. - - 0. Additional Definitions. - - As used herein, "this License" refers to version 3 of the GNU Lesser -General Public License, and the "GNU GPL" refers to version 3 of the GNU -General Public License. - - "The Library" refers to a covered work governed by this License, -other than an Application or a Combined Work as defined below. - - An "Application" is any work that makes use of an interface provided -by the Library, but which is not otherwise based on the Library. -Defining a subclass of a class defined by the Library is deemed a mode -of using an interface provided by the Library. - - A "Combined Work" is a work produced by combining or linking an -Application with the Library. The particular version of the Library -with which the Combined Work was made is also called the "Linked -Version". - - The "Minimal Corresponding Source" for a Combined Work means the -Corresponding Source for the Combined Work, excluding any source code -for portions of the Combined Work that, considered in isolation, are -based on the Application, and not on the Linked Version. - - The "Corresponding Application Code" for a Combined Work means the -object code and/or source code for the Application, including any data -and utility programs needed for reproducing the Combined Work from the -Application, but excluding the System Libraries of the Combined Work. - - 1. Exception to Section 3 of the GNU GPL. - - You may convey a covered work under sections 3 and 4 of this License -without being bound by section 3 of the GNU GPL. - - 2. Conveying Modified Versions. - - If you modify a copy of the Library, and, in your modifications, a -facility refers to a function or data to be supplied by an Application -that uses the facility (other than as an argument passed when the -facility is invoked), then you may convey a copy of the modified -version: - - a) under this License, provided that you make a good faith effort to - ensure that, in the event an Application does not supply the - function or data, the facility still operates, and performs - whatever part of its purpose remains meaningful, or - - b) under the GNU GPL, with none of the additional permissions of - this License applicable to that copy. - - 3. Object Code Incorporating Material from Library Header Files. - - The object code form of an Application may incorporate material from -a header file that is part of the Library. You may convey such object -code under terms of your choice, provided that, if the incorporated -material is not limited to numerical parameters, data structure -layouts and accessors, or small macros, inline functions and templates -(ten or fewer lines in length), you do both of the following: - - a) Give prominent notice with each copy of the object code that the - Library is used in it and that the Library and its use are - covered by this License. - - b) Accompany the object code with a copy of the GNU GPL and this license - document. - - 4. Combined Works. - - You may convey a Combined Work under terms of your choice that, -taken together, effectively do not restrict modification of the -portions of the Library contained in the Combined Work and reverse -engineering for debugging such modifications, if you also do each of -the following: - - a) Give prominent notice with each copy of the Combined Work that - the Library is used in it and that the Library and its use are - covered by this License. - - b) Accompany the Combined Work with a copy of the GNU GPL and this license - document. - - c) For a Combined Work that displays copyright notices during - execution, include the copyright notice for the Library among - these notices, as well as a reference directing the user to the - copies of the GNU GPL and this license document. - - d) Do one of the following: - - 0) Convey the Minimal Corresponding Source under the terms of this - License, and the Corresponding Application Code in a form - suitable for, and under terms that permit, the user to - recombine or relink the Application with a modified version of - the Linked Version to produce a modified Combined Work, in the - manner specified by section 6 of the GNU GPL for conveying - Corresponding Source. - - 1) Use a suitable shared library mechanism for linking with the - Library. A suitable mechanism is one that (a) uses at run time - a copy of the Library already present on the user's computer - system, and (b) will operate properly with a modified version - of the Library that is interface-compatible with the Linked - Version. - - e) Provide Installation Information, but only if you would otherwise - be required to provide such information under section 6 of the - GNU GPL, and only to the extent that such information is - necessary to install and execute a modified version of the - Combined Work produced by recombining or relinking the - Application with a modified version of the Linked Version. (If - you use option 4d0, the Installation Information must accompany - the Minimal Corresponding Source and Corresponding Application - Code. If you use option 4d1, you must provide the Installation - Information in the manner specified by section 6 of the GNU GPL - for conveying Corresponding Source.) - - 5. Combined Libraries. - - You may place library facilities that are a work based on the -Library side by side in a single library together with other library -facilities that are not Applications and are not covered by this -License, and convey such a combined library under terms of your -choice, if you do both of the following: - - a) Accompany the combined library with a copy of the same work based - on the Library, uncombined with any other library facilities, - conveyed under the terms of this License. - - b) Give prominent notice with the combined library that part of it - is a work based on the Library, and explaining where to find the - accompanying uncombined form of the same work. - - 6. Revised Versions of the GNU Lesser General Public License. - - The Free Software Foundation may publish revised and/or new versions -of the GNU Lesser General Public License from time to time. Such new -versions will be similar in spirit to the present version, but may -differ in detail to address new problems or concerns. - - Each version is given a distinguishing version number. If the -Library as you received it specifies that a certain numbered version -of the GNU Lesser General Public License "or any later version" -applies to it, you have the option of following the terms and -conditions either of that published version or of any later version -published by the Free Software Foundation. If the Library as you -received it does not specify a version number of the GNU Lesser -General Public License, you may choose any version of the GNU Lesser -General Public License ever published by the Free Software Foundation. - - If the Library as you received it specifies that a proxy can decide -whether future versions of the GNU Lesser General Public License shall -apply, that proxy's public statement of acceptance of any version is -permanent authorization for you to choose that version for the -Library. diff --git a/README.md b/README.md index 3147ccc1a..e8e338226 100644 --- a/README.md +++ b/README.md @@ -1,42 +1,5 @@ -# 🧰 Upgrade Utils +# Internal R&D repository -This repository contains helper functions[^1] to facilitate the writing of upgrade scripts. +This is not the repository you are looking for! -The functions in this repo are meant to work (sometimes just not fail) from Odoo 7.0 up to latest version. -Thus the only supported version of this repo is `master` head. - -## Installation - -### Recommended - -Once you have cloned this repository locally, start `odoo` with the `src` directory prepended to the `--upgrade-path` option. -```shell-session -$ ./odoo-bin --upgrade-path=/path/to/upgrade-util/src,/path/to/other/upgrade/script/directory [...] -``` - -### Alternative - -On platforms where you don't manage Odoo yourself, you can install this package via pip: -```shell-session -$ python3 -m pip install git+https://github.com/odoo/upgrade-util@master -``` - -You can freeze the hash version when installing in this fashion. Just replace `master` by the hash of the commit you want to target. - -On [Odoo.sh](https://www.odoo.sh/) it is recommended to add it to the `requirements.txt` of your repository: -``` -odoo_upgrade @ git+https://github.com/odoo/upgrade-util@master -``` - -## How to use the helper functions? - -Once installed, the following packages are available - - `odoo.upgrade.util`: the helper functions. - - `odoo.upgrade.testing`: base `TestCase` classes - -## Documentation - -- [Basic guide on how to write upgrade scripts](https://www.odoo.com/documentation/master/developer/reference/upgrades/upgrade_scripts.html) -- [The reference documentation](https://www.odoo.com/documentation/master/developer/reference/upgrades/upgrade_utils.html) - -[^1]: We call them "utils". +You probably want to fork/checkout https://github.com/odoo/upgrade-util instead. diff --git a/pyproject.toml b/pyproject.toml deleted file mode 100644 index 05639c304..000000000 --- a/pyproject.toml +++ /dev/null @@ -1,143 +0,0 @@ -[project] -name = "odoo_upgrade" -authors = [ - { name = "Odoo Upgrade Team", email = "upgrade@odoo.com" } -] -dynamic = ["version"] -dependencies = ["markdown"] - -[build-system] -requires = ["hatchling", "hatch-vcs"] -build-backend = "hatchling.build" - -[tool.hatch.build] -only-include = ["src"] - -[tool.hatch.build.sources] -"src" = "odoo/upgrade" - -[tool.hatch.version] -source = "vcs" -raw-options.version_scheme = "calver-by-date" - -[tool.hatch.build.hooks.vcs] -version-file = "src/_version.py" - -[tool.ruff] -required-version = ">=0.10.0" -fix = true -show-fixes = true -output-format = "full" -line-length = 120 -target-version = "py37" - -[tool.ruff.lint] -ignore = [ - "B904", # raise-without-from-inside-except; not python2 compatible - "B905", # zip-without-explicit-strict; not python2 compatible - "D1", # undocumented-* - "E501", # line-too-long; handled by auto-formatting - "E731", # lambda-assignment - "PERF203", # try-except-in-loop - "PLR09", # too-many-*; unwanted code complexity checks - "RUF012", # mutable-class-default; we know about the risk - - "PLR2004", # magic-value-comparison; not all comparisons to int or str are magic - "TRY003", # raise-vanilla-args; we can live without it - "RET505", # only true for simple if/elif branches (like in the ruff doc example). if/elif blocks are easier to read in most cases - - "ISC001", # avoid incompatibility with the ruff formatter - # not (yet) supported rules - # "E301", - # "E302", - # "E265", - # "E241", - # "W503", - # "E203", - # "B907", -] -select = [ - # full rule-sets - "A", # flake8-builtins - "B", # flake8-bugbear - "C4", # flake8-comprehensions - "D", # pydocstyle - "E", # pycodestyle - "ERA", # eradicate - "F", # Pyflakes - "G", # flake8-logging-format - "I", # isort - "ISC", # flake8-implicit-str-concat - "PERF",# perflint - "PIE", # flake8-pie - "PL", # pylint - "RET", # flake8-return - "RUF", # ruff specific rules - "SIM", # flake8-simplify - "TRY", # tryceratops - "T20", # flake8-print - "W", # pycodestyle - - # explicit rules - "COM818", # trailing-comma-on-bare-tuple; other rules handled by autoformatter - "FBT003", # boolean-positional-value-in-call; other rules not python2 compatible - "UP005", # deprecated-unittest-alias - "S704", # unsafe-markup-use; replaces RUF035 - -] - -[tool.ruff.lint.flake8-builtins] -ignorelist = ["format", "id", "type"] -allowed-modules = ["json"] - -[tool.ruff.lint.isort] -section-order = ["future", "standard-library", "third-party", "first-party", "odoo-addons", "local-folder"] -known-first-party = ["odoo", "openerp"] -known-local-folder = ["odoo.upgrade", "odoo.addons.base.maintenance.migrations", "openerp.addons.base.maintenance.migrations"] - -[tool.ruff.lint.isort.sections] -odoo-addons = ["odoo.addons", "openerp.addons"] - -[tool.ruff.lint.pydocstyle] -convention = "pep257" - -[tool.ruff.lint.per-file-ignores] -"*/__init__.py" = [ - "F401", - "F403", -] -"src/util/*.py" = [ - # python3 only rules - "RUF005", - "RUF007", -] -# ignore docstring lint for tests files -"src/*/tests/*.py" = ["D"] -# and for upgrade scripts -"src/*/*/{pre,post,end}-*.py" = ["D"] - -[tool.ruff.per-file-target-version] -"tools/fetch-release-notes-video-id.py" = "py312" - -[tool.typos.files] -extend-exclude = [ - # auto-generated file - "src/util/_inherit.py", - - # Use weird words. And it's just a test file, typos can be tolerated. - "src/spreadsheet/tests/test_spreadsheet_tokenizer.py", -] - -[tool.typos.type.py] -extend-ignore-re = [ - "\\brelease\\.serie\\b", - # ignore `datas` as the whole string - '"datas"', -] - -[tool.typos.default.extend-identifiers] -inh = "inh" -_inh = "_inh" -ressource_type_id = "ressource_type_id" -# Used as alias in SQL queries. -fpt = "fpt" diff --git a/requirements-dev.txt b/requirements-dev.txt deleted file mode 100644 index 416634f52..000000000 --- a/requirements-dev.txt +++ /dev/null @@ -1 +0,0 @@ -pre-commit diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 0918c9768..000000000 --- a/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -markdown diff --git a/src/base/0.0.0/end-clean-__upgrade__-crons.py b/src/base/0.0.0/end-clean-__upgrade__-crons.py deleted file mode 100644 index 06ba6b50e..000000000 --- a/src/base/0.0.0/end-clean-__upgrade__-crons.py +++ /dev/null @@ -1,26 +0,0 @@ -# -*- coding: utf-8 -*- -from odoo.addons.base.maintenance.migrations import util - - -def migrate(cr, version): - server_act_array = ( - "array_agg(c.ir_actions_server_id)" if util.column_exists(cr, "ir_cron", "ir_actions_server_id") else "NULL" - ) - xid_date = ( - ", x.date_init" if util.column_exists(cr, "ir_model_data", "date_init") else "" - ) # field gone in saas-13.4 - cr.execute( - """ - SELECT array_agg(c.id), {server_act_array} - FROM ir_cron c - JOIN ir_model_data x ON x.model = 'ir.cron' AND x.res_id = c.id - WHERE x.module = '__upgrade__' - AND now() - COALESCE(c.create_date, x.create_date {xid_date}) > interval '1 month' - """.format( - server_act_array=server_act_array, - xid_date=xid_date, - ) - ) - cron_ids, server_act_ids = cr.fetchone() - util.remove_records(cr, "ir.cron", cron_ids) - util.remove_records(cr, "ir.actions.server", server_act_ids) diff --git a/src/base/0.0.0/end-moved0.py b/src/base/0.0.0/end-moved0.py deleted file mode 100644 index 633091d3f..000000000 --- a/src/base/0.0.0/end-moved0.py +++ /dev/null @@ -1,19 +0,0 @@ -# -*- coding: utf-8 -*- -from odoo.addons.base.maintenance.migrations import util - - -def migrate(cr, version): - if not util.ENVIRON.get("manual_moved0"): - # let the test verify the invariant. - return - - pre = util.import_script("base/0.0.0/pre-moved0.py") - - cr.execute("SELECT value FROM upgrade_test_data WHERE key = %s", [pre.KEY]) - expected = [tuple(i) for i in cr.fetchone()[0]] if cr.rowcount else [] - moved_fields = set(pre.get_moved0_columns(cr)) - set(expected) - if moved_fields: - raise util.UpgradeError( - "New `moved0` field. It happen when the ORM cannot change a column type by itself.\n%s" - % "\n".join("\t- %s.%s" % m for m in sorted(moved_fields)) - ) diff --git a/src/base/0.0.0/end-no-respawn-fields.py b/src/base/0.0.0/end-no-respawn-fields.py deleted file mode 100644 index c8a080037..000000000 --- a/src/base/0.0.0/end-no-respawn-fields.py +++ /dev/null @@ -1,60 +0,0 @@ -# -*- coding: utf-8 -*- -import logging -import os - -from psycopg2.extras import execute_values - -from odoo.addons.base.maintenance.migrations import util - -_logger = logging.getLogger("odoo.addons.base.maintenance.migrations.base.000.no_respawn") - - -def migrate(cr, version): - # Ensure that we didn't `remove_field` that shouldnt' - cr.execute( - """ - CREATE TEMPORARY TABLE no_respawn( - model varchar, - field varchar - ) - """ - ) - execute_values( - cr._obj, - "INSERT INTO no_respawn(model, field) VALUES %s", - [ - (model, field) - for model, fields in util.ENVIRON["__renamed_fields"].items() - for field, new_name in fields.items() - if new_name is None # means removed :p - ], - ) - cr.execute( - """ - SELECT m.model, f.name, m.transient, f.store - FROM ir_model_fields f - JOIN ir_model m ON m.id = f.model_id - JOIN no_respawn r ON (m.model = r.model AND f.name = r.field) - ORDER BY m.model, f.name - """ - ) - - key = "field_respawn:" - ignored_fields_respawn = { - e[len(key) :] - for e in os.environ.get("suppress_upgrade_warnings", "").split(",") # noqa: SIM112 - if e.startswith(key) - } - - for model, field, transient, store in cr.fetchall(): - qualifier = "field" if store else "non-stored field" - if transient: - qualifier = "transient " + qualifier - lvl = util.NEARLYWARN if transient or not store else logging.CRITICAL - action = "" - - if "{}/{}".format(model, field) in ignored_fields_respawn: - lvl = util.NEARLYWARN - action = "; explicitly ignored" - - _logger.log(lvl, "%s %s/%s has respawn%s.", qualifier, model, field, action) diff --git a/src/base/0.0.0/end-user_groups_view.py b/src/base/0.0.0/end-user_groups_view.py deleted file mode 100644 index 49f13716c..000000000 --- a/src/base/0.0.0/end-user_groups_view.py +++ /dev/null @@ -1,8 +0,0 @@ -# -*- coding: utf-8 -*- -from odoo.addons.base.maintenance.migrations import util - - -def migrate(cr, version): - if util.version_gte("saas~18.2"): - return - util.env(cr)["res.groups"]._update_user_groups_view() diff --git a/src/base/0.0.0/post-01-modules-auto-discovery.py b/src/base/0.0.0/post-01-modules-auto-discovery.py deleted file mode 100644 index 8607d4ad8..000000000 --- a/src/base/0.0.0/post-01-modules-auto-discovery.py +++ /dev/null @@ -1,9 +0,0 @@ -# -*- coding: utf-8 -*- -from odoo.addons.base.maintenance.migrations import util -from odoo.addons.base.maintenance.migrations.util.modules import _trigger_auto_discovery - - -def migrate(cr, version): - if util.version_gte("saas~14.5"): - _trigger_auto_discovery(cr) - util.ENVIRON["AUTO_DISCOVERY_UPGRADE"] = True diff --git a/src/base/0.0.0/post-02-force-upgrade-installed-modules.py b/src/base/0.0.0/post-02-force-upgrade-installed-modules.py deleted file mode 100644 index 582997c8b..000000000 --- a/src/base/0.0.0/post-02-force-upgrade-installed-modules.py +++ /dev/null @@ -1,28 +0,0 @@ -# -*- coding: utf-8 -*- -from odoo.addons.base.maintenance.migrations import util - - -def migrate(cr, version): - # Short-circuit the state change made by Odoo during the loading process (at STEP 2). - # In Odoo, it's done by calling the method `button_upgrade` on all modules passed - # in the command line option `-u`. This method drills down through all downstream - # dependencies. That's why it works when upgrading the `base` module. - # This technique works well during updates (keep the same major version) where the - # modules' dependencies don't change. - # However, during upgrades (to the next version), it may happen that modules (A) got new - # dependencies (B) that are not installed yet (being a new module or not). - # As `button_update` won't update the state of non installed modules, if the modules (A) - # only dependencies are the new ones (B), their state will remain `installed`. Still, the - # corresponding packages (in the graph) will have the `update` flag, meaning the modules - # will still be upgraded. - # But partially. Due to their initial `installed` state, the `end-` scripts won't be - # applied, leading to an incomplete upgrade. - # This is the case for the `account_asset` module in `saas~12.3`. - # This can be observed at https://upgradeci.odoo.com/upgradeci/run/3665 - # NOTE: This behavior has been fixed by https://github.com/odoo/odoo/pull/85516 - # but we need to keep this for older versions. - query = "UPDATE ir_module_module SET state = 'to upgrade' WHERE state = 'installed'" - if util.column_exists(cr, "ir_module_module", "imported"): - query += " AND COALESCE(imported, false) = false" - - cr.execute(query) diff --git a/src/base/0.0.0/post-commercial_partner_id.py b/src/base/0.0.0/post-commercial_partner_id.py deleted file mode 100644 index dd7d54bb9..000000000 --- a/src/base/0.0.0/post-commercial_partner_id.py +++ /dev/null @@ -1,10 +0,0 @@ -# -*- coding: utf-8 -*- -from odoo.addons.base.maintenance.migrations import util - - -def migrate(cr, version): - # The `commercial_partner_id` field is expected to always be set. Although the column is not marked as `NOT NULL`. - # Fight the Murphy's Law, and recompute the value on partners with a NULL value. - cr.execute("SELECT id FROM res_partner WHERE commercial_partner_id IS NULL") - if cr.rowcount: - util.recompute_fields(cr, "res.partner", ["commercial_partner_id"], ids=[id_ for (id_,) in cr.fetchall()]) diff --git a/src/base/0.0.0/pre-00-upgrade-start.py b/src/base/0.0.0/pre-00-upgrade-start.py deleted file mode 100644 index eef329ab7..000000000 --- a/src/base/0.0.0/pre-00-upgrade-start.py +++ /dev/null @@ -1,15 +0,0 @@ -# -*- coding: utf-8 -*- - - -def migrate(cr, version): - cr.execute( - """ - INSERT - INTO ir_config_parameter(key, value) - VALUES ('upgrade.start.time', now() at time zone 'utc') - ON CONFLICT (key) - DO UPDATE - SET value = (now() at time zone 'utc') - WHERE EXCLUDED.value::timestamp - ir_config_parameter.value::timestamp > interval '72 hours' - """ - ) diff --git a/src/base/0.0.0/pre-base_version.py b/src/base/0.0.0/pre-base_version.py deleted file mode 100644 index 8468d2e7c..000000000 --- a/src/base/0.0.0/pre-base_version.py +++ /dev/null @@ -1,10 +0,0 @@ -# -*- coding: utf-8 -*- -try: - from odoo.addons.base.maintenance.migrations import util -except ImportError: - # for symlinked versions - from openerp.addons.base.maintenance.migrations import util - - -def migrate(cr, version): - util.inherit._get_base_version(cr) diff --git a/src/base/0.0.0/pre-models-ir_model_relation.py b/src/base/0.0.0/pre-models-ir_model_relation.py deleted file mode 100644 index 572ba02e4..000000000 --- a/src/base/0.0.0/pre-models-ir_model_relation.py +++ /dev/null @@ -1,49 +0,0 @@ -# -*- coding: utf-8 -*- -from odoo import api, models - -from odoo.addons.base.maintenance.migrations import util - -try: - from odoo.addons.base.models import ir_model as _ignore -except ImportError: - # version 10 - from odoo.addons.base.ir import ir_model as _ignore # noqa - - -def migrate(cr, version): - pass - - -class ModelRelation(models.Model): - _name = "ir.model.relation" - _inherit = ["ir.model.relation"] - _module = "base" - - @api.model - def _register_hook(self): - super(ModelRelation, self)._register_hook() - - query = """ - DELETE FROM ir_model_relation WHERE id IN ( - SELECT r.id - FROM ir_model_relation r - JOIN ir_module_module m ON m.id = r.module - LEFT JOIN information_schema.tables t ON t.table_name = r.name - WHERE m.state = 'installed' - AND t.table_name IS NULL - ) - """ - - self.env.cr.execute(query) - - gone_m2m = util.ENVIRON.get("_gone_m2m") - if gone_m2m: - query = """ - SELECT table_name - FROM information_schema.tables - WHERE table_name IN %s - """ - self.env.cr.execute(query, [tuple(gone_m2m)]) - back_m2m = "\n".join(" - %s via %s" % (tn, gone_m2m[tn]) for (tn,) in self.env.cr.fetchall()) - if back_m2m: - raise util.MigrationError("The following m2m relations have respawn:\n%s" % back_m2m) diff --git a/src/base/0.0.0/pre-moved0.py b/src/base/0.0.0/pre-moved0.py deleted file mode 100644 index 9ced75a6b..000000000 --- a/src/base/0.0.0/pre-moved0.py +++ /dev/null @@ -1,50 +0,0 @@ -# -*- coding: utf-8 -*- -from psycopg2.extras import Json - -from odoo.addons.base.maintenance.migrations import util - -KEY = "base.tests.test_moved0.TestMoved0" - - -def get_moved0_columns(cr): - cr.execute( - """ - SELECT table_name, column_name - FROM information_schema.columns - WHERE column_name ~ '_moved[0-9]+' - ORDER BY table_name, column_name - """ - ) - return cr.fetchall() - - -def migrate(cr, version): - if util.version_gte("16.0"): - # Starting Odoo 16, no more `moved0` columns are created - # See https://github.com/odoo/odoo/commit/50767ef90eadeca2ed05b9400238af8bdbe77fb3 - return - - if util.table_exists(cr, "upgrade_test_data"): - cr.execute("SELECT 1 FROM upgrade_test_data WHERE key = %s", [KEY]) - if cr.rowcount: - # Already ran as test. ignore - return - else: - # Test not run or not a version that support upgrade tests (<= 12) - cr.execute( - """ - CREATE TABLE upgrade_test_data ( - key VARCHAR(255) PRIMARY KEY, - value JSONB NOT NULL - ) - """ - ) - - util.ENVIRON["manual_moved0"] = True - - value = get_moved0_columns(cr) - if value: - cr.execute( - "INSERT INTO upgrade_test_data(key, value) VALUES (%s, %s)", - [KEY, Json(value)], - ) diff --git a/src/base/17.0.1.3/attr_domains2expr.py b/src/base/17.0.1.3/attr_domains2expr.py deleted file mode 100644 index 9d7d00fc4..000000000 --- a/src/base/17.0.1.3/attr_domains2expr.py +++ /dev/null @@ -1,609 +0,0 @@ -""" -This file contains tools for making views compatible with Odoo 17. - -Two main changes are performed here: -1. Convert `attrs` attributes of view elements from domains into Python expressions. -2. Remove `states` attribute, merge its logic into `invisible` attribute. - -The main entry point is `fix_attrs(cr, model, arch, comb_arch)` -- see docstring for -details about its parameters. One important consideration when converting views is that -we must convert views in the right order to ensure the arch is built correctly. Thus we -should convert root views first, then continue with children view, respecting also the -sequence of the views. - -The script can be imported directly or via `util.import_script`. The model of the view -should be fully loaded before attempting to convert it -- thus an end- script is the -best place to use this tool. See adapt_view function in this file for an example usage -of the utilities in this script. - -Example usage from another script: -``` -from odoo.upgrade import util - -script = util.import_script("base/17.0.1.3/attr_domains2expr.py") - -def migrate(cr, version): - # script.adapt_view(...) - # script.fix_attrs(...) - pass -``` -""" - -import ast -import logging -import re -import uuid - -from lxml import etree - -from odoo.tools.safe_eval import safe_eval - -from odoo.upgrade import util -from odoo.upgrade.util.domains import DOMAIN_OPERATORS, normalize_domain - - -def adapt_view(cr, view_xmlid): - """ - Adapt one view. - - Example usage of the utilities in this file. - - We use `util.edit_view` because it handles the propagation of the changes to all - languages while updating the whole arch. Alternatively you could just update specific - elements. - - Note if a view needs to be adapted for a specific languages use the `lang` parameter - in the context: - ``` - IrUiView = util.env(cr)["ir.ui.view"].with_context(lang=lang) - ``` - """ - vid = util.ref(cr, view_xmlid) - IrUiView = util.env(cr)["ir.ui.view"] - view = IrUiView.browse(vid) - - # disable view to avoid it being applied to the parent arch - view.active = False - # get combined arch of the parent view - comb_arch = view.inherit_id._get_combined_arch() if view.inherit_id else None - - # update current view arch - new_arch = etree.fromstring(view.arch_db) - fix_attrs(cr, view.model, new_arch, comb_arch) - - # `new_arch` is now already transformed, we can now copy its value - # note: we re-activate the view - with util.edit_view(cr, view_id=vid, active=True) as arch: - arch.clear() - arch.attrib.update(new_arch.attrib) - arch.text = new_arch.text - arch.extend(new_arch) - - -_logger = logging.getLogger(__name__) - -MODS = ["invisible", "readonly", "required", "column_invisible"] -DEFAULT_CONTEXT_REPLACE = {"active_id": "id"} -LIST_HEADER_CONTEXT_REPLACE = {"active_id": "context.get('active_id')"} - - -class InvalidDomainError(Exception): - pass - - -class Ast2StrVisitor(ast._Unparser): - """Extend standard unparser to allow specific names to be replaced.""" - - def __init__(self, replace_names=None): - self._replace_names = replace_names if replace_names else DEFAULT_CONTEXT_REPLACE - super().__init__() - - def visit_Name(self, node): - return self.write(self._replace_names.get(node.id, node.id)) - - -def mod2bool_str(s): - """ - Convert yes/no/true/false/on/off into True/False strings. - - Otherwise returns the input unchanged. - The checked values would raise an error instead if used in a Python expression. - Note that 0 and 1 are left unchanged since they have the same True/False meaning in Python. - """ - ss = s.lower() - if ss in ["yes", "true", "on"]: - return "True" - if ss in ["no", "false", "off"]: - return "False" - return s - - -def _clean_bool(s): - """Minimal simplification of trivial boolean expressions.""" - return { - "(1)": "1", - "(0)": "0", - "(True)": "True", - "(False)": "False", - "not (True)": "False", - "not (False)": "True", - }.get(s, s) - - -def target_elem_and_view_type(elem, comb_arch): - """ - Find the target of an element. - - If there is no `comb_arch` or the element doesn't look like - targeting anything (no position attributes) assume the input `elem` is the target and return it. - Along with the target we also return the view type of the elem, plus the field path from the - arch root. - """ - - def find_target(elem): - # as in standard: github.com/odoo/odoo/blob/4fec6300/odoo/tools/template_inheritance.py#L73-L94 - if comb_arch is not None and elem.get("position"): - if elem.tag == "xpath": - it = iter(comb_arch.xpath(elem.get("expr"))) - elif elem.tag == "field": - it = (x for x in comb_arch.iter("field") if x.get("name") == elem.get("name")) - else: - it = ( - x - for x in comb_arch.iter("field") - if all(x.get(k) == elem.get(k) for k in elem.attrib if k != "position") - ) - return next(it, elem) - return elem - - field_path = [] - view_type = None - telem = find_target(elem) - pelem = telem.getparent() - while pelem is not None: - # the parent may be a targeting element (xpath or field tag with position attribute) - # thus we need to ensure we got the parent's target - pelem_target_position = pelem.get("position") - pelem = find_target(pelem) - if view_type is None and pelem.tag in ( - "kanban", - "tree", - "form", - "calendar", - "setting", - "search", - "templates", - "groupby", - ): - view_type = pelem.tag - if pelem.tag == "field" and (not pelem_target_position or pelem_target_position == "inside"): - # if element is a normal or a targeting element with position="inside" - field_path.append(pelem.get("name")) - pelem = pelem.getparent() - field_path.reverse() - return telem, view_type, field_path - - -def is_simple_pred(expr): - if expr in ("0", "1", "True", "False"): - return True - return bool(re.match(r"""context\.get\((['"])\w+\1\)""", expr)) - - -def fix_elem(cr, model, elem, comb_arch): - success = True - telem, inner_view_type, field_path = target_elem_and_view_type(elem, comb_arch) - - if elem.get("position") != "replace": - telem = None # do not take default attributes from the target element - - # Build the dict of attrs attributes: - # 1. Take the values from the target element if any - # 2. If current element has attrs, override the values. - # All keys in target not in current element are overridden as empty value. - attrs = {} - if telem is not None and "attrs" in telem.attrib: - ast_attrs = ast_parse(telem.get("attrs")) - if isinstance(ast_attrs, ast.Dict): - attrs = {k.value: v for k, v in zip(ast_attrs.keys, ast_attrs.values)} - else: - _logger.log( - util.NEARLYWARN if util.on_CI() else logging.ERROR, - "Removing invalid `attrs` value %r from\n%s", - telem.get("attrs"), - etree.tostring(telem).decode(), - ) - - if elem.get("attrs"): - attrs_val = elem.get("attrs") - ast_attrs = ast_parse(attrs_val) - if isinstance(ast_attrs, ast.Dict): - elem_attrs = {k.value: v for k, v in zip(ast_attrs.keys, ast_attrs.values)} - attrs.update(elem_attrs) - for k in attrs: - if k not in elem_attrs: - attrs[k] = ast.Constant("") # clear previous values - else: - _logger.log( - util.NEARLYWARN if util.on_CI() else logging.ERROR, - "Removing invalid `attrs` value %r from\n%s", - attrs_val, - etree.tostring(elem).decode(), - ) - elem.attrib.pop("attrs", "") - - for mod in MODS: - if mod not in elem.attrib and mod not in attrs: - continue - if inner_view_type == "kanban" and elem.tag == "field": - # in kanban view, field outside should not have modifiers - elem.attrib.pop(mod, None) - continue - # if mod is not in the blend of attrs from current element and target, then we don't - # need to take the default value from target element since we can assume an override - default_val = telem.get(mod, "") if telem is not None and mod in attrs else "" - orig_mod = mod2bool_str(elem.get(mod, default_val).strip()) - try: - attr_mod = ( - mod2bool_str(_clean_bool(convert_attrs_val(cr, model, field_path, attrs.get(mod)))) - if mod in attrs - else "" - ) - except InvalidDomainError as e: - domain = e.args[0] - _logger.error("Invalid domain `%s`, saved as data-upgrade-invalid-domain attribute", domain) # noqa: TRY400 - hex_hash = uuid.uuid4().hex[:6] - elem.attrib[f"data-upgrade-invalid-domain-{mod}-{hex_hash}"] = domain - attr_mod = "" - success = False - # in list view we can switch the inline invisible into column_invisible - # in case only the attrs invisible is present we can also use column_invisible - if ( - mod == "invisible" - and inner_view_type == "tree" - and "column_invisible" not in elem.attrib - and "column_invisible" not in attrs - ): - if is_simple_pred(orig_mod): - elem.attrib.pop("invisible") - elem.set("column_invisible", orig_mod) - orig_mod = "" - elif not orig_mod and is_simple_pred(attr_mod): - elem.set("column_invisible", attr_mod) - continue # we know orig_mode is empty! - - # combine attributes - if orig_mod and attr_mod: - # short circuits for final_mod = (orig_mod or attr_mod) - if orig_mod in ("True", "1") or attr_mod in ("True", "1"): - final_mod = orig_mod - elif orig_mod in ("False", "0"): - final_mod = attr_mod - elif attr_mod == ("False", "0"): - final_mod = orig_mod - else: - final_mod = f"({orig_mod}) or ({attr_mod})" - else: - final_mod = orig_mod or attr_mod - - # set attribute if anything to set, or force empty if mod was present - if final_mod or mod in attrs: - elem.set(mod, final_mod) - - # special case to merge into invisible - if "states" in elem.attrib: - states = elem.attrib.pop("states") - expr = "state not in [{}]".format(",".join(repr(x.strip()) for x in states.split(","))) - invisible = elem.get("invisible") - if invisible: - elem.set("invisible", f"({invisible}) or ({expr})") - else: - elem.set("invisible", expr) - - for mod in MODS: - attrs.pop(mod, None) - # keys in attrs should be only one of MODS list, we inline here any "extra" value with a warning - if attrs: - extra = [key for key in attrs if key not in elem.attrib] - _logger.log( - util.NEARLYWARN if util.on_CI() else logging.WARN, - "Extra values %s in `attrs` attribute will be inlined for element\n%s", - extra, - etree.tostring(elem).decode(), - ) - extra_invalid = [key for key in attrs if key in elem.attrib] - if extra_invalid: - _logger.log( - util.NEARLYWARN if util.on_CI() else logging.ERROR, - "Attributes %s in `attrs` cannot be inlined because the inline attributes already exists", - extra_invalid, - ) - for key in extra: - value = ast.unparse(attrs[key]) - try: - etree.QName(key) - except ValueError as e: - _logger.error("Skipping invalid attribute name %r in `attrs` with value: %s: %r", key, value, e) # noqa: TRY400 - else: - elem.set(key, value) - _logger.info("Inlined %s=%r", key, value) - - return success - - -def ast_parse(val): - try: - return ast.parse(val.strip(), mode="eval").body - except SyntaxError: - _logger.exception("Error for invalid code:\n%s", val) - raise - - -def fix_attrs(cr, model, arch, comb_arch): - """ - Update `arch` etree transforming all attrs elements from domains to Python expressions. - - `model`: the model name of the view - `arch`: etree instance of the view's arch - `comb_arch`: combined arch of the parent of current view, ignored if `None`. Used to merge - parent attributes into current `arch` when necessary. - - Returns True on success. Transforms `arch` in-place. - For an example usage refer to this file's docstring and `adapt_view` - """ - success = True - for elem in arch.xpath( - "//attribute[@name='invisible' or @name='required' or @name='readonly' or @name='column_invisible']" - ): - if "value" in elem.attrib: - elem.set("value", mod2bool_str(elem.get("value").strip())) - elif elem.text: - elem.text = mod2bool_str(elem.text.strip()) - - # inline all attrs combined with already inline values - for elem in arch.xpath("//*[@attrs or @states or @invisible or @required or @readonly or @column_invisible]"): - success &= fix_elem(cr, model, elem, comb_arch) - - # remove context elements - for elem in arch.xpath("//tree/header/*[contains(@context, 'active_id')]"): - elem.set( - "context", - Ast2StrVisitor(LIST_HEADER_CONTEXT_REPLACE).visit(ast_parse(elem.get("context"))), - ) - for elem in arch.xpath("//*[contains(@context, 'active_id')]"): - elem.set("context", Ast2StrVisitor().visit(ast_parse(elem.get("context")))) - - # replace elements with individual - # use a fake field element to reuse the logic for Python expression conversion - # - # {invisible: xxx}` - # yyy` - # ` - # - # becomes the fake element - # ` - for parent in arch.xpath("//*[@position='attributes']"): - attrs_data = {} # save the attributes from the children - for elem in parent.findall("./attribute"): - name = elem.get("name") - if name in ["attrs", "states", *MODS]: - attrs_data[name] = elem.get("value", elem.text or "").strip() - parent.remove(elem) - if name == "attrs" and not attrs_data["attrs"]: - attrs_data["attrs"] = "{}" - # keep track of extra keys in `attrs` if any - extra_mods = [k.value for k in ast_parse(attrs_data.get("attrs", "{}")).keys if k.value not in MODS] - fake_elem = etree.Element(parent.tag, {**parent.attrib, **attrs_data}, position="replace") - success &= fix_elem(cr, model, fake_elem, comb_arch) - for mod in MODS + extra_mods: - if mod not in fake_elem.attrib: - continue - new_elem = etree.Element("attribute", name=mod) - new_elem.text = fake_elem.get(mod) - parent.append(new_elem) - if len(parent) == 0 and parent.getparent() is not None: - parent.getparent().remove(parent) - - return success - - -def check_true_false(lv, ov, rv_ast): - """ - Return True/False if the leaf (lp, op, rp) is something that can be considered as a True/False leaf. - - Otherwise returns None. - """ - ov = {"=": "==", "<>": "!="}.get(ov, ov) - if ov not in ["==", "!="]: - return None - # Note: from JS implementation (None,=,xxx) is always False, same for (True/False,=,xxx) - # conversely if op is `!=` then this is considered True ¯\_(ツ)_/¯ - if isinstance(lv, bool) or lv is None: - return ov == "!=" - if isinstance(lv, (int, float)) and isinstance(rv_ast, ast.Constant) and isinstance(rv_ast.value, (int, float)): - return safe_eval(f"{lv} {ov} {rv_ast.value}") - return None - - -def ast_term2domain_term(term): - if isinstance(term, ast.Constant) and term.value in DOMAIN_OPERATORS: - return term.value - if isinstance(term, (ast.Tuple, ast.List)): - try: - left, op, right = term.elts - except Exception: - _logger.error("Invalid domain leaf %s", ast.unparse(term)) # noqa: TRY400 - raise SyntaxError() from None - else: - return (left.value, op.value, right) - _logger.error("Domain terms must be a domain operator or a three-elements tuple, got %s", ast.unparse(term)) - raise SyntaxError() from None - - -def convert_attrs_val(cr, model, field_path, val): - """ - Convert an `attrs` value into a python formula. - - We need to use the AST representation because - values representing domains could be: - * an if, or boolean, expression returning alternative domains - * a string constant with the domain - * a list representing the domain directly - """ - ast2str = Ast2StrVisitor().visit - - if isinstance(val, ast.IfExp): - return "({} if {} else {})".format( - convert_attrs_val(cr, model, field_path, val.body), - ast2str(val.test), - convert_attrs_val(cr, model, field_path, val.orelse), - ) - if isinstance(val, ast.BoolOp): - return "({})".format( - (" and " if type(val.op) is ast.And else " or ").join( - convert_attrs_val(cr, model, field_path, v) for v in val.values - ) - ) - - if isinstance(val, ast.Constant): # {'readonly': '0'} or {'invisible': 'name'} - val = str(val.value).strip() # we process the right side as a string - # a string should be interpreted as a field name unless it is a domain!! - if val and val[0] == "[" and val[-1] == "]": - val = ast_parse(val) - return convert_attrs_val(cr, model, field_path, val) - return mod2bool_str(val) - - if isinstance(val, ast.List): # val is a domain - orig_ast = val - val = val.elts - if not val: - return "True" # all records match the empty domain - # make an ast domain look like a domain, to be able to use normalize_domain - try: - val = [ast_term2domain_term(term) for term in val] - norm_domain = normalize_domain(val) - except Exception: - raise InvalidDomainError(ast.unparse(orig_ast)) from None - # convert domain into python expression - stack = [] - for item in reversed(norm_domain): - if item == "!": - top = stack.pop() - stack.append(f"(not {top})") - elif item in ("&", "|"): - right = stack.pop() - left = stack.pop() - op = {"&": "and", "|": "or"}[item] - stack.append(f"({left} {op} {right})") - else: - try: - stack.append(convert_domain_leaf(cr, model, field_path, item)) - except Exception: - raise InvalidDomainError(ast.unparse(orig_ast)) from None - assert len(stack) == 1 - res = stack.pop() - assert res[0] == "(" and res[-1] == ")", res - return res[1:-1] - - return ast2str(val) - - -def target_field_type(cr, model, path): - ttype = None - for fname in path: - cr.execute( - """ - SELECT relation, ttype - FROM ir_model_fields - WHERE model = %s - AND name = %s - ORDER BY id - LIMIT 1 - """, - [model, fname], - ) - model, ttype = cr.fetchone() if cr.rowcount else (None, None) - if model is None: - break - return ttype - - -def convert_domain_leaf(cr, model, field_path, leaf): - """ - Convert a domain leaf (tuple) into a python expression. - - It always return the expression surrounded by parenthesis such that it's safe to use it as a sub-expression. - """ - if isinstance(leaf, bool): - # JS allows almost everything in a domain, boolean fields have a clear meaning and they are - # interpreted in JS side as their boolean value, we do the same here. - return f"({leaf})" - left, op, right_ast = leaf - tf = check_true_false(left, op, right_ast) - if tf is not None: - return f"({tf})" - - # see warnings from osv.expression.normalize_leaf - # https://github.com/odoo/odoo/blob/7ff1dac42fe24d1070c569f99ae7a67fe66eda2b/odoo/osv/expression.py#L353-L358 - if op in ("in", "not in") and isinstance(right_ast, ast.Constant) and isinstance(right_ast.value, bool): - op = "=" if op == "in" else "!=" - elif op in ("=", "!=") and isinstance(right_ast, (ast.List, ast.Tuple)): - op = "in" if op == "=" else "not in" - - right = Ast2StrVisitor().visit(right_ast) - if op == "=?": - return f"({right} is False or {right} is None or ({left} == {right}))" - if op in ("=", "=="): - return f"({left} == {right})" - if op in ("!=", "<>"): - return f"({left} != {right})" - if op in ("<", "<=", ">", ">="): - return f"({left} {op} {right})" - if op == "like": - return f"({right} in ({left} or ''))" - if op == "ilike": - return f"({right}.lower() in ({left} or '').lower())" - if op == "not like": - return f"({right} not in ({left} or ''))" - if op == "not ilike": - return f"({right}.lower() not in ({left} or '').lower())" - if op in ("in", "not in"): - # this is a complex case: - # (user_ids, 'in', []) -> empty result - # (user_ids, 'in', [2]) -> the result cannot be evaluated as `users_ids == [2]` :/ - # from domain.js: - # ``` - # const val = Array.isArray(value) ? value : [value]; - # const fieldVal = Array.isArray(fieldValue) ? fieldValue : [fieldValue]; - # return fieldVal.some((fv) => val.includes(fv)); - # ``` - rv = f"{right}" if isinstance(right_ast, (ast.List, ast.Tuple)) else f"[{right}]" - lv = str(left) - ttype = target_field_type(cr, model, field_path + left.split(".")) - if isinstance(left, str) and ttype in ("one2many", "many2many"): # array of ids - res = f"set({lv}).intersection({rv})" # odoo/odoo#139827, odoo/odoo#139451 - return f"(not {res})" if op == "not in" else f"({res})" - else: - # consider the left-hand side to be a single value - # ex. ('team_id', 'in', [val1, val2, val3, ...]) => team_id in [val1, val2, val3, ...] - return f"({lv} {op} {rv})" - if op in ("=like", "=ilike") and isinstance(right_ast, ast.Constant) and isinstance(right_ast.value, str): - # this cannot be handled in Python for all cases with the limited support of what - # can be evaluated in an inline attribute expression, we try to deal with some cases - # a pattern like 'aaa%bbb%ccc' is impossible to deal with - pattern = right[1:-1] # chop the quotes - lower = "" - if op == "=ilike": - pattern = pattern.lower() - lower = "lower()" - if "%" not in pattern: - return f"({left}{lower} == {pattern!r})" - if pattern.count("%") == 1: # pattern=aaa%bbbb - start, end = pattern.split("%") - return f"({left}{lower}.startswith({start!r}) and {left}{lower}.endswith({end!r}))" - if pattern.count("%") == 2 and pattern[0] == "%" and pattern[-1] == "%": - # pattern=%aaa%, same as `like` op with aaa - pattern = pattern[1:-1] # chop the % - return f"({pattern!r} in {left}{lower})" - # let it fail otherwise - raise ValueError("Cannot convert leaf to Python (%s, %s, %s)", left, op, right) diff --git a/src/base/8.0.1.3/pre-00-base_version.py b/src/base/8.0.1.3/pre-00-base_version.py deleted file mode 120000 index 797d24095..000000000 --- a/src/base/8.0.1.3/pre-00-base_version.py +++ /dev/null @@ -1 +0,0 @@ -../0.0.0/pre-base_version.py \ No newline at end of file diff --git a/src/base/8.0.1.3/pre-00-upgrade-start.py b/src/base/8.0.1.3/pre-00-upgrade-start.py deleted file mode 120000 index 5c80eee99..000000000 --- a/src/base/8.0.1.3/pre-00-upgrade-start.py +++ /dev/null @@ -1 +0,0 @@ -../0.0.0/pre-00-upgrade-start.py \ No newline at end of file diff --git a/src/base/9.0.1.3/pre-00-base_version.py b/src/base/9.0.1.3/pre-00-base_version.py deleted file mode 120000 index 797d24095..000000000 --- a/src/base/9.0.1.3/pre-00-base_version.py +++ /dev/null @@ -1 +0,0 @@ -../0.0.0/pre-base_version.py \ No newline at end of file diff --git a/src/base/9.0.1.3/pre-00-upgrade-start.py b/src/base/9.0.1.3/pre-00-upgrade-start.py deleted file mode 120000 index 5c80eee99..000000000 --- a/src/base/9.0.1.3/pre-00-upgrade-start.py +++ /dev/null @@ -1 +0,0 @@ -../0.0.0/pre-00-upgrade-start.py \ No newline at end of file diff --git a/src/base/tests/__init__.py b/src/base/tests/__init__.py deleted file mode 100644 index e5b25eaad..000000000 --- a/src/base/tests/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from . import test_ensure_has_pk, test_moved0, test_util diff --git a/src/base/tests/test_ensure_has_pk.py b/src/base/tests/test_ensure_has_pk.py deleted file mode 100644 index ae743ec01..000000000 --- a/src/base/tests/test_ensure_has_pk.py +++ /dev/null @@ -1,34 +0,0 @@ -import logging - -from odoo.addons.base.maintenance.migrations import util -from odoo.addons.base.maintenance.migrations.testing import IntegrityCase - -_logger = logging.getLogger("odoo.upgrade.base.tests.test_ensure_has_pk") - - -class TestTablesHavePK(IntegrityCase): - def invariant(self): - if not util.version_gte("14.0"): - # Older versions generated m2m tables without PK - return - - # Verify that all tables have a PK - cr = self.env.cr - query = """ - SELECT c.relname - FROM pg_class c - JOIN pg_namespace ns on ns.oid = c.relnamespace - LEFT JOIN pg_constraint p on p.conrelid = c.oid and p.contype = 'p' - WHERE c.relkind IN ('r', 'p') - AND ns.nspname = current_schema - AND p.oid IS NULL - ORDER BY c.relname - """ - - cr.execute(query) - if cr.rowcount: - tables = "\n".join(" - " + t for (t,) in cr.fetchall()) - msg = "Some tables do not have any primary key:\n{}".format(tables) - _logger.critical(msg) - if util.on_CI(): - raise AssertionError(msg) diff --git a/src/base/tests/test_moved0.py b/src/base/tests/test_moved0.py deleted file mode 100644 index 92d56136d..000000000 --- a/src/base/tests/test_moved0.py +++ /dev/null @@ -1,17 +0,0 @@ -from odoo.addons.base.maintenance.migrations import util -from odoo.addons.base.maintenance.migrations.testing import IntegrityCase - -impl = util.import_script("base/0.0.0/pre-moved0.py") - - -class TestMoved0(IntegrityCase): - key = impl.KEY - message = "New `moved0` field. It happen when the ORM cannot change a column type by itself." - - def invariant(self): - if util.version_gte("16.0"): - # See https://github.com/odoo/odoo/commit/50767ef90eadeca2ed05b9400238af8bdbe77fb3 - self.skipTest("Starting Odoo 16, no more `moved0` columns are created") - return None - - return impl.get_moved0_columns(self.env.cr) diff --git a/src/base/tests/test_util.py b/src/base/tests/test_util.py deleted file mode 100644 index 01f9c0e23..000000000 --- a/src/base/tests/test_util.py +++ /dev/null @@ -1,2648 +0,0 @@ -import ast -import operator -import re -import sys -import threading -import unittest -import uuid -from ast import literal_eval -from contextlib import contextmanager - -from lxml import etree - -try: - from unittest import mock -except ImportError: - import mock - -from odoo import modules -from odoo.tools import mute_logger - -from odoo.addons.base.maintenance.migrations import util -from odoo.addons.base.maintenance.migrations.testing import UnitTestCase, parametrize -from odoo.addons.base.maintenance.migrations.util import snippets -from odoo.addons.base.maintenance.migrations.util.domains import ( - FALSE_LEAF, - TRUE_LEAF, - _adapt_one_domain, - _model_of_path, -) -from odoo.addons.base.maintenance.migrations.util.exceptions import MigrationError - -USE_ORM_DOMAIN = util.misc.version_gte("saas~18.2") -NOTNOT = () if USE_ORM_DOMAIN else ("!", "!") - - -@contextmanager -def without_testing(): - thread = threading.current_thread() - testing = getattr(modules.module, "current_test", False) or getattr(thread, "testing", False) - try: - modules.module.current_test = False - thread.testing = False - yield - finally: - thread.testing = testing - modules.module.current_test = testing - - -class TestAdaptOneDomain(UnitTestCase): - def setUp(self): - super(TestAdaptOneDomain, self).setUp() - self.mock_adapter = mock.Mock() - - def test_adapt_renamed_field(self): - term = ("user_ids.partner_id.user_ids.partner_id", "=", False) - match_term = ("renamed_user_ids.partner_id.renamed_user_ids.partner_id", "=", False) - - Filter = self.env["ir.filters"] - filter1 = Filter.create( - {"name": "Test filter for adapt domain", "model_id": "res.partner", "domain": str([term])} - ) - assert [term] == ast.literal_eval(filter1.domain) - - base_exp = "context.get('context_value') in (1, 2) and [{0}] or ['!', {0}]" - base_exp_fallback = "(((context.get('context_value') in (1, 2)) and [{0}]) or ['!', {0}])" - filter2 = Filter.create( - {"name": "Test filter for adapt domain2", "model_id": "res.partner", "domain": base_exp.format(term)} - ) - - util.invalidate(Filter) - util.rename_field(self.cr, "res.partner", "user_ids", "renamed_user_ids") - - new_domain = ast.literal_eval(filter1.domain) - self.assertEqual([match_term], new_domain) - - self.assertIn(filter2.domain, [base_exp.format(match_term), base_exp_fallback.format(match_term)]) - - @parametrize( - [ - ("res.currency", [], "res.currency"), - ("res.currency", ["rate_ids"], "res.currency.rate"), - ("res.currency", ("rate_ids", "company_id"), "res.company"), - ("res.currency", ["rate_ids", "company_id", "user_ids"], "res.users"), - ("res.currency", ("rate_ids", "company_id", "user_ids", "partner_id"), "res.partner"), - ("res.users", ["partner_id"], "res.partner"), - ("res.users", ["nonexistent_field"], None), - ("res.users", ("partner_id", "active"), None), - ("res.users", ("partner_id", "active", "name"), None), - ("res.users", ("partner_id", "removed_field"), None), - ] - ) - def test_model_of_path(self, model, path, expected): - cr = self.env.cr - self.assertEqual(_model_of_path(cr, model, path), expected) - - def test_change_no_leaf(self): - # testing plan: updata path of a domain where the last element is not changed - - # no adapter - domain = [("partner_id.user_id.partner_id.user_id.partner_id", "=", False)] - match_domain = [("partner_id.friend_id.partner_id.friend_id.partner_id", "=", False)] - new_domain = _adapt_one_domain(self.cr, "res.partner", "user_id", "friend_id", "res.users", domain) - self.assertEqual(match_domain, new_domain) - - # with adapter, verify it's not called - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.assertEqual(match_domain, new_domain) - self.mock_adapter.assert_not_called() - - def test_change_leaf(self): - # testing plan: update path of a domain where the last element is changed - - # no adapter - domain = [("partner_id.user_id.partner_id.user_id", "=", False)] - match_domain = [("partner_id.friend_id.partner_id.friend_id", "=", False)] - - new_domain = _adapt_one_domain(self.cr, "res.partner", "user_id", "friend_id", "res.users", domain) - self.assertEqual(match_domain, new_domain) - - # with adapter, verify it's called even if nothing was changed on the path - self.mock_adapter.return_value = domain # adapter won't update anything - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "user_id", "res.users", domain, adapter=self.mock_adapter - ) # even if new==old the adapter must be called - self.mock_adapter.assert_called_once() - # Ignore `boolean-positional-value-in-call` lint violations in the whole file - # ruff: noqa: FBT003 - self.mock_adapter.assert_called_with(domain[0], False, False) - self.assertEqual(None, new_domain) - - # path is changed even if adapter doesn't touch it - self.mock_adapter.reset_mock() - match_domain = [("partner_id.friend_id.partner_id.friend_id", "=", False)] - self.mock_adapter.return_value = domain # adapter won't update anything - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_once() - self.mock_adapter.assert_called_with(domain[0], False, False) - self.assertEqual(match_domain, new_domain) # updated path even if adapter didn't - - def test_adapter_calls(self): - # testing plan: ensure the adapter is called with the right parameters - - self.mock_adapter.return_value = [("partner_id.friend_id", "=", 2)] - - # '&' domain - domain = ["&", ("partner_id.user_id", "=", 1), ("name", "=", False)] - match_domain = ["&", ("partner_id.friend_id", "=", 2), ("name", "=", False)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(domain[1], False, False) - self.assertEqual(match_domain, new_domain) - - # '|' domain - self.mock_adapter.reset_mock() - domain = ["|", ("partner_id.user_id", "=", 1), ("name", "=", False)] - match_domain = ["|", ("partner_id.friend_id", "=", 2), ("name", "=", False)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(domain[1], True, False) - self.assertEqual(match_domain, new_domain) - - # '!' domain - self.mock_adapter.reset_mock() - domain = ["!", ("partner_id.user_id", "=", 1)] - match_domain = ["!", ("partner_id.friend_id", "=", 2)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(domain[1], False, True) - self.assertEqual(match_domain, new_domain) - - # '&' '!' domain - self.mock_adapter.reset_mock() - domain = ["|", "!", ("partner_id.user_id", "=", 1), ("name", "=", False)] - match_domain = ["|", "!", ("partner_id.friend_id", "=", 2), ("name", "=", False)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(domain[2], True, True) - self.assertEqual(match_domain, new_domain) - - # '|' '!' domain - self.mock_adapter.reset_mock() - domain = ["|", "!", ("partner_id.user_id", "=", 1), ("name", "=", False)] - match_domain = ["|", "!", ("partner_id.friend_id", "=", 2), ("name", "=", False)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(domain[2], True, True) - self.assertEqual(match_domain, new_domain) - - def test_adapter_more_domains(self): - # testing plan: check more complex domains - - self.mock_adapter.return_value = [("partner_id.friend_id", "=", 2)] - term = ("partner_id.user_id", "=", 1) - - # double '!' - self.mock_adapter.reset_mock() - domain = ["!", "!", ("partner_id.user_id", "=", 1)] - match_domain = [*NOTNOT, ("partner_id.friend_id", "=", 2)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(term, False, False) - self.assertEqual(match_domain, new_domain) - - # triple '!' - self.mock_adapter.reset_mock() - domain = ["!", "!", "!", ("partner_id.user_id", "=", 1)] - match_domain = [*NOTNOT, "!", ("partner_id.friend_id", "=", 2)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(term, False, True) - self.assertEqual(match_domain, new_domain) - - # '|' double '!' - self.mock_adapter.reset_mock() - domain = ["|", "!", "!", ("partner_id.user_id", "=", 1), ("name", "=", False)] - match_domain = ["|", *NOTNOT, ("partner_id.friend_id", "=", 2), ("name", "=", False)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(term, True, False) - self.assertEqual(match_domain, new_domain) - - # '&' double '!' - self.mock_adapter.reset_mock() - domain = ["&", "!", "!", ("partner_id.user_id", "=", 1), ("name", "=", False)] - match_domain = ["&", *NOTNOT, ("partner_id.friend_id", "=", 2), ("name", "=", False)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(term, False, False) - self.assertEqual(match_domain, new_domain) - - # mixed domains - self.mock_adapter.reset_mock() - domain = ["|", "&", ("partner_id.user_id", "=", 1), ("name", "=", False), ("name", "=", False)] - match_domain = ["|", "&", ("partner_id.friend_id", "=", 2), ("name", "=", False), ("name", "=", False)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(term, False, False) - self.assertEqual(match_domain, new_domain) - - self.mock_adapter.reset_mock() - domain = ["&", "|", ("partner_id.user_id", "=", 1), ("name", "=", False), ("name", "=", False)] - match_domain = ["&", "|", ("partner_id.friend_id", "=", 2), ("name", "=", False), ("name", "=", False)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(term, True, False) - self.assertEqual(match_domain, new_domain) - - self.mock_adapter.reset_mock() - domain = ["|", "&", "!", ("partner_id.user_id", "=", 1), ("name", "=", False), ("name", "=", False)] - match_domain = ["|", "&", "!", ("partner_id.friend_id", "=", 2), ("name", "=", False), ("name", "=", False)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(term, False, True) - self.assertEqual(match_domain, new_domain) - - self.mock_adapter.reset_mock() - domain = ["&", "|", "!", ("partner_id.user_id", "=", 1), ("name", "=", False), ("name", "=", False)] - match_domain = ["&", "|", "!", ("partner_id.friend_id", "=", 2), ("name", "=", False), ("name", "=", False)] - new_domain = _adapt_one_domain( - self.cr, "res.partner", "user_id", "friend_id", "res.users", domain, adapter=self.mock_adapter - ) - self.mock_adapter.assert_called_with(term, True, True) - self.assertEqual(match_domain, new_domain) - - @parametrize( - [ - # first and last position in path at the same time - ("partner_id", "res.users"), - # first and last position in path - ("partner_id.user_id.partner_id", "res.users"), - # last position - ("user_id.partner_id", "res.partner"), - # middle - ("user_id.partner_id.user.id", "res.partner"), - # last position, longer domain - ("company_id.partner_id.user_id.partner_id", "res.partner"), - ] - ) - def test_force_adapt(self, left, model, target_model="res.users", old="partner_id"): - # simulate the adapter used for removal of a field - # this is the main use case for force_adapt=True - self.mock_adapter.return_value = [TRUE_LEAF] - domain = [(left, "=", False)] - res = _adapt_one_domain( - self.cr, target_model, old, "ignored", model, domain, adapter=self.mock_adapter, force_adapt=True - ) - self.mock_adapter.assert_called_once() - self.assertEqual(res, self.mock_adapter.return_value) - - @parametrize( - [ - ("partner_id.old", "new"), - ("partner_id.user_id.partner_id.old", "partner_id.user_id.new"), - ("partner_id.old.foo", "new.foo"), - # from another model - ("user_id.partner_id.old", "user_id.new", "res.partner"), - # no change expected - ("old", None), - ("partner_id", None), - ("partner_id.name", None), - ] - ) - def test_dotted_old(self, left, expected, model="res.users"): - domain = [(left, "=", "test")] - new_domain = _adapt_one_domain(self.cr, "res.users", "partner_id.old", "new", model, domain) - if expected is not None: - self.assertEqual(new_domain, [(expected, "=", "test")]) - else: - self.assertIsNone(new_domain) - - @unittest.skipUnless(util.version_gte("17.0"), "`any` operator only supported from Odoo 17") - def test_any_operator(self): - domain = [("partner_id", "any", [("complete_name", "=", "Odoo")])] - expected = [("partner_id", "any", [("full_name", "=", "Odoo")])] - - new_domain = _adapt_one_domain(self.cr, "res.partner", "complete_name", "full_name", "res.company", domain) - self.assertEqual(new_domain, expected) - - # test it also works recursively - domain = [("partner_id", "any", [("bank_ids", "not any", [("acc_number", "like", "S.A.")])])] - expected = [("partner_id", "any", [("bank_ids", "not any", [("acc_nbr", "like", "S.A.")])])] - - new_domain = _adapt_one_domain(self.cr, "res.partner.bank", "acc_number", "acc_nbr", "res.company", domain) - self.assertEqual(new_domain, expected) - - -class TestAdaptDomainView(UnitTestCase): - def test_adapt_domain_view(self): - tag = "list" if util.version_gte("saas~17.5") else "tree" - view_form = self.env["ir.ui.view"].create( - { - "name": "test_adapt_domain_view_form", - "model": "res.currency", - "arch": f"""\ -
- - <{tag}> - - - - - - """, - } - ) - - view_search_1 = self.env["ir.ui.view"].create( - { - "name": "test_adapt_domain_view_search", - "model": "res.company", - "arch": """\ - - - - """, - } - ) - - view_search_2 = self.env["ir.ui.view"].create( - { - "name": "test_adapt_domain_view_search", - "model": "res.company", - "arch": """\ - - - - """, - } - ) - - util.adapt_domains(self.env.cr, "res.partner", "email", "courriel") - util.invalidate(view_form | view_search_1 | view_search_2) - - self.assertIn("email", view_form.arch) - self.assertIn("email", view_search_1.arch) - self.assertIn("email", view_search_2.arch) - - util.adapt_domains(self.env.cr, "res.company", "email", "courriel") - util.invalidate(view_form | view_search_1 | view_search_2) - - self.assertIn("courriel", view_form.arch) - self.assertIn("courriel", view_search_1.arch) - self.assertIn("courriel", view_search_2.arch) - - -@unittest.skipUnless( - util.version_gte("13.0"), "This test is incompatible with old style odoo.addons.base.maintenance.migrations.util" -) -class TestReplaceReferences(UnitTestCase): - def setUp(self): - super().setUp() - self.env.cr.execute( - """ - CREATE TABLE dummy_model( - id serial PRIMARY KEY, - res_id int, - res_model varchar, - extra varchar, - CONSTRAINT uniq_constr UNIQUE(res_id, res_model, extra) - ); - - INSERT INTO dummy_model(res_model, res_id, extra) - VALUES -- the target is there with same res_id - ('res.users', 1, 'x'), - ('res.partner', 1, 'x'), - - -- two with same target and the target is there - ('res.users', 2, 'x'), - ('res.users', 3, 'x'), - ('res.partner', 2, 'x'), - - -- two with same target and the target is not there - ('res.users', 4, 'x'), - ('res.users', 5, 'x'), - - -- target is there different res_id - ('res.users', 6, 'x'), - ('res.partner', 4, 'x') - """ - ) - - def _ir_dummy(self, cr, bound_only=True): - yield util.IndirectReference("dummy_model", "res_model", "res_id") - - def test_replace_record_references_batch__full_unique(self): - cr = self.env.cr - mapping = {1: 1, 2: 2, 3: 2, 4: 3, 5: 3, 6: 4} - with mock.patch("odoo.upgrade.util.records.indirect_references", self._ir_dummy): - util.replace_record_references_batch(cr, mapping, "res.users", "res.partner") - - cr.execute("SELECT res_model, res_id, extra FROM dummy_model ORDER BY res_id, res_model") - data = cr.fetchall() - expected = [ - ("res.partner", 1, "x"), - ("res.partner", 2, "x"), - ("res.partner", 3, "x"), - ("res.partner", 4, "x"), - ] - self.assertEqual(data, expected) - - -class TestRemoveFieldDomains(UnitTestCase): - @parametrize( - [ - ([("updated", "=", 0)], [TRUE_LEAF]), - # operator is not relevant - ([("updated", "!=", 0)], [TRUE_LEAF]), - # if negate we should end with "not false" - (["!", ("updated", "!=", 0)], [TRUE_LEAF] if USE_ORM_DOMAIN else ["!", FALSE_LEAF]), - # multiple !, we should still end with a true leaf - (["!", "!", ("updated", ">", 0)], [*NOTNOT, TRUE_LEAF]), - # with operator - ([("updated", "=", 0), ("state", "=", "done")], ["&", TRUE_LEAF, ("state", "=", "done")]), - (["&", ("updated", "=", 0), ("state", "=", "done")], ["&", TRUE_LEAF, ("state", "=", "done")]), - (["|", ("updated", "=", 0), ("state", "=", "done")], ["|", FALSE_LEAF, ("state", "=", "done")]), - # in second operand - (["&", ("state", "=", "done"), ("updated", "=", 0)], ["&", ("state", "=", "done"), TRUE_LEAF]), - (["|", ("state", "=", "done"), ("updated", "=", 0)], ["|", ("state", "=", "done"), FALSE_LEAF]), - # combination with ! - ( - ["&", "!", ("updated", "=", 0), ("state", "=", "done")], - ["&", TRUE_LEAF, ("state", "=", "done")] - if USE_ORM_DOMAIN - else ["&", "!", FALSE_LEAF, ("state", "=", "done")], - ), - ( - ["|", "!", ("updated", "=", 0), ("state", "=", "done")], - ["|", FALSE_LEAF, ("state", "=", "done")] - if USE_ORM_DOMAIN - else ["|", "!", TRUE_LEAF, ("state", "=", "done")], - ), - # here, the ! apply on the whole &/| and should not invert the replaced leaf - ( - ["!", "&", ("updated", "=", 0), ("state", "=", "done")], - ["|", FALSE_LEAF, ("state", "!=", "done")] - if USE_ORM_DOMAIN - else ["!", "&", TRUE_LEAF, ("state", "=", "done")], - ), - ( - ["!", "|", ("updated", "=", 0), ("state", "=", "done")], - ["&", TRUE_LEAF, ("state", "!=", "done")] - if USE_ORM_DOMAIN - else ["!", "|", FALSE_LEAF, ("state", "=", "done")], - ), - ] - ) - def test_remove_field(self, domain, expected): - cr = self.env.cr - cr.execute( - "INSERT INTO ir_filters(name, model_id, domain, context, sort)" - " VALUES ('test', 'base.module.update', %s, '{}', '[]') RETURNING id", - [str(domain)], - ) - (filter_id,) = cr.fetchone() - - util.remove_field(cr, "base.module.update", "updated") - - cr.execute("SELECT domain FROM ir_filters WHERE id = %s", [filter_id]) - altered_domain = literal_eval(cr.fetchone()[0]) - - self.assertEqual(altered_domain, expected) - - -class TestIrExports(UnitTestCase): - def setUp(self): - super().setUp() - self.export = self.env["ir.exports"].create( - [ - { - "name": "Test currency export", - "resource": "res.currency", - "export_fields": [ - (0, 0, {"name": "full_name"}), - (0, 0, {"name": "rate_ids/company_id/user_ids/name"}), - (0, 0, {"name": "rate_ids/company_id/user_ids/partner_id/user_ids/name"}), - (0, 0, {"name": "rate_ids/name"}), - (0, 0, {"name": "rate_ids/company_id/user_ids/partner_id/user_ids/.id"}), - ], - } - ] - ) - util.flush(self.export) - - def _invalidate(self): - util.invalidate(self.export.export_fields) - util.invalidate(self.export) - - def test_rename_field(self): - util.rename_field(self.cr, "res.partner", "user_ids", "renamed_user_ids") - self._invalidate() - self.assertEqual( - self.export.export_fields[2].name, "rate_ids/company_id/user_ids/partner_id/renamed_user_ids/name" - ) - self.assertEqual( - self.export.export_fields[4].name, "rate_ids/company_id/user_ids/partner_id/renamed_user_ids/.id" - ) - - util.rename_field(self.cr, "res.users", "name", "new_name") - self._invalidate() - self.assertEqual(self.export.export_fields[1].name, "rate_ids/company_id/user_ids/new_name") - - def test_remove_field(self): - util.remove_field(self.cr, "res.currency.rate", "company_id") - self._invalidate() - self.assertEqual(len(self.export.export_fields), 2) - self.assertEqual(self.export.export_fields[0].name, "full_name") - self.assertEqual(self.export.export_fields[1].name, "rate_ids/name") - - @mute_logger(util.pg._logger.name) - def test_rename_model(self): - util.rename_model(self.cr, "res.currency", "res.currency2") - self._invalidate() - self.assertEqual(self.export.resource, "res.currency2") - - def test_remove_model(self): - util.remove_model(self.cr, "res.currency.rate") - self._invalidate() - self.assertEqual(len(self.export.export_fields), 1) - self.assertEqual(self.export.export_fields[0].name, "full_name") - - util.remove_model(self.cr, "res.currency") - self.cr.execute("SELECT * FROM ir_exports WHERE id = %s", [self.export.id]) - self.assertFalse(self.cr.fetchall()) - - -class TestBaseImportMappings(UnitTestCase): - def setUp(self): - super().setUp() - self.import_mapping = self.env["base_import.mapping"].create( - [ - {"res_model": "res.currency", "column_name": "Column name", "field_name": path} - for path in [ - "full_name", - "rate_ids/company_id/user_ids/name", - "rate_ids/company_id/user_ids/partner_id/user_ids/name", - "rate_ids/name", - ] - ] - ) - - util.flush(self.import_mapping) - - def test_rename_field(self): - util.rename_field(self.cr, "res.partner", "user_ids", "renamed_user_ids") - util.invalidate(self.import_mapping) - - self.assertEqual( - self.import_mapping[2].field_name, "rate_ids/company_id/user_ids/partner_id/renamed_user_ids/name" - ) - - util.rename_field(self.cr, "res.users", "name", "new_name") - util.invalidate(self.import_mapping) - - self.assertEqual(self.import_mapping[1].field_name, "rate_ids/company_id/user_ids/new_name") - - def test_remove_field(self): - prev_mappings = self.env["base_import.mapping"].search([]) - - util.remove_field(self.cr, "res.currency.rate", "company_id") - util.invalidate(self.import_mapping) - - removed_mappings = prev_mappings - self.env["base_import.mapping"].search([]) - remaining_mappings = self.import_mapping - removed_mappings - - self.assertEqual(len(removed_mappings), 2) - self.assertEqual(remaining_mappings[0].field_name, "full_name") - self.assertEqual(remaining_mappings[1].field_name, "rate_ids/name") - - @mute_logger(util.pg._logger.name) - def test_rename_model(self): - util.rename_model(self.cr, "res.currency", "res.currency2") - util.invalidate(self.import_mapping) - - self.assertEqual(self.import_mapping[0].res_model, "res.currency2") - - def test_remove_model(self): - prev_mappings = self.env["base_import.mapping"].search([]) - - util.remove_model(self.cr, "res.currency.rate") - util.invalidate(self.import_mapping) - - removed_mappings = prev_mappings - self.env["base_import.mapping"].search([]) - remaining_mappings = self.import_mapping - removed_mappings - - self.assertEqual(len(removed_mappings), 3) - self.assertEqual(remaining_mappings[0].field_name, "full_name") - - util.remove_model(self.cr, "res.currency") - self.cr.execute("SELECT * FROM base_import_mapping WHERE id = %s", [remaining_mappings.id]) - self.assertFalse(self.cr.fetchall()) - - -class TestIterBrowse(UnitTestCase): - def test_iter_browse_iter(self): - cr = self.env.cr - cr.execute("SELECT id FROM res_country") - ids = [c for (c,) in cr.fetchall()] - chunk_size = 10 - - Country = type(self.env["res.country"]) - func = "fetch" if util.version_gte("saas~16.2") else "_read" if util.version_gte("saas~12.5") else "read" - with mock.patch.object(Country, func, autospec=True, side_effect=getattr(Country, func)) as read: - for c in util.iter_browse(self.env["res.country"], ids, logger=None, chunk_size=chunk_size): - c.name # noqa: B018 - expected = (len(ids) + chunk_size - 1) // chunk_size - self.assertEqual(read.call_count, expected) - - def test_iter_browse_call(self): - cr = self.env.cr - cr.execute("SELECT id FROM res_country") - ids = [c for (c,) in cr.fetchall()] - chunk_size = 10 - - Country = type(self.env["res.country"]) - with mock.patch.object(Country, "write", autospec=True, side_effect=Country.write) as write: - ib = util.iter_browse(self.env["res.country"], ids, logger=None, chunk_size=chunk_size) - ib.write({"vat_label": "VAT"}) - - expected = (len(ids) + chunk_size - 1) // chunk_size - self.assertEqual(write.call_count, expected) - - def test_iter_browse_create_non_empty(self): - RP = self.env["res.partner"] - with self.assertRaises(ValueError): - util.iter_browse(RP, [42]).create([{}]) - - @parametrize([(True,), (False,)]) - def test_iter_browse_create(self, multi): - chunk_size = 2 - RP = self.env["res.partner"] - - names = [f"Name {i}" for i in range(7)] - ib = util.iter_browse(RP, [], chunk_size=chunk_size) - records = ib.create([{"name": name} for name in names], multi=multi) - self.assertEqual([t.name for t in records], names) - - def test_iter_browse_iter_twice(self): - cr = self.env.cr - cr.execute("SELECT id FROM res_country") - ids = [c for (c,) in cr.fetchall()] - chunk_size = 10 - - ib = util.iter_browse(self.env["res.country"], ids, logger=None, chunk_size=chunk_size) - for c in ib: - c.name # noqa: B018 - - with self.assertRaises(RuntimeError): - for c in ib: - c.name # noqa: B018 - - def test_iter_browse_call_twice(self): - cr = self.env.cr - cr.execute("SELECT id FROM res_country") - ids = [c for (c,) in cr.fetchall()] - chunk_size = 10 - - ib = util.iter_browse(self.env["res.country"], ids, logger=None, chunk_size=chunk_size) - ib.write({"vat_label": "VAT"}) - - with self.assertRaises(RuntimeError): - ib.write({"name": "FAIL"}) - - -class TestPG(UnitTestCase): - @parametrize( - [ - # explicit conversions - ("boolean", "bool"), - ("smallint", "int2"), - ("integer", "int4"), - ("bigint", "int8"), - ("real", "float4"), - ("double precision", "float8"), - ("character varying", "varchar"), - ("timestamp with time zone", "timestamptz"), - ("timestamp without time zone", "timestamp"), - # noop for existing types - ("bool", "bool"), - ("int4", "int4"), - ("varchar", "varchar"), - # and unspecified/unknown types - ("jsonb", "jsonb"), - ("foo", "foo"), - # keep suffix (for arrays and sized limited varchar) - ("int4[]", "int4[]"), - ("varchar(2)", "varchar(2)"), - # but also convert types - ("integer[]", "int4[]"), - ("character varying(16)", "varchar(16)"), - ] - ) - def test__normalize_pg_type(self, type_, expected): - self.assertEqual(util.pg._normalize_pg_type(type_), expected) - - @parametrize( - [ - ("res_country", "name", False, "jsonb" if util.version_gte("16.0") else "varchar"), # translated field - ("res_country", "code", False, "varchar"), - ("res_country", "code", True, "varchar(2)"), - ("res_currency", "active", False, "bool"), - ("res_currency", "active", True, "bool"), - ("res_country", "create_date", False, "timestamp"), - ("res_currency", "create_uid", False, "int4"), - ("res_country", "name_position", False, "varchar"), - ("res_country", "name_position", True, "varchar"), - ("res_country", "address_format", False, "text"), - ("res_partner", "does_not_exists", False, None), - ] - ) - def test_column_type(self, table, column, sized, expected): - value = util.column_type(self.env.cr, table, column, sized=sized) - if expected is None: - self.assertIsNone(value) - else: - self.assertEqual(value, expected) - - def test_alter_column_type(self): - cr = self.env.cr - cr.execute( - """ - ALTER TABLE res_partner_bank ADD COLUMN x bool; - ALTER TABLE res_partner_bank ADD COLUMN y varchar(4); - - UPDATE res_partner_bank - SET x = CASE id % 3 - WHEN 1 THEN NULL - WHEN 2 THEN True - ELSE False - END - """ - ) - self.assertEqual(util.column_type(cr, "res_partner_bank", "x"), "bool") - util.alter_column_type(cr, "res_partner_bank", "x", "int", using="CASE {0} WHEN True THEN 2 ELSE 1 END") - self.assertEqual(util.column_type(cr, "res_partner_bank", "x"), "int4") - cr.execute("SELECT id, x FROM res_partner_bank") - data = cr.fetchall() - self.assertTrue( - all(x == 1 or (x == 2 and id_ % 3 == 2) for id_, x in data), - "Some values where not casted correctly via USING", - ) - - self.assertEqual(util.column_type(cr, "res_partner_bank", "y"), "varchar") - self.assertEqual(util.column_type(cr, "res_partner_bank", "y", sized=True), "varchar(4)") - util.alter_column_type(cr, "res_partner_bank", "y", "varchar") - self.assertEqual(util.column_type(cr, "res_partner_bank", "y"), "varchar") - self.assertEqual(util.column_type(cr, "res_partner_bank", "y", sized=True), "varchar") - util.alter_column_type(cr, "res_partner_bank", "y", "varchar(12)") - self.assertEqual(util.column_type(cr, "res_partner_bank", "y"), "varchar") - self.assertEqual(util.column_type(cr, "res_partner_bank", "y", sized=True), "varchar(12)") - - @parametrize( - [ - ("test", "

test

"), - ("

test

", "

test

"), - ("
test
", "
test
"), - # escapings - ("r&d", "

r&d

"), - ("!<(^_^)>!", "

!<(^_^)>!

"), - ("'quoted'", "

'quoted'

"), - # and with links - ( - "Go to https://upgrade.odoo.com/?debug=1&version=14.0 and follow the instructions.", - '

Go to https://upgrade.odoo.com/?debug=1&version=14.0 and' - " follow the instructions.

", - ), - ] - ) - def test_pg_text2html(self, value, expected): - cr = self.env.cr - uid = self.env.user.id - cr.execute("UPDATE res_users SET signature=%s WHERE id=%s", [value, uid]) - cr.execute("SELECT {} FROM res_users WHERE id=%s".format(util.pg_text2html("signature")), [uid]) - result = cr.fetchone()[0] - self.assertEqual(result, expected) - - @parametrize( - [ - ("{parallel_filter}", "…"), - ("{{parallel_filter}}", "{parallel_filter}"), - ("{}", "{}"), - ("{0}", "{0}"), - ("{{0}}", "{0}"), - ("{x}", "{x}"), - ("{{x}}", "{x}"), - ("{{}}", "{}"), - ("{{", "{"), - ("test", "test"), - ("", ""), - ("WHERE {parallel_filter} AND true", "WHERE … AND true"), - ("WHERE {parallel_filter} AND {other}", "WHERE … AND {other}"), - ("WHERE {parallel_filter} AND {other!r}", "WHERE … AND {other!r}"), - ("WHERE {parallel_filter} AND {{other}}", "WHERE … AND {other}"), - ("WHERE {parallel_filter} AND {}", "WHERE … AND {}"), - ("WHERE {parallel_filter} AND {{}}", "WHERE … AND {}"), - ("WHERE {parallel_filter} AND {parallel_filter}", "WHERE … AND …"), - ("using { with other things inside } and {parallel_filter}", "using { with other things inside } and …"), - ] - ) - def test_ExplodeFormatter(self, value, expected): - formatted = util.pg._ExplodeFormatter().format(value, parallel_filter="…") - self.assertEqual(formatted, expected) - # retro-compatibility test - try: - std_formatted = value.format(parallel_filter="…") - except (IndexError, KeyError): - # ignore string that weren't valid - pass - else: - # assert that the new formatted output match the old one. - self.assertEqual(formatted, std_formatted) - - def _get_cr(self): - cr = self.registry.cursor() - self.addCleanup(cr.close) - return cr - - def test_explode_mult_filters(self): - cr = self._get_cr() - queries = util.explode_query_range( - cr, - """ - WITH cte1 AS ( - SELECT id, - login - FROM res_users - WHERE {parallel_filter} - ), cte2 AS ( - SELECT id, - login - FROM res_users - WHERE {parallel_filter} - ) SELECT u.login = cte1.login AND u.login = cte2.login - FROM cte1 - LEFT JOIN cte2 - ON cte2.id = cte1.id - JOIN res_users u - ON u.id = cte1.id - """, - table="res_users", - bucket_size=4, - ) - for q in queries: - cr.execute(q) - self.assertTrue(all(x for (x,) in cr.fetchall())) - - @mute_logger(util.pg._logger.getChild("explode_query_range").name) - def test_explode_query_range(self): - cr = self.env.cr - - cr.execute("SELECT count(id) FROM res_partner_category") - count = cr.fetchone()[0] - # ensure there start with at least 10 records - for _ in range(10 - count): - count += 1 - self.env["res.partner.category"].create({"name": "x"}) - - # set one record with very high id - tid = self.env["res.partner.category"].create({"name": "x"}).id - count += 1 - cr.execute("UPDATE res_partner_category SET id = 10000000 WHERE id = %s", [tid]) - - qs = util.explode_query_range(cr, "SELECT 1", table="res_partner_category", bucket_size=count) - self.assertEqual(len(qs), 1) # one bucket should be enough for all records - - qs = util.explode_query_range(cr, "SELECT 1", table="res_partner_category", bucket_size=count - 1) - self.assertEqual(len(qs), 1) # 10% rule for second bucket, 1 <= 0.1(count - 1) since count >= 11 - - def test_parallel_rowcount(self): - cr = self._get_cr() - cr.execute("SELECT count(*) FROM res_lang") - [expected] = cr.fetchone() - - # util.parallel_execute will `commit` the cursor and create new ones - # as we are in a test, we should not commit as we are in a subtransaction - with mock.patch.object(cr, "commit", lambda: ...): - query = "UPDATE res_lang SET name = name" - rowcount = util.explode_execute(cr, query, table="res_lang", bucket_size=10) - self.assertEqual(rowcount, expected) - - def test_parallel_rowcount_threaded(self): - with without_testing(): - self.test_parallel_rowcount() - - def test_parallel_execute_retry_on_serialization_failure(self): - TEST_TABLE_NAME = "_upgrade_serialization_failure_test_table" - N_ROWS = 10 - - cr = self._get_cr() - - cr.execute( - util.format_query( - cr, - """ - DROP TABLE IF EXISTS {table}; - - CREATE TABLE {table} ( - id SERIAL PRIMARY KEY, - other_id INTEGER, - FOREIGN KEY (other_id) REFERENCES {table} ON DELETE CASCADE - ); - - INSERT INTO {table} SELECT GENERATE_SERIES(1, %s); - - -- map odd numbers `n` to `n + 1` and viceversa (`n + 1` to `n`) - UPDATE {table} SET other_id = id + (MOD(id, 2) - 0.5)*2; - """ - % N_ROWS, - table=TEST_TABLE_NAME, - ) - ) - - # exploded queries will generate a SerializationFailed error, causing some of the queries to be retried - with without_testing(), mute_logger(util.pg._logger.name, "odoo.sql_db"): - util.explode_execute( - cr, util.format_query(cr, "DELETE FROM {}", TEST_TABLE_NAME), TEST_TABLE_NAME, bucket_size=1 - ) - - if hasattr(self, "_savepoint_id"): - # `explode_execute` causes the cursor to be committed, losing the automatic checkpoint - # Force a new one to avoid issues when cleaning up - self.addCleanup(cr.execute, f"SAVEPOINT test_{self._savepoint_id}") - self.addCleanup(cr.execute, util.format_query(cr, "DROP TABLE IF EXISTS {}", TEST_TABLE_NAME)) - - cr.execute(util.format_query(cr, "SELECT 1 FROM {}", TEST_TABLE_NAME)) - self.assertFalse(cr.rowcount) - - def test_update_one_col_from_dict(self): - TEST_TABLE_NAME = "_upgrade_bulk_update_one_col_test_table" - N_ROWS = 10 - - cr = self._get_cr() - - cr.execute( - util.format_query( - cr, - """ - DROP TABLE IF EXISTS {table}; - - CREATE TABLE {table} ( - id SERIAL PRIMARY KEY, - col1 INTEGER, - col2 INTEGER - ); - - INSERT INTO {table} (col1, col2) SELECT v, v FROM GENERATE_SERIES(1, %s) as v; - """, - table=TEST_TABLE_NAME, - ), - [N_ROWS], - ) - mapping = {id: id * 2 for id in range(1, N_ROWS + 1, 2)} - util.bulk_update_table(cr, TEST_TABLE_NAME, "col1", mapping) - - cr.execute( - util.format_query( - cr, - "SELECT id FROM {table} WHERE col2 != id", - table=TEST_TABLE_NAME, - ) - ) - self.assertFalse(cr.rowcount, "unintended column 'col2' is affected") - - cr.execute( - util.format_query( - cr, - "SELECT id FROM {table} WHERE col1 != id AND MOD(id, 2) = 0", - table=TEST_TABLE_NAME, - ) - ) - self.assertFalse(cr.rowcount, "unintended rows are affected") - - cr.execute( - util.format_query( - cr, - "SELECT id FROM {table} WHERE col1 != 2 * id AND MOD(id, 2) = 1", - table=TEST_TABLE_NAME, - ) - ) - self.assertFalse(cr.rowcount, "partial/incorrect updates are performed") - - def test_update_multiple_cols_from_dict(self): - TEST_TABLE_NAME = "_upgrade_bulk_update_multiple_cols_test_table" - N_ROWS = 10 - - cr = self._get_cr() - - cr.execute( - util.format_query( - cr, - """ - DROP TABLE IF EXISTS {table}; - - CREATE TABLE {table} ( - id SERIAL PRIMARY KEY, - col1 INTEGER, - col2 INTEGER, - col3 INTEGER - ); - - INSERT INTO {table} (col1, col2, col3) SELECT v, v, v FROM GENERATE_SERIES(1, %s) as v; - """, - table=TEST_TABLE_NAME, - ), - [N_ROWS], - ) - mapping = {id: [id * 2, id * 3] for id in range(1, N_ROWS + 1, 2)} - util.bulk_update_table(cr, TEST_TABLE_NAME, ["col1", "col2"], mapping) - - cr.execute( - util.format_query( - cr, - "SELECT id FROM {table} WHERE col3 != id", - table=TEST_TABLE_NAME, - ) - ) - self.assertFalse(cr.rowcount, "unintended column 'col3' is affected") - - cr.execute( - util.format_query( - cr, - "SELECT id FROM {table} WHERE col1 != id AND MOD(id, 2) = 0", - table=TEST_TABLE_NAME, - ) - ) - self.assertFalse(cr.rowcount, "unintended rows are affected") - - cr.execute( - util.format_query( - cr, - "SELECT id FROM {table} WHERE (col1 != 2 * id OR col2 != 3 * id) AND MOD(id, 2) = 1", - table=TEST_TABLE_NAME, - ) - ) - self.assertFalse(cr.rowcount, "partial/incorrect updates are performed") - - def test_create_column_with_fk(self): - cr = self.env.cr - self.assertFalse(util.column_exists(cr, "res_partner", "_test_lang_id")) - - with self.assertRaises(ValueError): - util.create_column(cr, "res_partner", "_test_lang_id", "int4", on_delete_action="http://github.com/SET%20NULL") - - with self.assertRaises(ValueError): - util.create_column( - cr, "res_partner", "_test_lang_id", "int4", fk_table="res_lang", on_delete_action="http://github.com/INVALID" - ) - - # this one should works - util.create_column(cr, "res_partner", "_test_lang_id", "int4", fk_table="res_lang", on_delete_action="http://github.com/SET%20NULL") - - target = util.target_of(cr, "res_partner", "_test_lang_id") - self.assertEqual(target, ("res_lang", "id", "res_partner__test_lang_id_fkey")) - - # code should be reentrant - util.create_column(cr, "res_partner", "_test_lang_id", "int4", fk_table="res_lang", on_delete_action="http://github.com/SET%20NULL") - - target = util.target_of(cr, "res_partner", "_test_lang_id") - self.assertEqual(target, ("res_lang", "id", "res_partner__test_lang_id_fkey")) - - def test_ColumnList(self): - cr = self.env.cr - - s = lambda c: c.as_string(cr._cnx) - - columns = util.ColumnList(["a", "A"], ['"a"', '"A"']) - self.assertEqual(len(columns), 2) - - columns2 = util.ColumnList.from_unquoted(cr, ["a", "A"]) - self.assertEqual(columns2, columns) - - # iterating it yield quoted columns - self.assertEqual(list(iter(columns)), ['"a"', '"A"']) - - self.assertEqual(list(columns.iter_unquoted()), ["a", "A"]) - - self.assertEqual(s(columns), '"a", "A"') - - self.assertEqual(s(columns.using(alias="t")), '"t"."a", "t"."A"') - self.assertEqual(s(columns.using(leading_comma=True)), ', "a", "A"') - self.assertEqual(s(columns.using(trailing_comma=True)), '"a", "A",') - self.assertEqual(s(columns.using(leading_comma=True, trailing_comma=True)), ', "a", "A",') - - self.assertIs(columns.using(), columns) - - ulc = columns.using(leading_comma=True) - self.assertTrue(s(ulc.using(alias="x")), ', "x"."a", "x"."A"') - self.assertIs(ulc, ulc.using(leading_comma=True)) - - def test_create_m2m(self): - cr = self.env.cr - - m2m_name = "random_table_name" - created_m2m = util.create_m2m(cr, m2m_name, "res_users", "res_groups") - self.assertEqual(m2m_name, created_m2m) - self.assertTrue(util.table_exists(cr, created_m2m)) - - auto_generated_m2m_table_name = util.create_m2m(cr, util.AUTO, "res_users", "res_groups") - self.assertEqual("res_groups_res_users_rel", auto_generated_m2m_table_name) - self.assertTrue(util.table_exists(cr, auto_generated_m2m_table_name)) - - def test_rename_m2m(self): - cr = self.env.cr - - self.env["ir.model"].create({"model": "x_new.model", "name": "Custom test model"}) - manual_model_id = self.env["ir.model"].create({"model": "x_manual.model", "name": "Manual model"}).id - - field_regular = self.env["ir.model.fields"].create( - { - "name": "x_m2m_field_regular", - "ttype": "many2many", - "model_id": manual_model_id, - "relation": "x_new.model", - "relation_table": "x_x_manual_model_x_new_model_rel", - } - ) - field_custom = self.env["ir.model.fields"].create( - { - "name": "x_m2m_field_custom", - "ttype": "many2many", - "model_id": manual_model_id, - "relation": "x_new.model", - "relation_table": "x_x_manual_model_x_new_model_rel_2", - } - ) - old_regular_table = field_regular.relation_table - old_custom_table = field_custom.relation_table - - util.pg_rename_table(cr, "x_new_model", "new_special_model") - util.update_m2m_tables(cr, "x_new_model", "new_special_model") - util.invalidate(field_regular) - - new_regular_table = field_regular.relation_table - self.assertEqual(new_regular_table, "x_new_special_model_x_manual_model_rel") - self.assertEqual(field_custom.relation_table, old_custom_table) - self.assertEqual(field_regular.column2, "new_special_model_id") - self.assertEqual(field_custom.column2, "new_special_model_id") - self.assertTrue(util.table_exists(cr, new_regular_table)) - self.assertTrue(util.table_exists(cr, old_custom_table)) - self.assertFalse(util.table_exists(cr, old_regular_table)) - - -class TestORM(UnitTestCase): - def test_create_cron(self): - cr = self.env.cr - util.create_cron(cr, "Test cron creation don't fail", "res.partner", "answer = 42") - - cron_id = util.ref(cr, "__upgrade__.cron_post_upgrade_test_cron_creation_don_t_fail") - self.assertIsNotNone(cron_id) - cron = self.env["ir.cron"].browse(cron_id) - self.assertEqual(cron.code, "answer = 42") - - -class TestField(UnitTestCase): - def test_invert_boolean_field(self): - cr = self.env.cr - - with self.assertRaises(ValueError): - util.invert_boolean_field(cr, "res.partner", "name", "nom") - - model, old_name, new_name = "ir.model.access", "perm_unlink", "perm_delete" - table = util.table_of_model(cr, model) - - fltr = self.env["ir.filters"].create( - {"name": "test", "model_id": model, "domain": str([(old_name, "=", True)])} - ) - - query = """ - SELECT {1}, count(*) - FROM {0} - GROUP BY {1} - """ - - cr.execute(util.format_query(cr, query, table, old_name)) - initial_repartition = dict(cr.fetchall()) - - # util.parallel_execute will `commit` the cursor and create new ones - # as we are in a test, we should not commit as we are in a subtransaction - with mock.patch.object(cr, "commit", lambda: ...): - util.invert_boolean_field(cr, model, old_name, new_name) - - util.invalidate(fltr) - expected = ["!", (new_name, "=", True)] - self.assertEqual(literal_eval(fltr.domain), expected) - - cr.execute(util.format_query(cr, query, table, new_name)) - inverted_repartition = dict(cr.fetchall()) - - self.assertEqual(inverted_repartition[False], initial_repartition[True]) - self.assertEqual(inverted_repartition[True], initial_repartition[False] + initial_repartition.get(None, 0)) - self.assertEqual(inverted_repartition.get(None, 0), 0) - - # rename back - with mock.patch.object(cr, "commit", lambda: ...): - util.rename_field(cr, model, new_name, old_name) - - util.invalidate(fltr) - expected = [(old_name, "!=", True)] if USE_ORM_DOMAIN else ["!", (old_name, "=", True)] - self.assertEqual(literal_eval(fltr.domain), expected) - - # invert with same name; will invert domains and data - with mock.patch.object(cr, "commit", lambda: ...): - util.invert_boolean_field(cr, model, old_name, old_name) - - util.invalidate(fltr) - expected = ["!", (old_name, "!=", True)] if USE_ORM_DOMAIN else ["!", "!", (old_name, "=", True)] - self.assertEqual(literal_eval(fltr.domain), expected) - - cr.execute(util.format_query(cr, query, table, old_name)) - back_repartition = dict(cr.fetchall()) - - # merge None into False in the initial repartition - initial_repartition[False] += initial_repartition.pop(None, 0) - self.assertEqual(back_repartition, initial_repartition) - - def test_change_field_selection_with_default(self): - cr = self.env.cr - lang = self.env["res.lang"].create({"name": "Elvish", "code": "el_VISH", "active": True}) - if util.table_exists(cr, "ir_default"): - self.env["ir.default"].set("res.partner", "lang", "el_VISH") - else: - self.env["ir.values"].set_default("res.partner", "lang", "el_VISH") - util.flush(lang) - partner = self.env["res.partner"].create({"name": "Gandalf"}) - self.assertEqual(partner.lang, "el_VISH") - - util.invalidate(partner) - util.change_field_selection_values(cr, "res.partner", "lang", {"el_VISH": "en_US"}) - - self.assertEqual(partner.lang, "en_US") - - if util.table_exists(cr, "ir_default"): - new_default = (getattr(self.env["ir.default"], "get", None) or self.env["ir.default"]._get)( - "res.partner", "lang" - ) - else: - new_default = self.env["ir.values"].get_default("res.partner", "lang") - - self.assertEqual(new_default, "en_US") - - @unittest.skipIf(not util.version_gte("saas~17.5"), "Company dependent fields are stored as jsonb since saas~17.5") - def test_convert_field_to_company_dependent(self): - cr = self.env.cr - - partner_model = self.env["ir.model"].search([("model", "=", "res.partner")]) - self.env["ir.model.fields"].create( - [ - { - "name": "x_test_cd_1", - "ttype": "char", - "model_id": partner_model.id, - }, - { - "name": "x_test_cd_2", - "ttype": "char", - "model_id": partner_model.id, - }, - ] - ) - - c1 = self.env["res.company"].create({"name": "Flancrest"}) - c2 = self.env["res.company"].create({"name": "Flancrest2"}) - - test_partners = self.env["res.partner"].create( - [ - {"name": "Homer", "x_test_cd_1": "A", "x_test_cd_2": "A", "company_id": c1.id}, - {"name": "Marjorie", "x_test_cd_1": "B", "x_test_cd_2": "B"}, - {"name": "Bartholomew"}, - ] - ) - test_partners.invalidate_recordset(["x_test_cd_1", "x_test_cd_2"]) - - # Using company_id as default, only records with company set are updated - util.make_field_company_dependent(cr, "res.partner", "x_test_cd_1", "char") - util.make_field_company_dependent(cr, "res.partner", "x_test_cd_2", "char", company_field=False) - - # make the ORM re-read the info about these manual fields from the DB - setup_models = ( - self.registry.setup_models if hasattr(self.registry, "setup_models") else self.registry._setup_models__ - ) - args = (["res.partner"],) if util.version_gte("saas~18.4") else () - setup_models(cr, *args) - - test_partners_c1 = test_partners.with_company(c1.id) - self.assertEqual(test_partners_c1[0].x_test_cd_1, "A") - self.assertFalse(test_partners_c1[1].x_test_cd_1) - self.assertFalse(test_partners_c1[2].x_test_cd_1) - self.assertEqual(test_partners_c1[0].x_test_cd_2, "A") - self.assertEqual(test_partners_c1[1].x_test_cd_2, "B") - self.assertFalse(test_partners_c1[2].x_test_cd_2) - - test_partners_c2 = test_partners.with_company(c2.id) - self.assertFalse(test_partners_c2[0].x_test_cd_1) - self.assertFalse(test_partners_c2[1].x_test_cd_1) - self.assertFalse(test_partners_c2[2].x_test_cd_1) - self.assertEqual(test_partners_c2[0].x_test_cd_2, "A") - self.assertEqual(test_partners_c2[1].x_test_cd_2, "B") - self.assertFalse(test_partners_c2[2].x_test_cd_2) - - -class TestHelpers(UnitTestCase): - def test_model_table_conversion(self): - cr = self.env.cr - for model in self.env.registry: - if model in ("ir.actions.act_window_close",): - continue - table = util.table_of_model(cr, model) - self.assertEqual(table, self.env[model]._table) - self.assertEqual(util.model_of_table(cr, table), model) - - def test_resolve_model_fields_path(self): - cr = self.env.cr - - # test with provided paths - model, path = "res.currency", ["rate_ids", "company_id", "user_ids", "partner_id"] - expected_result = [ - util.FieldsPathPart("res.currency", "rate_ids", "res.currency.rate"), - util.FieldsPathPart("res.currency.rate", "company_id", "res.company"), - util.FieldsPathPart("res.company", "user_ids", "res.users"), - util.FieldsPathPart("res.users", "partner_id", "res.partner"), - ] - result = util.resolve_model_fields_path(cr, model, path) - self.assertEqual(result, expected_result) - - model, path = "res.users", ("partner_id", "removed_field", "user_id") - expected_result = [util.FieldsPathPart("res.users", "partner_id", "res.partner")] - result = util.resolve_model_fields_path(cr, model, path) - self.assertEqual(result, expected_result) - - -@unittest.skipIf( - util.version_gte("saas~17.1"), - "Starting Odoo 17, the info being stored in the database, the test can't lie about its base version", -) -class TestInherit(UnitTestCase): - @classmethod - def setUpClass(cls): - bv = util.ENVIRON.get("__base_version") - util.ENVIRON["__base_version"] = util.parse_version("12.0.1.3") - if bv: - cls.addClassCleanup(operator.setitem, util.ENVIRON, "__base_version", bv) - return super().setUpClass() - - @parametrize( - [ - # simple tests - ("do.not.exits", []), - ("account.common.journal.report", ["account.common.report"]), - # avoid duplicates - ( - "product.product", - [ - "mail.activity.mixin", - "mail.thread", - "product.template", - "rating.mixin", - "website.published.multi.mixin", - "website.seo.metadata", - ], - ), - # version boundaries - # ... born after 12.0, should not include it. - ("report.paperformat", []), - # ... dead before 12.0. should not be included - ("delivery.carrier", ["website.published.multi.mixin"]), - # ... dead between 12.0 and CURRENT_VERSION - ("crm.lead.convert2task", ["crm.partner.binding"]), - ] - ) - def test_inherit_parents(self, model, expected): - cr = self.env.cr - result = sorted(util.inherit_parents(cr, model)) - self.assertEqual(result, sorted(expected)) - - def test_direct_inherit_parents(self): - cr = self.env.cr - result = sorted(util.direct_inherit_parents(cr, "product.product")) - self.assertEqual(len(result), 3) - parents, inhs = zip(*result) - self.assertEqual(parents, ("mail.activity.mixin", "mail.thread", "product.template")) - self.assertTrue(all(inh.model == "product.product" for inh in inhs)) - self.assertEqual([inh.via for inh in inhs], [None, None, "product_tmpl_id"]) - - -class TestNamedCursors(UnitTestCase): - @staticmethod - def exec(cr, which="", args=()): - cr.execute("SELECT * FROM ir_ui_view") - if which: - return getattr(cr, which)(*args) - return None - - @parametrize( - [ - (None, "dictfetchone"), - (None, "dictfetchmany", [10]), - (None, "dictfetchall"), - (1, "dictfetchone"), - (1, "dictfetchmany", [10]), - (1, "dictfetchall"), - (None, "fetchone"), - (None, "fetchmany", [10]), - (None, "fetchall"), - (1, "fetchone"), - (1, "fetchmany", [10]), - (1, "fetchall"), - ] - ) - def test_dictfetch(self, itersize, which, args=()): - expected = self.exec(self.env.cr, which, args) - with util.named_cursor(self.env.cr, itersize=itersize) as ncr: - result = self.exec(ncr, which, args) - self.assertEqual(result, expected) - - def test_iterdict(self): - expected = self.exec(self.env.cr, "dictfetchall") - with util.named_cursor(self.env.cr) as ncr: - result = list(self.exec(ncr, "iterdict")) - self.assertEqual(result, expected) - - def test_iter(self): - expected = self.exec(self.env.cr, "fetchall") - with util.named_cursor(self.env.cr) as ncr: - result = list(self.exec(ncr, "__iter__")) - self.assertEqual(result, expected) - - -class TestRecords(UnitTestCase): - def test_rename_xmlid(self): - cr = self.env.cr - - old = self.env["res.currency"].create({"name": "TX1", "symbol": "TX1"}) - new = self.env["res.currency"].create({"name": "TX2", "symbol": "TX2"}) - self.env["ir.model.data"].create({"name": "TX1", "module": "base", "model": "res.currency", "res_id": old.id}) - self.env["ir.model.data"].create({"name": "TX2", "module": "base", "model": "res.currency", "res_id": new.id}) - - rate = self.env["res.currency.rate"].create({"currency_id": old.id}) - self.env["ir.model.data"].create( - {"name": "test_rate_tx1", "module": "base", "model": "res.currency.rate", "res_id": rate.id} - ) - - if hasattr(self, "_savepoint_id"): - # As the `rename_xmlid` method uses `parallel_execute`, the cursor is committed; which kill - # the savepoint created by the test setup (since saas~14.1 with the merge of SavepointCase - # into TransactionCase in odoo/odoo@7f2e168c02a7aea666d34510ed2ed8efacd5654b). - # Force a new one to avoid this issue. - # Incidentally, we should also explicitly remove the created records. - self.addCleanup(cr.execute, f"SAVEPOINT test_{self._savepoint_id}") - self.addCleanup(old.unlink) - self.addCleanup(new.unlink) - self.addCleanup(rate.unlink) - - # Wrong model - with self.assertRaises(MigrationError): - util.rename_xmlid(cr, "base.TX1", "base.test_rate_tx1", on_collision="merge") - - # Collision - with self.assertRaises(MigrationError): - util.rename_xmlid(cr, "base.TX1", "base.TX2", on_collision="fail") - - # As TX2 is not free, TX1 is merged with TX2 - with mute_logger(util.helpers._logger.name): - res = util.rename_xmlid(cr, "base.TX1", "base.TX2", on_collision="merge") - self.assertEqual(res, new.id) - self.assertEqual(util.ref(cr, "base.TX1"), None) - - # TX1 references moved to TX2 - cr.execute("SELECT currency_id FROM res_currency_rate WHERE id = %s", [rate.id]) - self.assertEqual(cr.fetchall(), [(new.id,)]) - - # Nothing left to rename in TX1 - res = util.rename_xmlid(cr, "base.TX1", "base.TX3", on_collision="merge") - self.assertEqual(res, None) - - # Can rename to empty TX3 without need for merge - res = util.rename_xmlid(cr, "base.TX2", "base.TX3", on_collision="merge") - self.assertEqual(res, new.id) - - # Normal rename - res = util.rename_xmlid(cr, "base.TX3", "base.TX4") - self.assertEqual(res, new.id) - - def test_update_record_from_xml(self): - # reset all fields on a - xmlid = "base.res_partner_industry_A" - data_after = {"name": "42", "full_name": "Fortytwo"} - record = self.env.ref(xmlid) - data_before = {key: record[key] for key in data_after} - for key, value in data_after.items(): - record.write({key: value}) - self.assertEqual(record[key], value) - - util.update_record_from_xml(self.env.cr, xmlid, reset_translations=True) - if util.version_gte("16.0"): - record.invalidate_recordset(["name"]) - else: - record.invalidate_cache(["name"], record.ids) - for key, value in data_before.items(): - self.assertEqual(record[key], value) - - def test_update_record_from_xml_recursive_menuitem(self): - # reset all fields on a - xmlid = "base.menu_security" - data_after = {"name": "ATotallyValidSecurityMenu", "sequence": 112, "parent_id": self.env["ir.ui.menu"]} - record = self.env.ref(xmlid) - data_before = {key: record[key] for key in data_after} - for key, value in data_after.items(): - record.write({key: value}) - self.assertEqual(record[key], value) - - util.update_record_from_xml(self.env.cr, xmlid) - if util.version_gte("16.0"): - record.invalidate_recordset(["name"]) - else: - record.invalidate_cache(["name"], record.ids) - for key, value in data_before.items(): - self.assertEqual(record[key], value) - - def test_upgrade_record_from_xml_ensure_references(self): - def change(xmlid): - cat = self.env.ref(xmlid) - result = cat.name - cat.write({"name": str(uuid.uuid4())}) - util.flush(cat) - util.invalidate(cat) - return result - - if util.version_gte("saas~13.5"): - xmlid_tree = [ - "base.module_category_accounting_localizations_account_charts", - "base.module_category_accounting_localizations", - "base.module_category_accounting", - ] - else: - xmlid_tree = [ - "base.module_category_localization_account_charts", - "base.module_category_localization", - ] - - old_names = [change(xmlid) for xmlid in xmlid_tree] - - util.update_record_from_xml(self.env.cr, xmlid_tree[0], ensure_references=True) - - for xmlid, expected in zip(xmlid_tree, old_names): - cat = self.env.ref(xmlid) - self.assertEqual(cat.name, expected) - - def test_update_record_from_xml_template_tag(self): - # reset all fields on a