[MERGE] merged long-polling branch:
When using --workers, an evented (for longpolling) worker is started. When passing --gevent, all the server is evented but the cron is disabled. bzr revid: vmt@openerp.com-20130218153225-w1yft9je0b15owdv
This commit is contained in:
commit
3ee1923ca3
|
@ -230,6 +230,14 @@ def main(args):
|
|||
check_root_user()
|
||||
openerp.tools.config.parse_config(args)
|
||||
|
||||
if openerp.tools.config.options["gevent"]:
|
||||
openerp.evented = True
|
||||
_logger.info('Using gevent mode')
|
||||
import gevent.monkey
|
||||
gevent.monkey.patch_all()
|
||||
import gevent_psycopg2
|
||||
gevent_psycopg2.monkey_patch()
|
||||
|
||||
check_postgres_user()
|
||||
openerp.netsvc.init_logger()
|
||||
report_configuration()
|
||||
|
|
|
@ -31,6 +31,7 @@ import time
|
|||
import cron
|
||||
import wsgi_server
|
||||
|
||||
import openerp
|
||||
import openerp.modules
|
||||
import openerp.netsvc
|
||||
import openerp.osv
|
||||
|
@ -85,12 +86,14 @@ def start_services():
|
|||
# Start the WSGI server.
|
||||
wsgi_server.start_service()
|
||||
# Start the main cron thread.
|
||||
cron.start_service()
|
||||
if not openerp.evented:
|
||||
cron.start_service()
|
||||
|
||||
def stop_services():
|
||||
""" Stop all services. """
|
||||
# stop services
|
||||
cron.stop_service()
|
||||
if not openerp.evented:
|
||||
cron.stop_service()
|
||||
wsgi_server.stop_service()
|
||||
|
||||
_logger.info("Initiating shutdown")
|
||||
|
|
|
@ -35,6 +35,7 @@ class Multicorn(object):
|
|||
def __init__(self, app):
|
||||
# config
|
||||
self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
|
||||
self.long_polling_address = (config['xmlrpc_interface'] or '0.0.0.0', config['longpolling_port'])
|
||||
self.population = config['workers']
|
||||
self.timeout = config['limit_time_real']
|
||||
self.limit_request = config['limit_request']
|
||||
|
@ -45,6 +46,7 @@ class Multicorn(object):
|
|||
self.socket = None
|
||||
self.workers_http = {}
|
||||
self.workers_cron = {}
|
||||
self.workers_longpolling = {}
|
||||
self.workers = {}
|
||||
self.generation = 0
|
||||
self.queue = []
|
||||
|
@ -131,7 +133,8 @@ class Multicorn(object):
|
|||
def process_timeout(self):
|
||||
now = time.time()
|
||||
for (pid, worker) in self.workers.items():
|
||||
if now - worker.watchdog_time >= worker.watchdog_timeout:
|
||||
if (worker.watchdog_timeout is not None) and \
|
||||
(now - worker.watchdog_time >= worker.watchdog_timeout):
|
||||
_logger.error("Worker (%s) timeout", pid)
|
||||
self.worker_kill(pid, signal.SIGKILL)
|
||||
|
||||
|
@ -140,6 +143,8 @@ class Multicorn(object):
|
|||
self.worker_spawn(WorkerHTTP, self.workers_http)
|
||||
while len(self.workers_cron) < config['max_cron_threads']:
|
||||
self.worker_spawn(WorkerCron, self.workers_cron)
|
||||
while len(self.workers_longpolling) < 1:
|
||||
self.worker_spawn(WorkerLongPolling, self.workers_longpolling)
|
||||
|
||||
def sleep(self):
|
||||
try:
|
||||
|
@ -178,6 +183,12 @@ class Multicorn(object):
|
|||
self.socket.setblocking(0)
|
||||
self.socket.bind(self.address)
|
||||
self.socket.listen(8)
|
||||
# long polling socket
|
||||
self.long_polling_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.long_polling_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.long_polling_socket.setblocking(0)
|
||||
self.long_polling_socket.bind(self.long_polling_address)
|
||||
self.long_polling_socket.listen(8)
|
||||
|
||||
def stop(self, graceful=True):
|
||||
if graceful:
|
||||
|
@ -221,6 +232,7 @@ class Worker(object):
|
|||
self.multi = multi
|
||||
self.watchdog_time = time.time()
|
||||
self.watchdog_pipe = multi.pipe_new()
|
||||
# Can be set to None if no watchdog is desired.
|
||||
self.watchdog_timeout = multi.timeout
|
||||
self.ppid = os.getpid()
|
||||
self.pid = None
|
||||
|
@ -340,6 +352,26 @@ class WorkerHTTP(Worker):
|
|||
Worker.start(self)
|
||||
self.server = WorkerBaseWSGIServer(self.multi.app)
|
||||
|
||||
class WorkerLongPolling(Worker):
|
||||
""" Long polling workers """
|
||||
def __init__(self, multi):
|
||||
super(WorkerLongPolling, self).__init__(multi)
|
||||
# Disable the watchdog feature for this kind of worker.
|
||||
self.watchdog_timeout = None
|
||||
|
||||
def start(self):
|
||||
openerp.evented = True
|
||||
_logger.info('Using gevent mode')
|
||||
import gevent.monkey
|
||||
gevent.monkey.patch_all()
|
||||
import gevent_psycopg2
|
||||
gevent_psycopg2.monkey_patch()
|
||||
|
||||
Worker.start(self)
|
||||
from gevent.wsgi import WSGIServer
|
||||
self.server = WSGIServer(self.multi.long_polling_socket, self.multi.app)
|
||||
self.server.serve_forever()
|
||||
|
||||
class WorkerBaseWSGIServer(werkzeug.serving.BaseWSGIServer):
|
||||
""" werkzeug WSGI Server patched to allow using an external listen socket
|
||||
"""
|
||||
|
|
|
@ -426,7 +426,11 @@ def serve(interface, port, threaded):
|
|||
"""
|
||||
|
||||
global httpd
|
||||
httpd = werkzeug.serving.make_server(interface, port, application, threaded=threaded)
|
||||
if not openerp.evented:
|
||||
httpd = werkzeug.serving.make_server(interface, port, application, threaded=threaded)
|
||||
else:
|
||||
from gevent.wsgi import WSGIServer
|
||||
httpd = WSGIServer((interface, port), application)
|
||||
httpd.serve_forever()
|
||||
|
||||
def start_service():
|
||||
|
@ -446,8 +450,13 @@ def stop_service():
|
|||
The server is supposed to have been started by start_server() above.
|
||||
"""
|
||||
if httpd:
|
||||
httpd.shutdown()
|
||||
close_socket(httpd.socket)
|
||||
if not openerp.evented:
|
||||
httpd.shutdown()
|
||||
close_socket(httpd.socket)
|
||||
else:
|
||||
import gevent
|
||||
httpd.stop()
|
||||
gevent.shutdown()
|
||||
|
||||
def close_socket(sock):
|
||||
""" Closes a socket instance cleanly
|
||||
|
|
|
@ -107,6 +107,7 @@ class configmanager(object):
|
|||
help="specify additional addons paths (separated by commas).",
|
||||
action="callback", callback=self._check_addons_path, nargs=1, type="string")
|
||||
group.add_option("--load", dest="server_wide_modules", help="Comma-separated list of server-wide modules default=web")
|
||||
group.add_option("--gevent", dest="gevent", action="store_true", my_default=False, help="Activate the GEvent mode, this also desactivate the cron.")
|
||||
parser.add_option_group(group)
|
||||
|
||||
# XML-RPC / HTTP
|
||||
|
@ -119,6 +120,8 @@ class configmanager(object):
|
|||
help="disable the XML-RPC protocol")
|
||||
group.add_option("--proxy-mode", dest="proxy_mode", action="store_true", my_default=False,
|
||||
help="Enable correct behavior when behind a reverse proxy")
|
||||
group.add_option("--longpolling-port", dest="longpolling_port", my_default=8072,
|
||||
help="specify the TCP port for longpolling requests", type="int")
|
||||
parser.add_option_group(group)
|
||||
|
||||
# XML-RPC / HTTPS
|
||||
|
@ -376,7 +379,8 @@ class configmanager(object):
|
|||
self.options['pidfile'] = False
|
||||
|
||||
# if defined dont take the configfile value even if the defined value is None
|
||||
keys = ['xmlrpc_interface', 'xmlrpc_port', 'db_name', 'db_user', 'db_password', 'db_host',
|
||||
keys = ['xmlrpc_interface', 'xmlrpc_port', 'longpolling_port',
|
||||
'db_name', 'db_user', 'db_password', 'db_host',
|
||||
'db_port', 'db_template', 'logfile', 'pidfile', 'smtp_port',
|
||||
'email_from', 'smtp_server', 'smtp_user', 'smtp_password',
|
||||
'netrpc_interface', 'netrpc_port', 'db_maxconn', 'import_partial', 'addons_path',
|
||||
|
@ -407,7 +411,7 @@ class configmanager(object):
|
|||
'list_db', 'xmlrpcs', 'proxy_mode',
|
||||
'test_file', 'test_enable', 'test_commit', 'test_report_directory',
|
||||
'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads', 'unaccent',
|
||||
'workers', 'limit_memory_hard', 'limit_memory_soft', 'limit_time_cpu', 'limit_time_real', 'limit_request'
|
||||
'workers', 'limit_memory_hard', 'limit_memory_soft', 'limit_time_cpu', 'limit_time_real', 'limit_request', 'gevent'
|
||||
]
|
||||
|
||||
for arg in keys:
|
||||
|
|
Loading…
Reference in New Issue