Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,36 @@ Features:
.. contents::
:local:

Use Cases / Context
===================

Odoo treats task synchronously, like when you import a list of products
it will treat each line in one big task. "Queue job" gives you the
ability to detail big tasks in many smaller ones.

Imagine you have a lot of data to change for thousand orders, you can do
it in one step and cause a heavy load on the server, and this may affect
the performance of Odoo. With queue_job you can divide the work in jobs
and run thousand jobs (one job for each orders). An other benefit is if
one line failed it doesn't block the processing of the others, as the
jobs are independent. Plus you can schedule the jobs and set a number of
retries.

Here are some community usage examples:

- Mass sending invoices:
`account_invoice_mass_sending <https://github.com/OCA/account-invoicing/tree/17.0/account_invoice_mass_sending>`__
- Import data in the background:
`base_import_async <https://github.com/OCA/queue/tree/17.0/base_import_async>`__
- Export data in the background:
`base_export_async <https://github.com/OCA/queue/tree/17.0/base_export_async>`__
- Generate contract invoices with jobs:
`contract_queue_job <https://github.com/OCA/contract/tree/17.0/contract_queue_job>`__
- Generate partner invoices with
jobs:`partner_invoicing_mode <https://github.com/OCA/account-invoicing/tree/17.0/partner_invoicing_mode>`__
- Process the Sales Automatic Workflow actions with jobs:
`sale_automatic_workflow_job <https://github.com/OCA/sale-workflow/tree/17.0/sale_automatic_workflow_job>`__

Installation
============

Expand All @@ -99,10 +129,14 @@ Configuration

- ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels
configuration. The default is ``root:1``
- if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069``

- Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater
than 1. [1]_
- ``ODOO_QUEUE_JOB_PORT=8069``, default ``--http-port``
- ``ODOO_QUEUE_JOB_SCHEME=https``, default ``http``
- ``ODOO_QUEUE_JOB_HOST=load-balancer``, default
``--http-interface`` or ``localhost`` if unset
- ``ODOO_QUEUE_JOB_HTTP_AUTH_USER=jobrunner``, default empty
- ``ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD=s3cr3t``, default empty
- Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater
than 1. [1]_

- Using the Odoo configuration file:

Expand All @@ -116,6 +150,11 @@ Configuration
(...)
[queue_job]
channels = root:2
scheme = https
host = load-balancer
port = 443
http_auth_user = jobrunner
http_auth_password = s3cr3t

- Confirm the runner is starting correctly by checking the odoo log
file:
Expand Down
38 changes: 28 additions & 10 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from werkzeug.exceptions import BadRequest, Forbidden

from odoo import SUPERUSER_ID, api, http
from odoo.modules.registry import Registry
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY
from odoo.tools import config

from ..delay import chain, group
from ..exception import FailedJobError, RetryableJobError
Expand All @@ -38,8 +38,10 @@ def _prevent_commit(cr):
def forbidden_commit(*args, **kwargs):
raise RuntimeError(
"Commit is forbidden in queue jobs. "
"If the current job is a cron running as queue job, "
"modify it to run as a normal cron."
'You may want to enable the "Allow Commit" option on the Job '
"Function. Alternatively, if the current job is a cron running as "
"queue job, you can modify it to run as a normal cron. More details on: "
"https://github.com/OCA/queue/wiki/Upgrade-warning:-commits-inside-jobs"
)

original_commit = cr.commit
Expand Down Expand Up @@ -103,11 +105,16 @@ def _try_perform_job(cls, env, job):
job.set_done()
job.store()
env.flush_all()
env.cr.commit()
if not config["test_enable"]:
env.cr.commit()
_logger.debug("%s done", job)

@classmethod
def _enqueue_dependent_jobs(cls, env, job):
if not job.should_check_dependents():
return

_logger.debug("%s enqueue depends started", job)
tries = 0
while True:
try:
Expand Down Expand Up @@ -136,13 +143,13 @@ def _enqueue_dependent_jobs(cls, env, job):
time.sleep(wait_time)
else:
break
_logger.debug("%s enqueue depends done", job)

@classmethod
def _runjob(cls, env: api.Environment, job: Job) -> None:
def retry_postpone(job, message, seconds=None):
job.env.clear()
with Registry(job.env.cr.dbname).cursor() as new_cr:
job.env = api.Environment(new_cr, SUPERUSER_ID, {})
with job.in_temporary_env():
job.postpone(result=message, seconds=seconds)
job.set_pending(reset_retry=False)
job.store()
Expand All @@ -167,24 +174,22 @@ def retry_postpone(job, message, seconds=None):
# traceback in the logs we should have the traceback when all
# retries are exhausted
env.cr.rollback()
return

except (FailedJobError, Exception) as orig_exception:
buff = StringIO()
traceback.print_exc(file=buff)
traceback_txt = buff.getvalue()
_logger.error(traceback_txt)
job.env.clear()
with Registry(job.env.cr.dbname).cursor() as new_cr:
job.env = job.env(cr=new_cr)
with job.in_temporary_env():
vals = cls._get_failure_values(job, traceback_txt, orig_exception)
job.set_failed(**vals)
job.store()
buff.close()
raise

_logger.debug("%s enqueue depends started", job)
cls._enqueue_dependent_jobs(env, job)
_logger.debug("%s enqueue depends done", job)

@classmethod
def _get_failure_values(cls, job, traceback_txt, orig_exception):
Expand Down Expand Up @@ -229,6 +234,7 @@ def create_test_job(
failure_rate=0,
job_duration=0,
commit_within_job=False,
failure_retry_seconds=0,
):
if not http.request.env.user.has_group("base.group_erp_manager"):
raise Forbidden(http.request.env._("Access Denied"))
Expand Down Expand Up @@ -266,6 +272,12 @@ def create_test_job(
except ValueError:
max_retries = None

if failure_retry_seconds is not None:
try:
failure_retry_seconds = int(failure_retry_seconds)
except ValueError:
failure_retry_seconds = 0

if size == 1:
return self._create_single_test_job(
priority=priority,
Expand All @@ -275,6 +287,7 @@ def create_test_job(
failure_rate=failure_rate,
job_duration=job_duration,
commit_within_job=commit_within_job,
failure_retry_seconds=failure_retry_seconds,
)

if size > 1:
Expand All @@ -287,6 +300,7 @@ def create_test_job(
failure_rate=failure_rate,
job_duration=job_duration,
commit_within_job=commit_within_job,
failure_retry_seconds=failure_retry_seconds,
)
return ""

Expand All @@ -300,6 +314,7 @@ def _create_single_test_job(
failure_rate=0,
job_duration=0,
commit_within_job=False,
failure_retry_seconds=0,
):
delayed = (
http.request.env["queue.job"]
Expand All @@ -313,6 +328,7 @@ def _create_single_test_job(
failure_rate=failure_rate,
job_duration=job_duration,
commit_within_job=commit_within_job,
failure_retry_seconds=failure_retry_seconds,
)
)
return f"job uuid: {delayed.db_record().uuid}"
Expand All @@ -329,6 +345,7 @@ def _create_graph_test_jobs(
failure_rate=0,
job_duration=0,
commit_within_job=False,
failure_retry_seconds=0,
):
model = http.request.env["queue.job"]
current_count = 0
Expand All @@ -355,6 +372,7 @@ def _create_graph_test_jobs(
failure_rate=failure_rate,
job_duration=job_duration,
commit_within_job=commit_within_job,
failure_retry_seconds=failure_retry_seconds,
)
)

Expand Down
42 changes: 32 additions & 10 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
import uuid
import weakref
from contextlib import contextmanager, nullcontext
from datetime import datetime, timedelta
from random import randint

Expand Down Expand Up @@ -403,14 +404,9 @@ def __init__(
raise TypeError("Job accepts only methods of Models")

recordset = func.__self__
env = recordset.env
self.method_name = func.__name__
self.recordset = recordset

self.env = env
self.job_model = self.env["queue.job"]
self.job_model_name = "queue.job"

self.job_config = (
self.env["queue.job.function"].sudo().job_config(self.job_function_name)
)
Expand Down Expand Up @@ -460,10 +456,10 @@ def __init__(
self.exc_message = None
self.exc_info = None

if "company_id" in env.context:
company_id = env.context["company_id"]
if "company_id" in self.env.context:
company_id = self.env.context["company_id"]
else:
company_id = env.company.id
company_id = self.env.company.id
self.company_id = company_id
self._eta = None
self.eta = eta
Expand All @@ -488,7 +484,12 @@ def perform(self):
"""
self.retry += 1
try:
self.result = self.func(*tuple(self.args), **self.kwargs)
if self.job_config.allow_commit:
env_context_manager = self.in_temporary_env()
else:
env_context_manager = nullcontext()
with env_context_manager:
self.result = self.func(*tuple(self.args), **self.kwargs)
except RetryableJobError as err:
if err.ignore_retry:
self.retry -= 1
Expand All @@ -508,6 +509,16 @@ def perform(self):

return self.result

@contextmanager
def in_temporary_env(self):
with self.env.registry.cursor() as new_cr:
env = self.env
self._env = env(cr=new_cr)
try:
yield
finally:
self._env = env

def _get_common_dependent_jobs_query(self):
return """
UPDATE queue_job
Expand Down Expand Up @@ -538,6 +549,9 @@ def _get_common_dependent_jobs_query(self):
AND state = %s;
"""

def should_check_dependents(self):
return any(self.__reverse_depends_on_uuids)

def enqueue_waiting(self):
sql = self._get_common_dependent_jobs_query()
self.env.cr.execute(sql, (PENDING, self.uuid, DONE, WAIT_DEPENDENCIES))
Expand Down Expand Up @@ -666,6 +680,14 @@ def __hash__(self):
def db_record(self):
return self.db_records_from_uuids(self.env, [self.uuid])

@property
def env(self):
return self.recordset.env

@env.setter
def _env(self, env):
self.recordset = self.recordset.with_env(env)

@property
def func(self):
recordset = self.recordset.with_context(job_uuid=self.uuid)
Expand Down Expand Up @@ -730,7 +752,7 @@ def model_name(self):

@property
def user_id(self):
return self.recordset.env.uid
return self.env.uid

@property
def eta(self):
Expand Down
Loading
Loading