bitbake/PRservice: Added no_hist mode and export/import.

[YOCTO #1556]
1. Added the package_arch into the index to the DB table. Because the
change in PACKAGE_ARCH will results in different checksum, and it is
better to have seperate PR value domains for differnt PACKAGE_ARCH of
the same pakcage.

2. Changed the PR service to operate in no history mode. In this mode,
the for a given query tuple (version, pkgarch, checksum), the returned
value will be the largest among all the values of the same (version,
pkgarch). This means the PR value returned can NOT be decremented.

3. Added export function. For each (version, pkgarch) tuple, only the
record with the maximum value will be exported.

4. Added import function. The record will only be imported if the
imported value is larger than the value stored in the DB with the same
(version, pkgarch, checksum) tuple.

(Bitbake rev: 379567ee879dcdc09a51f7f1212bde1076147a6f)

Signed-off-by: Lianhao Lu <lianhao.lu@intel.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Lianhao Lu 2012-01-10 14:13:49 +08:00 committed by Richard Purdie
parent 4a8a3c503f
commit 30a9bc6c92
4 changed files with 263 additions and 80 deletions

View File

@ -16,31 +16,34 @@ PRPORT_DEFAULT=8585
def main(): def main():
parser = optparse.OptionParser( parser = optparse.OptionParser(
version="Bitbake PR Service Core version %s, %%prog version %s" % (prserv.__version__, __version__), version="Bitbake PR Service Core version %s, %%prog version %s" % (prserv.__version__, __version__),
usage = "%prog [options]") usage = "%prog < --start | --stop > [options]")
parser.add_option("-f", "--file", help="database filename(default prserv.db)", action="store", parser.add_option("-f", "--file", help="database filename(default: prserv.db)", action="store",
dest="dbfile", type="string", default="prserv.db") dest="dbfile", type="string", default="prserv.db")
parser.add_option("-l", "--log", help="log filename(default prserv.log)", action="store", parser.add_option("-l", "--log", help="log filename(default: prserv.log)", action="store",
dest="logfile", type="string", default="prserv.log") dest="logfile", type="string", default="prserv.log")
parser.add_option("--loglevel", help="logging level, i.e. CRITICAL, ERROR, WARNING, INFO, DEBUG", parser.add_option("--loglevel", help="logging level, i.e. CRITICAL, ERROR, WARNING, INFO, DEBUG",
action = "store", type="string", dest="loglevel", default = "WARNING") action = "store", type="string", dest="loglevel", default = "INFO")
parser.add_option("--start", help="start daemon", parser.add_option("--start", help="start daemon",
action="store_true", dest="start", default="True") action="store_true", dest="start")
parser.add_option("--stop", help="stop daemon", parser.add_option("--stop", help="stop daemon",
action="store_false", dest="start") action="store_true", dest="stop")
parser.add_option("--host", help="ip address to bind", action="store", parser.add_option("--host", help="ip address to bind", action="store",
dest="host", type="string", default=PRHOST_DEFAULT) dest="host", type="string", default=PRHOST_DEFAULT)
parser.add_option("--port", help="port number(default 8585)", action="store", parser.add_option("--port", help="port number(default: 8585)", action="store",
dest="port", type="int", default=PRPORT_DEFAULT) dest="port", type="int", default=PRPORT_DEFAULT)
options, args = parser.parse_args(sys.argv) options, args = parser.parse_args(sys.argv)
prserv.init_logger(os.path.abspath(options.logfile),options.loglevel) prserv.init_logger(os.path.abspath(options.logfile),options.loglevel)
if options.start: if options.start:
prserv.serv.start_daemon(options) ret=prserv.serv.start_daemon(dbfile=options.dbfile, interface=(options.host, options.port),
logfile=os.path.abspath(options.logfile))
elif options.stop:
ret=prserv.serv.stop_daemon(options.host, options.port)
else: else:
prserv.serv.stop_daemon() ret=parser.print_help()
return ret
if __name__ == "__main__": if __name__ == "__main__":
try: try:

View File

@ -7,5 +7,8 @@ def init_logger(logfile, loglevel):
numeric_level = getattr(logging, loglevel.upper(), None) numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int): if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % loglevel) raise ValueError('Invalid log level: %s' % loglevel)
logging.basicConfig(level=numeric_level, filename=logfile) FORMAT = '%(asctime)-15s %(message)s'
logging.basicConfig(level=numeric_level, filename=logfile, format=FORMAT)
class NotFoundError(StandardError):
pass

View File

@ -1,9 +1,7 @@
import logging import logging
import os.path import os.path
import errno import errno
import sys import prserv
import warnings
import sqlite3
try: try:
import sqlite3 import sqlite3
@ -14,73 +12,220 @@ sqlversion = sqlite3.sqlite_version_info
if sqlversion[0] < 3 or (sqlversion[0] == 3 and sqlversion[1] < 3): if sqlversion[0] < 3 or (sqlversion[0] == 3 and sqlversion[1] < 3):
raise Exception("sqlite3 version 3.3.0 or later is required.") raise Exception("sqlite3 version 3.3.0 or later is required.")
class NotFoundError(StandardError):
pass
class PRTable(): class PRTable():
def __init__(self,cursor,table): def __init__(self, conn, table, nohist):
self.cursor = cursor self.conn = conn
self.table = table self.nohist = nohist
if nohist:
self.table = "%s_nohist" % table
else:
self.table = "%s_hist" % table
#create the table
self._execute("CREATE TABLE IF NOT EXISTS %s \ self._execute("CREATE TABLE IF NOT EXISTS %s \
(version TEXT NOT NULL, \ (version TEXT NOT NULL, \
pkgarch TEXT NOT NULL, \
checksum TEXT NOT NULL, \ checksum TEXT NOT NULL, \
value INTEGER, \ value INTEGER, \
PRIMARY KEY (version,checksum));" PRIMARY KEY (version, pkgarch, checksum));" % self.table)
% table)
def _execute(self, *query): def _execute(self, *query):
"""Execute a query, waiting to acquire a lock if necessary""" """Execute a query, waiting to acquire a lock if necessary"""
count = 0 count = 0
while True: while True:
try: try:
return self.cursor.execute(*query) return self.conn.execute(*query)
except sqlite3.OperationalError as exc: except sqlite3.OperationalError as exc:
if 'database is locked' in str(exc) and count < 500: if 'database is locked' in str(exc) and count < 500:
count = count + 1 count = count + 1
continue continue
raise raise exc
except sqlite3.IntegrityError as exc:
print "Integrity error %s" % str(exc)
break
def getValue(self, version, checksum): def _getValueHist(self, version, pkgarch, checksum):
data=self._execute("SELECT value FROM %s WHERE version=? AND checksum=?;" % self.table, data=self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
(version,checksum)) (version, pkgarch, checksum))
row=data.fetchone() row=data.fetchone()
if row != None: if row != None:
return row[0] return row[0]
else: else:
#no value found, try to insert #no value found, try to insert
self._execute("INSERT INTO %s VALUES (?, ?, (select ifnull(max(value)+1,0) from %s where version=?));" try:
self._execute("BEGIN")
self._execute("INSERT OR ROLLBACK INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));"
% (self.table,self.table), % (self.table,self.table),
(version,checksum,version)) (version,pkgarch, checksum,version, pkgarch))
data=self._execute("SELECT value FROM %s WHERE version=? AND checksum=?;" % self.table, self.conn.commit()
(version,checksum)) except sqlite3.IntegrityError as exc:
logging.error(str(exc))
data=self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
(version, pkgarch, checksum))
row=data.fetchone() row=data.fetchone()
if row != None: if row != None:
return row[0] return row[0]
else: else:
raise NotFoundError raise prserv.NotFoundError
def _getValueNohist(self, version, pkgarch, checksum):
data=self._execute("SELECT value FROM %s \
WHERE version=? AND pkgarch=? AND checksum=? AND \
value >= (select max(value) from %s where version=? AND pkgarch=?);"
% (self.table, self.table),
(version, pkgarch, checksum, version, pkgarch))
row=data.fetchone()
if row != None:
return row[0]
else:
#no value found, try to insert
try:
self._execute("BEGIN")
self._execute("INSERT OR REPLACE INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));"
% (self.table,self.table),
(version, pkgarch, checksum, version, pkgarch))
self.conn.commit()
except sqlite3.IntegrityError as exc:
logging.error(str(exc))
self.conn.rollback()
data=self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
(version, pkgarch, checksum))
row=data.fetchone()
if row != None:
return row[0]
else:
raise prserv.NotFoundError
def getValue(self, version, pkgarch, checksum):
if self.nohist:
return self._getValueNohist(version, pkgarch, checksum)
else:
return self._getValueHist(version, pkgarch, checksum)
def _importHist(self, version, pkgarch, checksum, value):
val = None
data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
(version, pkgarch, checksum))
row = data.fetchone()
if row != None:
val=row[0]
else:
#no value found, try to insert
try:
self._execute("BEGIN")
self._execute("INSERT OR ROLLBACK INTO %s VALUES (?, ?, ?, ?);" % (self.table),
(version, pkgarch, checksum, value))
self.conn.commit()
except sqlite3.IntegrityError as exc:
logging.error(str(exc))
data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
(version, pkgarch, checksum))
row = data.fetchone()
if row != None:
val = row[0]
return val
def _importNohist(self, version, pkgarch, checksum, value):
try:
#try to insert
self._execute("BEGIN")
self._execute("INSERT OR ROLLBACK INTO %s VALUES (?, ?, ?, ?);" % (self.table),
(version, pkgarch, checksum,value))
self.conn.commit()
except sqlite3.IntegrityError as exc:
#already have the record, try to update
try:
self._execute("BEGIN")
self._execute("UPDATE %s SET value=? WHERE version=? AND pkgarch=? AND checksum=? AND value<?"
% (self.table),
(value,version,pkgarch,checksum,value))
self.conn.commit()
except sqlite3.IntegrityError as exc:
logging.error(str(exc))
data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=? AND value>=?;" % self.table,
(version,pkgarch,checksum,value))
row=data.fetchone()
if row != None:
return row[0]
else:
return None
def importone(self, version, pkgarch, checksum, value):
if self.nohist:
return self._importNohist(version, pkgarch, checksum, value)
else:
return self._importHist(version, pkgarch, checksum, value)
def export(self, version, pkgarch, checksum, colinfo):
metainfo = {}
#column info
if colinfo:
metainfo['tbl_name'] = self.table
metainfo['core_ver'] = prserv.__version__
metainfo['col_info'] = []
data = self._execute("PRAGMA table_info(%s);" % self.table)
for row in data:
col = {}
col['name'] = row['name']
col['type'] = row['type']
col['notnull'] = row['notnull']
col['dflt_value'] = row['dflt_value']
col['pk'] = row['pk']
metainfo['col_info'].append(col)
#data info
datainfo = []
if self.nohist:
sqlstmt = "SELECT T1.version, T1.pkgarch, T1.checksum, T1.value FROM %s as T1, \
(SELECT version,pkgarch,max(value) as maxvalue FROM %s GROUP BY version,pkgarch) as T2 \
WHERE T1.version=T2.version AND T1.pkgarch=T2.pkgarch AND T1.value=T2.maxvalue " % (self.table, self.table)
else:
sqlstmt = "SELECT * FROM %s as T1 WHERE 1=1 " % self.table
sqlarg = []
where = ""
if version:
where += "AND T1.version=? "
sqlarg.append(str(version))
if pkgarch:
where += "AND T1.pkgarch=? "
sqlarg.append(str(pkgarch))
if checksum:
where += "AND T1.checksum=? "
sqlarg.append(str(checksum))
sqlstmt += where + ";"
if len(sqlarg):
data = self._execute(sqlstmt, tuple(sqlarg))
else:
data = self._execute(sqlstmt)
for row in data:
if row['version']:
col = {}
col['version'] = row['version']
col['pkgarch'] = row['pkgarch']
col['checksum'] = row['checksum']
col['value'] = row['value']
datainfo.append(col)
return (metainfo, datainfo)
class PRData(object): class PRData(object):
"""Object representing the PR database""" """Object representing the PR database"""
def __init__(self, filename): def __init__(self, filename, nohist=True):
self.filename=os.path.abspath(filename) self.filename=os.path.abspath(filename)
self.nohist=nohist
#build directory hierarchy #build directory hierarchy
try: try:
os.makedirs(os.path.dirname(self.filename)) os.makedirs(os.path.dirname(self.filename))
except OSError as e: except OSError as e:
if e.errno != errno.EEXIST: if e.errno != errno.EEXIST:
raise e raise e
self.connection=sqlite3.connect(self.filename, timeout=5, self.connection=sqlite3.connect(self.filename, isolation_level="DEFERRED")
isolation_level=None) self.connection.row_factory=sqlite3.Row
self.cursor=self.connection.cursor()
self._tables={} self._tables={}
def __del__(self): def __del__(self):
print "PRData: closing DB %s" % self.filename
self.connection.close() self.connection.close()
def __getitem__(self,tblname): def __getitem__(self,tblname):
@ -90,11 +235,11 @@ class PRData(object):
if tblname in self._tables: if tblname in self._tables:
return self._tables[tblname] return self._tables[tblname]
else: else:
tableobj = self._tables[tblname] = PRTable(self.cursor, tblname) tableobj = self._tables[tblname] = PRTable(self.connection, tblname, self.nohist)
return tableobj return tableobj
def __delitem__(self, tblname): def __delitem__(self, tblname):
if tblname in self._tables: if tblname in self._tables:
del self._tables[tblname] del self._tables[tblname]
logging.info("drop table %s" % (tblname)) logging.info("drop table %s" % (tblname))
self.cursor.execute("DROP TABLE IF EXISTS %s;" % tblname) self.connection.execute("DROP TABLE IF EXISTS %s;" % tblname)

View File

@ -21,6 +21,8 @@ class Handler(SimpleXMLRPCRequestHandler):
raise raise
return value return value
PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
class PRServer(SimpleXMLRPCServer): class PRServer(SimpleXMLRPCServer):
pidfile="/tmp/PRServer.pid" pidfile="/tmp/PRServer.pid"
def __init__(self, dbfile, logfile, interface, daemon=True): def __init__(self, dbfile, logfile, interface, daemon=True):
@ -34,20 +36,33 @@ class PRServer(SimpleXMLRPCServer):
self.host, self.port = self.socket.getsockname() self.host, self.port = self.socket.getsockname()
self.db=prserv.db.PRData(dbfile) self.db=prserv.db.PRData(dbfile)
self.table=self.db["PRMAIN"] self.table=self.db["PRMAIN"]
self.pidfile=PIDPREFIX % interface
self.register_function(self.getPR, "getPR") self.register_function(self.getPR, "getPR")
self.register_function(self.quit, "quit") self.register_function(self.quit, "quit")
self.register_function(self.ping, "ping") self.register_function(self.ping, "ping")
self.register_function(self.export, "export")
self.register_function(self.importone, "importone")
self.register_introspection_functions() self.register_introspection_functions()
def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
try:
return self.table.export(version, pkgarch, checksum, colinfo)
except sqlite3.Error as exc:
logging.error(str(exc))
return None
def importone(self, version, pkgarch, checksum, value):
return self.table.importone(version, pkgarch, checksum, value)
def ping(self): def ping(self):
return not self.quit return not self.quit
def getPR(self, version, checksum): def getPR(self, version, pkgarch, checksum):
try: try:
return self.table.getValue(version,checksum) return self.table.getValue(version, pkgarch, checksum)
except prserv.NotFoundError: except prserv.NotFoundError:
logging.error("can not find value for (%s, %s)",version,checksum) logging.error("can not find value for (%s, %s)",version, checksum)
return None return None
except sqlite3.Error as exc: except sqlite3.Error as exc:
logging.error(str(exc)) logging.error(str(exc))
@ -69,28 +84,34 @@ class PRServer(SimpleXMLRPCServer):
def start(self): def start(self):
if self.daemon is True: if self.daemon is True:
logging.info("PRServer: starting daemon...") logging.info("PRServer: try to start daemon...")
self.daemonize() self.daemonize()
else: else:
logging.info("PRServer: starting...") atexit.register(self.delpid)
pid = str(os.getpid())
pf = file(self.pidfile, 'w+')
pf.write("%s\n" % pid)
pf.write("%s\n" % self.host)
pf.write("%s\n" % self.port)
pf.close()
logging.info("PRServer: start success! DBfile: %s, IP: %s, PORT: %d" %
(self.dbfile, self.host, self.port))
self._serve_forever() self._serve_forever()
def delpid(self): def delpid(self):
os.remove(PRServer.pidfile) os.remove(self.pidfile)
def daemonize(self): def daemonize(self):
""" """
See Advanced Programming in the UNIX, Sec 13.3 See Advanced Programming in the UNIX, Sec 13.3
""" """
os.umask(0)
try: try:
pid = os.fork() pid = os.fork()
if pid > 0: if pid > 0:
sys.exit(0) #parent return instead of exit to give control
return
except OSError as e: except OSError as e:
sys.stderr.write("1st fork failed: %d %s\n" % (e.errno, e.strerror)) raise Exception("%s [%d]" % (e.strerror, e.errno))
sys.exit(1)
os.setsid() os.setsid()
""" """
@ -102,9 +123,9 @@ class PRServer(SimpleXMLRPCServer):
if pid > 0: #parent if pid > 0: #parent
sys.exit(0) sys.exit(0)
except OSError as e: except OSError as e:
sys.stderr.write("2nd fork failed: %d %s\n" % (e.errno, e.strerror)) raise Exception("%s [%d]" % (e.strerror, e.errno))
sys.exit(1)
os.umask(0)
os.chdir("/") os.chdir("/")
sys.stdout.flush() sys.stdout.flush()
@ -119,13 +140,15 @@ class PRServer(SimpleXMLRPCServer):
# write pidfile # write pidfile
atexit.register(self.delpid) atexit.register(self.delpid)
pid = str(os.getpid()) pid = str(os.getpid())
pf = file(PRServer.pidfile, 'w+') pf = file(self.pidfile, 'w')
pf.write("%s\n" % pid) pf.write("%s\n" % pid)
pf.write("%s\n" % self.host)
pf.write("%s\n" % self.port)
pf.close() pf.close()
logging.info("PRServer: starting daemon success! DBfile: %s, IP: %s, PORT: %s, PID: %s" %
(self.dbfile, self.host, self.port, pid))
self._serve_forever() self._serve_forever()
exit(0)
class PRServerConnection(): class PRServerConnection():
def __init__(self, host, port): def __init__(self, host, port):
@ -139,16 +162,22 @@ class PRServerConnection():
socket.setdefaulttimeout(2) socket.setdefaulttimeout(2)
try: try:
self.connection.quit() self.connection.quit()
except: except Exception as exc:
pass sys.stderr.write("%s\n" % str(exc))
def getPR(self, version, checksum): def getPR(self, version, pkgarch, checksum):
return self.connection.getPR(version, checksum) return self.connection.getPR(version, pkgarch, checksum)
def ping(self): def ping(self):
return self.connection.ping() return self.connection.ping()
def start_daemon(options): def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
return self.connection.export(version, pkgarch, checksum, colinfo)
def importone(self, version, pkgarch, checksum, value):
return self.connection.importone(version, pkgarch, checksum, value)
def start_daemon(dbfile, logfile, interface):
try: try:
pf = file(PRServer.pidfile,'r') pf = file(PRServer.pidfile,'r')
pid = int(pf.readline().strip()) pid = int(pf.readline().strip())
@ -159,40 +188,43 @@ def start_daemon(options):
if pid: if pid:
sys.stderr.write("pidfile %s already exist. Daemon already running?\n" sys.stderr.write("pidfile %s already exist. Daemon already running?\n"
% PRServer.pidfile) % PRServer.pidfile)
sys.exit(1) return 1
server = PRServer(options.dbfile, interface=(options.host, options.port), server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), interface)
logfile=os.path.abspath(options.logfile))
server.start() server.start()
return 0
def stop_daemon(): def stop_daemon(host, port):
pidfile = PIDPREFIX % (host, port)
try: try:
pf = file(PRServer.pidfile,'r') pf = file(pidfile,'r')
pid = int(pf.readline().strip()) pid = int(pf.readline().strip())
host = pf.readline().strip()
port = int(pf.readline().strip())
pf.close() pf.close()
except IOError: except IOError:
pid = None pid = None
if not pid: if not pid:
sys.stderr.write("pidfile %s does not exist. Daemon not running?\n" sys.stderr.write("pidfile %s does not exist. Daemon not running?\n"
% PRServer.pidfile) % pidfile)
sys.exit(1) return 1
PRServerConnection(host,port).terminate() PRServerConnection(host, port).terminate()
time.sleep(0.5) time.sleep(0.5)
try: try:
while 1: while 1:
os.kill(pid,signal.SIGTERM) os.kill(pid,signal.SIGTERM)
time.sleep(0.1) time.sleep(0.1)
except OSError as err: except OSError as e:
err = str(err) err = str(e)
if err.find("No such process") > 0: if err.find("No such process") > 0:
if os.path.exists(PRServer.pidfile): if os.path.exists(PRServer.pidfile):
os.remove(PRServer.pidfile) os.remove(PRServer.pidfile)
else: else:
print err raise Exception("%s [%d]" % (e.strerror, e.errno))
sys.exit(1)
return 0
def ping(host, port):
print PRServerConnection(host,port).ping()
return 0