76 lines
2.8 KiB
Python
76 lines
2.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2017 Camptocamp SA
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)
|
|
|
|
import mock
|
|
from odoo import api
|
|
from odoo.tests import common
|
|
from odoo.modules.registry import Registry
|
|
from odoo.addons.queue_job.exception import RetryableJobError
|
|
from odoo.addons.connector.database import pg_try_advisory_lock
|
|
from odoo.addons.component.core import WorkContext
|
|
from odoo.addons.component.tests.common import TransactionComponentCase
|
|
|
|
|
|
class TestAdvisoryLock(TransactionComponentCase):
|
|
|
|
def setUp(self):
|
|
super(TestAdvisoryLock, self).setUp()
|
|
self.registry2 = Registry(common.get_db_name())
|
|
self.cr2 = self.registry2.cursor()
|
|
self.env2 = api.Environment(self.cr2, self.env.uid, {})
|
|
|
|
@self.addCleanup
|
|
def reset_cr2():
|
|
# rollback and close the cursor, and reset the environments
|
|
self.env2.reset()
|
|
self.cr2.rollback()
|
|
self.cr2.close()
|
|
|
|
def test_concurrent_lock(self):
|
|
""" 2 concurrent transactions cannot acquire the same lock """
|
|
# the lock is based on a string, a second transaction trying
|
|
# to acquire the same lock won't be able to acquire it
|
|
lock = 'import_record({}, {}, {}, {})'.format(
|
|
'backend.name',
|
|
1,
|
|
'res.partner',
|
|
'999999',
|
|
)
|
|
acquired = pg_try_advisory_lock(self.env, lock)
|
|
self.assertTrue(acquired)
|
|
# we test the base function
|
|
inner_acquired = pg_try_advisory_lock(self.env2, lock)
|
|
self.assertFalse(inner_acquired)
|
|
|
|
def test_concurrent_import_lock(self):
|
|
""" A 2nd concurrent transaction must retry """
|
|
# the lock is based on a string, a second transaction trying
|
|
# to acquire the same lock won't be able to acquire it
|
|
lock = 'import_record({}, {}, {}, {})'.format(
|
|
'backend.name',
|
|
1,
|
|
'res.partner',
|
|
'999999',
|
|
)
|
|
|
|
backend = mock.MagicMock()
|
|
backend.env = self.env
|
|
work = WorkContext(model_name='res.partner',
|
|
collection=backend)
|
|
# we test the function through a Component instance
|
|
component = work.component_by_name('base.connector')
|
|
# acquire the lock
|
|
component.advisory_lock_or_retry(lock)
|
|
|
|
# instanciate another component using a different odoo env
|
|
# hence another PG transaction
|
|
backend2 = mock.MagicMock()
|
|
backend2.env = self.env2
|
|
work2 = WorkContext(model_name='res.partner',
|
|
collection=backend2)
|
|
component2 = work2.component_by_name('base.connector')
|
|
with self.assertRaises(RetryableJobError) as cm:
|
|
component2.advisory_lock_or_retry(lock, retry_seconds=3)
|
|
self.assertEqual(cm.exception.seconds, 3)
|