bitbake: process: Improve exit handling and hangs

It turns out we have a number of different ways the process server termination can
hang. If we call cancel_join_thread() on the event queue, it means that it can be left
containing partial data. This means the reading of the event queue in the terminate()
function can hang, the timeout and block parameters to Queue.get() don't make any
difference.

Equally, if we don't call cancel_join_thread(), the join_thread in terminate()
will hang giving a different deadlock.

The best solution I could find is to loop over the process is_alive() after requesting
it stops,  trying to join the thread and if that fails, try and flush the event
queue again.

It wasn't clear what difference a force option should make in this case, we're
gracefully trying to empty queues and shut down regardless of whether its a SIGTERM
so I've simply removed the force option.

(Bitbake rev: c5c8f33ca4b81877a0115887849881001b745bf0)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Richard Purdie 2013-08-24 11:56:31 +00:00
parent aa0b237144
commit a03a423c60
1 changed files with 17 additions and 19 deletions

View File

@ -105,7 +105,7 @@ class ProcessServer(Process, BaseImplServer):
except Exception:
logger.exception('Running command %s', command)
self.event_queue.cancel_join_thread()
self.event_queue.close()
bb.event.unregister_UIHhandler(self.event_handle)
self.command_channel.close()
self.cooker.stop()
@ -150,27 +150,25 @@ class BitBakeProcessServerConnection(BitBakeBaseServerConnection):
self.connection = ServerCommunicator(self.ui_channel)
self.events = self.event_queue
def terminate(self, force = False):
def terminate(self):
def flushevents():
while True:
try:
event = self.event_queue.get(block=False)
except (Empty, IOError):
break
if isinstance(event, logging.LogRecord):
logger.handle(event)
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.procserver.stop()
if force:
self.procserver.join(0.5)
if self.procserver.is_alive():
self.procserver.terminate()
self.procserver.join()
else:
self.procserver.join()
while True:
try:
event = self.event_queue.get(block=False)
except (Empty, IOError):
break
if isinstance(event, logging.LogRecord):
logger.handle(event)
while self.procserver.is_alive():
flushevents()
self.procserver.join(0.1)
self.ui_channel.close()
self.event_queue.close()
if force:
sys.exit(1)
# Wrap Queue to provide API which isn't server implementation specific
class ProcessEventQueue(multiprocessing.queues.Queue):
@ -203,5 +201,5 @@ class BitBakeServer(BitBakeBaseServer):
def establishConnection(self):
self.connection = BitBakeProcessServerConnection(self.serverImpl, self.ui_channel, self.event_queue)
signal.signal(signal.SIGTERM, lambda i, s: self.connection.terminate(force=True))
signal.signal(signal.SIGTERM, lambda i, s: self.connection.terminate())
return self.connection