407 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			407 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
# -*- coding: utf-8 -*-
 | 
						|
# Copyright (c) 2015-2016 ACSONE SA/NV (<http://acsone.eu>)
 | 
						|
# Copyright 2015-2016 Camptocamp SA
 | 
						|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)
 | 
						|
"""
 | 
						|
What is the job runner?
 | 
						|
-----------------------
 | 
						|
The job runner is the main process managing the dispatch of delayed jobs to
 | 
						|
available Odoo workers
 | 
						|
 | 
						|
How does it work?
 | 
						|
-----------------
 | 
						|
 | 
						|
* It starts as a thread in the Odoo main process
 | 
						|
* It receives postgres NOTIFY messages each time jobs are
 | 
						|
  added or updated in the queue_job table.
 | 
						|
* It maintains an in-memory priority queue of jobs that
 | 
						|
  is populated from the queue_job tables in all databases.
 | 
						|
* It does not run jobs itself, but asks Odoo to run them through an
 | 
						|
  anonymous ``/queue_job/runjob`` HTTP request. [1]_
 | 
						|
 | 
						|
How to use it?
 | 
						|
--------------
 | 
						|
 | 
						|
* Optionally adjust your configuration through environment variables:
 | 
						|
 | 
						|
  - set ``ODOO_QUEUE_JOB_CHANNELS=root:4`` (or any other channels
 | 
						|
    configuration) if you don't want the default ``root:1``.
 | 
						|
 | 
						|
  - if ``xmlrpc-port`` is not set, you can set it for the jobrunner only with:
 | 
						|
    ``ODOO_QUEUE_JOB_PORT=8069``.
 | 
						|
 | 
						|
* Alternatively, configure the channels through the Odoo configuration
 | 
						|
  file, like:
 | 
						|
 | 
						|
.. code-block:: ini
 | 
						|
 | 
						|
  [queue_job]
 | 
						|
  channels = root:4
 | 
						|
 | 
						|
* Or, if using ``anybox.recipe.odoo``, add this to your buildout configuration:
 | 
						|
 | 
						|
.. code-block:: ini
 | 
						|
 | 
						|
  [odoo]
 | 
						|
  recipe = anybox.recipe.odoo
 | 
						|
  (...)
 | 
						|
  queue_job.channels = root:4
 | 
						|
 | 
						|
* Start Odoo with ``--load=web,web_kanban,queue_job``
 | 
						|
  and ``--workers`` greater than 1 [2]_, or set the ``server_wide_modules``
 | 
						|
  option in The Odoo configuration file:
 | 
						|
 | 
						|
.. code-block:: ini
 | 
						|
 | 
						|
  [options]
 | 
						|
  (...)
 | 
						|
  workers = 4
 | 
						|
  server_wide_modules = web,web_kanban,queue_job
 | 
						|
  (...)
 | 
						|
 | 
						|
* Or, if using ``anybox.recipe.odoo``:
 | 
						|
 | 
						|
.. code-block:: ini
 | 
						|
 | 
						|
  [odoo]
 | 
						|
  recipe = anybox.recipe.odoo
 | 
						|
  (...)
 | 
						|
  options.workers = 4
 | 
						|
  options.server_wide_modules = web,web_kanban,queue_job
 | 
						|
 | 
						|
* Confirm the runner is starting correctly by checking the odoo log file:
 | 
						|
 | 
						|
.. code-block:: none
 | 
						|
 | 
						|
  ...INFO...queue_job.jobrunner.runner: starting
 | 
						|
  ...INFO...queue_job.jobrunner.runner: initializing database connections
 | 
						|
  ...INFO...queue_job.jobrunner.runner: queue job runner ready for db <dbname>
 | 
						|
  ...INFO...queue_job.jobrunner.runner: database connections ready
 | 
						|
 | 
						|
* Create jobs (eg using base_import_async) and observe they
 | 
						|
  start immediately and in parallel.
 | 
						|
 | 
						|
* Tip: to enable debug logging for the queue job, use
 | 
						|
  ``--log-handler=odoo.addons.queue_job:DEBUG``
 | 
						|
 | 
						|
Caveat
 | 
						|
------
 | 
						|
 | 
						|
* After creating a new database or installing queue_job on an
 | 
						|
  existing database, Odoo must be restarted for the runner to detect it.
 | 
						|
 | 
						|
* When Odoo shuts down normally, it waits for running jobs to finish.
 | 
						|
  However, when the Odoo server crashes or is otherwise force-stopped,
 | 
						|
  running jobs are interrupted while the runner has no chance to know
 | 
						|
  they have been aborted. In such situations, jobs may remain in
 | 
						|
  ``started`` or ``enqueued`` state after the Odoo server is halted.
 | 
						|
  Since the runner has no way to know if they are actually running or
 | 
						|
  not, and does not know for sure if it is safe to restart the jobs,
 | 
						|
  it does not attempt to restart them automatically. Such stale jobs
 | 
						|
  therefore fill the running queue and prevent other jobs to start.
 | 
						|
  You must therefore requeue them manually, either from the Jobs view,
 | 
						|
  or by running the following SQL statement *before starting Odoo*:
 | 
						|
 | 
						|
.. code-block:: sql
 | 
						|
 | 
						|
  update queue_job set state='pending' where state in ('started', 'enqueued')
 | 
						|
 | 
						|
.. rubric:: Footnotes
 | 
						|
 | 
						|
.. [1] From a security standpoint, it is safe to have an anonymous HTTP
 | 
						|
       request because this request only accepts to run jobs that are
 | 
						|
       enqueued.
 | 
						|
.. [2] It works with the threaded Odoo server too, although this way
 | 
						|
       of running Odoo is obviously not for production purposes.
 | 
						|
"""
 | 
						|
 | 
						|
from contextlib import closing
 | 
						|
import datetime
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import select
 | 
						|
import threading
 | 
						|
import time
 | 
						|
 | 
						|
import psycopg2
 | 
						|
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
 | 
						|
import requests
 | 
						|
 | 
						|
import odoo
 | 
						|
from odoo.tools import config
 | 
						|
 | 
						|
from .channels import ChannelManager, PENDING, ENQUEUED, NOT_DONE
 | 
						|
 | 
						|
SELECT_TIMEOUT = 60
 | 
						|
ERROR_RECOVERY_DELAY = 5
 | 
						|
 | 
						|
_logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
# Unfortunately, it is not possible to extend the Odoo
 | 
						|
# server command line arguments, so we resort to environment variables
 | 
						|
# to configure the runner (channels mostly).
 | 
						|
#
 | 
						|
# On the other hand, the odoo configuration file can be extended at will,
 | 
						|
# so we check it in addition to the environment variables.
 | 
						|
 | 
						|
 | 
						|
def _channels():
 | 
						|
    return (
 | 
						|
        os.environ.get('ODOO_QUEUE_JOB_CHANNELS') or
 | 
						|
        config.misc.get("queue_job", {}).get("channels") or
 | 
						|
        "root:1"
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def _datetime_to_epoch(dt):
 | 
						|
    # important: this must return the same as postgresql
 | 
						|
    # EXTRACT(EPOCH FROM TIMESTAMP dt)
 | 
						|
    return (dt - datetime.datetime(1970, 1, 1)).total_seconds()
 | 
						|
 | 
						|
 | 
						|
def _odoo_now():
 | 
						|
    dt = datetime.datetime.utcnow()
 | 
						|
    return _datetime_to_epoch(dt)
 | 
						|
 | 
						|
 | 
						|
def _async_http_get(port, db_name, job_uuid):
 | 
						|
    # Method to set failed job (due to timeout, etc) as pending,
 | 
						|
    # to avoid keeping it as enqueued.
 | 
						|
    def set_job_pending():
 | 
						|
        connection_info = odoo.sql_db.connection_info_for(db_name)[1]
 | 
						|
        conn = psycopg2.connect(**connection_info)
 | 
						|
        conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
 | 
						|
        with closing(conn.cursor()) as cr:
 | 
						|
            cr.execute(
 | 
						|
                "UPDATE queue_job SET state=%s, "
 | 
						|
                "date_enqueued=NULL, date_started=NULL "
 | 
						|
                "WHERE uuid=%s and state=%s", (PENDING, job_uuid, ENQUEUED)
 | 
						|
            )
 | 
						|
 | 
						|
    # TODO: better way to HTTP GET asynchronously (grequest, ...)?
 | 
						|
    #       if this was python3 I would be doing this with
 | 
						|
    #       asyncio, aiohttp and aiopg
 | 
						|
    def urlopen():
 | 
						|
        url = ('http://localhost:%s/queue_job/runjob?db=%s&job_uuid=%s' %
 | 
						|
               (port, db_name, job_uuid))
 | 
						|
        try:
 | 
						|
            # we are not interested in the result, so we set a short timeout
 | 
						|
            # but not too short so we trap and log hard configuration errors
 | 
						|
            response = requests.get(url, timeout=1)
 | 
						|
 | 
						|
            # raise_for_status will result in either nothing, a Client Error
 | 
						|
            # for HTTP Response codes between 400 and 500 or a Server Error
 | 
						|
            # for codes between 500 and 600
 | 
						|
            response.raise_for_status()
 | 
						|
        except requests.Timeout:
 | 
						|
            set_job_pending()
 | 
						|
        except:
 | 
						|
            _logger.exception("exception in GET %s", url)
 | 
						|
            set_job_pending()
 | 
						|
    thread = threading.Thread(target=urlopen)
 | 
						|
    thread.daemon = True
 | 
						|
    thread.start()
 | 
						|
 | 
						|
 | 
						|
class Database(object):
 | 
						|
 | 
						|
    def __init__(self, db_name):
 | 
						|
        self.db_name = db_name
 | 
						|
        connection_info = odoo.sql_db.connection_info_for(db_name)[1]
 | 
						|
        self.conn = psycopg2.connect(**connection_info)
 | 
						|
        self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
 | 
						|
        self.has_queue_job = self._has_queue_job()
 | 
						|
        if self.has_queue_job:
 | 
						|
            self._initialize()
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        try:
 | 
						|
            self.conn.close()
 | 
						|
        except:
 | 
						|
            pass
 | 
						|
        self.conn = None
 | 
						|
 | 
						|
    def _has_queue_job(self):
 | 
						|
        with closing(self.conn.cursor()) as cr:
 | 
						|
            cr.execute("SELECT 1 FROM pg_tables WHERE tablename=%s",
 | 
						|
                       ('ir_module_module',))
 | 
						|
            if not cr.fetchone():
 | 
						|
                return False
 | 
						|
            cr.execute(
 | 
						|
                "SELECT 1 FROM ir_module_module WHERE name=%s AND state=%s",
 | 
						|
                ('queue_job', 'installed')
 | 
						|
            )
 | 
						|
            return cr.fetchone()
 | 
						|
 | 
						|
    def _initialize(self):
 | 
						|
        with closing(self.conn.cursor()) as cr:
 | 
						|
            # this is the trigger that sends notifications when jobs change
 | 
						|
            cr.execute("""
 | 
						|
                DROP TRIGGER IF EXISTS queue_job_notify ON queue_job;
 | 
						|
 | 
						|
                CREATE OR REPLACE
 | 
						|
                    FUNCTION queue_job_notify() RETURNS trigger AS $$
 | 
						|
                BEGIN
 | 
						|
                    IF TG_OP = 'DELETE' THEN
 | 
						|
                        IF OLD.state != 'done' THEN
 | 
						|
                            PERFORM pg_notify('queue_job', OLD.uuid);
 | 
						|
                        END IF;
 | 
						|
                    ELSE
 | 
						|
                        PERFORM pg_notify('queue_job', NEW.uuid);
 | 
						|
                    END IF;
 | 
						|
                    RETURN NULL;
 | 
						|
                END;
 | 
						|
                $$ LANGUAGE plpgsql;
 | 
						|
 | 
						|
                CREATE TRIGGER queue_job_notify
 | 
						|
                    AFTER INSERT OR UPDATE OR DELETE
 | 
						|
                    ON queue_job
 | 
						|
                    FOR EACH ROW EXECUTE PROCEDURE queue_job_notify();
 | 
						|
            """)
 | 
						|
            cr.execute("LISTEN queue_job")
 | 
						|
 | 
						|
    def select_jobs(self, where, args):
 | 
						|
        query = ("SELECT channel, uuid, id as seq, date_created, "
 | 
						|
                 "priority, EXTRACT(EPOCH FROM eta), state "
 | 
						|
                 "FROM queue_job WHERE %s" %
 | 
						|
                 (where, ))
 | 
						|
        with closing(self.conn.cursor()) as cr:
 | 
						|
            cr.execute(query, args)
 | 
						|
            return list(cr.fetchall())
 | 
						|
 | 
						|
    def set_job_enqueued(self, uuid):
 | 
						|
        with closing(self.conn.cursor()) as cr:
 | 
						|
            cr.execute("UPDATE queue_job SET state=%s, "
 | 
						|
                       "date_enqueued=date_trunc('seconds', "
 | 
						|
                       "                         now() at time zone 'utc') "
 | 
						|
                       "WHERE uuid=%s",
 | 
						|
                       (ENQUEUED, uuid))
 | 
						|
 | 
						|
 | 
						|
class QueueJobRunner(object):
 | 
						|
 | 
						|
    def __init__(self, port=8069, channel_config_string=None):
 | 
						|
        self.port = port
 | 
						|
        self.channel_manager = ChannelManager()
 | 
						|
        if channel_config_string is None:
 | 
						|
            channel_config_string = _channels()
 | 
						|
        self.channel_manager.simple_configure(channel_config_string)
 | 
						|
        self.db_by_name = {}
 | 
						|
        self._stop = False
 | 
						|
        self._stop_pipe = os.pipe()
 | 
						|
 | 
						|
    def get_db_names(self):
 | 
						|
        if odoo.tools.config['db_name']:
 | 
						|
            db_names = odoo.tools.config['db_name'].split(',')
 | 
						|
        else:
 | 
						|
            db_names = odoo.service.db.exp_list(True)
 | 
						|
        return db_names
 | 
						|
 | 
						|
    def close_databases(self, remove_jobs=True):
 | 
						|
        for db_name, db in self.db_by_name.items():
 | 
						|
            try:
 | 
						|
                if remove_jobs:
 | 
						|
                    self.channel_manager.remove_db(db_name)
 | 
						|
                db.close()
 | 
						|
            except:
 | 
						|
                _logger.warning('error closing database %s',
 | 
						|
                                db_name, exc_info=True)
 | 
						|
        self.db_by_name = {}
 | 
						|
 | 
						|
    def initialize_databases(self):
 | 
						|
        for db_name in self.get_db_names():
 | 
						|
            db = Database(db_name)
 | 
						|
            if not db.has_queue_job:
 | 
						|
                _logger.debug('queue_job is not installed for db %s', db_name)
 | 
						|
            else:
 | 
						|
                self.db_by_name[db_name] = db
 | 
						|
                for job_data in db.select_jobs('state in %s', (NOT_DONE,)):
 | 
						|
                    self.channel_manager.notify(db_name, *job_data)
 | 
						|
                _logger.info('queue job runner ready for db %s', db_name)
 | 
						|
 | 
						|
    def run_jobs(self):
 | 
						|
        now = _odoo_now()
 | 
						|
        for job in self.channel_manager.get_jobs_to_run(now):
 | 
						|
            if self._stop:
 | 
						|
                break
 | 
						|
            _logger.info("asking Odoo to run job %s on db %s",
 | 
						|
                         job.uuid, job.db_name)
 | 
						|
            self.db_by_name[job.db_name].set_job_enqueued(job.uuid)
 | 
						|
            _async_http_get(self.port, job.db_name, job.uuid)
 | 
						|
 | 
						|
    def process_notifications(self):
 | 
						|
        for db in self.db_by_name.values():
 | 
						|
            while db.conn.notifies:
 | 
						|
                if self._stop:
 | 
						|
                    break
 | 
						|
                notification = db.conn.notifies.pop()
 | 
						|
                uuid = notification.payload
 | 
						|
                job_datas = db.select_jobs('uuid = %s', (uuid,))
 | 
						|
                if job_datas:
 | 
						|
                    self.channel_manager.notify(db.db_name, *job_datas[0])
 | 
						|
                else:
 | 
						|
                    self.channel_manager.remove_job(uuid)
 | 
						|
 | 
						|
    def wait_notification(self):
 | 
						|
        for db in self.db_by_name.values():
 | 
						|
            if db.conn.notifies:
 | 
						|
                # something is going on in the queue, no need to wait
 | 
						|
                return
 | 
						|
        # wait for something to happen in the queue_job tables
 | 
						|
        # we'll select() on database connections and the stop pipe
 | 
						|
        conns = [db.conn for db in self.db_by_name.values()]
 | 
						|
        conns.append(self._stop_pipe[0])
 | 
						|
        # look if the channels specify a wakeup time
 | 
						|
        wakeup_time = self.channel_manager.get_wakeup_time()
 | 
						|
        if not wakeup_time:
 | 
						|
            # this could very well be no timeout at all, because
 | 
						|
            # any activity in the job queue will wake us up, but
 | 
						|
            # let's have a timeout anyway, just to be safe
 | 
						|
            timeout = SELECT_TIMEOUT
 | 
						|
        else:
 | 
						|
            timeout = wakeup_time - _odoo_now()
 | 
						|
        # wait for a notification or a timeout;
 | 
						|
        # if timeout is negative (ie wakeup time in the past),
 | 
						|
        # do not wait; this should rarely happen
 | 
						|
        # because of how get_wakeup_time is designed; actually
 | 
						|
        # if timeout remains a large negative number, it is most
 | 
						|
        # probably a bug
 | 
						|
        _logger.debug("select() timeout: %.2f sec", timeout)
 | 
						|
        if timeout > 0:
 | 
						|
            conns, _, _ = select.select(conns, [], [], timeout)
 | 
						|
            if conns and not self._stop:
 | 
						|
                for conn in conns:
 | 
						|
                    conn.poll()
 | 
						|
 | 
						|
    def stop(self):
 | 
						|
        _logger.info("graceful stop requested")
 | 
						|
        self._stop = True
 | 
						|
        # wakeup the select() in wait_notification
 | 
						|
        os.write(self._stop_pipe[1], b'.')
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        _logger.info("starting")
 | 
						|
        while not self._stop:
 | 
						|
            # outer loop does exception recovery
 | 
						|
            try:
 | 
						|
                _logger.info("initializing database connections")
 | 
						|
                # TODO: how to detect new databases or databases
 | 
						|
                #       on which queue_job is installed after server start?
 | 
						|
                self.initialize_databases()
 | 
						|
                _logger.info("database connections ready")
 | 
						|
                # inner loop does the normal processing
 | 
						|
                while not self._stop:
 | 
						|
                    self.process_notifications()
 | 
						|
                    self.run_jobs()
 | 
						|
                    self.wait_notification()
 | 
						|
            except KeyboardInterrupt:
 | 
						|
                self.stop()
 | 
						|
            except:
 | 
						|
                _logger.exception("exception: sleeping %ds and retrying",
 | 
						|
                                  ERROR_RECOVERY_DELAY)
 | 
						|
                self.close_databases()
 | 
						|
                time.sleep(ERROR_RECOVERY_DELAY)
 | 
						|
        self.close_databases(remove_jobs=False)
 | 
						|
        _logger.info("stopped")
 |