odoo/addons/bus/bus.py

195 lines
7.2 KiB
Python

# -*- coding: utf-8 -*-
import datetime
import json
import logging
import select
import threading
import time
import random
import simplejson
import openerp
from openerp.osv import osv, fields
from openerp.http import request
from openerp.tools.misc import DEFAULT_SERVER_DATETIME_FORMAT
_logger = logging.getLogger(__name__)
TIMEOUT = 50
#----------------------------------------------------------
# Bus
#----------------------------------------------------------
def json_dump(v):
return simplejson.dumps(v, separators=(',', ':'))
def hashable(key):
if isinstance(key, list):
key = tuple(key)
return key
class ImBus(osv.Model):
_name = 'bus.bus'
_columns = {
'id' : fields.integer('Id'),
'create_date' : fields.datetime('Create date'),
'channel' : fields.char('Channel'),
'message' : fields.char('Message'),
}
def gc(self, cr, uid):
timeout_ago = datetime.datetime.utcnow()-datetime.timedelta(seconds=TIMEOUT*2)
domain = [('create_date', '<', timeout_ago.strftime(DEFAULT_SERVER_DATETIME_FORMAT))]
ids = self.search(cr, openerp.SUPERUSER_ID, domain)
self.unlink(cr, openerp.SUPERUSER_ID, ids)
def sendmany(self, cr, uid, notifications):
channels = set()
for channel, message in notifications:
channels.add(channel)
values = {
"channel" : json_dump(channel),
"message" : json_dump(message)
}
self.pool['bus.bus'].create(cr, openerp.SUPERUSER_ID, values)
cr.commit()
if random.random() < 0.01:
self.gc(cr, uid)
if channels:
with openerp.sql_db.db_connect('postgres').cursor() as cr2:
cr2.execute("notify imbus, %s", (json_dump(list(channels)),))
def sendone(self, cr, uid, channel, message):
self.sendmany(cr, uid, [[channel, message]])
def poll(self, cr, uid, channels, last=0):
# first poll return the notification in the 'buffer'
if last == 0:
timeout_ago = datetime.datetime.utcnow()-datetime.timedelta(seconds=TIMEOUT)
domain = [('create_date', '>', timeout_ago.strftime(DEFAULT_SERVER_DATETIME_FORMAT))]
else:
# else returns the unread notifications
domain = [('id','>',last)]
channels = [json_dump(c) for c in channels]
domain.append(('channel','in',channels))
notifications = self.search_read(cr, openerp.SUPERUSER_ID, domain)
return [{"id":notif["id"], "channel": simplejson.loads(notif["channel"]), "message":simplejson.loads(notif["message"])} for notif in notifications]
class ImDispatch(object):
def __init__(self):
self.channels = {}
def poll(self, dbname, channels, last, timeout=TIMEOUT):
# Dont hang ctrl-c for a poll request, we need to bypass private
# attribute access because we dont know before starting the thread that
# it will handle a longpolling request
if not openerp.evented:
current = threading.current_thread()
current._Thread__daemonic = True
# rename the thread to avoid tests waiting for a longpolling
current.setName("openerp.longpolling.request.%s" % current.ident)
registry = openerp.registry(dbname)
# immediatly returns if past notifications exist
with registry.cursor() as cr:
notifications = registry['bus.bus'].poll(cr, openerp.SUPERUSER_ID, channels, last)
# or wait for future ones
if not notifications:
event = self.Event()
for c in channels:
self.channels.setdefault(hashable(c), []).append(event)
try:
event.wait(timeout=timeout)
with registry.cursor() as cr:
notifications = registry['bus.bus'].poll(cr, openerp.SUPERUSER_ID, channels, last)
except Exception:
# timeout
pass
return notifications
def loop(self):
""" Dispatch postgres notifications to the relevant polling threads/greenlets """
_logger.info("Bus.loop listen imbus on db postgres")
with openerp.sql_db.db_connect('postgres').cursor() as cr:
conn = cr._cnx
cr.execute("listen imbus")
cr.commit();
while True:
if select.select([conn], [], [], TIMEOUT) == ([],[],[]):
pass
else:
conn.poll()
channels = []
while conn.notifies:
channels.extend(json.loads(conn.notifies.pop().payload))
# dispatch to local threads/greenlets
events = set()
for c in channels:
events.update(self.channels.pop(hashable(c),[]))
for e in events:
e.set()
def run(self):
while True:
try:
self.loop()
except Exception, e:
_logger.exception("Bus.loop error, sleep and retry")
time.sleep(TIMEOUT)
def start(self):
if openerp.evented:
# gevent mode
import gevent
self.Event = gevent.event.Event
gevent.spawn(self.run)
elif openerp.multi_process:
# disabled in prefork mode
return
else:
# threaded mode
self.Event = threading.Event
t = threading.Thread(name="%s.Bus" % __name__, target=self.run)
t.daemon = True
t.start()
return self
dispatch = ImDispatch().start()
#----------------------------------------------------------
# Controller
#----------------------------------------------------------
class Controller(openerp.http.Controller):
""" Examples:
openerp.jsonRpc('/longpolling/poll','call',{"channels":["c1"],last:0}).then(function(r){console.log(r)});
openerp.jsonRpc('/longpolling/send','call',{"channel":"c1","message":"m1"});
openerp.jsonRpc('/longpolling/send','call',{"channel":"c2","message":"m2"});
"""
@openerp.http.route('/longpolling/send', type="json", auth="public")
def send(self, channel, message):
if not isinstance(channel, basestring):
raise Exception("bus.Bus only string channels are allowed.")
registry, cr, uid, context = request.registry, request.cr, request.session.uid, request.context
return registry['bus.bus'].sendone(cr, uid, channel, message)
# override to add channels
def _poll(self, dbname, channels, last, options):
request.cr.close()
request._cr = None
return dispatch.poll(dbname, channels, last)
@openerp.http.route('/longpolling/poll', type="json", auth="public")
def poll(self, channels, last, options=None):
if options is None:
options = {}
if not dispatch:
raise Exception("bus.Bus unavailable")
if [c for c in channels if not isinstance(c, basestring)]:
print channels
raise Exception("bus.Bus only string channels are allowed.")
return self._poll(request.db, channels, last, options)
# vim:et: