[MERGE] run unit tests in addition to YAML tests.

bzr revid: vmt@openerp.com-20120305141355-djmbojnntj583ujd
This commit is contained in:
Vo Minh Thu 2012-03-05 15:13:55 +01:00
commit 7e3d080aa2
20 changed files with 531 additions and 295 deletions

View File

@ -1 +1,8 @@
OpenERP Server
''''''''''''''
.. toctree::
:maxdepth: 1
test-framework

100
doc/test-framework.rst Normal file
View File

@ -0,0 +1,100 @@
.. _test-framework:
Test framework
==============
In addition to the YAML-based tests, OpenERP uses the unittest2_ testing
framework to test both the core ``openerp`` package and its addons. For the
core and each addons, tests are divided between three (overlapping) sets:
1. A test suite that comprises all the tests that can be run right after the
addons is installed (or, for the core, right after a database is created).
That suite is called ``fast_suite`` and must contain only tests that can be run
frequently. Actually most of the tests should be considered fast enough to be
included in that ``fast_suite`` list and only tests that take a long time to run
(e.g. more than a minute) should not be listed. Those long tests should come up
pretty rarely.
2. A test suite called ``checks`` provides sanity checks. These tests are
invariants that must be full-filled at any time. They are expected to always
pass: obviously they must pass right after the module is installed (i.e. just
like the ``fast_suite`` tests), but they must also pass after any other module is
installed, after a migration, or even after the database was put in production
for a few months.
3. The third suite is made of all the tests: those provided by the two above
suites, but also tests that are not explicitely listed in ``fast_suite`` or
``checks``. They are not explicitely listed anywhere and are discovered
automatically.
As the sanity checks provide stronger guarantees about the code and database
structure, new tests must be added to the ``checks`` suite whenever it is
possible. Said with other words: one should try to avoid writing tests that
assume a freshly installed/unaltered module or database.
It is possible to have tests that are not listed in ``fast_suite`` or
``checks``. This is useful if a test takes a lot of time. By default, when
using the testing infrastructure, tests should run fast enough so that people
can use them frequently. One can also use that possiblity for tests that
require some complex setup before they can be successfuly run.
As a rule of thumb when writing a new test, try to add it to the ``checks``
suite. If it really needs that the module it belongs to is freshly installed,
add it to ``fast_suite``. Finally, if it can not be run in an acceptable time
frame, don't add it to any explicit list.
Writing tests
-------------
The tests must be developed under ``<addons-name>.tests`` (or ``openerp.tests``
for the core). For instance, with respect to the tests, a module ``foo``
should be organized as follow::
foo/
__init__.py # does not import .tests
tests/
__init__.py # import some of the tests sub-modules, and
# list them in fast_suite or checks
test_bar.py # contains unittest2 classes
test_baz.py # idem
... and so on ...
The two explicit lists of tests are thus the variables ``foo.tests.fast_suite``
and ``foo.tests.checks``. As an example, you can take a look at the
``openerp.tests`` module (which follows exactly the same conventions even if it
is not an addons).
Note that the ``fast_suite`` and ``checks`` variables are really lists of
module objects. They could be directly unittest2 suite objects if necessary in
the future.
Running the tests
-----------------
To run the tests (see :ref:`above <test-framework>` to learn how tests are
organized), the simplest way is to use the ``oe`` command (provided by the
``openerp-command`` project).
::
> oe run-tests # will run all the fast_suite tests
> oe run-tests -m openerp # will run all the fast_suite tests defined in `openerp.tests`
> oe run-tests -m sale # will run all the fast_suite tests defined in `openerp.addons.sale.tests`
> oe run-tests -m foo.test_bar # will run the tests defined in `openerp.addons.foo.tests.test_bar`
In addition to the above possibilities, when invoked with a non-existing module
(or module.sub-module) name, oe will reply with a list of available test
sub-modules.
Depending on the unittest2_ class that is used to write the tests (see
``openerp.tests.common`` for some helper classes that you can re-use), a database
may be created before the test is run, and the module providing the test will
be installed on that database.
Because creating a database, installing modules, and then dropping it is
expensive, it is possible to interleave the run of the ``fast_suite`` tests
with the initialization of a new database: the dabase is created, and after
each requested module is installed, its fast_suite tests are run. The database
is thus created and dropped (and the modules installed) only once.
.. _unittest2: http://pypi.python.org/pypi/unittest2

View File

@ -91,7 +91,6 @@
'test/bug_lp541545.xml',
'test/test_osv_expression.yml',
'test/test_ir_rule.yml', # <-- These tests modify/add/delete ir_rules.
'test/test_ir_values.yml',
# Commented because this takes some time.
# This must be (un)commented with the corresponding import statement
# in test/__init__.py.

View File

@ -1,87 +0,0 @@
-
Create some default value for some (non-existing) model, for all users.
-
!python {model: ir.values }: |
# use the old API
self.set(cr, uid, 'default', False, 'my_test_field',['unexisting_model'], 'global value')
# use the new API
self.set_default(cr, uid, 'other_unexisting_model', 'my_other_test_field', 'conditional value', condition='foo=bar')
-
Retrieve them.
-
!python {model: ir.values }: |
# d is a list of triplets (id, name, value)
# Old API
d = self.get(cr, uid, 'default', False, ['unexisting_model'])
assert len(d) == 1, "Only one single value should be retrieved for this model"
assert d[0][1] == 'my_test_field', "Can't retrieve the created default value. (1)"
assert d[0][2] == 'global value', "Can't retrieve the created default value. (2)"
# New API, Conditional version
d = self.get_defaults(cr, uid, 'other_unexisting_model')
assert len(d) == 0, "No value should be retrieved, the condition is not met"
d = self.get_defaults(cr, uid, 'other_unexisting_model', condition="foo=eggs")
assert len(d) == 0, 'Condition is not met either, no defaults should be returned'
d = self.get_defaults(cr, uid, 'other_unexisting_model', condition="foo=bar")
assert len(d) == 1, "Only one single value should be retrieved"
assert d[0][1] == 'my_other_test_field', "Can't retrieve the created default value. (5)"
assert d[0][2] == 'conditional value', "Can't retrieve the created default value. (6)"
-
Do it again but for a specific user.
-
!python {model: ir.values }: |
self.set(cr, uid, 'default', False, 'my_test_field',['unexisting_model'], 'specific value', preserve_user=True)
-
Retrieve it and check it is the one for the current user.
-
!python {model: ir.values }: |
d = self.get(cr, uid, 'default', False, ['unexisting_model'])
assert len(d) == 1, "Only one default must be returned per field"
assert d[0][1] == 'my_test_field', "Can't retrieve the created default value."
assert d[0][2] == 'specific value', "Can't retrieve the created default value."
-
Create some action bindings for a non-existing model
-
!python {model: ir.values }: |
self.set(cr, uid, 'action', 'tree_but_open', 'OnDblClick Action', ['unexisting_model'], 'ir.actions.act_window,10', isobject=True)
self.set(cr, uid, 'action', 'tree_but_open', 'OnDblClick Action 2', ['unexisting_model'], 'ir.actions.act_window,11', isobject=True)
self.set(cr, uid, 'action', 'client_action_multi', 'Side Wizard', ['unexisting_model'], 'ir.actions.act_window,12', isobject=True)
self.set(cr, uid, 'action', 'client_print_multi', 'Nice Report', ['unexisting_model'], 'ir.actions.report.xml,2', isobject=True)
self.set(cr, uid, 'action', 'client_action_relate', 'Related Stuff', ['unexisting_model'], 'ir.actions.act_window,14', isobject=True)
-
Replace one action binding to set a new name
-
!python {model: ir.values }: |
self.set(cr, uid, 'action', 'tree_but_open', 'OnDblClick Action New', ['unexisting_model'], 'ir.actions.act_window,10', isobject=True)
-
Retrieve the action bindings and check they're correct
-
!python {model: ir.values }: |
actions = self.get(cr, uid, 'action', 'tree_but_open', ['unexisting_model'])
assert len(actions) == 2, "Mismatching number of bound actions"
#first action
assert len(actions[0]) == 3, "Malformed action definition"
assert actions[0][1] == 'OnDblClick Action 2', 'Bound action does not match definition'
assert isinstance(actions[0][2], dict) and actions[0][2]['id'] == 11, 'Bound action does not match definition'
#second action - this ones comes last because it was re-created with a different name
assert len(actions[1]) == 3, "Malformed action definition"
assert actions[1][1] == 'OnDblClick Action New', 'Re-Registering an action should replace it'
assert isinstance(actions[1][2], dict) and actions[1][2]['id'] == 10, 'Bound action does not match definition'
actions = self.get(cr, uid, 'action', 'client_action_multi', ['unexisting_model'])
assert len(actions) == 1, "Mismatching number of bound actions"
assert len(actions[0]) == 3, "Malformed action definition"
assert actions[0][1] == 'Side Wizard', 'Bound action does not match definition'
assert isinstance(actions[0][2], dict) and actions[0][2]['id'] == 12, 'Bound action does not match definition'
actions = self.get(cr, uid, 'action', 'client_print_multi', ['unexisting_model'])
assert len(actions) == 1, "Mismatching number of bound actions"
assert len(actions[0]) == 3, "Malformed action definition"
assert actions[0][1] == 'Nice Report', 'Bound action does not match definition'
assert isinstance(actions[0][2], dict) and actions[0][2]['id'] == 2, 'Bound action does not match definition'
actions = self.get(cr, uid, 'action', 'client_action_relate', ['unexisting_model'])
assert len(actions) == 1, "Mismatching number of bound actions"
assert len(actions[0]) == 3, "Malformed action definition"
assert actions[0][1] == 'Related Stuff', 'Bound action does not match definition'
assert isinstance(actions[0][2], dict) and actions[0][2]['id'] == 14, 'Bound action does not match definition'

View File

@ -440,16 +440,6 @@
assert res_2 == expected
assert res_3 == expected
assert res_4 == expected
-
Verify that normalize_domain() works.
-
!python {model: res.partner}: |
from osv import expression
norm_domain = domain = ['&',(1,'=',1),('a','=','b')]
assert norm_domain == expression.normalize(domain), "Normalized domains should be left untouched"
domain = [('x','in',['y','z']),('a.v','=','e'),'|','|',('a','=','b'),'!',('c','>','d'),('e','!=','f'),('g','=','h')]
norm_domain = ['&','&','&'] + domain
assert norm_domain == expression.normalize(domain), "Non-normalized domains should be properly normalized"
-
Unaccent. Create a company with an accent in its name.
-

View File

@ -0,0 +1,5 @@
import test_ir_values
checks = [
test_ir_values,
]

View File

@ -0,0 +1,95 @@
import unittest2
import openerp.tests.common as common
class test_ir_values(common.TransactionCase):
def test_00(self):
# Create some default value for some (non-existing) model, for all users.
ir_values = self.registry('ir.values')
# use the old API
ir_values.set(self.cr, self.uid, 'default', False, 'my_test_field',
['unexisting_model'], 'global value')
# use the new API
ir_values.set_default(self.cr, self.uid, 'other_unexisting_model',
'my_other_test_field', 'conditional value', condition='foo=bar')
# Retrieve them.
ir_values = self.registry('ir.values')
# d is a list of triplets (id, name, value)
# Old API
d = ir_values.get(self.cr, self.uid, 'default', False, ['unexisting_model'])
assert len(d) == 1, "Only one single value should be retrieved for this model"
assert d[0][1] == 'my_test_field', "Can't retrieve the created default value. (1)"
assert d[0][2] == 'global value', "Can't retrieve the created default value. (2)"
# New API, Conditional version
d = ir_values.get_defaults(self.cr, self.uid, 'other_unexisting_model')
assert len(d) == 0, "No value should be retrieved, the condition is not met"
d = ir_values.get_defaults(self.cr, self.uid, 'other_unexisting_model', condition="foo=eggs")
assert len(d) == 0, 'Condition is not met either, no defaults should be returned'
d = ir_values.get_defaults(self.cr, self.uid, 'other_unexisting_model', condition="foo=bar")
assert len(d) == 1, "Only one single value should be retrieved"
assert d[0][1] == 'my_other_test_field', "Can't retrieve the created default value. (5)"
assert d[0][2] == 'conditional value', "Can't retrieve the created default value. (6)"
# Do it again but for a specific user.
ir_values = self.registry('ir.values')
ir_values.set(self.cr, self.uid, 'default', False, 'my_test_field',['unexisting_model'], 'specific value', preserve_user=True)
# Retrieve it and check it is the one for the current user.
ir_values = self.registry('ir.values')
d = ir_values.get(self.cr, self.uid, 'default', False, ['unexisting_model'])
assert len(d) == 1, "Only one default must be returned per field"
assert d[0][1] == 'my_test_field', "Can't retrieve the created default value."
assert d[0][2] == 'specific value', "Can't retrieve the created default value."
# Create some action bindings for a non-existing model.
ir_values = self.registry('ir.values')
ir_values.set(self.cr, self.uid, 'action', 'tree_but_open', 'OnDblClick Action', ['unexisting_model'], 'ir.actions.act_window,10', isobject=True)
ir_values.set(self.cr, self.uid, 'action', 'tree_but_open', 'OnDblClick Action 2', ['unexisting_model'], 'ir.actions.act_window,11', isobject=True)
ir_values.set(self.cr, self.uid, 'action', 'client_action_multi', 'Side Wizard', ['unexisting_model'], 'ir.actions.act_window,12', isobject=True)
ir_values.set(self.cr, self.uid, 'action', 'client_print_multi', 'Nice Report', ['unexisting_model'], 'ir.actions.report.xml,2', isobject=True)
ir_values.set(self.cr, self.uid, 'action', 'client_action_relate', 'Related Stuff', ['unexisting_model'], 'ir.actions.act_window,14', isobject=True)
# Replace one action binding to set a new name.
ir_values = self.registry('ir.values')
ir_values.set(self.cr, self.uid, 'action', 'tree_but_open', 'OnDblClick Action New', ['unexisting_model'], 'ir.actions.act_window,10', isobject=True)
# Retrieve the action bindings and check they're correct
ir_values = self.registry('ir.values')
actions = ir_values.get(self.cr, self.uid, 'action', 'tree_but_open', ['unexisting_model'])
assert len(actions) == 2, "Mismatching number of bound actions"
#first action
assert len(actions[0]) == 3, "Malformed action definition"
assert actions[0][1] == 'OnDblClick Action 2', 'Bound action does not match definition'
assert isinstance(actions[0][2], dict) and actions[0][2]['id'] == 11, 'Bound action does not match definition'
#second action - this ones comes last because it was re-created with a different name
assert len(actions[1]) == 3, "Malformed action definition"
assert actions[1][1] == 'OnDblClick Action New', 'Re-Registering an action should replace it'
assert isinstance(actions[1][2], dict) and actions[1][2]['id'] == 10, 'Bound action does not match definition'
actions = ir_values.get(self.cr, self.uid, 'action', 'client_action_multi', ['unexisting_model'])
assert len(actions) == 1, "Mismatching number of bound actions"
assert len(actions[0]) == 3, "Malformed action definition"
assert actions[0][1] == 'Side Wizard', 'Bound action does not match definition'
assert isinstance(actions[0][2], dict) and actions[0][2]['id'] == 12, 'Bound action does not match definition'
actions = ir_values.get(self.cr, self.uid, 'action', 'client_print_multi', ['unexisting_model'])
assert len(actions) == 1, "Mismatching number of bound actions"
assert len(actions[0]) == 3, "Malformed action definition"
assert actions[0][1] == 'Nice Report', 'Bound action does not match definition'
assert isinstance(actions[0][2], dict) and actions[0][2]['id'] == 2, 'Bound action does not match definition'
actions = ir_values.get(self.cr, self.uid, 'action', 'client_action_relate', ['unexisting_model'])
assert len(actions) == 1, "Mismatching number of bound actions"
assert len(actions[0]) == 3, "Malformed action definition"
assert actions[0][1] == 'Related Stuff', 'Bound action does not match definition'
assert isinstance(actions[0][2], dict) and actions[0][2]['id'] == 14, 'Bound action does not match definition'

View File

@ -50,6 +50,7 @@ import openerp.pooler as pooler
import openerp.release as release
import openerp.tools as tools
import openerp.tools.osutil as osutil
import openerp.tools.assertion_report as assertion_report
from openerp.tools.safe_eval import safe_eval as eval
from openerp.tools.translate import _
@ -94,19 +95,20 @@ def load_module_graph(cr, graph, status=None, perform_checks=True, skip_modules=
def load_test(module_name, idref, mode):
cr.commit()
if not tools.config.options['test_disable']:
try:
threading.currentThread().testing = True
_load_data(cr, module_name, idref, mode, 'test')
except Exception, e:
_logger.exception(
'Tests failed to execute in module %s', module_name)
finally:
threading.currentThread().testing = False
if tools.config.options['test_commit']:
cr.commit()
else:
cr.rollback()
try:
threading.currentThread().testing = True
_load_data(cr, module_name, idref, mode, 'test')
return True
except Exception, e:
_logger.error(
'module %s: an exception occurred in a test', module_name)
return False
finally:
threading.currentThread().testing = False
if tools.config.options['test_commit']:
cr.commit()
else:
cr.rollback()
def _load_data(cr, module_name, idref, mode, kind):
"""
@ -133,7 +135,7 @@ def load_module_graph(cr, graph, status=None, perform_checks=True, skip_modules=
elif ext == '.sql':
process_sql_file(cr, fp)
elif ext == '.yml':
tools.convert_yaml_import(cr, module_name, fp, idref, mode, noupdate)
tools.convert_yaml_import(cr, module_name, fp, idref, mode, noupdate, report)
else:
tools.convert_xml_import(cr, module_name, fp, idref, mode, noupdate, report)
finally:
@ -201,7 +203,14 @@ def load_module_graph(cr, graph, status=None, perform_checks=True, skip_modules=
# on demo data. Other tests can be added into the regular
# 'data' section, but should probably not alter the data,
# as there is no rollback.
load_test(module_name, idref, mode)
if tools.config.options['test_enable']:
report.record_result(load_test(module_name, idref, mode))
# Run the `fast_suite` and `checks` tests given by the module.
if module_name == 'base':
# Also run the core tests after the database is created.
report.record_result(openerp.modules.module.run_unit_tests('openerp'))
report.record_result(openerp.modules.module.run_unit_tests(module_name))
processed_modules.append(package.name)
@ -282,7 +291,6 @@ def load_modules(db, force_demo=False, status=None, update_module=False):
# This is a brand new pool, just created in pooler.get_db_and_pool()
pool = pooler.get_pool(cr.dbname)
report = tools.assertion_report()
if 'base' in tools.config['update'] or 'all' in tools.config['update']:
cr.execute("update ir_module_module set state=%s where name=%s and state=%s", ('to upgrade', 'base', 'installed'))
@ -295,6 +303,7 @@ def load_modules(db, force_demo=False, status=None, update_module=False):
# processed_modules: for cleanup step after install
# loaded_modules: to avoid double loading
report = assertion_report.assertion_report()
loaded_modules, processed_modules = load_module_graph(cr, graph, status, perform_checks=(not update_module), report=report)
if tools.config['load_language']:
@ -414,7 +423,10 @@ def load_modules(db, force_demo=False, status=None, update_module=False):
cr.execute("update ir_module_module set state=%s where state=%s", ('uninstalled', 'to remove',))
cr.commit()
_logger.info('Modules loaded.')
if report.failures:
_logger.error('At least one test failed when loading the modules.')
else:
_logger.info('Modules loaded.')
finally:
cr.close()

View File

@ -20,9 +20,12 @@
#
##############################################################################
import os, sys, imp
from os.path import join as opj
import imp
import itertools
import os
from os.path import join as opj
import sys
import types
import zipimport
import openerp
@ -460,5 +463,124 @@ def get_modules_with_version():
continue
return res
def get_test_modules(module, submodule, explode):
"""
Return a list of submodules containing tests.
`submodule` can be:
- None
- the name of a submodule
- '__fast_suite__'
- '__sanity_checks__'
"""
# Turn command-line module, submodule into importable names.
if module is None:
pass
elif module == 'openerp':
module = 'openerp.tests'
else:
module = 'openerp.addons.' + module + '.tests'
# Try to import the module
try:
__import__(module)
except Exception, e:
if explode:
print 'Can not `import %s`.' % module
import logging
logging.exception('')
sys.exit(1)
else:
if str(e) == 'No module named tests':
# It seems the module has no `tests` sub-module, no problem.
pass
else:
print 'Can not `import %s`.' % module
return []
# Discover available test sub-modules.
m = sys.modules[module]
submodule_names = sorted([x for x in dir(m) \
if x.startswith('test_') and \
isinstance(getattr(m, x), types.ModuleType)])
submodules = [getattr(m, x) for x in submodule_names]
def show_submodules_and_exit():
if submodule_names:
print 'Available submodules are:'
for x in submodule_names:
print ' ', x
sys.exit(1)
if submodule is None:
# Use auto-discovered sub-modules.
ms = submodules
elif submodule == '__fast_suite__':
# Obtain the explicit test sub-modules list.
ms = getattr(sys.modules[module], 'fast_suite', None)
# `suite` was used before the 6.1 release instead of `fast_suite`.
ms = ms if ms else getattr(sys.modules[module], 'suite', None)
if ms is None:
if explode:
print 'The module `%s` has no defined test suite.' % (module,)
show_submodules_and_exit()
else:
ms = []
elif submodule == '__sanity_checks__':
ms = getattr(sys.modules[module], 'checks', None)
if ms is None:
if explode:
print 'The module `%s` has no defined sanity checks.' % (module,)
show_submodules_and_exit()
else:
ms = []
else:
# Pick the command-line-specified test sub-module.
m = getattr(sys.modules[module], submodule, None)
ms = [m]
if m is None:
if explode:
print 'The module `%s` has no submodule named `%s`.' % \
(module, submodule)
show_submodules_and_exit()
else:
ms = []
return ms
def run_unit_tests(module_name):
"""
Return True or False if some tests were found and succeeded or failed.
Return None if no test was found.
"""
import unittest2
ms = get_test_modules(module_name, '__fast_suite__', explode=False)
ms.extend(get_test_modules(module_name, '__sanity_checks__', explode=False))
suite = unittest2.TestSuite()
for m in ms:
suite.addTests(unittest2.TestLoader().loadTestsFromModule(m))
if ms:
_logger.info('module %s: executing %s `fast_suite` and/or `checks` sub-modules', module_name, len(ms))
# Use a custom stream object to log the test executions.
class MyStream(object):
def __init__(self):
self.r = re.compile(r'^-*$|^ *... *$|^ok$')
def flush(self):
pass
def write(self, s):
if self.r.match(s):
return
first = True
for c in s.split('\n'):
if not first:
c = '` ' + c
first = False
_logger.log(logging.TEST, c)
result = unittest2.TextTestRunner(verbosity=2, stream=MyStream()).run(suite)
if result.wasSuccessful():
return True
else:
_logger.error('module %s: at least one error occurred in a test', module_name)
return False
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -48,6 +48,8 @@ import openerp.wsgi
_logger = logging.getLogger(__name__)
# TODO block until the server is really up, accepting connections
# TODO be idemptotent (as long as stop_service was not called).
def start_services():
""" Start all services.

View File

@ -1,15 +1,24 @@
# -*- coding: utf-8 -*-
import unittest2
"""
Tests for the OpenERP library.
import test_orm
This module groups a few sub-modules containing unittest2 test cases.
Tests can be explicitely added to the `fast_suite` or `checks` lists or not.
See the :ref:`test-framework` section in the :ref:`features` list.
"""
import test_expression
import test_ir_sequence
import test_xmlrpc
import test_orm
# Explicit declaration list of test sub-modules.
suite = [
test_xmlrpc, # Creates a database
test_ir_sequence, # Assume an existing database
test_orm, # Assume an existing database
fast_suite = [
test_ir_sequence,
]
checks = [
test_expression,
test_orm,
]
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import os
import time
import unittest2
import xmlrpclib
import openerp
@ -16,15 +17,6 @@ ADMIN_USER = 'admin'
ADMIN_USER_ID = 1
ADMIN_PASSWORD = 'admin'
common_proxy_60 = None
db_proxy_60 = None
object_proxy_60 = None
common_proxy_61 = None
db_proxy_61 = None
model_proxy_61 = None
model_uri_61 = None
def start_openerp():
"""
Start the OpenERP server similary to the openerp-server script.
@ -34,33 +26,59 @@ def start_openerp():
# Ugly way to ensure the server is listening.
time.sleep(2)
def create_xmlrpc_proxies():
def stop_openerp():
"""
setup some xmlrpclib proxies.
Shutdown the OpenERP server similarly to a single ctrl-c.
"""
global common_proxy_60
global db_proxy_60
global object_proxy_60
# Use the old (pre 6.1) API.
url = 'http://%s:%d/xmlrpc/' % (HOST, PORT)
common_proxy_60 = xmlrpclib.ServerProxy(url + 'common')
db_proxy_60 = xmlrpclib.ServerProxy(url + 'db')
object_proxy_60 = xmlrpclib.ServerProxy(url + 'object')
global common_proxy_61
global db_proxy_61
global model_proxy_61
global model_uri_61
# Use the new (6.1) API.
model_uri_61 = 'http://%s:%d/openerp/xmlrpc/1/' % (HOST, PORT)
common_proxy_61 = xmlrpclib.ServerProxy(model_uri_61 + 'common')
db_proxy_61 = xmlrpclib.ServerProxy(model_uri_61 + 'db')
model_proxy_61 = xmlrpclib.ServerProxy(model_uri_61 + 'model/' + DB)
def tearDownModule():
""" Shutdown the OpenERP server similarly to a single ctrl-c. """
openerp.service.stop_services()
class TransactionCase(unittest2.TestCase):
"""
Subclass of TestCase with a single transaction, rolled-back at the end of
the tests.
"""
def setUp(self):
self.cr = openerp.modules.registry.RegistryManager.get(DB).db.cursor()
self.uid = openerp.SUPERUSER_ID
def tearDown(self):
self.cr.rollback()
self.cr.close()
def registry(self, model):
return openerp.modules.registry.RegistryManager.get(DB)[model]
class RpcCase(unittest2.TestCase):
"""
Subclass of TestCase with a few XML-RPC proxies.
"""
def __init__(self, name):
super(RpcCase, self).__init__(name)
class A(object):
pass
self.proxy = A()
# Use the old (pre 6.1) API.
self.proxy.url_60 = url_60 = 'http://%s:%d/xmlrpc/' % (HOST, PORT)
self.proxy.common_60 = xmlrpclib.ServerProxy(url_60 + 'common')
self.proxy.db_60 = xmlrpclib.ServerProxy(url_60 + 'db')
self.proxy.object_60 = xmlrpclib.ServerProxy(url_60 + 'object')
# Use the new (6.1) API.
self.proxy.url_61 = url_61 = 'http://%s:%d/openerp/xmlrpc/1/' % (HOST, PORT)
self.proxy.common_61 = xmlrpclib.ServerProxy(url_61 + 'common')
self.proxy.db_61 = xmlrpclib.ServerProxy(url_61 + 'db')
self.proxy.model_61 = xmlrpclib.ServerProxy(url_61 + 'model/' + DB)
@classmethod
def generate_database_name(cls):
if hasattr(cls, '_database_id'):
cls._database_id += 1
else:
cls._database_id = 0
return '_fresh_name_' + str(cls._database_id) + '_'
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -0,0 +1,12 @@
import unittest2
import openerp
class test_domain_normalization(unittest2.TestCase):
def test_normalize_domain(self):
expression = openerp.osv.expression
norm_domain = domain = ['&',(1,'=',1),('a','=','b')]
assert norm_domain == expression.normalize(domain), "Normalized domains should be left untouched"
domain = [('x','in',['y','z']),('a.v','=','e'),'|','|',('a','=','b'),'!',('c','>','d'),('e','!=','f'),('g','=','h')]
norm_domain = ['&','&','&'] + domain
assert norm_domain == expression.normalize(domain), "Non-normalized domains should be properly normalized"

View File

@ -16,11 +16,6 @@ import common
DB = common.DB
ADMIN_USER_ID = common.ADMIN_USER_ID
def setUpModule():
common.create_xmlrpc_proxies()
tearDownModule = common.tearDownModule
def registry(model):
return openerp.modules.registry.RegistryManager.get(DB)[model]
@ -174,8 +169,7 @@ class test_ir_sequence_generate(unittest2.TestCase):
def test_ir_sequence_create_no_gap(self):
""" Try to create a sequence object. """
cr = cursor()
d = dict(code='test_sequence_type_6', name='Test sequence type',
implementation='no_gap')
d = dict(code='test_sequence_type_6', name='Test sequence type')
c = registry('ir.sequence.type').create(cr, ADMIN_USER_ID, d, {})
assert c
d = dict(code='test_sequence_type_6', name='Test sequence')

View File

@ -2,9 +2,10 @@ import os
import unittest2
import openerp
import common
UID = 1
DB = openerp.tools.config['db_name']
UID = common.ADMIN_USER_ID
DB = common.DB
CREATE = lambda values: (0, False, values)
UPDATE = lambda id, values: (1, id, values)
@ -14,16 +15,12 @@ LINK_TO = lambda id: (4, id, False)
DELETE_ALL = lambda: (5, False, False)
REPLACE_WITH = lambda ids: (6, False, ids)
class TestO2MSerialization(unittest2.TestCase):
class TestO2MSerialization(common.TransactionCase):
def setUp(self):
self.cr = openerp.modules.registry.RegistryManager.get(DB).db.cursor()
self.partner = openerp.modules.registry.RegistryManager.get(DB)['res.partner']
self.address = openerp.modules.registry.RegistryManager.get(DB)['res.partner.address']
def tearDown(self):
self.cr.rollback()
self.cr.close()
super(TestO2MSerialization, self).setUp()
self.partner = self.registry('res.partner')
self.address = self.registry('res.partner.address')
def test_no_command(self):
" empty list of commands yields an empty list of records "

View File

@ -13,56 +13,64 @@ import xmlrpclib
import openerp
import common
DB = common.DB
DB = None
ADMIN_USER = common.ADMIN_USER
ADMIN_USER_ID = common.ADMIN_USER_ID
ADMIN_PASSWORD = common.ADMIN_PASSWORD
def setUpModule():
common.start_openerp()
common.create_xmlrpc_proxies()
common.start_openerp()
global DB
DB = common.RpcCase.generate_database_name()
tearDownModule = common.tearDownModule
tearDownModule = common.stop_openerp
class test_xmlrpc(unittest2.TestCase):
class test_xmlrpc(common.RpcCase):
def test_00_xmlrpc_create_database_polling(self):
"""
Simulate a OpenERP client requesting the creation of a database and
polling the server until the creation is complete.
"""
progress_id = common.db_proxy_60.create(ADMIN_PASSWORD, DB, True,
False, ADMIN_PASSWORD)
progress_id = self.proxy.db_60.create(ADMIN_PASSWORD,DB, True, False,
ADMIN_PASSWORD)
while True:
time.sleep(1)
progress, users = common.db_proxy_60.get_progress(ADMIN_PASSWORD,
progress, users = self.proxy.db_60.get_progress(ADMIN_PASSWORD,
progress_id)
if progress == 1.0:
break
def test_xmlrpc_login(self):
""" Try to login on the common service. """
uid = common.common_proxy_60.login(DB, ADMIN_USER, ADMIN_PASSWORD)
uid = self.proxy.common_60.login(DB, ADMIN_USER, ADMIN_PASSWORD)
assert uid == ADMIN_USER_ID
def test_xmlrpc_ir_model_search(self):
""" Try a search on the object service. """
ids = common.object_proxy_60.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD,
ids = self.proxy.object_60.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD,
'ir.model', 'search', [])
assert ids
ids = common.object_proxy_60.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD,
ids = self.proxy.object_60.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD,
'ir.model', 'search', [], {})
assert ids
def test_xmlrpc_61_ir_model_search(self):
""" Try a search on the object service. """
proxy = xmlrpclib.ServerProxy(common.model_uri_61 + 'model/' + DB + '/ir.model')
proxy = xmlrpclib.ServerProxy(self.proxy.url_61 + 'model/' + DB +
'/ir.model')
ids = proxy.execute(ADMIN_USER_ID, ADMIN_PASSWORD, 'search', [])
assert ids
ids = proxy.execute(ADMIN_USER_ID, ADMIN_PASSWORD, 'search', [], {})
assert ids
def test_zz_xmlrpc_drop_database(self):
"""
Simulate a OpenERP client requesting the deletion of a database.
"""
assert self.proxy.db_60.drop(ADMIN_PASSWORD, DB) is True
if __name__ == '__main__':
unittest2.main()

View File

@ -0,0 +1,29 @@
class assertion_report(object):
"""
Simple pair of success and failures counts (used to record YAML and XML
`assert` tags as well as unittest2 tests outcome (in this case, not
individual `assert`)).
"""
def __init__(self):
self.successes = 0
self.failures = 0
def record_success(self):
self.successes += 1
def record_failure(self):
self.failures += 1
def record_result(self, result):
if result is None:
pass
elif result is True:
self.record_success()
elif result is False:
self.record_failure()
def __str__(self):
res = 'Assertions report: %s successes, %s failures' % (self.successes, self.failures)
return res

View File

@ -168,13 +168,10 @@ class configmanager(object):
help="Launch a YML test file.")
group.add_option("--test-report-directory", dest="test_report_directory", my_default=False,
help="If set, will save sample of all reports in this directory.")
group.add_option("--test-disable", action="store_true", dest="test_disable",
my_default=False, help="Disable loading test files.")
group.add_option("--test-enable", action="store_true", dest="test_enable",
my_default=False, help="Enable YAML and unit tests.")
group.add_option("--test-commit", action="store_true", dest="test_commit",
my_default=False, help="Commit database changes performed by tests.")
group.add_option("--assert-exit-level", dest='assert_exit_level', type="choice", choices=self._LOGLEVELS.keys(),
my_default='error',
help="specify the level at which a failed assertion will stop the server. Accepted values: %s" % (self._LOGLEVELS.keys(),))
my_default=False, help="Commit database changes performed by YAML or XML tests.")
parser.add_option_group(group)
# Logging Group
@ -395,7 +392,7 @@ class configmanager(object):
'debug_mode', 'smtp_ssl', 'load_language',
'stop_after_init', 'logrotate', 'without_demo', 'netrpc', 'xmlrpc', 'syslog',
'list_db', 'xmlrpcs', 'proxy_mode',
'test_file', 'test_disable', 'test_commit', 'test_report_directory',
'test_file', 'test_enable', 'test_commit', 'test_report_directory',
'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads',
'virtual_memory_limit', 'virtual_memory_reset', 'cpu_time_limit', 'unaccent',
]
@ -408,11 +405,6 @@ class configmanager(object):
elif isinstance(self.options[arg], basestring) and self.casts[arg].type in optparse.Option.TYPE_CHECKER:
self.options[arg] = optparse.Option.TYPE_CHECKER[self.casts[arg].type](self.casts[arg], arg, self.options[arg])
if opt.assert_exit_level:
self.options['assert_exit_level'] = self._LOGLEVELS[opt.assert_exit_level]
else:
self.options['assert_exit_level'] = self._LOGLEVELS.get(self.options['assert_exit_level']) or int(self.options['assert_exit_level'])
self.options['root_path'] = os.path.abspath(os.path.expanduser(os.path.expandvars(os.path.dirname(openerp.__file__))))
if not self.options['addons_path'] or self.options['addons_path']=='None':
self.options['addons_path'] = os.path.join(self.options['root_path'], 'addons')
@ -582,7 +574,7 @@ class configmanager(object):
continue
if opt in self.blacklist_for_save:
continue
if opt in ('log_level', 'assert_exit_level'):
if opt in ('log_level',):
p.set('options', opt, loglevelnames.get(self.options[opt], self.options[opt]))
else:
p.set('options', opt, self.options[opt])

View File

@ -30,6 +30,8 @@ import re
import time
import openerp.release as release
import assertion_report
_logger = logging.getLogger(__name__)
try:
@ -200,35 +202,6 @@ escape_re = re.compile(r'(?<!\\)/')
def escape(x):
return x.replace('\\/', '/')
class assertion_report(object):
def __init__(self):
self._report = {}
def record_assertion(self, success, severity):
"""
Records the result of an assertion for the failed/success count
returns success
"""
if severity in self._report:
self._report[severity][success] += 1
else:
self._report[severity] = {success:1, not success: 0}
return success
def get_report(self):
return self._report
def __str__(self):
res = '\nAssertions report:\nLevel\tsuccess\tfailed\n'
success = failed = 0
for sev in self._report:
res += sev + '\t' + str(self._report[sev][True]) + '\t' + str(self._report[sev][False]) + '\n'
success += self._report[sev][True]
failed += self._report[sev][False]
res += 'total\t' + str(success) + '\t' + str(failed) + '\n'
res += 'end of report (' + str(success + failed) + ' assertion(s) checked)'
return res
class xml_import(object):
@staticmethod
def nodeattr2bool(node, attr, default=False):
@ -712,7 +685,6 @@ form: module.record_id""" % (xml_id,)
rec_src = rec.get("search",'').encode('utf8')
rec_src_count = rec.get("count")
severity = rec.get("severity",'').encode('ascii') or loglevels.LOG_ERROR
rec_string = rec.get("string",'').encode('utf8') or 'unknown'
ids = None
@ -727,17 +699,13 @@ form: module.record_id""" % (xml_id,)
if rec_src_count:
count = int(rec_src_count)
if len(ids) != count:
self.assert_report.record_assertion(False, severity)
self.assertion_report.record_failure()
msg = 'assertion "%s" failed!\n' \
' Incorrect search count:\n' \
' expected count: %d\n' \
' obtained count: %d\n' \
% (rec_string, count, len(ids))
sevval = getattr(logging, severity.upper())
_logger.log(sevval, msg)
if sevval >= config['assert_exit_level']:
# TODO: define a dedicated exception
raise Exception('Severe assertion failure')
_logger.error(msg)
return
assert ids is not None,\
@ -759,20 +727,16 @@ form: module.record_id""" % (xml_id,)
expected_value = _eval_xml(self, test, self.pool, cr, uid, self.idref, context=context) or True
expression_value = unsafe_eval(f_expr, globals_dict)
if expression_value != expected_value: # assertion failed
self.assert_report.record_assertion(False, severity)
self.assertion_report.record_failure()
msg = 'assertion "%s" failed!\n' \
' xmltag: %s\n' \
' expected value: %r\n' \
' obtained value: %r\n' \
% (rec_string, etree.tostring(test), expected_value, expression_value)
sevval = getattr(logging, severity.upper())
_logger.log(sevval, msg)
if sevval >= config['assert_exit_level']:
# TODO: define a dedicated exception
raise Exception('Severe assertion failure')
_logger.error(msg)
return
else: # all tests were successful for this assertion tag (no break)
self.assert_report.record_assertion(True, severity)
self.assertion_report.record_success()
def _tag_record(self, cr, rec, data_node=None):
rec_model = rec.get("model").encode('ascii')
@ -906,8 +870,8 @@ form: module.record_id""" % (xml_id,)
self.pool = pooler.get_pool(cr.dbname)
self.uid = 1
if report is None:
report = assertion_report()
self.assert_report = report
report = assertion_report.assertion_report()
self.assertion_report = report
self.noupdate = noupdate
self._tags = {
'menuitem': self._tag_menuitem,

View File

@ -19,6 +19,8 @@ from lxml import etree
unsafe_eval = eval
from safe_eval import safe_eval as eval
import assertion_report
_logger = logging.getLogger(__name__)
class YamlImportException(Exception):
@ -85,33 +87,6 @@ def is_ir_set(node):
def is_string(node):
return isinstance(node, basestring)
class TestReport(object):
def __init__(self):
self._report = {}
def record(self, success, severity):
"""
Records the result of an assertion for the failed/success count.
Returns success.
"""
if severity in self._report:
self._report[severity][success] += 1
else:
self._report[severity] = {success: 1, not success: 0}
return success
def __str__(self):
res = []
res.append('\nAssertions report:\nLevel\tsuccess\tfailure')
success = failure = 0
for severity in self._report:
res.append("%s\t%s\t%s" % (severity, self._report[severity][True], self._report[severity][False]))
success += self._report[severity][True]
failure += self._report[severity][False]
res.append("total\t%s\t%s" % (success, failure))
res.append("end of report (%s assertion(s) checked)" % (success + failure))
return "\n".join(res)
class RecordDictWrapper(dict):
"""
Used to pass a record as locals in eval:
@ -125,13 +100,15 @@ class RecordDictWrapper(dict):
return dict.__getitem__(self, key)
class YamlInterpreter(object):
def __init__(self, cr, module, id_map, mode, filename, noupdate=False):
def __init__(self, cr, module, id_map, mode, filename, report=None, noupdate=False):
self.cr = cr
self.module = module
self.id_map = id_map
self.mode = mode
self.filename = filename
self.assert_report = TestReport()
if report is None:
report = assertion_report.assertion_report()
self.assertion_report = report
self.noupdate = noupdate
self.pool = pooler.get_pool(cr.dbname)
self.uid = 1
@ -210,18 +187,9 @@ class YamlInterpreter(object):
def process_comment(self, node):
return node
def _log_assert_failure(self, severity, msg, *args):
if isinstance(severity, types.StringTypes):
levelname = severity.strip().upper()
level = logging.getLevelName(levelname)
else:
level = severity
levelname = logging.getLevelName(level)
self.assert_report.record(False, levelname)
_logger.log(level, msg, *args)
if level >= config['assert_exit_level']:
raise YamlImportAbortion('Severe assertion failure (%s), aborting.' % levelname)
return
def _log_assert_failure(self, msg, *args):
self.assertion_report.record_failure()
_logger.error(msg, *args)
def _get_assertion_id(self, assertion):
if assertion.id:
@ -250,7 +218,7 @@ class YamlInterpreter(object):
' expected count: %d\n' \
' obtained count: %d\n'
args = (assertion.string, assertion.count, len(ids))
self._log_assert_failure(assertion.severity, msg, *args)
self._log_assert_failure(msg, *args)
else:
context = self.get_context(assertion, self.eval_context)
for id in ids:
@ -283,10 +251,10 @@ class YamlInterpreter(object):
args += ( lmsg, aop, rmsg )
break
self._log_assert_failure(assertion.severity, msg, *args)
self._log_assert_failure(msg, *args)
return
else: # all tests were successful for this assertion tag (no break)
self.assert_report.record(True, assertion.severity)
self.assertion_report.record_success()
def _coerce_bool(self, value, default=False):
if isinstance(value, types.BooleanType):
@ -528,13 +496,13 @@ class YamlInterpreter(object):
code_obj = compile(statements, self.filename, 'exec')
unsafe_eval(code_obj, {'ref': self.get_id}, code_context)
except AssertionError, e:
self._log_assert_failure(python.severity, 'AssertionError in Python code %s: %s', python.name, e)
self._log_assert_failure('AssertionError in Python code %s: %s', python.name, e)
return
except Exception, e:
_logger.debug('Exception during evaluation of !python block in yaml_file %s.', self.filename, exc_info=True)
raise
else:
self.assert_report.record(True, python.severity)
self.assertion_report.record_success()
def process_workflow(self, node):
workflow, values = node.items()[0]
@ -827,7 +795,7 @@ class YamlInterpreter(object):
"""
Empty node or commented node should not pass silently.
"""
self._log_assert_failure(logging.WARNING, "You have an empty block in your tests.")
self._log_assert_failure("You have an empty block in your tests.")
def process(self, yaml_string):
@ -900,11 +868,11 @@ class YamlInterpreter(object):
is_preceded_by_comment = False
return is_preceded_by_comment
def yaml_import(cr, module, yamlfile, idref=None, mode='init', noupdate=False):
def yaml_import(cr, module, yamlfile, idref=None, mode='init', noupdate=False, report=None):
if idref is None:
idref = {}
yaml_string = yamlfile.read()
yaml_interpreter = YamlInterpreter(cr, module, idref, mode, filename=yamlfile.name, noupdate=noupdate)
yaml_interpreter = YamlInterpreter(cr, module, idref, mode, filename=yamlfile.name, report=report, noupdate=noupdate)
yaml_interpreter.process(yaml_string)
# keeps convention of convert.py