433 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			433 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
# -*- coding: utf-8 -*-
 | 
						|
# Copyright 2017 Camptocamp SA
 | 
						|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)
 | 
						|
 | 
						|
import mock
 | 
						|
import unittest
 | 
						|
 | 
						|
from odoo.addons.component.tests.common import (
 | 
						|
    ComponentRegistryCase,
 | 
						|
    TransactionComponentRegistryCase,
 | 
						|
)
 | 
						|
from odoo.addons.component.core import Component
 | 
						|
from odoo.addons.component_event.core import EventWorkContext
 | 
						|
from odoo.addons.component_event.components.event import skip_if
 | 
						|
 | 
						|
 | 
						|
class TestEventWorkContext(unittest.TestCase):
 | 
						|
    """ Test Events Components """
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super(TestEventWorkContext, self).setUp()
 | 
						|
        self.env = mock.MagicMock(name='env')
 | 
						|
        self.record = mock.MagicMock(name='record')
 | 
						|
        self.components_registry = mock.MagicMock(name='ComponentRegistry')
 | 
						|
 | 
						|
    def test_env(self):
 | 
						|
        """ WorkContext with env """
 | 
						|
        work = EventWorkContext(model_name='res.users', env=self.env,
 | 
						|
                                components_registry=self.components_registry)
 | 
						|
        self.assertEqual(self.env, work.env)
 | 
						|
        self.assertEqual('res.users', work.model_name)
 | 
						|
        with self.assertRaises(ValueError):
 | 
						|
            # pylint: disable=W0104
 | 
						|
            work.collection  # noqa
 | 
						|
 | 
						|
    def test_collection(self):
 | 
						|
        """ WorkContext with collection """
 | 
						|
        env = mock.MagicMock(name='env')
 | 
						|
        collection = mock.MagicMock(name='collection')
 | 
						|
        collection.env = env
 | 
						|
        work = EventWorkContext(model_name='res.users', collection=collection,
 | 
						|
                                components_registry=self.components_registry)
 | 
						|
        self.assertEqual(collection, work.collection)
 | 
						|
        self.assertEqual(env, work.env)
 | 
						|
        self.assertEqual('res.users', work.model_name)
 | 
						|
 | 
						|
    def test_env_and_collection(self):
 | 
						|
        """ WorkContext with collection and env is forbidden """
 | 
						|
        env = mock.MagicMock(name='env')
 | 
						|
        collection = mock.MagicMock(name='collection')
 | 
						|
        collection.env = env
 | 
						|
        with self.assertRaises(ValueError):
 | 
						|
            EventWorkContext(model_name='res.users', collection=collection,
 | 
						|
                             env=env,
 | 
						|
                             components_registry=self.components_registry)
 | 
						|
 | 
						|
    def test_missing(self):
 | 
						|
        """ WorkContext with collection and env is forbidden """
 | 
						|
        with self.assertRaises(ValueError):
 | 
						|
            EventWorkContext(model_name='res.users',
 | 
						|
                             components_registry=self.components_registry)
 | 
						|
 | 
						|
    def test_env_work_on(self):
 | 
						|
        """ WorkContext propagated through work_on """
 | 
						|
        env = mock.MagicMock(name='env')
 | 
						|
        collection = mock.MagicMock(name='collection')
 | 
						|
        collection.env = env
 | 
						|
        work = EventWorkContext(env=env, model_name='res.users',
 | 
						|
                                components_registry=self.components_registry)
 | 
						|
        work2 = work.work_on(model_name='res.partner', collection=collection)
 | 
						|
        self.assertEqual('WorkContext', work2.__class__.__name__)
 | 
						|
        self.assertEqual(env, work2.env)
 | 
						|
        self.assertEqual('res.partner', work2.model_name)
 | 
						|
        self.assertEqual(self.components_registry, work2.components_registry)
 | 
						|
        with self.assertRaises(ValueError):
 | 
						|
            # pylint: disable=W0104
 | 
						|
            work.collection  # noqa
 | 
						|
 | 
						|
    def test_collection_work_on(self):
 | 
						|
        """ WorkContext propagated through work_on """
 | 
						|
        env = mock.MagicMock(name='env')
 | 
						|
        collection = mock.MagicMock(name='collection')
 | 
						|
        collection.env = env
 | 
						|
        work = EventWorkContext(collection=collection, model_name='res.users',
 | 
						|
                                components_registry=self.components_registry)
 | 
						|
        work2 = work.work_on(model_name='res.partner')
 | 
						|
        self.assertEqual('WorkContext', work2.__class__.__name__)
 | 
						|
        self.assertEqual(collection, work2.collection)
 | 
						|
        self.assertEqual(env, work2.env)
 | 
						|
        self.assertEqual('res.partner', work2.model_name)
 | 
						|
        self.assertEqual(self.components_registry, work2.components_registry)
 | 
						|
 | 
						|
    def test_collection_work_on_collection(self):
 | 
						|
        """ WorkContext collection changed with work_on """
 | 
						|
        env = mock.MagicMock(name='env')
 | 
						|
        collection = mock.MagicMock(name='collection')
 | 
						|
        collection.env = env
 | 
						|
        work = EventWorkContext(model_name='res.users', env=env,
 | 
						|
                                components_registry=self.components_registry)
 | 
						|
        work2 = work.work_on(collection=collection)
 | 
						|
        # when work_on is used inside an event component, we want
 | 
						|
        # to switch back to a normal WorkContext, because we don't
 | 
						|
        # need anymore the EventWorkContext
 | 
						|
        self.assertEqual('WorkContext', work2.__class__.__name__)
 | 
						|
        self.assertEqual(collection, work2.collection)
 | 
						|
        self.assertEqual(env, work2.env)
 | 
						|
        self.assertEqual('res.users', work2.model_name)
 | 
						|
        self.assertEqual(self.components_registry, work2.components_registry)
 | 
						|
 | 
						|
 | 
						|
class TestEvent(ComponentRegistryCase):
 | 
						|
    """ Test Events Components """
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super(TestEvent, self).setUp()
 | 
						|
        self._load_module_components('component_event')
 | 
						|
 | 
						|
        # get the collecter to notify the event
 | 
						|
        # we don't mind about the collection and the model here,
 | 
						|
        # the events we test are global
 | 
						|
        env = mock.MagicMock()
 | 
						|
        self.work = EventWorkContext(model_name='res.users', env=env,
 | 
						|
                                     components_registry=self.comp_registry)
 | 
						|
        self.collecter = self.comp_registry['base.event.collecter'](self.work)
 | 
						|
 | 
						|
    def test_event(self):
 | 
						|
        class MyEventListener(Component):
 | 
						|
            _name = 'my.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            def on_record_create(self, recipient, something, fields=None):
 | 
						|
                recipient.append(('OK', something, fields))
 | 
						|
 | 
						|
        MyEventListener._build_component(self.comp_registry)
 | 
						|
 | 
						|
        something = object()
 | 
						|
        fields = ['name', 'code']
 | 
						|
 | 
						|
        # as there is no return value by the event, we
 | 
						|
        # modify this recipient to check it has been called
 | 
						|
        recipient = []
 | 
						|
 | 
						|
        # collect the event and notify it
 | 
						|
        self.collecter.collect_events('on_record_create').notify(
 | 
						|
            recipient, something, fields=fields
 | 
						|
        )
 | 
						|
        self.assertEqual([('OK', something, fields)], recipient)
 | 
						|
 | 
						|
    def test_collect_several(self):
 | 
						|
        class MyEventListener(Component):
 | 
						|
            _name = 'my.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            def on_record_create(self, recipient, something, fields=None):
 | 
						|
                recipient.append(('OK', something, fields))
 | 
						|
 | 
						|
        class MyOtherEventListener(Component):
 | 
						|
            _name = 'my.other.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            def on_record_create(self, recipient, something, fields=None):
 | 
						|
                recipient.append(('OK', something, fields))
 | 
						|
 | 
						|
        MyEventListener._build_component(self.comp_registry)
 | 
						|
        MyOtherEventListener._build_component(self.comp_registry)
 | 
						|
 | 
						|
        something = object()
 | 
						|
        fields = ['name', 'code']
 | 
						|
 | 
						|
        # as there is no return value by the event, we
 | 
						|
        # modify this recipient to check it has been called
 | 
						|
        recipient = []
 | 
						|
 | 
						|
        # collect the event and notify them
 | 
						|
        collected = self.collecter.collect_events('on_record_create')
 | 
						|
        self.assertEqual(2, len(collected.events))
 | 
						|
 | 
						|
        collected.notify(recipient, something, fields=fields)
 | 
						|
        self.assertEqual([('OK', something, fields),
 | 
						|
                          ('OK', something, fields)], recipient)
 | 
						|
 | 
						|
    def test_event_cache(self):
 | 
						|
        class MyEventListener(Component):
 | 
						|
            _name = 'my.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            def on_record_create(self):
 | 
						|
                pass
 | 
						|
 | 
						|
        MyEventListener._build_component(self.comp_registry)
 | 
						|
 | 
						|
        # collect the event
 | 
						|
        collected = self.collecter.collect_events('on_record_create')
 | 
						|
        # CollectedEvents.events contains the collected events
 | 
						|
        self.assertEqual(1, len(collected.events))
 | 
						|
        event = list(collected.events)[0]
 | 
						|
        self.assertEqual(self.work, event.__self__.work)
 | 
						|
        self.assertEqual(self.work.env, event.__self__.work.env)
 | 
						|
 | 
						|
        # build and register a new listener
 | 
						|
        class MyOtherEventListener(Component):
 | 
						|
            _name = 'my.other.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            def on_record_create(self):
 | 
						|
                pass
 | 
						|
 | 
						|
        MyOtherEventListener._build_component(self.comp_registry)
 | 
						|
 | 
						|
        # get a new collecter and check that we it finds the same
 | 
						|
        # events even if we built a new one: it means the cache works
 | 
						|
        env = mock.MagicMock()
 | 
						|
        work = EventWorkContext(model_name='res.users', env=env,
 | 
						|
                                components_registry=self.comp_registry)
 | 
						|
        collecter = self.comp_registry['base.event.collecter'](work)
 | 
						|
        collected = collecter.collect_events('on_record_create')
 | 
						|
        # CollectedEvents.events contains the collected events
 | 
						|
        self.assertEqual(1, len(collected.events))
 | 
						|
        event = list(collected.events)[0]
 | 
						|
        self.assertEqual(work, event.__self__.work)
 | 
						|
        self.assertEqual(env, event.__self__.work.env)
 | 
						|
 | 
						|
        # if we empty the cache, as it on the class, both collecters
 | 
						|
        # should now find the 2 events
 | 
						|
        collecter._cache.clear()
 | 
						|
        self.comp_registry._cache.clear()
 | 
						|
        # CollectedEvents.events contains the collected events
 | 
						|
        self.assertEqual(
 | 
						|
            2,
 | 
						|
            len(collecter.collect_events('on_record_create').events)
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            2,
 | 
						|
            len(self.collecter.collect_events('on_record_create').events)
 | 
						|
        )
 | 
						|
 | 
						|
    def test_event_cache_collection(self):
 | 
						|
        class MyEventListener(Component):
 | 
						|
            _name = 'my.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            def on_record_create(self):
 | 
						|
                pass
 | 
						|
 | 
						|
        MyEventListener._build_component(self.comp_registry)
 | 
						|
 | 
						|
        # collect the event
 | 
						|
        collected = self.collecter.collect_events('on_record_create')
 | 
						|
        # CollectedEvents.events contains the collected events
 | 
						|
        self.assertEqual(1, len(collected.events))
 | 
						|
 | 
						|
        # build and register a new listener
 | 
						|
        class MyOtherEventListener(Component):
 | 
						|
            _name = 'my.other.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
            _collection = 'base.collection'
 | 
						|
 | 
						|
            def on_record_create(self):
 | 
						|
                pass
 | 
						|
 | 
						|
        MyOtherEventListener._build_component(self.comp_registry)
 | 
						|
 | 
						|
        # get a new collecter and check that we it finds the same
 | 
						|
        # events even if we built a new one: it means the cache works
 | 
						|
        collection = mock.MagicMock(name='base.collection')
 | 
						|
        collection._name = 'base.collection'
 | 
						|
        collection.env = mock.MagicMock()
 | 
						|
        work = EventWorkContext(model_name='res.users', collection=collection,
 | 
						|
                                components_registry=self.comp_registry)
 | 
						|
        collecter = self.comp_registry['base.event.collecter'](work)
 | 
						|
        collected = collecter.collect_events('on_record_create')
 | 
						|
        # for a different collection, we should not have the same
 | 
						|
        # cache entry
 | 
						|
        self.assertEqual(2, len(collected.events))
 | 
						|
 | 
						|
    def test_event_cache_model_name(self):
 | 
						|
        class MyEventListener(Component):
 | 
						|
            _name = 'my.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            def on_record_create(self):
 | 
						|
                pass
 | 
						|
 | 
						|
        MyEventListener._build_component(self.comp_registry)
 | 
						|
 | 
						|
        # collect the event
 | 
						|
        collected = self.collecter.collect_events('on_record_create')
 | 
						|
        # CollectedEvents.events contains the collected events
 | 
						|
        self.assertEqual(1, len(collected.events))
 | 
						|
 | 
						|
        # build and register a new listener
 | 
						|
        class MyOtherEventListener(Component):
 | 
						|
            _name = 'my.other.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
            _apply_on = ['res.country']
 | 
						|
 | 
						|
            def on_record_create(self):
 | 
						|
                pass
 | 
						|
 | 
						|
        MyOtherEventListener._build_component(self.comp_registry)
 | 
						|
 | 
						|
        # get a new collecter and check that we it finds the same
 | 
						|
        # events even if we built a new one: it means the cache works
 | 
						|
        env = mock.MagicMock()
 | 
						|
        work = EventWorkContext(model_name='res.country', env=env,
 | 
						|
                                components_registry=self.comp_registry)
 | 
						|
        collecter = self.comp_registry['base.event.collecter'](work)
 | 
						|
        collected = collecter.collect_events('on_record_create')
 | 
						|
        # for a different collection, we should not have the same
 | 
						|
        # cache entry
 | 
						|
        self.assertEqual(2, len(collected.events))
 | 
						|
 | 
						|
    def test_skip_if(self):
 | 
						|
        class MyEventListener(Component):
 | 
						|
            _name = 'my.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            def on_record_create(self, msg):
 | 
						|
                assert True
 | 
						|
 | 
						|
        class MyOtherEventListener(Component):
 | 
						|
            _name = 'my.other.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            @skip_if(lambda self, msg: msg == 'foo')
 | 
						|
            def on_record_create(self, msg):
 | 
						|
                assert False
 | 
						|
 | 
						|
        self._build_components(MyEventListener, MyOtherEventListener)
 | 
						|
 | 
						|
        # collect the event and notify it
 | 
						|
        collected = self.collecter.collect_events('on_record_create')
 | 
						|
        self.assertEqual(2, len(collected.events))
 | 
						|
        collected.notify('foo')
 | 
						|
 | 
						|
 | 
						|
class TestEventFromModel(TransactionComponentRegistryCase):
 | 
						|
    """ Test Events Components from Models """
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super(TestEventFromModel, self).setUp()
 | 
						|
        self._load_module_components('component_event')
 | 
						|
 | 
						|
    def test_event_from_model(self):
 | 
						|
        class MyEventListener(Component):
 | 
						|
            _name = 'my.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            def on_foo(self, record, name):
 | 
						|
                record.name = name
 | 
						|
 | 
						|
        MyEventListener._build_component(self.comp_registry)
 | 
						|
 | 
						|
        partner = self.env['res.partner'].create({'name': 'test'})
 | 
						|
        # Normally you would not pass a components_registry,
 | 
						|
        # this is for the sake of the test, letting it empty
 | 
						|
        # will use the global registry.
 | 
						|
        # In a real code it would look like:
 | 
						|
        # partner._event('on_foo').notify('bar')
 | 
						|
        events = partner._event('on_foo',
 | 
						|
                                components_registry=self.comp_registry)
 | 
						|
        events.notify(partner, 'bar')
 | 
						|
        self.assertEqual('bar', partner.name)
 | 
						|
 | 
						|
    def test_event_filter_on_model(self):
 | 
						|
        class GlobalListener(Component):
 | 
						|
            _name = 'global.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            def on_foo(self, record, name):
 | 
						|
                record.name = name
 | 
						|
 | 
						|
        class PartnerListener(Component):
 | 
						|
            _name = 'partner.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
            _apply_on = ['res.partner']
 | 
						|
 | 
						|
            def on_foo(self, record, name):
 | 
						|
                record.ref = name
 | 
						|
 | 
						|
        class UserListener(Component):
 | 
						|
            _name = 'user.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
            _apply_on = ['res.users']
 | 
						|
 | 
						|
            def on_foo(self, record, name):
 | 
						|
                assert False
 | 
						|
 | 
						|
        self._build_components(GlobalListener, PartnerListener, UserListener)
 | 
						|
 | 
						|
        partner = self.env['res.partner'].create({'name': 'test'})
 | 
						|
        partner._event(
 | 
						|
            'on_foo',
 | 
						|
            components_registry=self.comp_registry
 | 
						|
        ).notify(partner, 'bar')
 | 
						|
        self.assertEqual('bar', partner.name)
 | 
						|
        self.assertEqual('bar', partner.ref)
 | 
						|
 | 
						|
    def test_event_filter_on_collection(self):
 | 
						|
        class GlobalListener(Component):
 | 
						|
            _name = 'global.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
 | 
						|
            def on_foo(self, record, name):
 | 
						|
                record.name = name
 | 
						|
 | 
						|
        class PartnerListener(Component):
 | 
						|
            _name = 'partner.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
            _collection = 'collection.base'
 | 
						|
 | 
						|
            def on_foo(self, record, name):
 | 
						|
                record.ref = name
 | 
						|
 | 
						|
        class UserListener(Component):
 | 
						|
            _name = 'user.event.listener'
 | 
						|
            _inherit = 'base.event.listener'
 | 
						|
            _collection = 'magento.backend'
 | 
						|
 | 
						|
            def on_foo(self, record, name):
 | 
						|
                assert False
 | 
						|
 | 
						|
        self._build_components(GlobalListener, PartnerListener, UserListener)
 | 
						|
 | 
						|
        partner = self.env['res.partner'].create({'name': 'test'})
 | 
						|
        events = partner._event(
 | 
						|
            'on_foo', collection=self.env['collection.base'],
 | 
						|
            components_registry=self.comp_registry
 | 
						|
        )
 | 
						|
        events.notify(partner, 'bar')
 | 
						|
        self.assertEqual('bar', partner.name)
 | 
						|
        self.assertEqual('bar', partner.ref)
 |