[MERGE] [REF] Server action cleaning and usability improvements.

Main modifications:
- removed dummy, email (moved to email_template), loop, sms types; merged object_create and object_copy; renamed other to multi
- cleaned code, made it easier to override
- improved view to ease the definition of new server actions
- changed/updated fields
- added tests
- added changelog and base for documentation

[ADD] ORM: added _check_m2m_recursion on BaseModel. This method checks for recursions in intramodel many2manyrelationships, aka models having childs in the same model.

bzr revid: tde@openerp.com-20130726101805-a2vby7q1j7df9wte
This commit is contained in:
Thibault Delavallée 2013-07-26 12:18:05 +02:00
commit b80e86def0
9 changed files with 1225 additions and 389 deletions

View File

@ -6,6 +6,12 @@ Changelog
- Cleaned and slightly refactored ``ir.actions.server``. The ``loop``, ``sms``
and ``dummy`` server actions have been removed; ``object_create`` and
``object_copy`` have been merged into ``object_create``; ``other`` is now ``multi``
and raises in case of loops. See :ref:`ir-actions-server` for more details.
- Removed ``sms_send`` method.
- Added checking of recursions in many2many loops using ``_check_m2m_recursion``.
- Added MONTHS attribute on fields.date and fields.datetime, holding the list
(month_number, month_name)
- Almost removed ``LocalService()``. For reports,

View File

@ -19,6 +19,7 @@ OpenERP Server
OpenERP Command

doc/ir_actions.rst Normal file
View File

@ -0,0 +1,48 @@
.. _ir-actions:
Ir Actions
.. _ir-actions-server:
Server actions
.. versionchanged:: 8.0
.. currentmodule:: openerp.addons.base.ir.ir_actions
.. autoclass:: actions_server
Adding a new sever action
The ``state`` field holds the various available types of server action. In order
to add a new server action, the first thing to do is to override the ``_get_states``
method that returns the list of values available for the selection field.
.. automethod:: actions_server._get_states
The method called when executing the server action is the ``run`` method. This
method calls ``run_action_<STATE>``. When adding a new server action type, you
have to define the related method that will be called upon execution.
.. automethod:: actions_server.run
The refactoring of OpenERP 8.0 server actions removed the following types of
server action:
- ``loop``: can be replaced by a ``code`` action
- ``dummy``: can be replaced by a void ``code`` action
- ``object_create`` and ``object_copy`` have been merged into a single and
more understandable ``object_create`` action
- ``other`` is renamed ``multi`` and raises in case of loops

View File

@ -19,20 +19,20 @@
from functools import partial
import logging
import operator
import os
import re
from socket import gethostname
import time
import openerp
from openerp import SUPERUSER_ID
from openerp import tools
from openerp import workflow
from openerp.osv import fields, osv
from openerp.osv.orm import browse_record
import openerp.report.interface
from openerp.report.report_sxw import report_sxw, report_rml
from openerp.tools.config import config
from openerp.tools.safe_eval import safe_eval as eval
from openerp.tools.translate import _
import openerp.workflow
@ -429,39 +429,52 @@ class server_object_lines(osv.osv):
_name = 'ir.server.object.lines'
_sequence = 'ir_actions_id_seq'
_columns = {
'server_id': fields.many2one('ir.actions.server', 'Object Mapping'),
'col1': fields.many2one('ir.model.fields', 'Destination', required=True),
'server_id': fields.many2one('ir.actions.server', 'Related Server Action'),
'col1': fields.many2one('ir.model.fields', 'Field', required=True),
'value': fields.text('Value', required=True, help="Expression containing a value specification. \n"
"When Formula type is selected, this field may be a Python expression "
" that can use the same values as for the condition field on the server action.\n"
"If Value type is selected, the value will be used directly without evaluation."),
'type': fields.selection([
('value', 'Value'),
], 'Type', required=True, size=32, change_default=True),
('equation', 'Python expression')
], 'Evaluation Type', required=True, change_default=True),
_defaults = {
'type': 'equation',
'type': 'value',
# Actions that are run on the server side
class actions_server(osv.osv):
""" Server actions model. Server action work on a base model and offer various
type of actions that can be executed automatically, for example using base
action rules, of manually, by adding the action in the 'More' contextual
def _select_signals(self, cr, uid, context=None):
cr.execute("""SELECT distinct w.osv, t.signal FROM wkf w, wkf_activity a, wkf_transition t
WHERE w.id = a.wkf_id AND
(t.act_from = a.id OR t.act_to = a.id) AND
t.signal IS NOT NULL""")
result = cr.fetchall() or []
res = []
for rs in result:
if rs[0] is not None and rs[1] is not None:
line = rs[1], "%s - (%s)" % (rs[1], rs[0])
return res
Since OpenERP 8.0 a button 'Create Menu Action' button is available on the
action form view. It creates an entry in the More menu of the base model.
This allows to create server actions and run them in mass mode easily through
the interface.
The available actions are :
- 'Execute Python Code': a block of python code that will be executed
- 'Trigger a Workflow Signal': send a signal to a workflow
- 'Run a Client Action': choose a client action to launch
- 'Create or Copy a new Record': create a new record with new values, or
copy an existing record in your database
- 'Write on a Record': update the values of a record
- 'Execute several actions': define an action that triggers several other
server actions
_name = 'ir.actions.server'
_table = 'ir_act_server'
_inherit = 'ir.actions.actions'
_sequence = 'ir_actions_id_seq'
_order = 'sequence,name'
def _select_objects(self, cr, uid, context=None):
model_pool = self.pool.get('ir.model')
@ -469,81 +482,124 @@ class actions_server(osv.osv):
res = model_pool.read(cr, uid, ids, ['model', 'name'])
return [(r['model'], r['name']) for r in res] + [('', '')]
def change_object(self, cr, uid, ids, copy_object, state, context=None):
if state == 'object_copy' and copy_object:
if context is None:
context = {}
model_pool = self.pool.get('ir.model')
model = copy_object.split(',')[0]
mid = model_pool.search(cr, uid, [('model','=',model)])
return {
'value': {'srcmodel_id': mid[0]},
'context': context
return {}
def _get_states(self, cr, uid, context=None):
""" Override me in order to add new states in the server action. Please
note that the added key length should not be higher than already-existing
ones. """
return [('code', 'Execute Python Code'),
('trigger', 'Trigger a Workflow Signal'),
('client_action', 'Run a Client Action'),
('object_create', 'Create or Copy a new Record'),
('object_write', 'Write on a Record'),
('multi', 'Execute several actions')]
def _get_states_wrapper(self, cr, uid, context=None):
return self._get_states(cr, uid, context)
_name = 'ir.actions.server'
_table = 'ir_act_server'
_inherit = 'ir.actions.actions'
_sequence = 'ir_actions_id_seq'
_order = 'sequence,name'
_columns = {
'name': fields.char('Action Name', required=True, size=64, translate=True),
'condition' : fields.char('Condition', size=256, required=True,
help="Condition that is tested before the action is executed, "
"and prevent execution if it is not verified.\n"
"Example: object.list_price > 5000\n"
"It is a Python expression that can use the following values:\n"
" - self: ORM model of the record on which the action is triggered\n"
" - object or obj: browse_record of the record on which the action is triggered\n"
" - pool: ORM model pool (i.e. self.pool)\n"
" - time: Python time module\n"
" - cr: database cursor\n"
" - uid: current user id\n"
" - context: current context"),
'state': fields.selection([
('client_action','Client Action'),
('code','Python Code'),
('object_create','Create Object'),
('object_copy','Copy Object'),
('object_write','Write Object'),
('other','Multi Actions'),
], 'Action Type', required=True, size=32, help="Type of the Action that is to be executed"),
'code':fields.text('Python Code', help="Python code to be executed if condition is met.\n"
"It is a Python block that can use the same values as for the condition field"),
'sequence': fields.integer('Sequence', help="Important when you deal with multiple actions, the execution order will be decided based on this, low number is higher priority."),
'model_id': fields.many2one('ir.model', 'Object', required=True, help="Select the object on which the action will work (read, write, create).", ondelete='cascade'),
'action_id': fields.many2one('ir.actions.actions', 'Client Action', help="Select the Action Window, Report, Wizard to be executed."),
'trigger_name': fields.selection(_select_signals, string='Trigger Signal', size=128, help="The workflow signal to trigger"),
'wkf_model_id': fields.many2one('ir.model', 'Target Object', help="The object that should receive the workflow signal (must have an associated workflow)"),
'trigger_obj_id': fields.many2one('ir.model.fields','Relation Field', help="The field on the current object that links to the target object record (must be a many2one, or an integer field with the record ID)"),
'email': fields.char('Email Address', size=512, help="Expression that returns the email address to send to. Can be based on the same values as for the condition field.\n"
"Example: object.invoice_address_id.email, or 'me@example.com'"),
'subject': fields.char('Subject', size=1024, translate=True, help="Email subject, may contain expressions enclosed in double brackets based on the same values as those "
"available in the condition field, e.g. `Hello [[ object.partner_id.name ]]`"),
'message': fields.text('Message', translate=True, help="Email contents, may contain expressions enclosed in double brackets based on the same values as those "
"available in the condition field, e.g. `Dear [[ object.partner_id.name ]]`"),
'mobile': fields.char('Mobile No', size=512, help="Provides fields that be used to fetch the mobile number, e.g. you select the invoice, then `object.invoice_address_id.mobile` is the field which gives the correct mobile number"),
'sms': fields.char('SMS', size=160, translate=True),
'child_ids': fields.many2many('ir.actions.server', 'rel_server_actions', 'server_id', 'action_id', 'Other Actions'),
'condition': fields.char('Condition',
help="Condition verified before executing the server action. If it "
"is not verified, the action will not be executed. The condition is "
"a Python expression, like 'object.list_price > 5000'. A void "
"condition is considered as always True. Help about python expression "
"is given in the help tab."),
'state': fields.selection(_get_states_wrapper, 'Action To Do', required=True,
help="Type of server action. The following values are available:\n"
"- 'Execute Python Code': a block of python code that will be executed\n"
"- 'Trigger a Workflow Signal': send a signal to a workflow\n"
"- 'Run a Client Action': choose a client action to launch\n"
"- 'Create or Copy a new Record': create a new record with new values, or copy an existing record in your database\n"
"- 'Write on a Record': update the values of a record\n"
"- 'Execute several actions': define an action that triggers several other server actions\n"
"- 'Send Email': automatically send an email (available in email_template)"),
'usage': fields.char('Action Usage', size=32),
'type': fields.char('Action Type', size=32, required=True),
'srcmodel_id': fields.many2one('ir.model', 'Model', help="Object in which you want to create / write the object. If it is empty then refer to the Object field."),
'fields_lines': fields.one2many('ir.server.object.lines', 'server_id', 'Field Mappings.'),
'record_id':fields.many2one('ir.model.fields', 'Create Id', help="Provide the field name where the record id is stored after the create operations. If it is empty, you can not track the new record."),
'write_id':fields.char('Write Id', size=256, help="Provide the field name that the record id refers to for the write operation. If it is empty it will refer to the active id of the object."),
'loop_action':fields.many2one('ir.actions.server', 'Loop Action', help="Select the action that will be executed. Loop action will not be avaliable inside loop."),
'expression':fields.char('Loop Expression', size=512, help="Enter the field/expression that will return the list. E.g. select the sale order in Object, and you can have loop on the sales order line. Expression = `object.order_line`."),
'copy_object': fields.reference('Copy Of', selection=_select_objects, size=256),
# Generic
'sequence': fields.integer('Sequence',
help="When dealing with multiple actions, the execution order is "
"based on the sequence. Low number means high priority."),
'model_id': fields.many2one('ir.model', 'Base Model', required=True, ondelete='cascade',
help="Base model on which the server action runs."),
'menu_ir_values_id': fields.many2one('ir.values', 'More Menu entry', readonly=True,
help='More menu entry.'),
# Client Action
'action_id': fields.many2one('ir.actions.actions', 'Client Action',
help="Select the client action that has to be executed."),
# Python code
'code': fields.text('Python Code',
help="Write Python code that the action will execute. Some variables are "
"available for use; help about pyhon expression is given in the help tab."),
# Workflow signal
'use_relational_model': fields.selection([('base', 'Use the base model of the action'),
('relational', 'Use a relation field on the base model')],
string='Target Model', required=True),
'wkf_transition_id': fields.many2one('workflow.transition', string='Signal to Trigger',
help="Select the workflow signal to trigger."),
'wkf_model_id': fields.many2one('ir.model', 'Target Model',
help="The model that will receive the workflow signal. Note that it should have a workflow associated with it."),
'wkf_model_name': fields.related('wkf_model_id', 'model', type='char', string='Target Model Name', store=True, readonly=True),
'wkf_field_id': fields.many2one('ir.model.fields', string='Relation Field',
help="The field on the current object that links to the target object record (must be a many2one, or an integer field with the record ID)"),
# Multi
'child_ids': fields.many2many('ir.actions.server', 'rel_server_actions',
'server_id', 'action_id',
string='Child Actions',
help='Child server actions that will be executed. Note that the last return returned action value will be used as global return value.'),
# Create/Copy/Write
'use_create': fields.selection([('new', 'Create a new record in the Base Model'),
('new_other', 'Create a new record in another model'),
('copy_current', 'Copy the current record'),
('copy_other', 'Choose and copy a record in the database')],
string="Creation Policy", required=True,
'crud_model_id': fields.many2one('ir.model', 'Target Model',
help="Model for record creation / update. Set this field only to specify a different model than the base model."),
'crud_model_name': fields.related('crud_model_id', 'model', type='char',
string='Create/Write Target Model Name',
store=True, readonly=True),
'ref_object': fields.reference('Reference record', selection=_select_objects, size=128,
'link_new_record': fields.boolean('Attach the new record',
help="Check this if you want to link the newly-created record "
"to the current record on which the server action runs."),
'link_field_id': fields.many2one('ir.model.fields', 'Link using field',
help="Provide the field where the record id is stored after the operations."),
'use_write': fields.selection([('current', 'Update the current record'),
('expression', 'Update a record linked to the current record using python'),
('other', 'Choose and Update a record in the database')],
string='Update Policy', required=True,
'write_expression': fields.char('Expression',
help="Provide an expression that, applied on the current record, gives the field to update."),
'fields_lines': fields.one2many('ir.server.object.lines', 'server_id',
string='Value Mapping',
# Fake fields used to implement the placeholder assistant
'model_object_field': fields.many2one('ir.model.fields', string="Field",
help="Select target field from the related document model.\n"
"If it is a relationship field you will be able to select "
"a target field at the destination of the relationship."),
'sub_object': fields.many2one('ir.model', 'Sub-model', readonly=True,
help="When a relationship field is selected as first field, "
"this field shows the document model the relationship goes to."),
'sub_model_object_field': fields.many2one('ir.model.fields', 'Sub-field',
help="When a relationship field is selected as first field, "
"this field lets you select the target field within the "
"destination document model (sub-model)."),
'copyvalue': fields.char('Placeholder Expression', help="Final placeholder expression, to be copy-pasted in the desired template field."),
# Fake fields used to implement the ID finding assistant
'id_object': fields.reference('Record', selection=_select_objects, size=128),
'id_value': fields.char('Record ID'),
_defaults = {
'state': 'dummy',
'state': 'code',
'condition': 'True',
'type': 'ir.actions.server',
'sequence': 5,
@ -551,246 +607,427 @@ class actions_server(osv.osv):
# - self: ORM model of the record on which the action is triggered
# - object: browse_record of the record on which the action is triggered if there is one, otherwise None
# - pool: ORM model pool (i.e. self.pool)
# - time: Python time module
# - cr: database cursor
# - uid: current user id
# - context: current context
# If you plan to return an action, assign: action = {...}
# - time: Python time module
# If you plan to return an action, assign: action = {...}""",
'use_relational_model': 'base',
'use_create': 'new',
'use_write': 'current',
def get_email(self, cr, uid, action, context):
obj_pool = self.pool[action.model_id.model]
id = context.get('active_id')
obj = obj_pool.browse(cr, uid, id)
def _check_expression(self, cr, uid, expression, model_id, context):
""" Check python expression (condition, write_expression). Each step of
the path must be a valid many2one field, or an integer field for the last
fields = None
:param str expression: a python expression, beginning by 'obj' or 'object'
:param int model_id: the base model of the server action
:returns tuple: (is_valid, target_model_name, error_msg)
if not model_id:
return (False, None, 'Your expression cannot be validated because the Base Model is not set.')
# fetch current model
current_model_name = self.pool.get('ir.model').browse(cr, uid, model_id, context).model
# transform expression into a path that should look like 'object.many2onefield.many2onefield'
path = expression.split('.')
initial = path.pop(0)
if initial not in ['obj', 'object']:
return (False, None, 'Your expression should begin with obj or object.\nAn expression builder is available in the help tab.')
# analyze path
while path:
step = path.pop(0)
column_info = self.pool[current_model_name]._all_columns.get(step)
if not column_info:
return (False, None, 'Part of the expression (%s) is not recognized as a column in the model %s.' % (step, current_model_name))
column_type = column_info.column._type
if column_type not in ['many2one', 'int']:
return (False, None, 'Part of the expression (%s) is not a valid column type (is %s, should be a many2one or an int)' % (step, column_type))
if column_type == 'int' and path:
return (False, None, 'Part of the expression (%s) is an integer field that is only allowed at the end of an expression' % (step))
if column_type == 'many2one':
current_model_name = column_info.column._obj
return (True, current_model_name, None)
if '/' in action.email.complete_name:
fields = action.email.complete_name.split('/')
elif '.' in action.email.complete_name:
fields = action.email.complete_name.split('.')
def _check_write_expression(self, cr, uid, ids, context=None):
for record in self.browse(cr, uid, ids, context=context):
if record.write_expression and record.model_id:
correct, model_name, message = self._check_expression(cr, uid, record.write_expression, record.model_id.id, context=context)
if not correct:
_logger.warning('Invalid expression: %s' % message)
return False
return True
for field in fields:
obj = getattr(obj, field)
except Exception:
_logger.exception('Failed to parse: %s', field)
_constraints = [
'Incorrect Write Record Expression',
(partial(osv.Model._check_m2m_recursion, field_name='child_ids'),
'Recursion found in child server actions',
return obj
def on_change_model_id(self, cr, uid, ids, model_id, wkf_model_id, crud_model_id, context=None):
""" When changing the action base model, reset workflow and crud config
to ease value coherence. """
values = {
'use_create': 'new',
'use_write': 'current',
'use_relational_model': 'base',
'wkf_model_id': model_id,
'wkf_field_id': False,
'crud_model_id': model_id,
return {'value': values}
def get_mobile(self, cr, uid, action, context):
obj_pool = self.pool[action.model_id.model]
id = context.get('active_id')
obj = obj_pool.browse(cr, uid, id)
def on_change_wkf_wonfig(self, cr, uid, ids, use_relational_model, wkf_field_id, wkf_model_id, model_id, context=None):
""" Update workflow type configuration
fields = None
- update the workflow model (for base (model_id) /relational (field.relation))
- update wkf_transition_id to False if workflow model changes, to force
the user to choose a new one
values = {}
if use_relational_model == 'relational' and wkf_field_id:
field = self.pool['ir.model.fields'].browse(cr, uid, wkf_field_id, context=context)
new_wkf_model_id = self.pool.get('ir.model').search(cr, uid, [('model', '=', field.relation)], context=context)[0]
values['wkf_model_id'] = new_wkf_model_id
values['wkf_model_id'] = model_id
return {'value': values}
if '/' in action.mobile.complete_name:
fields = action.mobile.complete_name.split('/')
elif '.' in action.mobile.complete_name:
fields = action.mobile.complete_name.split('.')
def on_change_wkf_model_id(self, cr, uid, ids, wkf_model_id, context=None):
""" When changing the workflow model, update its stored name also """
wkf_model_name = False
if wkf_model_id:
wkf_model_name = self.pool.get('ir.model').browse(cr, uid, wkf_model_id, context).model
values = {'wkf_transition_id': False, 'wkf_model_name': wkf_model_name}
return {'value': values}
for field in fields:
obj = getattr(obj, field)
except Exception:
_logger.exception('Failed to parse: %s', field)
def on_change_crud_config(self, cr, uid, ids, state, use_create, use_write, ref_object, crud_model_id, model_id, context=None):
""" Wrapper on CRUD-type (create or write) on_change """
if state == 'object_create':
return self.on_change_create_config(cr, uid, ids, use_create, ref_object, crud_model_id, model_id, context=context)
elif state == 'object_write':
return self.on_change_write_config(cr, uid, ids, use_write, ref_object, crud_model_id, model_id, context=context)
return {}
return obj
def on_change_create_config(self, cr, uid, ids, use_create, ref_object, crud_model_id, model_id, context=None):
""" When changing the object_create type configuration:
def merge_message(self, cr, uid, keystr, action, context=None):
if context is None:
context = {}
- `new` and `copy_current`: crud_model_id is the same as base model
- `new_other`: user choose crud_model_id
- `copy_other`: disassemble the reference object to have its model
- if the target model has changed, then reset the link field that is
probably not correct anymore
values = {}
if use_create == 'new':
values['crud_model_id'] = model_id
elif use_create == 'new_other':
elif use_create == 'copy_current':
values['crud_model_id'] = model_id
elif use_create == 'copy_other' and ref_object:
ref_model, ref_id = ref_object.split(',')
ref_model_id = self.pool['ir.model'].search(cr, uid, [('model', '=', ref_model)], context=context)[0]
values['crud_model_id'] = ref_model_id
def merge(match):
obj_pool = self.pool[action.model_id.model]
id = context.get('active_id')
obj = obj_pool.browse(cr, uid, id)
exp = str(match.group()[2:-2]).strip()
result = eval(exp,
'object': obj,
'context': dict(context), # copy context to prevent side-effects of eval
'time': time,
if values.get('crud_model_id') != crud_model_id:
values['link_field_id'] = False
return {'value': values}
def on_change_write_config(self, cr, uid, ids, use_write, ref_object, crud_model_id, model_id, context=None):
""" When changing the object_write type configuration:
- `current`: crud_model_id is the same as base model
- `other`: disassemble the reference object to have its model
- `expression`: has its own on_change, nothing special here
values = {}
if use_write == 'current':
values['crud_model_id'] = model_id
elif use_write == 'other' and ref_object:
ref_model, ref_id = ref_object.split(',')
ref_model_id = self.pool['ir.model'].search(cr, uid, [('model', '=', ref_model)], context=context)[0]
values['crud_model_id'] = ref_model_id
elif use_write == 'expression':
if values.get('crud_model_id') != crud_model_id:
values['link_field_id'] = False
return {'value': values}
def on_change_write_expression(self, cr, uid, ids, write_expression, model_id, context=None):
""" Check the write_expression and update crud_model_id accordingly """
values = {}
valid, model_name, message = self._check_expression(cr, uid, write_expression, model_id, context=context)
if valid:
ref_model_id = self.pool['ir.model'].search(cr, uid, [('model', '=', model_name)], context=context)[0]
values['crud_model_id'] = ref_model_id
return {'value': values}
if not message:
message = 'Invalid expression'
return {
'warning': {
'title': 'Incorrect expression',
'message': message,
def on_change_crud_model_id(self, cr, uid, ids, crud_model_id, context=None):
""" When changing the CRUD model, update its stored name also """
crud_model_name = False
if crud_model_id:
crud_model_name = self.pool.get('ir.model').browse(cr, uid, crud_model_id, context).model
values = {'link_field_id': False, 'crud_model_name': crud_model_name}
return {'value': values}
def _build_expression(self, field_name, sub_field_name):
""" Returns a placeholder expression for use in a template field,
based on the values provided in the placeholder assistant.
:param field_name: main field name
:param sub_field_name: sub field name (M2O)
:return: final placeholder expression
expression = ''
if field_name:
expression = "object." + field_name
if sub_field_name:
expression += "." + sub_field_name
return expression
def onchange_sub_model_object_value_field(self, cr, uid, ids, model_object_field, sub_model_object_field=False, context=None):
result = {
'sub_object': False,
'copyvalue': False,
'sub_model_object_field': False,
if model_object_field:
fields_obj = self.pool.get('ir.model.fields')
field_value = fields_obj.browse(cr, uid, model_object_field, context)
if field_value.ttype in ['many2one', 'one2many', 'many2many']:
res_ids = self.pool.get('ir.model').search(cr, uid, [('model', '=', field_value.relation)], context=context)
sub_field_value = False
if sub_model_object_field:
sub_field_value = fields_obj.browse(cr, uid, sub_model_object_field, context)
if res_ids:
'sub_object': res_ids[0],
'copyvalue': self._build_expression(field_value.name, sub_field_value and sub_field_value.name or False),
'sub_model_object_field': sub_model_object_field or False,
if result in (None, False):
return str("--------")
return tools.ustr(result)
'copyvalue': self._build_expression(field_value.name, False),
return {'value': result}
com = re.compile('(\[\[.+?\]\])')
message = com.sub(merge, keystr)
def onchange_id_object(self, cr, uid, ids, id_object, context=None):
if id_object:
ref_model, ref_id = id_object.split(',')
return {'value': {'id_value': ref_id}}
return {'value': {'id_value': False}}
return message
def create_action(self, cr, uid, ids, context=None):
""" Create a contextual action for each of the server actions. """
for action in self.browse(cr, uid, ids, context=context):
ir_values_id = self.pool.get('ir.values').create(cr, SUPERUSER_ID, {
'name': _('Run %s') % action.name,
'model': action.model_id.model,
'key2': 'client_action_multi',
'value': "ir.actions.server,%s" % action.id,
}, context)
'menu_ir_values_id': ir_values_id,
# Context should contains:
# ids : original ids
# id : current id of the object
# OUT:
# False : Finished correctly
# ACTION_ID : Action to launch
return True
def unlink_action(self, cr, uid, ids, context=None):
""" Remove the contextual actions created for the server actions. """
for action in self.browse(cr, uid, ids, context=context):
if action.menu_ir_values_id:
self.pool.get('ir.values').unlink(cr, SUPERUSER_ID, action.menu_ir_values_id.id, context)
except Exception:
raise osv.except_osv(_('Warning'), _('Deletion of the action record failed.'))
return True
def run_action_client_action(self, cr, uid, action, eval_context=None, context=None):
if not action.action_id:
raise osv.except_osv(_('Error'), _("Please specify an action to launch!"))
return self.pool[action.action_id.type].read(cr, uid, action.action_id.id, context=context)
def run_action_code(self, cr, uid, action, eval_context=None, context=None):
eval(action.code.strip(), eval_context, mode="exec", nocopy=True) # nocopy allows to return 'action'
if 'action' in eval_context:
return eval_context['action']
def run_action_trigger(self, cr, uid, action, eval_context=None, context=None):
""" Trigger a workflow signal, depending on the use_relational_model:
- `base`: base_model_pool.signal_<TRIGGER_NAME>(cr, uid, context.get('active_id'))
- `relational`: find the related model and object, using the relational
field, then target_model_pool.signal_<TRIGGER_NAME>(cr, uid, target_id)
obj_pool = self.pool[action.model_id.model]
if action.use_relational_model == 'base':
target_id = context.get('active_id')
target_pool = obj_pool
value = getattr(obj_pool.browse(cr, uid, context.get('active_id'), context=context), action.wkf_field_id.name)
if action.wkf_field_id.ttype == 'many2one':
target_id = value.id
target_id = value
target_pool = self.pool[action.wkf_model_id.model]
trigger_name = action.wkf_transition_id.signal
workflow.trg_validate(uid, target_pool._name, target_id, trigger_name, cr)
def run_action_multi(self, cr, uid, action, eval_context=None, context=None):
res = []
for act in action.child_ids:
result = self.run(cr, uid, [act.id], context)
if result:
return res and res[0] or False
def run_action_object_write(self, cr, uid, action, eval_context=None, context=None):
""" Write server action.
- 1. evaluate the value mapping
- 2. depending on the write configuration:
- `current`: id = active_id
- `other`: id = from reference object
- `expression`: id = from expression evaluation
res = {}
for exp in action.fields_lines:
if exp.type == 'equation':
expr = eval(exp.value, eval_context)
expr = exp.value
res[exp.col1.name] = expr
if action.use_write == 'current':
model = action.model_id.model
ref_id = context.get('active_id')
elif action.use_write == 'other':
model = action.crud_model_id.model
ref_id = action.ref_object.id
elif action.use_write == 'expression':
model = action.crud_model_id.model
ref = eval(action.write_expression, eval_context)
if isinstance(ref, browse_record):
ref_id = getattr(ref, 'id')
ref_id = int(ref)
obj_pool = self.pool[model]
obj_pool.write(cr, uid, [ref_id], res, context=context)
def run_action_object_create(self, cr, uid, action, eval_context=None, context=None):
""" Create and Copy server action.
- 1. evaluate the value mapping
- 2. depending on the write configuration:
- `new`: new record in the base model
- `copy_current`: copy the current record (id = active_id) + gives custom values
- `new_other`: new record in target model
- `copy_other`: copy the current record (id from reference object)
+ gives custom values
res = {}
for exp in action.fields_lines:
if exp.type == 'equation':
expr = eval(exp.value, eval_context)
expr = exp.value
res[exp.col1.name] = expr
if action.use_create in ['new', 'copy_current']:
model = action.model_id.model
elif action.use_create in ['new_other', 'copy_other']:
model = action.crud_model_id.model
obj_pool = self.pool[model]
if action.use_create == 'copy_current':
ref_id = context.get('active_id')
res_id = obj_pool.copy(cr, uid, ref_id, res, context=context)
elif action.use_create == 'copy_other':
ref_id = action.ref_object.id
res_id = obj_pool.copy(cr, uid, ref_id, res, context=context)
res_id = obj_pool.create(cr, uid, res, context=context)
if action.link_new_record and action.link_field_id:
self.pool[action.model_id.model].write(cr, uid, [context.get('active_id')], {action.link_field_id.name: res_id})
# FIXME: refactor all the eval() calls in run()!
def run(self, cr, uid, ids, context=None):
""" Run the server action. For each server action, the condition is
checked. Note that A void (aka False) condition is considered as always
valid. If it is verified, the run_action_<STATE> method is called. This
allows easy inheritance of the server actions.
:param dict context: context should contain following keys
- active_id: id of the current object (single mode)
- active_model: current model that should equal the action's model
The following keys are optional:
- active_ids: ids of the current records (mass mode). If active_ids
and active_id are present, active_ids is given precedence.
:return: an action_id to be executed, or False is finished correctly without
return action
if context is None:
context = {}
res = False
user = self.pool.get('res.users').browse(cr, uid, uid)
active_ids = context.get('active_ids', [context.get('active_id', None)])
for action in self.browse(cr, uid, ids, context):
obj = None
obj_pool = self.pool[action.model_id.model]
if context.get('active_model') == action.model_id.model and context.get('active_id'):
for active_id in active_ids:
if context.get('active_model') == action.model_id.model and active_id:
obj = obj_pool.browse(cr, uid, context['active_id'], context=context)
cxt = {
# run context dedicated to a particular active_id
run_context = dict(context, active_ids=[active_id], active_id=active_id)
# evaluation context for python strings to evaluate
eval_context = {
'self': obj_pool,
'object': obj,
'obj': obj,
'pool': self.pool,
'time': time,
'cr': cr,
'context': dict(context), # copy context to prevent side-effects of eval
'context': dict(run_context), # copy context to prevent side-effects of eval
'uid': uid,
'user': user
expr = eval(str(action.condition), cxt)
# evaluate the condition, with the specific case that a void (aka False) condition is considered as True
condition = action.condition
if action.condition is False:
condition = True
expr = eval(str(condition), eval_context)
if not expr:
if action.state=='client_action':
if not action.action_id:
raise osv.except_osv(_('Error'), _("Please specify an action to launch!"))
return self.pool[action.action_id.type].read(cr, uid, action.action_id.id, context=context)
if action.state=='code':
eval(action.code.strip(), cxt, mode="exec", nocopy=True) # nocopy allows to return 'action'
if 'action' in cxt:
return cxt['action']
if action.state == 'email':
email_from = config['email_from']
if not email_from:
_logger.debug('--email-from command line option is not specified, using a fallback value instead.')
if user.email:
email_from = user.email
email_from = "%s@%s" % (user.login, gethostname())
address = eval(str(action.email), cxt)
except Exception:
address = str(action.email)
if not address:
_logger.info('No partner email address specified, not sending any email.')
# handle single and multiple recipient addresses
addresses = address if isinstance(address, (tuple, list)) else [address]
subject = self.merge_message(cr, uid, action.subject, action, context)
body = self.merge_message(cr, uid, action.message, action, context)
ir_mail_server = self.pool.get('ir.mail_server')
msg = ir_mail_server.build_email(email_from, addresses, subject, body)
res_email = ir_mail_server.send_email(cr, uid, msg)
if res_email:
_logger.info('Email successfully sent to: %s', addresses)
_logger.warning('Failed to send email to: %s', addresses)
if action.state == 'trigger':
model = action.wkf_model_id.model
m2o_field_name = action.trigger_obj_id.name
target_id = obj_pool.read(cr, uid, context.get('active_id'), [m2o_field_name])[m2o_field_name]
target_id = target_id[0] if isinstance(target_id,tuple) else target_id
openerp.workflow.trg_validate(uid, model, int(target_id), action.trigger_name, cr)
if action.state == 'sms':
#TODO: set the user and password from the system
# for the sms gateway user / password
# USE smsclient module from extra-addons
_logger.warning('SMS Facility has not been implemented yet. Use smsclient module!')
if action.state == 'other':
res = []
for act in action.child_ids:
context['active_id'] = context['active_ids'][0]
result = self.run(cr, uid, [act.id], context)
if result:
# call the method related to the action: run_action_<STATE>
if hasattr(self, 'run_action_%s' % action.state):
res = getattr(self, 'run_action_%s' % action.state)(cr, uid, action, eval_context=eval_context, context=run_context)
return res
if action.state == 'loop':
expr = eval(str(action.expression), cxt)
context['object'] = obj
for i in expr:
context['active_id'] = i.id
self.run(cr, uid, [action.loop_action.id], context)
if action.state == 'object_write':
res = {}
for exp in action.fields_lines:
euq = exp.value
if exp.type == 'equation':
expr = eval(euq, cxt)
expr = exp.value
res[exp.col1.name] = expr
if not action.write_id:
if not action.srcmodel_id:
obj_pool = self.pool[action.model_id.model]
obj_pool.write(cr, uid, [context.get('active_id')], res)
write_id = context.get('active_id')
obj_pool = self.pool[action.srcmodel_id.model]
obj_pool.write(cr, uid, [write_id], res)
elif action.write_id:
obj_pool = self.pool[action.srcmodel_id.model]
rec = self.pool[action.model_id.model].browse(cr, uid, context.get('active_id'))
id = eval(action.write_id, {'object': rec})
id = int(id)
raise osv.except_osv(_('Error'), _("Problem in configuration `Record Id` in Server Action!"))
if type(id) != type(1):
raise osv.except_osv(_('Error'), _("Problem in configuration `Record Id` in Server Action!"))
write_id = id
obj_pool.write(cr, uid, [write_id], res)
if action.state == 'object_create':
res = {}
for exp in action.fields_lines:
euq = exp.value
if exp.type == 'equation':
expr = eval(euq, cxt)
expr = exp.value
res[exp.col1.name] = expr
obj_pool = self.pool[action.srcmodel_id.model]
res_id = obj_pool.create(cr, uid, res)
if action.record_id:
self.pool[action.model_id.model].write(cr, uid, [context.get('active_id')], {action.record_id.name:res_id})
if action.state == 'object_copy':
res = {}
for exp in action.fields_lines:
euq = exp.value
if exp.type == 'equation':
expr = eval(euq, cxt)
expr = exp.value
res[exp.col1.name] = expr
model = action.copy_object.split(',')[0]
cid = action.copy_object.split(',')[1]
obj_pool = self.pool[model]
obj_pool.copy(cr, uid, int(cid), res)
return False
class act_window_close(osv.osv):
_name = 'ir.actions.act_window_close'

View File

@ -309,10 +309,25 @@
<field name="model">ir.actions.server</field>
<field name="arch" type="xml">
<form string="Server Action" version="7.0">
<div class="oe_title">
<label for="name" class="oe_edit_only"/>
<h1><field name="name"/></h1>
<div class="oe_right oe_button_box" name="buttons">
<field name="menu_ir_values_id" invisible="1"/>
<button name="create_action" string="Add in the 'More' menu" type="object"
help="Display an option on related documents to run this sever action"/>
<button name="unlink_action" string="Remove from the 'More' menu" type="object"
help="Remove the contextual action related to this server action"/>
<field name="name"/>
<field name="model_id"/>
<field name="type" invisible="1"/>
<field name="model_id"
on_change="on_change_model_id(model_id, wkf_model_id, crud_model_id)"/>
<field name="state"/>
@ -321,74 +336,181 @@
<notebook colspan="4">
<page string="Python Code" attrs="{'invisible':[('state','!=','code')]}">
<field name="code"/>
<page string="Python Code" name='code' autofocus="autofocus"
attrs="{'invisible': [('state', '!=', 'code')]}">
<field name="code" placeholder="Enter Python code here. Help about Python expression is available in the help tab of this document."/>
<page string="Trigger" attrs="{'invisible':[('state','!=','trigger')]}">
<group string="Trigger Configuration" col="4">
<field name="wkf_model_id" attrs="{'required':[('state','=','trigger')]}"/>
<field name="trigger_obj_id" context="{'key':''}"
<field name="trigger_name" attrs="{'required':[('state','=','trigger')]}"/>
<page string="Worflow Signal" autofocus="autofocus"
attrs="{'invisible': [('state', '!=', 'trigger')]}">
<p attrs="{'invisible': [('model_id', '!=', False)]}">
Please set the Base Model before setting the action details.
<group attrs="{'invisible': [('model_id', '=', False)]}">
<field name="use_relational_model" widget="radio"
on_change="on_change_wkf_wonfig(use_relational_model, wkf_field_id, wkf_model_id, model_id)"
attrs="{'readonly': [('model_id', '=', False)]}"/>
<field name="wkf_field_id" context="{'key': ''}"
on_change="on_change_wkf_wonfig(use_relational_model, wkf_field_id, wkf_model_id, model_id)"
attrs="{'required': [('state', '=', 'trigger'), ('use_relational_model', '=', 'relational')],
'invisible': [('use_relational_model', '=', 'base')]}"
domain="[('model_id', '=', model_id), ('ttype', 'in', ['many2one'])]"/>
<field name="wkf_model_id" invisible="1"
<field name="wkf_model_name" invisible="1"/>
<field name="wkf_transition_id" attrs="{'required': [('state', '=', 'trigger')]}"
domain="[('wkf_id.osv', '=', wkf_model_name)]"/>
<page string="Action to Launch" attrs="{'invisible':[('state','!=','client_action')]}">
<page string="Client" autofocus="autofocus"
attrs="{'invisible': [('state', '!=', 'client_action')]}">
<field name="action_id" attrs="{'required':[('state', '=', 'client_action')]}"/>
<page string="Email Configuration" attrs="{'invisible':[('state','!=','email')]}">
<page string="Create / Write / Copy" autofocus="autofocus"
attrs="{'invisible':[('state', 'not in', ['object_create', 'object_write'])]}">
<p attrs="{'invisible': [('model_id', '!=', False)]}">
Please set the Base Model before setting the action details.
<group attrs="{'invisible': [('model_id', '=', False)]}">
<field name="use_create" widget="radio"
on_change="on_change_crud_config(state, use_create, use_write, ref_object, crud_model_id, model_id)"
attrs="{'invisible': [('state', '!=', 'object_create')]}"/>
<field name="use_write" widget="radio"
on_change="on_change_crud_config(state, use_create, use_write, ref_object, crud_model_id, model_id)"
attrs="{'invisible': [('state', '!=', 'object_write')]}"/>
<label for="ref_object" string=" "
attrs="{'invisible': ['&amp;',
'|', ('state', '!=', 'object_write'), ('use_write', '!=', 'other'),
'|', ('state', '!=', 'object_create'), ('use_create', '!=', 'copy_other')]}"/>
<div style="margin-left: 24px;"
attrs="{'invisible': ['&amp;',
'|', ('state', '!=', 'object_write'), ('use_write', '!=', 'other'),
'|', ('state', '!=', 'object_create'), ('use_create', '!=', 'copy_other')]}">
<field name="ref_object" nolabel="1"
on_change="on_change_crud_config(state, use_create, use_write, ref_object, crud_model_id, model_id)"/>
<field name="crud_model_id"
attrs="{'invisible': ['|', ('state', '!=', 'object_create'), ('use_create', '!=', 'new_other')]}"/>
<field name="crud_model_name" invisible="1"/>
<label for="link_new_record" attrs="{'invisible': [('state', '!=', 'object_create')]}"/>
<div attrs="{'invisible': [('state', '!=', 'object_create')]}">
<field name="link_new_record" nolabel="1" style="display: inline-block;"/>
<p class="oe_grey oe_edit_only" style="display: inline-block; margin: 0px 0px 0px 8px;">
Check to attach the newly created record to the record on which the server action runs.
<field name="email" domain="[('model_id','=',model_id)]" attrs="{'required':[('state','=','email')]}"/>
<field name="subject" attrs="{'required':[('state','=','email')]}"/>
<field name="message" attrs="{'required':[('state','=','email')]}"/>
<label colspan="2" string="Access all the fields related to the current object using expressions, i.e. object.partner_id.name " align="0.0"/>
<field name="link_field_id"
domain="[('model_id', '=', model_id), ('relation', '=', crud_model_name), ('ttype', 'in', ['many2one'])]"
attrs="{'required': [('state', '=', 'object_create'), ('link_new_record', '=', True)],
'invisible': ['|', ('state', '!=', 'object_create'), ('link_new_record', '=', False)]}"/>
<page string="SMS Configuration" attrs="{'invisible':[('state','!=','sms')]}">
<field name="mobile" domain="[('model_id','=',model_id)]" attrs="{'required':[('state','=','sms')]}"/>
<field name="sms" attrs="{'required':[('state','=','sms')]}"/>
<label string="Access all the fields related to the current object using expressions, i.e. object.partner_id.name " align="0.0"/>
<page string="Create / Write / Copy" attrs="{'invisible':[('state','!=','object_create'), ('state','!=','object_write'), ('state','!=','object_copy')]}">
<group col="4" string="Fields Mapping">
<field name="srcmodel_id" attrs="{'required':[('state','!=','dummy'), ('state','!=','sms'), ('state','!=','code'), ('state','!=','loop'), ('state','!=','trigger'), ('state','!=','object_copy'), ('state','!=','client_action'), ('state','!=','email'), ('state','!=','sms'), ('state','!=','other')]}"/>
<field name="copy_object" on_change="change_object(copy_object, state)" attrs="{'required':[('state','!=','dummy'), ('state','!=','sms'), ('state','!=','code'), ('state','!=','loop'), ('state','!=','trigger'), ('state','!=','object_write'), ('state','!=','object_create'), ('state','!=','client_action'), ('state','!=','email'), ('state','!=','sms'), ('state','!=','other')]}"/>
<field name="fields_lines" nolabel="1" colspan="2">
<label for="link_new_record" attrs="{'invisible': ['|', ('state', '!=', 'object_write'), ('use_write', '!=', 'expression')]}"/>
<div attrs="{'invisible': ['|', ('state', '!=', 'object_write'), ('use_write', '!=', 'expression')]}">
<p class="oe_grey oe_edit_only" style="margin: 0px;">
Write a python expression, beginning with object, that gives the record to update. An expression builder is available in the help tab. Examples:
<ul class="oe_grey oe_edit_only">
<field name="write_expression"
on_change="on_change_write_expression(write_expression, model_id)"
attrs="{'required': [('state', '=', 'object_write'), ('use_write', '=', 'expression')]}"/>
<field name="fields_lines">
<tree string="Field Mappings" editable="top">
<field name="col1" domain="[('model_id','=',parent.srcmodel_id or parent.model_id)]"/>
<field name="col1" domain="[('model_id', '=', parent.crud_model_id)]"/>
<field name="type"/>
<field name="value" colspan="4"/>
<field name="value"/>
<form string="Field Mapping" version="7.0">
<group col="4">
<field name="col1" domain="[('model_id','=',parent.srcmodel_id or parent.model_id)]"/>
<group >
<field name="col1" domain="[('model_id', '=', parent.crud_model_id)]"/>
<field name="type"/>
<field name="value" colspan="4"/>
<field name="value"/>
<field name="record_id" attrs="{'readonly':[('state','!=','object_create')]}" domain="[('model_id','in',[model_id])]"/>
<field name="write_id" attrs="{'readonly':[('state','!=','object_write')]}"/>
<label string="If you use a formula type, use a python expression using the variable 'object'." align="0.0"/>
<page string="Iteration Actions" attrs="{'invisible':[('state','!=','loop')]}">
<group col="4">
<field name="expression" attrs="{'required':[('state','=','loop')]}"/>
<field name="loop_action" domain="[('state','!=','loop')]" attrs="{'required':[('state','=','loop')]}"/>
<page string="Multi Actions" attrs="{'invisible':[('state','!=','other')]}">
<field name="child_ids"/>
<label string="Only one client action will be executed, last client action will be considered in case of multiple client actions." align="0.0"/>
<page string="Execute several actions" autofocus="autofocus"
attrs="{'invisible': [('state', '!=', 'multi')]}">
<p class="oe_grey">
If several child actions return an action, only the last one will be executed.
This may happen when having server actions executing code that returns an action, or server actions returning a client action.
<field name="child_ids"
domain="[('model_id', '=', model_id)]"/>
<page string="Help">
<div style="margin-top: 4px;">
<h3>Help with Python expressions.</h3>
<p>Various fields may use Python code or Python expressions. The following variables can be used:</p>
<li>self: ORM model of the record on which the action is triggered</li>
<li>object or obj: browse_record of the record on which the action is triggered</li>
<li>pool: ORM model pool (i.e. self.pool)</li>
<li>time: Python time module</li>
<li>cr: database cursor</li>
<li>uid: current user id</li>
<li>context: current context</li>
<p>Example of condition expression using Python</p>
<li>condition: True</li>
<li>condition: object.list_price > 5000</li>
<div attrs="{'invisible': [('state', '!=', 'code')]}">
<p>Example of python code</p>
partner_name = obj.name + '_code'
self.pool["res.partner"].create(cr, uid, {"name": partner_name}, context=context)
<h3 colspan="2">Dynamic expression builder</h3>
<p colspan="2" attrs="{'invisible': [('model_id', '!=', False)]}">
Please set the Base Model of the action to enable the dynamic expression buidler.
<field name="model_object_field"
attrs="{'invisible': [('model_id', '=', False)]}"
domain="[('model_id', '=', model_id), ('ttype', '!=', 'one2many'), ('ttype', '!=', 'many2many')]"
<field name="sub_object" readonly="1" attrs="{'invisible': [('model_id', '=', False)]}"/>
<field name="sub_model_object_field"
domain="[('model_id', '=', sub_object), ('ttype', '!=', 'one2many'), ('ttype', '!=', 'many2many')]"
'invisible': [('model_id', '=', False)]}"
<field name="copyvalue" attrs="{'invisible': [('model_id', '=', False)]}"/>
<h3 colspan="2">Find the ID of a record in the database</h3>
<field name="id_object" on_change="onchange_id_object(id_object)"/>
<field name="id_value" />
<field name="type" readonly="1"/>

View File

@ -1,5 +1,6 @@
import test_base
import test_expression
import test_ir_actions
import test_ir_attachment
import test_ir_values
import test_menu
@ -10,6 +11,7 @@ import test_search
checks = [

View File

@ -0,0 +1,398 @@
import unittest2
from openerp.osv.orm import except_orm
import openerp.tests.common as common
from openerp.tools import mute_logger
class TestServerActionsBase(common.TransactionCase):
def setUp(self):
super(TestServerActionsBase, self).setUp()
cr, uid = self.cr, self.uid
# Models
self.ir_actions_server = self.registry('ir.actions.server')
self.ir_actions_client = self.registry('ir.actions.client')
self.ir_values = self.registry('ir.values')
self.ir_model = self.registry('ir.model')
self.ir_model_fields = self.registry('ir.model.fields')
self.res_partner = self.registry('res.partner')
self.res_country = self.registry('res.country')
# Data on which we will run the server action
self.test_country_id = self.res_country.create(cr, uid, {
'name': 'TestingCountry',
'code': 'TY',
'address_format': 'SuperFormat',
self.test_country = self.res_country.browse(cr, uid, self.test_country_id)
self.test_partner_id = self.res_partner.create(cr, uid, {
'name': 'TestingPartner',
'city': 'OrigCity',
'country_id': self.test_country_id,
self.test_partner = self.res_partner.browse(cr, uid, self.test_partner_id)
self.context = {
'active_id': self.test_partner_id,
'active_model': 'res.partner',
# Model data
self.res_partner_model_id = self.ir_model.search(cr, uid, [('model', '=', 'res.partner')])[0]
self.res_partner_name_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.partner'), ('name', '=', 'name')])[0]
self.res_partner_city_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.partner'), ('name', '=', 'city')])[0]
self.res_partner_country_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.partner'), ('name', '=', 'country_id')])[0]
self.res_partner_parent_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.partner'), ('name', '=', 'parent_id')])[0]
self.res_country_model_id = self.ir_model.search(cr, uid, [('model', '=', 'res.country')])[0]
self.res_country_name_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.country'), ('name', '=', 'name')])[0]
self.res_country_code_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.country'), ('name', '=', 'code')])[0]
# create server action to
self.act_id = self.ir_actions_server.create(cr, uid, {
'name': 'TestAction',
'condition': 'True',
'model_id': self.res_partner_model_id,
'state': 'code',
'code': 'obj.write({"comment": "MyComment"})',
class TestServerActions(TestServerActionsBase):
def test_00_action(self):
cr, uid = self.cr, self.uid
# Do: eval 'True' condition
self.ir_actions_server.run(cr, uid, [self.act_id], self.context)
self.assertEqual(self.test_partner.comment, 'MyComment', 'ir_actions_server: invalid condition check')
self.test_partner.write({'comment': False})
# Do: eval False condition, that should be considered as True (void = True)
self.ir_actions_server.write(cr, uid, [self.act_id], {'condition': False})
self.ir_actions_server.run(cr, uid, [self.act_id], self.context)
self.assertEqual(self.test_partner.comment, 'MyComment', 'ir_actions_server: invalid condition check')
# Do: create contextual action
self.ir_actions_server.create_action(cr, uid, [self.act_id])
# Test: ir_values created
ir_values_ids = self.ir_values.search(cr, uid, [('name', '=', 'Run TestAction')])
self.assertEqual(len(ir_values_ids), 1, 'ir_actions_server: create_action should have created an entry in ir_values')
ir_value = self.ir_values.browse(cr, uid, ir_values_ids[0])
self.assertEqual(ir_value.value, 'ir.actions.server,%s' % self.act_id, 'ir_actions_server: created ir_values should reference the server action')
self.assertEqual(ir_value.model, 'res.partner', 'ir_actions_server: created ir_values should be linked to the action base model')
# Do: remove contextual action
self.ir_actions_server.unlink_action(cr, uid, [self.act_id])
# Test: ir_values removed
ir_values_ids = self.ir_values.search(cr, uid, [('name', '=', 'Run TestAction')])
self.assertEqual(len(ir_values_ids), 0, 'ir_actions_server: unlink_action should remove the ir_values record')
def test_10_code(self):
cr, uid = self.cr, self.uid
self.ir_actions_server.write(cr, uid, self.act_id, {
'state': 'code',
'code': """partner_name = obj.name + '_code'
self.pool["res.partner"].create(cr, uid, {"name": partner_name}, context=context)"""
run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
self.assertFalse(run_res, 'ir_actions_server: code server action correctly finished should return False')
pids = self.res_partner.search(cr, uid, [('name', 'ilike', 'TestingPartner_code')])
self.assertEqual(len(pids), 1, 'ir_actions_server: 1 new partner should have been created')
def test_20_trigger(self):
cr, uid = self.cr, self.uid
# Data: code server action (at this point code-based actions should work)
act_id2 = self.ir_actions_server.create(cr, uid, {
'name': 'TestAction2',
'type': 'ir.actions.server',
'condition': 'True',
'model_id': self.res_partner_model_id,
'state': 'code',
'code': 'obj.write({"comment": "MyComment"})',
act_id3 = self.ir_actions_server.create(cr, uid, {
'name': 'TestAction3',
'type': 'ir.actions.server',
'condition': 'True',
'model_id': self.res_country_model_id,
'state': 'code',
'code': 'obj.write({"code": "ZZ"})',
# Data: create workflows
partner_wf_id = self.registry('workflow').create(cr, uid, {
'name': 'TestWorkflow',
'osv': 'res.partner',
'on_create': True,
partner_act1_id = self.registry('workflow.activity').create(cr, uid, {
'name': 'PartnerStart',
'wkf_id': partner_wf_id,
'flow_start': True
partner_act2_id = self.registry('workflow.activity').create(cr, uid, {
'name': 'PartnerTwo',
'wkf_id': partner_wf_id,
'kind': 'function',
'action': 'True',
'action_id': act_id2,
partner_trs1_id = self.registry('workflow.transition').create(cr, uid, {
'signal': 'partner_trans',
'act_from': partner_act1_id,
'act_to': partner_act2_id
country_wf_id = self.registry('workflow').create(cr, uid, {
'name': 'TestWorkflow',
'osv': 'res.country',
'on_create': True,
country_act1_id = self.registry('workflow.activity').create(cr, uid, {
'name': 'CountryStart',
'wkf_id': country_wf_id,
'flow_start': True
country_act2_id = self.registry('workflow.activity').create(cr, uid, {
'name': 'CountryTwo',
'wkf_id': country_wf_id,
'kind': 'function',
'action': 'True',
'action_id': act_id3,
country_trs1_id = self.registry('workflow.transition').create(cr, uid, {
'signal': 'country_trans',
'act_from': country_act1_id,
'act_to': country_act2_id
# Data: re-create country and partner to benefit from the workflows
self.test_country_id = self.res_country.create(cr, uid, {
'name': 'TestingCountry2',
'code': 'T2',
self.test_country = self.res_country.browse(cr, uid, self.test_country_id)
self.test_partner_id = self.res_partner.create(cr, uid, {
'name': 'TestingPartner2',
'country_id': self.test_country_id,
self.test_partner = self.res_partner.browse(cr, uid, self.test_partner_id)
self.context = {
'active_id': self.test_partner_id,
'active_model': 'res.partner',
# Run the action on partner object itself ('base')
self.ir_actions_server.write(cr, uid, [self.act_id], {
'state': 'trigger',
'use_relational_model': 'base',
'wkf_model_id': self.res_partner_model_id,
'wkf_model_name': 'res.partner',
'wkf_transition_id': partner_trs1_id,
self.ir_actions_server.run(cr, uid, [self.act_id], self.context)
self.assertEqual(self.test_partner.comment, 'MyComment', 'ir_actions_server: incorrect signal trigger')
# Run the action on related country object ('relational')
self.ir_actions_server.write(cr, uid, [self.act_id], {
'use_relational_model': 'relational',
'wkf_model_id': self.res_country_model_id,
'wkf_model_name': 'res.country',
'wkf_field_id': self.res_partner_country_field_id,
'wkf_transition_id': country_trs1_id,
self.ir_actions_server.run(cr, uid, [self.act_id], self.context)
self.assertEqual(self.test_country.code, 'ZZ', 'ir_actions_server: incorrect signal trigger')
# Clear workflow cache, otherwise openerp will try to create workflows even if it has been deleted
from openerp.workflow import clear_cache
clear_cache(cr, uid)
def test_30_client(self):
cr, uid = self.cr, self.uid
client_action_id = self.registry('ir.actions.client').create(cr, uid, {
'name': 'TestAction2',
'tag': 'Test',
self.ir_actions_server.write(cr, uid, [self.act_id], {
'state': 'client_action',
'action_id': client_action_id,
res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
self.assertEqual(res['name'], 'TestAction2', 'ir_actions_server: incorrect return result for a client action')
def test_40_crud_create(self):
cr, uid = self.cr, self.uid
_city = 'TestCity'
_name = 'TestNew'
# Do: create a new record in the same model and link it
self.ir_actions_server.write(cr, uid, [self.act_id], {
'state': 'object_create',
'use_create': 'new',
'link_new_record': True,
'link_field_id': self.res_partner_parent_field_id,
'fields_lines': [(0, 0, {'col1': self.res_partner_name_field_id, 'value': _name}),
(0, 0, {'col1': self.res_partner_city_field_id, 'value': _city})],
run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
# Test: new partner created
pids = self.res_partner.search(cr, uid, [('name', 'ilike', _name)])
self.assertEqual(len(pids), 1, 'ir_actions_server: TODO')
partner = self.res_partner.browse(cr, uid, pids[0])
self.assertEqual(partner.city, _city, 'ir_actions_server: TODO')
# Test: new partner linked
self.assertEqual(self.test_partner.parent_id.id, pids[0], 'ir_actions_server: TODO')
# Do: copy current record
self.ir_actions_server.write(cr, uid, [self.act_id], {'fields_lines': [[5]]})
self.ir_actions_server.write(cr, uid, [self.act_id], {
'state': 'object_create',
'use_create': 'copy_current',
'link_new_record': False,
'fields_lines': [(0, 0, {'col1': self.res_partner_name_field_id, 'value': 'TestCopyCurrent'}),
(0, 0, {'col1': self.res_partner_city_field_id, 'value': 'TestCity'})],
run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
# Test: new partner created
pids = self.res_partner.search(cr, uid, [('name', 'ilike', 'TestingPartner (copy)')]) # currently res_partner overrides default['name'] whatever its value
self.assertEqual(len(pids), 1, 'ir_actions_server: TODO')
partner = self.res_partner.browse(cr, uid, pids[0])
self.assertEqual(partner.city, 'TestCity', 'ir_actions_server: TODO')
self.assertEqual(partner.country_id.id, self.test_partner.country_id.id, 'ir_actions_server: TODO')
# Do: create a new record in another model
self.ir_actions_server.write(cr, uid, [self.act_id], {'fields_lines': [[5]]})
self.ir_actions_server.write(cr, uid, [self.act_id], {
'state': 'object_create',
'use_create': 'new_other',
'crud_model_id': self.res_country_model_id,
'link_new_record': False,
'fields_lines': [(0, 0, {'col1': self.res_country_name_field_id, 'value': 'obj.name', 'type': 'equation'}),
(0, 0, {'col1': self.res_country_code_field_id, 'value': 'obj.name[0:2]', 'type': 'equation'})],
run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
# Test: new country created
cids = self.res_country.search(cr, uid, [('name', 'ilike', 'TestingPartner')])
self.assertEqual(len(cids), 1, 'ir_actions_server: TODO')
country = self.res_country.browse(cr, uid, cids[0])
self.assertEqual(country.code, 'TE', 'ir_actions_server: TODO')
# Do: copy a record in another model
self.ir_actions_server.write(cr, uid, [self.act_id], {'fields_lines': [[5]]})
self.ir_actions_server.write(cr, uid, [self.act_id], {
'state': 'object_create',
'use_create': 'copy_other',
'crud_model_id': self.res_country_model_id,
'link_new_record': False,
'ref_object': 'res.country,%s' % self.test_country_id,
'fields_lines': [(0, 0, {'col1': self.res_country_name_field_id, 'value': 'NewCountry', 'type': 'value'}),
(0, 0, {'col1': self.res_country_code_field_id, 'value': 'NY', 'type': 'value'})],
run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
# Test: new country created
cids = self.res_country.search(cr, uid, [('name', 'ilike', 'NewCountry')])
self.assertEqual(len(cids), 1, 'ir_actions_server: TODO')
country = self.res_country.browse(cr, uid, cids[0])
self.assertEqual(country.code, 'NY', 'ir_actions_server: TODO')
self.assertEqual(country.address_format, 'SuperFormat', 'ir_actions_server: TODO')
def test_50_crud_write(self):
cr, uid = self.cr, self.uid
_name = 'TestNew'
# Do: create a new record in the same model and link it
self.ir_actions_server.write(cr, uid, [self.act_id], {
'state': 'object_write',
'use_write': 'current',
'fields_lines': [(0, 0, {'col1': self.res_partner_name_field_id, 'value': _name})],
run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
# Test: new partner created
pids = self.res_partner.search(cr, uid, [('name', 'ilike', _name)])
self.assertEqual(len(pids), 1, 'ir_actions_server: TODO')
partner = self.res_partner.browse(cr, uid, pids[0])
self.assertEqual(partner.city, 'OrigCity', 'ir_actions_server: TODO')
# Do: copy current record
self.ir_actions_server.write(cr, uid, [self.act_id], {'fields_lines': [[5]]})
self.ir_actions_server.write(cr, uid, [self.act_id], {
'use_write': 'other',
'crud_model_id': self.res_country_model_id,
'ref_object': 'res.country,%s' % self.test_country_id,
'fields_lines': [(0, 0, {'col1': self.res_country_name_field_id, 'value': 'obj.name', 'type': 'equation'})],
run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
# Test: new country created
cids = self.res_country.search(cr, uid, [('name', 'ilike', 'TestNew')])
self.assertEqual(len(cids), 1, 'ir_actions_server: TODO')
# Do: copy a record in another model
self.ir_actions_server.write(cr, uid, [self.act_id], {'fields_lines': [[5]]})
self.ir_actions_server.write(cr, uid, [self.act_id], {
'use_write': 'expression',
'crud_model_id': self.res_country_model_id,
'write_expression': 'object.country_id',
'fields_lines': [(0, 0, {'col1': self.res_country_name_field_id, 'value': 'NewCountry', 'type': 'value'})],
run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
# Test: new country created
cids = self.res_country.search(cr, uid, [('name', 'ilike', 'NewCountry')])
self.assertEqual(len(cids), 1, 'ir_actions_server: TODO')
@mute_logger('openerp.addons.base.ir.ir_model', 'openerp.osv.orm')
def test_60_multi(self):
cr, uid = self.cr, self.uid
# Data: 2 server actions that will be nested
act1_id = self.ir_actions_server.create(cr, uid, {
'name': 'Subaction1',
'model_id': self.res_partner_model_id,
'state': 'code',
'code': 'action = {"type": "ir.actions.act_window"}',
# Do: create a new record in the same model and link it
act2_id = self.ir_actions_server.create(cr, uid, {
'name': 'Subaction2',
'model_id': self.res_partner_model_id,
'state': 'object_create',
'use_create': 'copy_current',
self.ir_actions_server.write(cr, uid, [self.act_id], {
'state': 'multi',
'child_ids': [(6, 0, [act1_id, act2_id])],
# Do: run the action
res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
# Test: new partner created
pids = self.res_partner.search(cr, uid, [('name', 'ilike', 'TestingPartner (copy)')]) # currently res_partner overrides default['name'] whatever its value
self.assertEqual(len(pids), 1, 'ir_actions_server: TODO')
# Test: action returned
self.assertEqual(res.get('type'), 'ir.actions.act_window', '')
# Test loops
self.assertRaises(except_orm, self.ir_actions_server.write, cr, uid, [self.act_id], {
'child_ids': [(6, 0, [self.act_id])]
if __name__ == '__main__':

View File

@ -5114,6 +5114,40 @@ class BaseModel(object):
return False
return True
def _check_m2m_recursion(self, cr, uid, ids, field_name):
Verifies that there is no loop in a hierarchical structure of records,
by following the parent relationship using the **parent** field until a loop
is detected or until a top-level record is found.
:param cr: database cursor
:param uid: current user id
:param ids: list of ids of records to check
:param field_name: field to check
:return: **True** if the operation can proceed safely, or **False** if an infinite loop is detected.
field = self._all_columns.get(field_name)
field = field.column if field else None
if not field or field._type != 'many2many' or field._obj != self._name:
# field must be a many2many on itself
raise ValueError('invalid field_name: %r' % (field_name,))
query = 'SELECT distinct "%s" FROM "%s" WHERE "%s" IN %%s' % (field._id2, field._rel, field._id1)
ids_parent = ids[:]
while ids_parent:
ids_parent2 = []
for i in range(0, len(ids_parent), cr.IN_MAX):
j = i + cr.IN_MAX
sub_ids_parent = ids_parent[i:j]
cr.execute(query, (tuple(sub_ids_parent),))
ids_parent2.extend(filter(None, map(lambda x: x[0], cr.fetchall())))
ids_parent = ids_parent2
for i in ids_parent:
if i in ids:
return False
return True
def _get_external_ids(self, cr, uid, ids, *args, **kwargs):
"""Retrieve the External ID(s) of any database record.

View File

@ -272,18 +272,6 @@ def reverse_enumerate(l):
return izip(xrange(len(l)-1, -1, -1), reversed(l))
# text must be latin-1 encoded
def sms_send(user, password, api_id, text, to):
import urllib
url = "http://api.urlsms.com/SendSMS.aspx"
#url = ""
params = urllib.urlencode({'UserID': user, 'Password': password, 'SenderID': api_id, 'MsgText': text, 'RecipientMobileNo':to})
# FIXME: Use the logger if there is an error
return True
class UpdateableStr(local):
""" Class that stores an updateable string (used in wizards)