358 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			358 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
# -*- coding: utf-8 -*-
 | 
						|
# Copyright 2013-2016 Camptocamp SA
 | 
						|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)
 | 
						|
 | 
						|
import logging
 | 
						|
from datetime import datetime, timedelta
 | 
						|
 | 
						|
from odoo import models, fields, api, exceptions, _
 | 
						|
 | 
						|
from ..job import STATES, DONE, PENDING, Job
 | 
						|
from ..fields import JobSerialized
 | 
						|
 | 
						|
_logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
def channel_func_name(method):
 | 
						|
    return '<%s>.%s' % (method.__self__._name, method.__name__)
 | 
						|
 | 
						|
 | 
						|
class QueueJob(models.Model):
 | 
						|
    """ Job status and result """
 | 
						|
    _name = 'queue.job'
 | 
						|
    _description = 'Queue Job'
 | 
						|
    _inherit = ['mail.thread', 'mail.activity.mixin']
 | 
						|
    _log_access = False
 | 
						|
 | 
						|
    _order = 'date_created DESC, date_done DESC'
 | 
						|
 | 
						|
    _removal_interval = 30  # days
 | 
						|
 | 
						|
    uuid = fields.Char(string='UUID',
 | 
						|
                       readonly=True,
 | 
						|
                       index=True,
 | 
						|
                       required=True)
 | 
						|
    user_id = fields.Many2one(comodel_name='res.users',
 | 
						|
                              string='User ID',
 | 
						|
                              required=True)
 | 
						|
    company_id = fields.Many2one(comodel_name='res.company',
 | 
						|
                                 string='Company', index=True)
 | 
						|
    name = fields.Char(string='Description', readonly=True)
 | 
						|
 | 
						|
    model_name = fields.Char(string='Model', readonly=True)
 | 
						|
    method_name = fields.Char(readonly=True)
 | 
						|
    record_ids = fields.Serialized(readonly=True)
 | 
						|
    args = JobSerialized(readonly=True)
 | 
						|
    kwargs = JobSerialized(readonly=True)
 | 
						|
    func_string = fields.Char(string='Task', compute='_compute_func_string',
 | 
						|
                              readonly=True, store=True)
 | 
						|
 | 
						|
    state = fields.Selection(STATES,
 | 
						|
                             string='State',
 | 
						|
                             readonly=True,
 | 
						|
                             required=True,
 | 
						|
                             index=True)
 | 
						|
    priority = fields.Integer()
 | 
						|
    exc_info = fields.Text(string='Exception Info', readonly=True)
 | 
						|
    result = fields.Text(string='Result', readonly=True)
 | 
						|
 | 
						|
    date_created = fields.Datetime(string='Created Date', readonly=True)
 | 
						|
    date_started = fields.Datetime(string='Start Date', readonly=True)
 | 
						|
    date_enqueued = fields.Datetime(string='Enqueue Time', readonly=True)
 | 
						|
    date_done = fields.Datetime(string='Date Done', readonly=True)
 | 
						|
 | 
						|
    eta = fields.Datetime(string='Execute only after')
 | 
						|
    retry = fields.Integer(string='Current try')
 | 
						|
    max_retries = fields.Integer(
 | 
						|
        string='Max. retries',
 | 
						|
        help="The job will fail if the number of tries reach the "
 | 
						|
             "max. retries.\n"
 | 
						|
             "Retries are infinite when empty.",
 | 
						|
    )
 | 
						|
    channel_method_name = fields.Char(readonly=True,
 | 
						|
                                      compute='_compute_job_function',
 | 
						|
                                      store=True)
 | 
						|
    job_function_id = fields.Many2one(comodel_name='queue.job.function',
 | 
						|
                                      compute='_compute_job_function',
 | 
						|
                                      string='Job Function',
 | 
						|
                                      readonly=True,
 | 
						|
                                      store=True)
 | 
						|
 | 
						|
    channel = fields.Char(compute='_compute_channel',
 | 
						|
                          inverse='_inverse_channel',
 | 
						|
                          store=True,
 | 
						|
                          index=True)
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def _inverse_channel(self):
 | 
						|
        self.filtered(lambda a: not a.channel)._compute_channel()
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    @api.depends('job_function_id.channel_id')
 | 
						|
    def _compute_channel(self):
 | 
						|
        for record in self:
 | 
						|
            record.channel = record.job_function_id.channel
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    @api.depends('model_name', 'method_name', 'job_function_id.channel_id')
 | 
						|
    def _compute_job_function(self):
 | 
						|
        for record in self:
 | 
						|
            model = self.env[record.model_name]
 | 
						|
            method = getattr(model, record.method_name)
 | 
						|
            channel_method_name = channel_func_name(method)
 | 
						|
            func_model = self.env['queue.job.function']
 | 
						|
            function = func_model.search([('name', '=', channel_method_name)])
 | 
						|
            record.channel_method_name = channel_method_name
 | 
						|
            record.job_function_id = function
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    @api.depends('model_name', 'method_name', 'record_ids', 'args', 'kwargs')
 | 
						|
    def _compute_func_string(self):
 | 
						|
        for record in self:
 | 
						|
            record_ids = record.record_ids
 | 
						|
            model = repr(self.env[record.model_name].browse(record_ids))
 | 
						|
            args = [repr(arg) for arg in record.args]
 | 
						|
            kwargs = ['%s=%r' % (key, val) for key, val
 | 
						|
                      in record.kwargs.items()]
 | 
						|
            all_args = ', '.join(args + kwargs)
 | 
						|
            record.func_string = (
 | 
						|
                "%s.%s(%s)" % (model, record.method_name, all_args)
 | 
						|
            )
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def open_related_action(self):
 | 
						|
        """ Open the related action associated to the job """
 | 
						|
        self.ensure_one()
 | 
						|
        job = Job.load(self.env, self.uuid)
 | 
						|
        action = job.related_action()
 | 
						|
        if action is None:
 | 
						|
            raise exceptions.Warning(_('No action available for this job'))
 | 
						|
        return action
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def _change_job_state(self, state, result=None):
 | 
						|
        """ Change the state of the `Job` object itself so it
 | 
						|
        will change the other fields (date, result, ...)
 | 
						|
        """
 | 
						|
        for record in self:
 | 
						|
            job_ = Job.load(record.env, record.uuid)
 | 
						|
            if state == DONE:
 | 
						|
                job_.set_done(result=result)
 | 
						|
            elif state == PENDING:
 | 
						|
                job_.set_pending(result=result)
 | 
						|
            else:
 | 
						|
                raise ValueError('State not supported: %s' % state)
 | 
						|
            job_.store()
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def button_done(self):
 | 
						|
        result = _('Manually set to done by %s') % self.env.user.name
 | 
						|
        self._change_job_state(DONE, result=result)
 | 
						|
        return True
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def requeue(self):
 | 
						|
        self._change_job_state(PENDING)
 | 
						|
        return True
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def write(self, vals):
 | 
						|
        res = super(QueueJob, self).write(vals)
 | 
						|
        if vals.get('state') == 'failed':
 | 
						|
            # subscribe the users now to avoid to subscribe them
 | 
						|
            # at every job creation
 | 
						|
            domain = self._subscribe_users_domain()
 | 
						|
            users = self.env['res.users'].search(domain)
 | 
						|
            self.message_subscribe_users(user_ids=users.ids)
 | 
						|
            for record in self:
 | 
						|
                msg = record._message_failed_job()
 | 
						|
                if msg:
 | 
						|
                    record.message_post(body=msg,
 | 
						|
                                        subtype='queue_job.mt_job_failed')
 | 
						|
        return res
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def _subscribe_users_domain(self):
 | 
						|
        """ Subscribe all users having the 'Queue Job Manager' group """
 | 
						|
        group = self.env.ref('queue_job.group_queue_job_manager')
 | 
						|
        if not group:
 | 
						|
            return
 | 
						|
        companies = self.mapped('company_id')
 | 
						|
        domain = [('groups_id', '=', group.id)]
 | 
						|
        if companies:
 | 
						|
            domain.append(('company_id', 'child_of', companies.ids))
 | 
						|
        return domain
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def _message_failed_job(self):
 | 
						|
        """ Return a message which will be posted on the job when it is failed.
 | 
						|
 | 
						|
        It can be inherited to allow more precise messages based on the
 | 
						|
        exception informations.
 | 
						|
 | 
						|
        If nothing is returned, no message will be posted.
 | 
						|
        """
 | 
						|
        self.ensure_one()
 | 
						|
        return _("Something bad happened during the execution of the job. "
 | 
						|
                 "More details in the 'Exception Information' section.")
 | 
						|
 | 
						|
    @api.model
 | 
						|
    def _needaction_domain_get(self):
 | 
						|
        """ Returns the domain to filter records that require an action
 | 
						|
            :return: domain or False is no action
 | 
						|
        """
 | 
						|
        return [('state', '=', 'failed')]
 | 
						|
 | 
						|
    @api.model
 | 
						|
    def autovacuum(self):
 | 
						|
        """ Delete all jobs done since more than ``_removal_interval`` days.
 | 
						|
 | 
						|
        Called from a cron.
 | 
						|
        """
 | 
						|
        deadline = datetime.now() - timedelta(days=self._removal_interval)
 | 
						|
        jobs = self.search(
 | 
						|
            [('date_done', '<=', fields.Datetime.to_string(deadline))],
 | 
						|
        )
 | 
						|
        jobs.unlink()
 | 
						|
        return True
 | 
						|
 | 
						|
 | 
						|
class RequeueJob(models.TransientModel):
 | 
						|
    _name = 'queue.requeue.job'
 | 
						|
    _description = 'Wizard to requeue a selection of jobs'
 | 
						|
 | 
						|
    @api.model
 | 
						|
    def _default_job_ids(self):
 | 
						|
        res = False
 | 
						|
        context = self.env.context
 | 
						|
        if (context.get('active_model') == 'queue.job' and
 | 
						|
                context.get('active_ids')):
 | 
						|
            res = context['active_ids']
 | 
						|
        return res
 | 
						|
 | 
						|
    job_ids = fields.Many2many(comodel_name='queue.job',
 | 
						|
                               string='Jobs',
 | 
						|
                               default=_default_job_ids)
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def requeue(self):
 | 
						|
        jobs = self.job_ids
 | 
						|
        jobs.requeue()
 | 
						|
        return {'type': 'ir.actions.act_window_close'}
 | 
						|
 | 
						|
 | 
						|
class JobChannel(models.Model):
 | 
						|
    _name = 'queue.job.channel'
 | 
						|
    _description = 'Job Channels'
 | 
						|
 | 
						|
    name = fields.Char()
 | 
						|
    complete_name = fields.Char(compute='_compute_complete_name',
 | 
						|
                                string='Complete Name',
 | 
						|
                                store=True,
 | 
						|
                                readonly=True)
 | 
						|
    parent_id = fields.Many2one(comodel_name='queue.job.channel',
 | 
						|
                                string='Parent Channel',
 | 
						|
                                ondelete='restrict')
 | 
						|
    job_function_ids = fields.One2many(comodel_name='queue.job.function',
 | 
						|
                                       inverse_name='channel_id',
 | 
						|
                                       string='Job Functions')
 | 
						|
 | 
						|
    _sql_constraints = [
 | 
						|
        ('name_uniq',
 | 
						|
         'unique(complete_name)',
 | 
						|
         'Channel complete name must be unique'),
 | 
						|
    ]
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    @api.depends('name', 'parent_id.complete_name')
 | 
						|
    def _compute_complete_name(self):
 | 
						|
        for record in self:
 | 
						|
            # if not record.name:
 | 
						|
            #     return  # new record
 | 
						|
            channel = record
 | 
						|
            parts = [channel.name]
 | 
						|
            while channel.parent_id:
 | 
						|
                channel = channel.parent_id
 | 
						|
                parts.append(channel.name)
 | 
						|
            record.complete_name = '.'.join(reversed(parts))
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    @api.constrains('parent_id', 'name')
 | 
						|
    def parent_required(self):
 | 
						|
        for record in self:
 | 
						|
            if record.name != 'root' and not record.parent_id:
 | 
						|
                raise exceptions.ValidationError(_('Parent channel required.'))
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def write(self, values):
 | 
						|
        for channel in self:
 | 
						|
            if (not self.env.context.get('install_mode') and
 | 
						|
                    channel.name == 'root' and
 | 
						|
                    ('name' in values or 'parent_id' in values)):
 | 
						|
                raise exceptions.Warning(_('Cannot change the root channel'))
 | 
						|
        return super(JobChannel, self).write(values)
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def unlink(self):
 | 
						|
        for channel in self:
 | 
						|
            if channel.name == 'root':
 | 
						|
                raise exceptions.Warning(_('Cannot remove the root channel'))
 | 
						|
        return super(JobChannel, self).unlink()
 | 
						|
 | 
						|
    @api.multi
 | 
						|
    def name_get(self):
 | 
						|
        result = []
 | 
						|
        for record in self:
 | 
						|
            result.append((record.id, record.complete_name))
 | 
						|
        return result
 | 
						|
 | 
						|
 | 
						|
class JobFunction(models.Model):
 | 
						|
    _name = 'queue.job.function'
 | 
						|
    _description = 'Job Functions'
 | 
						|
    _log_access = False
 | 
						|
 | 
						|
    @api.model
 | 
						|
    def _default_channel(self):
 | 
						|
        return self.env.ref('queue_job.channel_root')
 | 
						|
 | 
						|
    name = fields.Char(index=True)
 | 
						|
    channel_id = fields.Many2one(comodel_name='queue.job.channel',
 | 
						|
                                 string='Channel',
 | 
						|
                                 required=True,
 | 
						|
                                 default=_default_channel)
 | 
						|
    channel = fields.Char(related='channel_id.complete_name',
 | 
						|
                          store=True,
 | 
						|
                          readonly=True)
 | 
						|
 | 
						|
    @api.model
 | 
						|
    def _find_or_create_channel(self, channel_path):
 | 
						|
        channel_model = self.env['queue.job.channel']
 | 
						|
        parts = channel_path.split('.')
 | 
						|
        parts.reverse()
 | 
						|
        channel_name = parts.pop()
 | 
						|
        assert channel_name == 'root', "A channel path starts with 'root'"
 | 
						|
        # get the root channel
 | 
						|
        channel = channel_model.search([('name', '=', channel_name)])
 | 
						|
        while parts:
 | 
						|
            channel_name = parts.pop()
 | 
						|
            parent_channel = channel
 | 
						|
            channel = channel_model.search([
 | 
						|
                ('name', '=', channel_name),
 | 
						|
                ('parent_id', '=', parent_channel.id)],
 | 
						|
                limit=1,
 | 
						|
            )
 | 
						|
            if not channel:
 | 
						|
                channel = channel_model.create({
 | 
						|
                    'name': channel_name,
 | 
						|
                    'parent_id': parent_channel.id,
 | 
						|
                })
 | 
						|
        return channel
 | 
						|
 | 
						|
    @api.model
 | 
						|
    def _register_job(self, job_method):
 | 
						|
        func_name = channel_func_name(job_method)
 | 
						|
        if not self.search_count([('name', '=', func_name)]):
 | 
						|
            channel = self._find_or_create_channel(job_method.default_channel)
 | 
						|
            self.create({'name': func_name, 'channel_id': channel.id})
 |