From 46f82438777b6a426a764c129c324289f6fd79c4 Mon Sep 17 00:00:00 2001 From: Vo Minh Thu Date: Thu, 7 Jul 2011 15:58:43 +0200 Subject: [PATCH] [IMP] ir_cron: reschedule the main cron thread if a worker takes too long. bzr revid: vmt@openerp.com-20110707135843-z38f4r8s373ctnd2 --- openerp-server | 4 +- openerp/addons/base/ir/ir_cron.py | 75 ++++++++++++++++++++++++++----- openerp/netsvc.py | 12 +++++ 3 files changed, 78 insertions(+), 13 deletions(-) diff --git a/openerp-server b/openerp-server index 347cd9f6723..a2ec11ba996 100755 --- a/openerp-server +++ b/openerp-server @@ -108,8 +108,8 @@ if config['db_name']: openerp.tools.convert_yaml_import(cr, 'base', file(config["test_file"]), {}, 'test', True) cr.rollback() - pool.get('ir.cron')._poolJobs(db.dbname) - # pool.get('ir.cron').restart(db.dbname) # jobs will start to be processed later, when start_agent below is called. + # jobs will start to be processed later, when start_agent below is called. + pool.get('ir.cron').restart(db.dbname) cr.close() diff --git a/openerp/addons/base/ir/ir_cron.py b/openerp/addons/base/ir/ir_cron.py index ba25f59525d..9360442e922 100644 --- a/openerp/addons/base/ir/ir_cron.py +++ b/openerp/addons/base/ir/ir_cron.py @@ -76,6 +76,25 @@ class ir_cron(osv.osv, netsvc.Agent): 'doall' : lambda *a: 1 } + thread_count_lock = threading.Lock() + thread_count = 1 # maximum allowed number of thread. + + @classmethod + def get_thread_count(cls): + return cls.thread_count + + @classmethod + def inc_thread_count(cls): + cls.thread_count_lock.acquire() + cls.thread_count += 1 + cls.thread_count_lock.release() + + @classmethod + def dec_thread_count(cls): + cls.thread_count_lock.acquire() + cls.thread_count -= 1 + cls.thread_count_lock.release() + def f(a, b, c): print ">>> in f" @@ -125,7 +144,11 @@ class ir_cron(osv.osv, netsvc.Agent): self._handle_callback_exception(cr, uid, model, func, args, job_id, e) def _compute_nextcall(self, job, now): - """ Compute the nextcall for a job exactly as _run_job does. """ + """ Compute the nextcall for a job exactly as _run_job does. + + Return either the nextcall or None if it shouldn't be called. + + """ nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S') numbercall = job['numbercall'] @@ -135,10 +158,12 @@ class ir_cron(osv.osv, netsvc.Agent): if numbercall: nextcall += _intervalTypes[job['interval_type']](job['interval_number']) + if not numbercall: + return None return nextcall.strftime('%Y-%m-%d %H:%M:%S') def _run_job(self, cr, job, now): - """ Run a given job. """ + """ Run a given job taking care of the repetition. """ try: nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S') numbercall = job['numbercall'] @@ -156,18 +181,35 @@ class ir_cron(osv.osv, netsvc.Agent): if not numbercall: addsql = ', active=False' cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id'])) - # TODO re-schedule the master thread to nextcall if its wake-up time is later than nextcall. - # TODO NOTIFY the 'ir_cron' channel. + + if numbercall: + # Reschedule our own main cron thread if necessary. + # This is really needed if this job run longer that its rescheduling period. + print ">>> advance at", nextcall + nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')) + self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname) finally: cr.commit() cr.close() def _poolJobs(self, db_name): + return self._run_jobs(db_name) + + def _run_jobs(self, db_name): # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py + """ Process the cron jobs by spawning worker threads. + + This selects in database all the jobs that should be processed. It then + try to lock each of them and, if it succeeds, spawn a thread to run the + cron job (if doesn't succeed, it means another the job was already + locked to be taken care of by another thread. + + """ try: db, pool = pooler.get_db_and_pool(db_name) except: return False + print ">>> _run_jobs" cr = db.cursor() try: jobs = {} # mapping job ids to jobs for all jobs being processed. @@ -177,13 +219,16 @@ class ir_cron(osv.osv, netsvc.Agent): for job in cr.dictfetchall(): task_cr = db.cursor() task_job = None + jobs[job['id']] = job try: + # Try to lock the job... task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False) task_job = task_cr.dictfetchall()[0] - jobs[job['id']] = job except psycopg2.OperationalError, e: if e.pgcode == '55P03': # Class 55: Object not in prerequisite state, 55P03: lock_not_available + # ... and fail. + print ">>>", job['name'], " is already being processed" continue else: raise @@ -191,6 +236,8 @@ class ir_cron(osv.osv, netsvc.Agent): if not task_job: task_cr.close() + # ... and succeed. + print ">>> taking care of", job['name'] task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now)) # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default) task_thread.setDaemon(False) @@ -202,17 +249,23 @@ class ir_cron(osv.osv, netsvc.Agent): else: cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active') next_call = cr.dictfetchone()['min_next_call'] + print ">>> possibility at ", next_call + + # Wake up time, taking the smallest processed job nextcall value. + for job in jobs.values(): + nextcall = self._compute_nextcall(job, now) + print ">>> or at ", nextcall + if not nextcall: + continue + if not next_call or nextcall < next_call: + next_call = nextcall + print ">>> rescheduling at", next_call + if next_call: next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S')) else: next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day - # Take the smallest nextcall value. - for job in jobs.values(): - nextcall = self._compute_nextcall(job, now) - if nextcall < next_call: - next_call = nextcall - self.setAlarm(self._poolJobs, next_call, db_name, db_name) except Exception, ex: diff --git a/openerp/netsvc.py b/openerp/netsvc.py index 3a2add26421..26f7cbe7875 100644 --- a/openerp/netsvc.py +++ b/openerp/netsvc.py @@ -283,6 +283,18 @@ class Agent(object): for task in cls.__tasks_by_db[db_name]: task[0] = 0 + @classmethod + def reschedule_in_advance(cls, function, timestamp, db_name, *args, **kwargs): + # Cancel the previous task if any. + old_timestamp = None + if db_name in cls.__tasks_by_db: + for task in cls.__tasks_by_db[db_name]: + if task[2] == function and timestamp < task[0]: + old_timestamp = task[0] + task[0] = 0 + if not old_timestamp or timestamp < old_timestamp: + cls.setAlarm(function, timestamp, db_name, *args, **kwargs) + @classmethod def quit(cls): cls.cancel(None)