407 lines
14 KiB
Python
407 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright (c) 2015-2016 ACSONE SA/NV (<http://acsone.eu>)
|
|
# Copyright 2015-2016 Camptocamp SA
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)
|
|
"""
|
|
What is the job runner?
|
|
-----------------------
|
|
The job runner is the main process managing the dispatch of delayed jobs to
|
|
available Odoo workers
|
|
|
|
How does it work?
|
|
-----------------
|
|
|
|
* It starts as a thread in the Odoo main process
|
|
* It receives postgres NOTIFY messages each time jobs are
|
|
added or updated in the queue_job table.
|
|
* It maintains an in-memory priority queue of jobs that
|
|
is populated from the queue_job tables in all databases.
|
|
* It does not run jobs itself, but asks Odoo to run them through an
|
|
anonymous ``/queue_job/runjob`` HTTP request. [1]_
|
|
|
|
How to use it?
|
|
--------------
|
|
|
|
* Optionally adjust your configuration through environment variables:
|
|
|
|
- set ``ODOO_QUEUE_JOB_CHANNELS=root:4`` (or any other channels
|
|
configuration) if you don't want the default ``root:1``.
|
|
|
|
- if ``xmlrpc-port`` is not set, you can set it for the jobrunner only with:
|
|
``ODOO_QUEUE_JOB_PORT=8069``.
|
|
|
|
* Alternatively, configure the channels through the Odoo configuration
|
|
file, like:
|
|
|
|
.. code-block:: ini
|
|
|
|
[queue_job]
|
|
channels = root:4
|
|
|
|
* Or, if using ``anybox.recipe.odoo``, add this to your buildout configuration:
|
|
|
|
.. code-block:: ini
|
|
|
|
[odoo]
|
|
recipe = anybox.recipe.odoo
|
|
(...)
|
|
queue_job.channels = root:4
|
|
|
|
* Start Odoo with ``--load=web,web_kanban,queue_job``
|
|
and ``--workers`` greater than 1 [2]_, or set the ``server_wide_modules``
|
|
option in The Odoo configuration file:
|
|
|
|
.. code-block:: ini
|
|
|
|
[options]
|
|
(...)
|
|
workers = 4
|
|
server_wide_modules = web,web_kanban,queue_job
|
|
(...)
|
|
|
|
* Or, if using ``anybox.recipe.odoo``:
|
|
|
|
.. code-block:: ini
|
|
|
|
[odoo]
|
|
recipe = anybox.recipe.odoo
|
|
(...)
|
|
options.workers = 4
|
|
options.server_wide_modules = web,web_kanban,queue_job
|
|
|
|
* Confirm the runner is starting correctly by checking the odoo log file:
|
|
|
|
.. code-block:: none
|
|
|
|
...INFO...queue_job.jobrunner.runner: starting
|
|
...INFO...queue_job.jobrunner.runner: initializing database connections
|
|
...INFO...queue_job.jobrunner.runner: queue job runner ready for db <dbname>
|
|
...INFO...queue_job.jobrunner.runner: database connections ready
|
|
|
|
* Create jobs (eg using base_import_async) and observe they
|
|
start immediately and in parallel.
|
|
|
|
* Tip: to enable debug logging for the queue job, use
|
|
``--log-handler=odoo.addons.queue_job:DEBUG``
|
|
|
|
Caveat
|
|
------
|
|
|
|
* After creating a new database or installing queue_job on an
|
|
existing database, Odoo must be restarted for the runner to detect it.
|
|
|
|
* When Odoo shuts down normally, it waits for running jobs to finish.
|
|
However, when the Odoo server crashes or is otherwise force-stopped,
|
|
running jobs are interrupted while the runner has no chance to know
|
|
they have been aborted. In such situations, jobs may remain in
|
|
``started`` or ``enqueued`` state after the Odoo server is halted.
|
|
Since the runner has no way to know if they are actually running or
|
|
not, and does not know for sure if it is safe to restart the jobs,
|
|
it does not attempt to restart them automatically. Such stale jobs
|
|
therefore fill the running queue and prevent other jobs to start.
|
|
You must therefore requeue them manually, either from the Jobs view,
|
|
or by running the following SQL statement *before starting Odoo*:
|
|
|
|
.. code-block:: sql
|
|
|
|
update queue_job set state='pending' where state in ('started', 'enqueued')
|
|
|
|
.. rubric:: Footnotes
|
|
|
|
.. [1] From a security standpoint, it is safe to have an anonymous HTTP
|
|
request because this request only accepts to run jobs that are
|
|
enqueued.
|
|
.. [2] It works with the threaded Odoo server too, although this way
|
|
of running Odoo is obviously not for production purposes.
|
|
"""
|
|
|
|
from contextlib import closing
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import select
|
|
import threading
|
|
import time
|
|
|
|
import psycopg2
|
|
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
|
import requests
|
|
|
|
import odoo
|
|
from odoo.tools import config
|
|
|
|
from .channels import ChannelManager, PENDING, ENQUEUED, NOT_DONE
|
|
|
|
SELECT_TIMEOUT = 60
|
|
ERROR_RECOVERY_DELAY = 5
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Unfortunately, it is not possible to extend the Odoo
|
|
# server command line arguments, so we resort to environment variables
|
|
# to configure the runner (channels mostly).
|
|
#
|
|
# On the other hand, the odoo configuration file can be extended at will,
|
|
# so we check it in addition to the environment variables.
|
|
|
|
|
|
def _channels():
|
|
return (
|
|
os.environ.get('ODOO_QUEUE_JOB_CHANNELS') or
|
|
config.misc.get("queue_job", {}).get("channels") or
|
|
"root:1"
|
|
)
|
|
|
|
|
|
def _datetime_to_epoch(dt):
|
|
# important: this must return the same as postgresql
|
|
# EXTRACT(EPOCH FROM TIMESTAMP dt)
|
|
return (dt - datetime.datetime(1970, 1, 1)).total_seconds()
|
|
|
|
|
|
def _odoo_now():
|
|
dt = datetime.datetime.utcnow()
|
|
return _datetime_to_epoch(dt)
|
|
|
|
|
|
def _async_http_get(port, db_name, job_uuid):
|
|
# Method to set failed job (due to timeout, etc) as pending,
|
|
# to avoid keeping it as enqueued.
|
|
def set_job_pending():
|
|
connection_info = odoo.sql_db.connection_info_for(db_name)[1]
|
|
conn = psycopg2.connect(**connection_info)
|
|
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
|
with closing(conn.cursor()) as cr:
|
|
cr.execute(
|
|
"UPDATE queue_job SET state=%s, "
|
|
"date_enqueued=NULL, date_started=NULL "
|
|
"WHERE uuid=%s and state=%s", (PENDING, job_uuid, ENQUEUED)
|
|
)
|
|
|
|
# TODO: better way to HTTP GET asynchronously (grequest, ...)?
|
|
# if this was python3 I would be doing this with
|
|
# asyncio, aiohttp and aiopg
|
|
def urlopen():
|
|
url = ('http://localhost:%s/queue_job/runjob?db=%s&job_uuid=%s' %
|
|
(port, db_name, job_uuid))
|
|
try:
|
|
# we are not interested in the result, so we set a short timeout
|
|
# but not too short so we trap and log hard configuration errors
|
|
response = requests.get(url, timeout=1)
|
|
|
|
# raise_for_status will result in either nothing, a Client Error
|
|
# for HTTP Response codes between 400 and 500 or a Server Error
|
|
# for codes between 500 and 600
|
|
response.raise_for_status()
|
|
except requests.Timeout:
|
|
set_job_pending()
|
|
except:
|
|
_logger.exception("exception in GET %s", url)
|
|
set_job_pending()
|
|
thread = threading.Thread(target=urlopen)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
|
|
class Database(object):
|
|
|
|
def __init__(self, db_name):
|
|
self.db_name = db_name
|
|
connection_info = odoo.sql_db.connection_info_for(db_name)[1]
|
|
self.conn = psycopg2.connect(**connection_info)
|
|
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
|
self.has_queue_job = self._has_queue_job()
|
|
if self.has_queue_job:
|
|
self._initialize()
|
|
|
|
def close(self):
|
|
try:
|
|
self.conn.close()
|
|
except:
|
|
pass
|
|
self.conn = None
|
|
|
|
def _has_queue_job(self):
|
|
with closing(self.conn.cursor()) as cr:
|
|
cr.execute("SELECT 1 FROM pg_tables WHERE tablename=%s",
|
|
('ir_module_module',))
|
|
if not cr.fetchone():
|
|
return False
|
|
cr.execute(
|
|
"SELECT 1 FROM ir_module_module WHERE name=%s AND state=%s",
|
|
('queue_job', 'installed')
|
|
)
|
|
return cr.fetchone()
|
|
|
|
def _initialize(self):
|
|
with closing(self.conn.cursor()) as cr:
|
|
# this is the trigger that sends notifications when jobs change
|
|
cr.execute("""
|
|
DROP TRIGGER IF EXISTS queue_job_notify ON queue_job;
|
|
|
|
CREATE OR REPLACE
|
|
FUNCTION queue_job_notify() RETURNS trigger AS $$
|
|
BEGIN
|
|
IF TG_OP = 'DELETE' THEN
|
|
IF OLD.state != 'done' THEN
|
|
PERFORM pg_notify('queue_job', OLD.uuid);
|
|
END IF;
|
|
ELSE
|
|
PERFORM pg_notify('queue_job', NEW.uuid);
|
|
END IF;
|
|
RETURN NULL;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
CREATE TRIGGER queue_job_notify
|
|
AFTER INSERT OR UPDATE OR DELETE
|
|
ON queue_job
|
|
FOR EACH ROW EXECUTE PROCEDURE queue_job_notify();
|
|
""")
|
|
cr.execute("LISTEN queue_job")
|
|
|
|
def select_jobs(self, where, args):
|
|
query = ("SELECT channel, uuid, id as seq, date_created, "
|
|
"priority, EXTRACT(EPOCH FROM eta), state "
|
|
"FROM queue_job WHERE %s" %
|
|
(where, ))
|
|
with closing(self.conn.cursor()) as cr:
|
|
cr.execute(query, args)
|
|
return list(cr.fetchall())
|
|
|
|
def set_job_enqueued(self, uuid):
|
|
with closing(self.conn.cursor()) as cr:
|
|
cr.execute("UPDATE queue_job SET state=%s, "
|
|
"date_enqueued=date_trunc('seconds', "
|
|
" now() at time zone 'utc') "
|
|
"WHERE uuid=%s",
|
|
(ENQUEUED, uuid))
|
|
|
|
|
|
class QueueJobRunner(object):
|
|
|
|
def __init__(self, port=8069, channel_config_string=None):
|
|
self.port = port
|
|
self.channel_manager = ChannelManager()
|
|
if channel_config_string is None:
|
|
channel_config_string = _channels()
|
|
self.channel_manager.simple_configure(channel_config_string)
|
|
self.db_by_name = {}
|
|
self._stop = False
|
|
self._stop_pipe = os.pipe()
|
|
|
|
def get_db_names(self):
|
|
if odoo.tools.config['db_name']:
|
|
db_names = odoo.tools.config['db_name'].split(',')
|
|
else:
|
|
db_names = odoo.service.db.exp_list(True)
|
|
return db_names
|
|
|
|
def close_databases(self, remove_jobs=True):
|
|
for db_name, db in self.db_by_name.items():
|
|
try:
|
|
if remove_jobs:
|
|
self.channel_manager.remove_db(db_name)
|
|
db.close()
|
|
except:
|
|
_logger.warning('error closing database %s',
|
|
db_name, exc_info=True)
|
|
self.db_by_name = {}
|
|
|
|
def initialize_databases(self):
|
|
for db_name in self.get_db_names():
|
|
db = Database(db_name)
|
|
if not db.has_queue_job:
|
|
_logger.debug('queue_job is not installed for db %s', db_name)
|
|
else:
|
|
self.db_by_name[db_name] = db
|
|
for job_data in db.select_jobs('state in %s', (NOT_DONE,)):
|
|
self.channel_manager.notify(db_name, *job_data)
|
|
_logger.info('queue job runner ready for db %s', db_name)
|
|
|
|
def run_jobs(self):
|
|
now = _odoo_now()
|
|
for job in self.channel_manager.get_jobs_to_run(now):
|
|
if self._stop:
|
|
break
|
|
_logger.info("asking Odoo to run job %s on db %s",
|
|
job.uuid, job.db_name)
|
|
self.db_by_name[job.db_name].set_job_enqueued(job.uuid)
|
|
_async_http_get(self.port, job.db_name, job.uuid)
|
|
|
|
def process_notifications(self):
|
|
for db in self.db_by_name.values():
|
|
while db.conn.notifies:
|
|
if self._stop:
|
|
break
|
|
notification = db.conn.notifies.pop()
|
|
uuid = notification.payload
|
|
job_datas = db.select_jobs('uuid = %s', (uuid,))
|
|
if job_datas:
|
|
self.channel_manager.notify(db.db_name, *job_datas[0])
|
|
else:
|
|
self.channel_manager.remove_job(uuid)
|
|
|
|
def wait_notification(self):
|
|
for db in self.db_by_name.values():
|
|
if db.conn.notifies:
|
|
# something is going on in the queue, no need to wait
|
|
return
|
|
# wait for something to happen in the queue_job tables
|
|
# we'll select() on database connections and the stop pipe
|
|
conns = [db.conn for db in self.db_by_name.values()]
|
|
conns.append(self._stop_pipe[0])
|
|
# look if the channels specify a wakeup time
|
|
wakeup_time = self.channel_manager.get_wakeup_time()
|
|
if not wakeup_time:
|
|
# this could very well be no timeout at all, because
|
|
# any activity in the job queue will wake us up, but
|
|
# let's have a timeout anyway, just to be safe
|
|
timeout = SELECT_TIMEOUT
|
|
else:
|
|
timeout = wakeup_time - _odoo_now()
|
|
# wait for a notification or a timeout;
|
|
# if timeout is negative (ie wakeup time in the past),
|
|
# do not wait; this should rarely happen
|
|
# because of how get_wakeup_time is designed; actually
|
|
# if timeout remains a large negative number, it is most
|
|
# probably a bug
|
|
_logger.debug("select() timeout: %.2f sec", timeout)
|
|
if timeout > 0:
|
|
conns, _, _ = select.select(conns, [], [], timeout)
|
|
if conns and not self._stop:
|
|
for conn in conns:
|
|
conn.poll()
|
|
|
|
def stop(self):
|
|
_logger.info("graceful stop requested")
|
|
self._stop = True
|
|
# wakeup the select() in wait_notification
|
|
os.write(self._stop_pipe[1], b'.')
|
|
|
|
def run(self):
|
|
_logger.info("starting")
|
|
while not self._stop:
|
|
# outer loop does exception recovery
|
|
try:
|
|
_logger.info("initializing database connections")
|
|
# TODO: how to detect new databases or databases
|
|
# on which queue_job is installed after server start?
|
|
self.initialize_databases()
|
|
_logger.info("database connections ready")
|
|
# inner loop does the normal processing
|
|
while not self._stop:
|
|
self.process_notifications()
|
|
self.run_jobs()
|
|
self.wait_notification()
|
|
except KeyboardInterrupt:
|
|
self.stop()
|
|
except:
|
|
_logger.exception("exception: sleeping %ds and retrying",
|
|
ERROR_RECOVERY_DELAY)
|
|
self.close_databases()
|
|
time.sleep(ERROR_RECOVERY_DELAY)
|
|
self.close_databases(remove_jobs=False)
|
|
_logger.info("stopped")
|