【odoo】【知识杂谈】单一实例多库模式下定时任务的问题分析

欢迎转载,但需标注出处,谢谢!

背景:

有客户反应有个别模块下的定时任务没有正常执行,是否是新装的模块哪些有题?排查后发现,客户是在一台服务器上跑着一个odoo容器,对应多个数据库。个别库的定时任务是正常的,但是一个对接其他平台的库的定时任务没有正常跑起来。

先说结论,看官没时间支持按说明处理即可,分析过程在下面。

结论

odoo的配置文件db_name字段配置希望后台一直跑着的库名称字符串,以英文“,”分割。

分析

直接源码

看odoo日志,我们知道odoo的任务正常执行时会打印Starting Job 任务名称,直接vscode全局查找,定位到ir_cron.py文件的_process_jobs函数。

@classmethod def _process_jobs(cls, db_name): """ Try to process all cron jobs. This selects in database all the jobs that should be processed. It then tries to lock each of them and, if it succeeds, run the cron job (if it doesn't succeed, it means the job was already locked to be taken care of by another thread) and return. :raise BadVersion: if the version is different from the worker's :raise BadModuleState: if modules are to install/upgrade/remove """ db = odoo.sql_db.db_connect(db_name) threading.current_thread().dbname = db_name try: with db.cursor() as cr: # Make sure the database has the same version as the code of # base and that no module must be installed/upgraded/removed cr.execute("SELECT latest_version FROM ir_module_module WHERE name=%s", ['base']) (version,) = cr.fetchone() cr.execute("SELECT COUNT(*) FROM ir_module_module WHERE state LIKE %s", ['to %']) (changes,) = cr.fetchone() if version is None: raise BadModuleState() elif version != BASE_VERSION: raise BadVersion() # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1. cr.execute("""SELECT * FROM ir_cron WHERE numbercall != 0 AND active AND nextcall <= (now() at time zone 'UTC') ORDER BY priority""") jobs = cr.dictfetchall() if changes: if not jobs: raise BadModuleState() # nextcall is never updated if the cron is not executed, # it is used as a sentinel value to check whether cron jobs # have been locked for a long time (stuck) parse = fields.Datetime.from_string oldest = min([parse(job['nextcall']) for job in jobs]) if datetime.now() - oldest > MAX_FAIL_TIME: odoo.modules.reset_modules_state(db_name) else: raise BadModuleState() for job in jobs: lock_cr = db.cursor() try: # Try to grab an exclusive lock on the job row from within the task transaction # Restrict to the same conditions as for the search since the job may have already # been run by an other thread when cron is running in multi thread lock_cr.execute("""SELECT * FROM ir_cron WHERE numbercall != 0 AND active AND nextcall <= (now() at time zone 'UTC') AND id=%s FOR UPDATE NOWAIT""", (job['id'],), log_exceptions=False) locked_job = lock_cr.fetchone() if not locked_job: _logger.debug("Job `%s` already executed by another process/thread. skipping it", job['cron_name']) continue # Got the lock on the job row, run its code _logger.info('Starting job `%s`.', job['cron_name']) job_cr = db.cursor() try: registry = odoo.registry(db_name) registry[cls._name]._process_job(job_cr, job, lock_cr) _logger.info('Job `%s` done.', job['cron_name']) except Exception: _logger.exception('Unexpected exception while processing cron job %r', job) finally: job_cr.close() except psycopg2.OperationalError as e: if e.pgcode == '55P03': # Class 55: Object not in prerequisite state; 55P03: lock_not_available _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['cron_name']) continue else: # Unexpected OperationalError raise finally: # we're exiting due to an exception while acquiring the lock lock_cr.close() finally: if hasattr(threading.current_thread(), 'dbname'): del threading.current_thread().dbname

看到上面这个函数已经是执行的具体内容了。我们继续在该文件查找_process_jobs函数 => _acquire_job函数 => server.py文件中的cron_thread函数 => cron_spawn函数。
其中cron_spawn定了开启几个cron线程,由max_cron_threads决定。
在cron_thread函数中,我们可以看到定时任务的调用过程

def cron_thread(self, number): from odoo.addons.base.models.ir_cron import ir_cron while True: time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style registries = odoo.modules.registry.Registry.registries _logger.debug('cron%d polling for jobs', number) for db_name, registry in registries.d.items(): if registry.ready: thread = threading.currentThread() thread.start_time = time.time() try: ir_cron._acquire_job(db_name) except Exception: _logger.warning('cron%d encountered an Exception:', number, exc_info=True) thread.start_time = None

核心内容是registries.d.items(),cron线程将循环调用registries中的数据库信息,那么这个变量中到底有哪些内容,如何添加的呢?可以全局搜索registries,定位到Registry.py文件(具体的registry类对象)以及server.py中的preload_registries函数以及调用该函数的run函数。看名称可以了解将预加载registry信息。

def run(self, preload=None, stop=False): """ Start the http server and the cron thread then wait for a signal. The first SIGINT or SIGTERM signal will initiate a graceful shutdown while a second one if any will force an immediate exit. """ self.start(stop=stop) rc = preload_registries(preload) if stop: if config['test_enable']: logger = odoo.tests.runner._logger with Registry.registries._lock: for db, registry in Registry.registries.d.items(): report = registry._assertion_report log = logger.error if not report.wasSuccessful() \ else logger.warning if not report.testsRun \ else logger.info log("%s when loading database %r", report, db) self.stop() return rc

核心是run函数汇总的preload变量,记录着将初始化哪些数据库的对象;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzfzg.html