125 lines
3.9 KiB
Python
125 lines
3.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright (c) 2015-2016 ACSONE SA/NV (<http://acsone.eu>)
|
|
# Copyright 2013-2016 Camptocamp SA
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)
|
|
|
|
import logging
|
|
import traceback
|
|
from io import StringIO
|
|
|
|
from psycopg2 import OperationalError
|
|
|
|
import odoo
|
|
from odoo import _, http, tools
|
|
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY
|
|
|
|
from ..job import Job, ENQUEUED
|
|
from ..exception import (NoSuchJobError,
|
|
NotReadableJobError,
|
|
RetryableJobError,
|
|
FailedJobError,
|
|
NothingToDoJob)
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
PG_RETRY = 5 # seconds
|
|
|
|
|
|
class RunJobController(http.Controller):
|
|
|
|
def _load_job(self, env, job_uuid):
|
|
""" Reload a job from the backend """
|
|
try:
|
|
job = Job.load(env, job_uuid)
|
|
except NoSuchJobError:
|
|
# just skip it
|
|
job = None
|
|
except NotReadableJobError:
|
|
_logger.exception('Could not read job: %s', job_uuid)
|
|
raise
|
|
return job
|
|
|
|
def _try_perform_job(self, env, job):
|
|
"""Try to perform the job."""
|
|
|
|
# if the job has been manually set to DONE or PENDING,
|
|
# or if something tries to run a job that is not enqueued
|
|
# before its execution, stop
|
|
if job.state != ENQUEUED:
|
|
_logger.warning('job %s is in state %s '
|
|
'instead of enqueued in /runjob',
|
|
job.uuid, job.state)
|
|
return
|
|
|
|
# TODO: set_started should be done atomically with
|
|
# update queue_job set=state=started
|
|
# where state=enqueid and id=
|
|
job.set_started()
|
|
job.store()
|
|
http.request.env.cr.commit()
|
|
|
|
_logger.debug('%s started', job)
|
|
job.perform()
|
|
job.set_done()
|
|
job.store()
|
|
http.request.env.cr.commit()
|
|
_logger.debug('%s done', job)
|
|
|
|
@http.route('/queue_job/runjob', type='http', auth='none')
|
|
def runjob(self, db, job_uuid, **kw):
|
|
http.request.session.db = db
|
|
env = http.request.env(user=odoo.SUPERUSER_ID)
|
|
|
|
def retry_postpone(job, message, seconds=None):
|
|
job.postpone(result=message, seconds=seconds)
|
|
job.set_pending(reset_retry=False)
|
|
job.store()
|
|
env.cr.commit()
|
|
|
|
job = self._load_job(env, job_uuid)
|
|
if job is None:
|
|
return ""
|
|
env.cr.commit()
|
|
|
|
try:
|
|
try:
|
|
self._try_perform_job(env, job)
|
|
except OperationalError as err:
|
|
# Automatically retry the typical transaction serialization
|
|
# errors
|
|
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
|
|
raise
|
|
|
|
retry_postpone(job, tools.ustr(err.pgerror, errors='replace'),
|
|
seconds=PG_RETRY)
|
|
_logger.debug('%s OperationalError, postponed', job)
|
|
|
|
except NothingToDoJob as err:
|
|
if str(err):
|
|
msg = str(err)
|
|
else:
|
|
msg = _('Job interrupted and set to Done: nothing to do.')
|
|
job.set_done(msg)
|
|
job.store()
|
|
env.cr.commit()
|
|
|
|
except RetryableJobError as err:
|
|
# delay the job later, requeue
|
|
retry_postpone(job, str(err), seconds=err.seconds)
|
|
_logger.debug('%s postponed', job)
|
|
|
|
except (FailedJobError, Exception):
|
|
buff = StringIO()
|
|
traceback.print_exc(file=buff)
|
|
_logger.error(buff.getvalue())
|
|
job.env.clear()
|
|
with odoo.api.Environment.manage():
|
|
with odoo.registry(job.env.cr.dbname).cursor() as new_cr:
|
|
job.env = job.env(cr=new_cr)
|
|
job.set_failed(exc_info=buff.getvalue())
|
|
job.store()
|
|
new_cr.commit()
|
|
raise
|
|
|
|
return ""
|