76 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			76 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Python
		
	
	
# -*- coding: utf-8 -*-
 | 
						|
# Copyright 2017 Camptocamp SA
 | 
						|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)
 | 
						|
 | 
						|
import mock
 | 
						|
from odoo import api
 | 
						|
from odoo.tests import common
 | 
						|
from odoo.modules.registry import Registry
 | 
						|
from odoo.addons.queue_job.exception import RetryableJobError
 | 
						|
from odoo.addons.connector.database import pg_try_advisory_lock
 | 
						|
from odoo.addons.component.core import WorkContext
 | 
						|
from odoo.addons.component.tests.common import TransactionComponentCase
 | 
						|
 | 
						|
 | 
						|
class TestAdvisoryLock(TransactionComponentCase):
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super(TestAdvisoryLock, self).setUp()
 | 
						|
        self.registry2 = Registry(common.get_db_name())
 | 
						|
        self.cr2 = self.registry2.cursor()
 | 
						|
        self.env2 = api.Environment(self.cr2, self.env.uid, {})
 | 
						|
 | 
						|
        @self.addCleanup
 | 
						|
        def reset_cr2():
 | 
						|
            # rollback and close the cursor, and reset the environments
 | 
						|
            self.env2.reset()
 | 
						|
            self.cr2.rollback()
 | 
						|
            self.cr2.close()
 | 
						|
 | 
						|
    def test_concurrent_lock(self):
 | 
						|
        """ 2 concurrent transactions cannot acquire the same lock """
 | 
						|
        # the lock is based on a string, a second transaction trying
 | 
						|
        # to acquire the same lock won't be able to acquire it
 | 
						|
        lock = 'import_record({}, {}, {}, {})'.format(
 | 
						|
            'backend.name',
 | 
						|
            1,
 | 
						|
            'res.partner',
 | 
						|
            '999999',
 | 
						|
        )
 | 
						|
        acquired = pg_try_advisory_lock(self.env, lock)
 | 
						|
        self.assertTrue(acquired)
 | 
						|
        # we test the base function
 | 
						|
        inner_acquired = pg_try_advisory_lock(self.env2, lock)
 | 
						|
        self.assertFalse(inner_acquired)
 | 
						|
 | 
						|
    def test_concurrent_import_lock(self):
 | 
						|
        """ A 2nd concurrent transaction must retry """
 | 
						|
        # the lock is based on a string, a second transaction trying
 | 
						|
        # to acquire the same lock won't be able to acquire it
 | 
						|
        lock = 'import_record({}, {}, {}, {})'.format(
 | 
						|
            'backend.name',
 | 
						|
            1,
 | 
						|
            'res.partner',
 | 
						|
            '999999',
 | 
						|
        )
 | 
						|
 | 
						|
        backend = mock.MagicMock()
 | 
						|
        backend.env = self.env
 | 
						|
        work = WorkContext(model_name='res.partner',
 | 
						|
                           collection=backend)
 | 
						|
        # we test the function through a Component instance
 | 
						|
        component = work.component_by_name('base.connector')
 | 
						|
        # acquire the lock
 | 
						|
        component.advisory_lock_or_retry(lock)
 | 
						|
 | 
						|
        # instanciate another component using a different odoo env
 | 
						|
        # hence another PG transaction
 | 
						|
        backend2 = mock.MagicMock()
 | 
						|
        backend2.env = self.env2
 | 
						|
        work2 = WorkContext(model_name='res.partner',
 | 
						|
                            collection=backend2)
 | 
						|
        component2 = work2.component_by_name('base.connector')
 | 
						|
        with self.assertRaises(RetryableJobError) as cm:
 | 
						|
            component2.advisory_lock_or_retry(lock, retry_seconds=3)
 | 
						|
            self.assertEqual(cm.exception.seconds, 3)
 |