Close #1590: APIs to facilitate pending processing of pjsip_rx_data

git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@4275 74dad513-b988-da41-8d7b-12977e46ad98
This commit is contained in:
Benny Prijono 2012-10-04 06:11:58 +00:00
parent 02eced275e
commit 6c80455c1e
4 changed files with 297 additions and 48 deletions

View File

@ -213,6 +213,77 @@ PJ_DECL(pj_status_t) pjsip_endpt_register_module( pjsip_endpoint *endpt,
PJ_DECL(pj_status_t) pjsip_endpt_unregister_module( pjsip_endpoint *endpt,
pjsip_module *module );
/**
* This describes additional parameters to pjsip_endpt_process_rx_data()
* function. Application MUST call pjsip_process_rdata_param_default() to
* initialize this structure.
*/
typedef struct pjsip_process_rdata_param
{
/**
* Specify the minimum priority number of the modules that are allowed
* to process the message. Default is zero to allow all modules to
* process the message.
*/
unsigned start_prio;
/**
* Specify the pointer of the module where processing will start.
* The default is NULL, meaning processing will start from the start
* of the module list.
*/
void *start_mod;
/**
* Set to N, then processing will start at Nth module after start
* module (where start module can be an explicit module as specified
* by \a start_mod or the start of module list when \a start_mod is
* NULL). For example, if set to 1, then processing will start from
* the next module after start module. Default is zero.
*/
unsigned idx_after_start;
/**
* Print nothing to log. Default is PJ_FALSE.
*/
pj_bool_t silent;
} pjsip_process_rdata_param;
/**
* Initialize with default.
*
* @param p The param.
*/
PJ_DECL(void) pjsip_process_rdata_param_default(pjsip_process_rdata_param *p);
/**
* Manually distribute the specified pjsip_rx_data to registered modules.
* Normally application does not need to call this function because received
* messages will be given to endpoint automatically by transports.
*
* Application can use this function when it has postponed the processing of
* an incoming message, for example to perform long operations such as
* database operation or to consult other servers to decide what to do with
* the message. In this case, application clones the original rdata, return
* from the callback, and perform the long operation. Upon completing the
* long operation, it resumes pjsip's module processing by calling this
* function, and then free the cloned rdata.
*
* @param endpt The endpoint instance.
* @param rdata The rdata to be distributed.
* @param p Optional pointer to param to specify from which module
* the processing should start.
* @param p_handled Optional pointer to receive last return value of
* module's \a on_rx_request() or \a on_rx_response()
* callback.
*
* @return PJ_SUCCESS on success.
*/
PJ_DECL(pj_status_t) pjsip_endpt_process_rx_data(pjsip_endpoint *endpt,
pjsip_rx_data *rdata,
pjsip_process_rdata_param *p,
pj_bool_t *p_handled);
/**
* Create pool from the endpoint. All SIP components should allocate their

View File

@ -427,6 +427,42 @@ struct pjsip_rx_data
*/
PJ_DECL(char*) pjsip_rx_data_get_info(pjsip_rx_data *rdata);
/**
* Clone pjsip_rx_data. This will duplicate the contents of
* pjsip_rx_data and add reference count to the transport.
* Once application has finished using the cloned pjsip_rx_data,
* it must release it by calling #pjsip_rx_data_free_cloned().
*
* By default (if flags is set to zero), this function copies the
* transport pointer in \a tp_info, duplicates the \a pkt_info,
* perform deep clone of the \a msg_info parts of the rdata, and
* fills the \a endpt_info (i.e. the \a mod_data) with zeros.
*
* @param src The source to be cloned.
* @param flags Optional flags. Must be zero for now.
* @param p_rdata Pointer to receive the cloned rdata.
*
* @return PJ_SUCCESS on success or the appropriate error.
*/
PJ_DECL(pj_status_t) pjsip_rx_data_clone(const pjsip_rx_data *src,
unsigned flags,
pjsip_rx_data **p_rdata);
/**
* Free cloned pjsip_rx_data. This function must be and must only
* be called for a cloned pjsip_rx_data. Specifically, it must NOT
* be called for the original pjsip_rx_data that is returned by
* transports.
*
* This function will free the memory used by the pjsip_rx_data and
* decrement the transport reference counter.
*
* @param rdata The receive data buffer.
*
* @return PJ_SUCCESS on success or the appropriate error.
*/
PJ_DECL(pj_status_t) pjsip_rx_data_free_cloned(pjsip_rx_data *rdata);
/*****************************************************************************
*

View File

@ -812,6 +812,103 @@ PJ_DEF(pj_timer_heap_t*) pjsip_endpt_get_timer_heap(pjsip_endpoint *endpt)
return endpt->timer_heap;
}
/* Init with default */
PJ_DEF(void) pjsip_process_rdata_param_default(pjsip_process_rdata_param *p)
{
pj_bzero(p, sizeof(*p));
}
/* Distribute rdata */
PJ_DEF(pj_status_t) pjsip_endpt_process_rx_data( pjsip_endpoint *endpt,
pjsip_rx_data *rdata,
pjsip_process_rdata_param *p,
pj_bool_t *p_handled)
{
pjsip_msg *msg;
pjsip_process_rdata_param def_prm;
pjsip_module *mod;
pj_bool_t handled = PJ_FALSE;
unsigned i;
pj_status_t status;
PJ_ASSERT_RETURN(endpt && rdata, PJ_EINVAL);
if (p==NULL) {
p = &def_prm;
pjsip_process_rdata_param_default(p);
}
msg = rdata->msg_info.msg;
if (p_handled)
*p_handled = PJ_FALSE;
if (!p->silent) {
PJ_LOG(5, (THIS_FILE, "Distributing rdata to modules: %s",
pjsip_rx_data_get_info(rdata)));
pj_log_push_indent();
}
LOCK_MODULE_ACCESS(endpt);
/* Find start module */
if (p->start_mod) {
mod = pj_list_find_node(&endpt->module_list, p->start_mod);
if (!mod) {
status = PJ_ENOTFOUND;
goto on_return;
}
} else {
mod = endpt->module_list.next;
}
/* Start after the specified index */
for (i=0; i < p->idx_after_start && mod != &endpt->module_list; ++i) {
mod = mod->next;
}
/* Start with the specified priority */
while (mod != &endpt->module_list && mod->priority < p->start_prio) {
mod = mod->next;
}
if (mod == &endpt->module_list) {
status = PJ_ENOTFOUND;
goto on_return;
}
/* Distribute */
if (msg->type == PJSIP_REQUEST_MSG) {
do {
if (mod->on_rx_request)
handled = (*mod->on_rx_request)(rdata);
if (handled)
break;
mod = mod->next;
} while (mod != &endpt->module_list);
} else {
do {
if (mod->on_rx_response)
handled = (*mod->on_rx_response)(rdata);
if (handled)
break;
mod = mod->next;
} while (mod != &endpt->module_list);
}
status = PJ_SUCCESS;
on_return:
if (p_handled)
*p_handled = handled;
UNLOCK_MODULE_ACCESS(endpt);
if (!p->silent) {
pj_log_pop_indent();
}
return status;
}
/*
* This is the callback that is called by the transport manager when it
* receives a message from the network.
@ -820,7 +917,8 @@ static void endpt_on_rx_msg( pjsip_endpoint *endpt,
pj_status_t status,
pjsip_rx_data *rdata )
{
pjsip_msg *msg = rdata->msg_info.msg;
pjsip_process_rdata_param proc_prm;
pj_bool_t handled = PJ_FALSE;
if (status != PJ_SUCCESS) {
char info[30];
@ -927,57 +1025,20 @@ static void endpt_on_rx_msg( pjsip_endpoint *endpt,
}
#endif
pjsip_process_rdata_param_default(&proc_prm);
proc_prm.silent = PJ_TRUE;
/* Distribute to modules, starting from modules with highest priority */
LOCK_MODULE_ACCESS(endpt);
pjsip_endpt_process_rx_data(endpt, rdata, &proc_prm, &handled);
if (msg->type == PJSIP_REQUEST_MSG) {
pjsip_module *mod;
pj_bool_t handled = PJ_FALSE;
mod = endpt->module_list.next;
while (mod != &endpt->module_list) {
if (mod->on_rx_request)
handled = (*mod->on_rx_request)(rdata);
if (handled)
break;
mod = mod->next;
}
/* No module is able to handle the request. */
if (!handled) {
PJ_TODO(ENDPT_RESPOND_UNHANDLED_REQUEST);
PJ_LOG(4,(THIS_FILE, "Message %s from %s:%d was dropped/unhandled by"
" any modules",
pjsip_rx_data_get_info(rdata),
rdata->pkt_info.src_name,
rdata->pkt_info.src_port));
}
} else {
pjsip_module *mod;
pj_bool_t handled = PJ_FALSE;
mod = endpt->module_list.next;
while (mod != &endpt->module_list) {
if (mod->on_rx_response)
handled = (*mod->on_rx_response)(rdata);
if (handled)
break;
mod = mod->next;
}
if (!handled) {
PJ_LOG(4,(THIS_FILE, "Message %s from %s:%d was dropped/unhandled"
" by any modules",
pjsip_rx_data_get_info(rdata),
rdata->pkt_info.src_name,
rdata->pkt_info.src_port));
}
/* No module is able to handle the message */
if (!handled) {
PJ_LOG(4,(THIS_FILE, "%s from %s:%d was dropped/unhandled by"
" any modules",
pjsip_rx_data_get_info(rdata),
rdata->pkt_info.src_name,
rdata->pkt_info.src_port));
}
UNLOCK_MODULE_ACCESS(endpt);
/* Must clear mod_data before returning rdata to transport, since
* rdata may be reused.
*/

View File

@ -602,6 +602,87 @@ PJ_DEF(char*) pjsip_rx_data_get_info(pjsip_rx_data *rdata)
return rdata->msg_info.info;
}
/* Clone pjsip_rx_data. */
PJ_DEF(pj_status_t) pjsip_rx_data_clone( const pjsip_rx_data *src,
unsigned flags,
pjsip_rx_data **p_rdata)
{
pj_pool_t *pool;
pjsip_rx_data *dst;
pjsip_hdr *hdr;
PJ_ASSERT_RETURN(src && flags==0 && p_rdata, PJ_EINVAL);
pool = pj_pool_create(src->tp_info.pool->factory,
"rtd%p",
PJSIP_POOL_RDATA_LEN,
PJSIP_POOL_RDATA_INC,
NULL);
if (!pool)
return PJ_ENOMEM;
dst = PJ_POOL_ZALLOC_T(pool, pjsip_rx_data);
/* Parts of tp_info */
dst->tp_info.pool = pool;
dst->tp_info.transport = (pjsip_transport*)src->tp_info.transport;
/* pkt_info can be memcopied */
pj_memcpy(&dst->pkt_info, &src->pkt_info, sizeof(src->pkt_info));
/* msg_info needs deep clone */
dst->msg_info.msg_buf = dst->pkt_info.packet;
dst->msg_info.len = src->msg_info.len;
dst->msg_info.msg = pjsip_msg_clone(pool, src->msg_info.msg);
pj_list_init(&dst->msg_info.parse_err);
#define GET_MSG_HDR2(TYPE, type, var) \
case PJSIP_H_##TYPE: \
dst->msg_info.var = (pjsip_##type##_hdr*)hdr; \
break
#define GET_MSG_HDR(TYPE, var_type) GET_MSG_HDR2(TYPE, var_type, var_type)
hdr = dst->msg_info.msg->hdr.next;
while (hdr != &dst->msg_info.msg->hdr) {
switch (hdr->type) {
GET_MSG_HDR(CALL_ID, cid);
GET_MSG_HDR(FROM, from);
GET_MSG_HDR(TO, to);
GET_MSG_HDR(VIA, via);
GET_MSG_HDR(CSEQ, cseq);
GET_MSG_HDR(MAX_FORWARDS, max_fwd);
GET_MSG_HDR(ROUTE, route);
GET_MSG_HDR2(RECORD_ROUTE, rr, record_route);
GET_MSG_HDR(CONTENT_TYPE, ctype);
GET_MSG_HDR(CONTENT_LENGTH, clen);
GET_MSG_HDR(REQUIRE, require);
GET_MSG_HDR(SUPPORTED, supported);
default:
break;
}
hdr = hdr->next;
}
#undef GET_MSG_HDR
#undef GET_MSG_HDR2
*p_rdata = dst;
/* Finally add transport ref */
return pjsip_transport_add_ref(dst->tp_info.transport);
}
/* Free previously cloned pjsip_rx_data. */
PJ_DEF(pj_status_t) pjsip_rx_data_free_cloned(pjsip_rx_data *rdata)
{
PJ_ASSERT_RETURN(rdata, PJ_EINVAL);
pjsip_transport_dec_ref(rdata->tp_info.transport);
pj_pool_release(rdata->tp_info.pool);
return PJ_SUCCESS;
}
/*****************************************************************************
*
* TRANSPORT KEY