bitbake: server/process, server/xmlrpc, runqueue: Use select.select() on fds, not time.sleep()

The existing backend server implementations were inefficient since they
were sleeping for the full length of the timeouts rather than being woken when
there was data ready for them. It was assumed they would wake and perhaps did
when we forked processes directory but that is no longer the case.

This updates both the process and xmlrpc backends to wait using select(). This
does mean we need to pass the file descriptors to wait on from the internals
who know which these file descriptors are but this is a logical improvement.

Tests of a pathaolgical load on the process server of ~420 rapid tasks
executed on a server with BB_NUMBER_THREAD=48  went from a wall clock
measurement of the overall command execution time of 75s to a much more
reasonable 24s.

(Bitbake rev: 9bee497960889d9baa0a4284d79a384b18a8e826)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Richard Purdie 2013-08-31 23:40:55 +01:00
parent d63e6a925a
commit b306d7d9a4
3 changed files with 28 additions and 21 deletions

View File

@ -895,6 +895,14 @@ class RunQueue:
if self.fakeworkerpipe: if self.fakeworkerpipe:
self.fakeworkerpipe.read() self.fakeworkerpipe.read()
def active_fds(self):
fds = []
if self.workerpipe:
fds.append(self.workerpipe.input)
if self.fakeworkerpipe:
fds.append(self.fakeworkerpipe.input)
return fds
def check_stamp_task(self, task, taskname = None, recurse = False, cache = None): def check_stamp_task(self, task, taskname = None, recurse = False, cache = None):
def get_timestamp(f): def get_timestamp(f):
try: try:
@ -972,7 +980,7 @@ class RunQueue:
(if the abort on failure configuration option isn't set) (if the abort on failure configuration option isn't set)
""" """
retval = 0.5 retval = True
if self.state is runQueuePrepare: if self.state is runQueuePrepare:
self.rqexe = RunQueueExecuteDummy(self) self.rqexe = RunQueueExecuteDummy(self)
@ -1375,7 +1383,7 @@ class RunQueueExecuteTasks(RunQueueExecute):
if self.stats.active > 0: if self.stats.active > 0:
self.rq.read_workers() self.rq.read_workers()
return 0.5 return self.rq.active_fds()
if len(self.failed_fnids) != 0: if len(self.failed_fnids) != 0:
self.rq.state = runQueueFailed self.rq.state = runQueueFailed

View File

@ -29,6 +29,7 @@ import os
import signal import signal
import sys import sys
import time import time
import select
from Queue import Empty from Queue import Empty
from multiprocessing import Event, Process, util, Queue, Pipe, queues from multiprocessing import Event, Process, util, Queue, Pipe, queues
@ -105,7 +106,7 @@ class ProcessServer(Process, BaseImplServer):
command = self.command_channel.recv() command = self.command_channel.recv()
self.runCommand(command) self.runCommand(command)
self.idle_commands(.1) self.idle_commands(.1, [self.event_queue._reader, self.command_channel])
except Exception: except Exception:
logger.exception('Running command %s', command) logger.exception('Running command %s', command)
@ -115,7 +116,7 @@ class ProcessServer(Process, BaseImplServer):
self.cooker.stop() self.cooker.stop()
self.idle_commands(.1) self.idle_commands(.1)
def idle_commands(self, delay): def idle_commands(self, delay, fds = []):
nextsleep = delay nextsleep = delay
for function, data in self._idlefuns.items(): for function, data in self._idlefuns.items():
@ -127,15 +128,15 @@ class ProcessServer(Process, BaseImplServer):
nextsleep = None nextsleep = None
elif nextsleep is None: elif nextsleep is None:
continue continue
elif retval < nextsleep: else:
nextsleep = retval fds = fds + retval
except SystemExit: except SystemExit:
raise raise
except Exception: except Exception:
logger.exception('Running idle function') logger.exception('Running idle function')
if nextsleep is not None: if nextsleep is not None:
time.sleep(nextsleep) select.select(fds,[],[],nextsleep)
def runCommand(self, command): def runCommand(self, command):
""" """

View File

@ -264,12 +264,9 @@ class XMLRPCServer(SimpleXMLRPCServer, BaseImplServer):
Serve Requests. Overloaded to honor a quit command Serve Requests. Overloaded to honor a quit command
""" """
self.quit = False self.quit = False
self.timeout = 0 # Run Idle calls for our first callback
while not self.quit: while not self.quit:
#print "Idle queue length %s" % len(self._idlefuns) fds = [self]
self.handle_request() nextsleep = 0.1
#print "Idle timeout, running idle functions"
nextsleep = None
for function, data in self._idlefuns.items(): for function, data in self._idlefuns.items():
try: try:
retval = function(self, data, False) retval = function(self, data, False)
@ -277,21 +274,22 @@ class XMLRPCServer(SimpleXMLRPCServer, BaseImplServer):
del self._idlefuns[function] del self._idlefuns[function]
elif retval is True: elif retval is True:
nextsleep = 0 nextsleep = 0
elif nextsleep is 0: else:
continue fds = fds + retval
elif nextsleep is None:
nextsleep = retval
elif retval < nextsleep:
nextsleep = retval
except SystemExit: except SystemExit:
raise raise
except: except:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
pass pass
if nextsleep is None and len(self._idlefuns) > 0:
nextsleep = 0 socktimeout = self.socket.gettimeout() or nextsleep
self.timeout = nextsleep socktimeout = min(socktimeout, nextsleep)
# Mirror what BaseServer handle_request would do
fd_sets = select.select(fds, [], [], socktimeout)
if fd_sets[0] and self in fd_sets[0]:
self._handle_request_noblock()
# Tell idle functions we're exiting # Tell idle functions we're exiting
for function, data in self._idlefuns.items(): for function, data in self._idlefuns.items():
try: try: