diff --git a/pjsip/include/pjsip/sip_endpoint.h b/pjsip/include/pjsip/sip_endpoint.h index 86dc9befc..265b7f238 100644 --- a/pjsip/include/pjsip/sip_endpoint.h +++ b/pjsip/include/pjsip/sip_endpoint.h @@ -213,6 +213,77 @@ PJ_DECL(pj_status_t) pjsip_endpt_register_module( pjsip_endpoint *endpt, PJ_DECL(pj_status_t) pjsip_endpt_unregister_module( pjsip_endpoint *endpt, pjsip_module *module ); +/** + * This describes additional parameters to pjsip_endpt_process_rx_data() + * function. Application MUST call pjsip_process_rdata_param_default() to + * initialize this structure. + */ +typedef struct pjsip_process_rdata_param +{ + /** + * Specify the minimum priority number of the modules that are allowed + * to process the message. Default is zero to allow all modules to + * process the message. + */ + unsigned start_prio; + + /** + * Specify the pointer of the module where processing will start. + * The default is NULL, meaning processing will start from the start + * of the module list. + */ + void *start_mod; + + /** + * Set to N, then processing will start at Nth module after start + * module (where start module can be an explicit module as specified + * by \a start_mod or the start of module list when \a start_mod is + * NULL). For example, if set to 1, then processing will start from + * the next module after start module. Default is zero. + */ + unsigned idx_after_start; + + /** + * Print nothing to log. Default is PJ_FALSE. + */ + pj_bool_t silent; + +} pjsip_process_rdata_param; + +/** + * Initialize with default. + * + * @param p The param. + */ +PJ_DECL(void) pjsip_process_rdata_param_default(pjsip_process_rdata_param *p); + +/** + * Manually distribute the specified pjsip_rx_data to registered modules. + * Normally application does not need to call this function because received + * messages will be given to endpoint automatically by transports. + * + * Application can use this function when it has postponed the processing of + * an incoming message, for example to perform long operations such as + * database operation or to consult other servers to decide what to do with + * the message. In this case, application clones the original rdata, return + * from the callback, and perform the long operation. Upon completing the + * long operation, it resumes pjsip's module processing by calling this + * function, and then free the cloned rdata. + * + * @param endpt The endpoint instance. + * @param rdata The rdata to be distributed. + * @param p Optional pointer to param to specify from which module + * the processing should start. + * @param p_handled Optional pointer to receive last return value of + * module's \a on_rx_request() or \a on_rx_response() + * callback. + * + * @return PJ_SUCCESS on success. + */ +PJ_DECL(pj_status_t) pjsip_endpt_process_rx_data(pjsip_endpoint *endpt, + pjsip_rx_data *rdata, + pjsip_process_rdata_param *p, + pj_bool_t *p_handled); /** * Create pool from the endpoint. All SIP components should allocate their diff --git a/pjsip/include/pjsip/sip_transport.h b/pjsip/include/pjsip/sip_transport.h index e3e448842..4ff3597cd 100644 --- a/pjsip/include/pjsip/sip_transport.h +++ b/pjsip/include/pjsip/sip_transport.h @@ -427,6 +427,42 @@ struct pjsip_rx_data */ PJ_DECL(char*) pjsip_rx_data_get_info(pjsip_rx_data *rdata); +/** + * Clone pjsip_rx_data. This will duplicate the contents of + * pjsip_rx_data and add reference count to the transport. + * Once application has finished using the cloned pjsip_rx_data, + * it must release it by calling #pjsip_rx_data_free_cloned(). + * + * By default (if flags is set to zero), this function copies the + * transport pointer in \a tp_info, duplicates the \a pkt_info, + * perform deep clone of the \a msg_info parts of the rdata, and + * fills the \a endpt_info (i.e. the \a mod_data) with zeros. + * + * @param src The source to be cloned. + * @param flags Optional flags. Must be zero for now. + * @param p_rdata Pointer to receive the cloned rdata. + * + * @return PJ_SUCCESS on success or the appropriate error. + */ +PJ_DECL(pj_status_t) pjsip_rx_data_clone(const pjsip_rx_data *src, + unsigned flags, + pjsip_rx_data **p_rdata); + +/** + * Free cloned pjsip_rx_data. This function must be and must only + * be called for a cloned pjsip_rx_data. Specifically, it must NOT + * be called for the original pjsip_rx_data that is returned by + * transports. + * + * This function will free the memory used by the pjsip_rx_data and + * decrement the transport reference counter. + * + * @param rdata The receive data buffer. + * + * @return PJ_SUCCESS on success or the appropriate error. + */ +PJ_DECL(pj_status_t) pjsip_rx_data_free_cloned(pjsip_rx_data *rdata); + /***************************************************************************** * diff --git a/pjsip/src/pjsip/sip_endpoint.c b/pjsip/src/pjsip/sip_endpoint.c index b1d7f7d2f..0f7b05fdc 100644 --- a/pjsip/src/pjsip/sip_endpoint.c +++ b/pjsip/src/pjsip/sip_endpoint.c @@ -812,6 +812,103 @@ PJ_DEF(pj_timer_heap_t*) pjsip_endpt_get_timer_heap(pjsip_endpoint *endpt) return endpt->timer_heap; } +/* Init with default */ +PJ_DEF(void) pjsip_process_rdata_param_default(pjsip_process_rdata_param *p) +{ + pj_bzero(p, sizeof(*p)); +} + +/* Distribute rdata */ +PJ_DEF(pj_status_t) pjsip_endpt_process_rx_data( pjsip_endpoint *endpt, + pjsip_rx_data *rdata, + pjsip_process_rdata_param *p, + pj_bool_t *p_handled) +{ + pjsip_msg *msg; + pjsip_process_rdata_param def_prm; + pjsip_module *mod; + pj_bool_t handled = PJ_FALSE; + unsigned i; + pj_status_t status; + + PJ_ASSERT_RETURN(endpt && rdata, PJ_EINVAL); + + if (p==NULL) { + p = &def_prm; + pjsip_process_rdata_param_default(p); + } + + msg = rdata->msg_info.msg; + + if (p_handled) + *p_handled = PJ_FALSE; + + if (!p->silent) { + PJ_LOG(5, (THIS_FILE, "Distributing rdata to modules: %s", + pjsip_rx_data_get_info(rdata))); + pj_log_push_indent(); + } + + LOCK_MODULE_ACCESS(endpt); + + /* Find start module */ + if (p->start_mod) { + mod = pj_list_find_node(&endpt->module_list, p->start_mod); + if (!mod) { + status = PJ_ENOTFOUND; + goto on_return; + } + } else { + mod = endpt->module_list.next; + } + + /* Start after the specified index */ + for (i=0; i < p->idx_after_start && mod != &endpt->module_list; ++i) { + mod = mod->next; + } + + /* Start with the specified priority */ + while (mod != &endpt->module_list && mod->priority < p->start_prio) { + mod = mod->next; + } + + if (mod == &endpt->module_list) { + status = PJ_ENOTFOUND; + goto on_return; + } + + /* Distribute */ + if (msg->type == PJSIP_REQUEST_MSG) { + do { + if (mod->on_rx_request) + handled = (*mod->on_rx_request)(rdata); + if (handled) + break; + mod = mod->next; + } while (mod != &endpt->module_list); + } else { + do { + if (mod->on_rx_response) + handled = (*mod->on_rx_response)(rdata); + if (handled) + break; + mod = mod->next; + } while (mod != &endpt->module_list); + } + + status = PJ_SUCCESS; + +on_return: + if (p_handled) + *p_handled = handled; + + UNLOCK_MODULE_ACCESS(endpt); + if (!p->silent) { + pj_log_pop_indent(); + } + return status; +} + /* * This is the callback that is called by the transport manager when it * receives a message from the network. @@ -820,7 +917,8 @@ static void endpt_on_rx_msg( pjsip_endpoint *endpt, pj_status_t status, pjsip_rx_data *rdata ) { - pjsip_msg *msg = rdata->msg_info.msg; + pjsip_process_rdata_param proc_prm; + pj_bool_t handled = PJ_FALSE; if (status != PJ_SUCCESS) { char info[30]; @@ -927,57 +1025,20 @@ static void endpt_on_rx_msg( pjsip_endpoint *endpt, } #endif + pjsip_process_rdata_param_default(&proc_prm); + proc_prm.silent = PJ_TRUE; - /* Distribute to modules, starting from modules with highest priority */ - LOCK_MODULE_ACCESS(endpt); + pjsip_endpt_process_rx_data(endpt, rdata, &proc_prm, &handled); - if (msg->type == PJSIP_REQUEST_MSG) { - pjsip_module *mod; - pj_bool_t handled = PJ_FALSE; - - mod = endpt->module_list.next; - while (mod != &endpt->module_list) { - if (mod->on_rx_request) - handled = (*mod->on_rx_request)(rdata); - if (handled) - break; - mod = mod->next; - } - - /* No module is able to handle the request. */ - if (!handled) { - PJ_TODO(ENDPT_RESPOND_UNHANDLED_REQUEST); - PJ_LOG(4,(THIS_FILE, "Message %s from %s:%d was dropped/unhandled by" - " any modules", - pjsip_rx_data_get_info(rdata), - rdata->pkt_info.src_name, - rdata->pkt_info.src_port)); - } - - } else { - pjsip_module *mod; - pj_bool_t handled = PJ_FALSE; - - mod = endpt->module_list.next; - while (mod != &endpt->module_list) { - if (mod->on_rx_response) - handled = (*mod->on_rx_response)(rdata); - if (handled) - break; - mod = mod->next; - } - - if (!handled) { - PJ_LOG(4,(THIS_FILE, "Message %s from %s:%d was dropped/unhandled" - " by any modules", - pjsip_rx_data_get_info(rdata), - rdata->pkt_info.src_name, - rdata->pkt_info.src_port)); - } + /* No module is able to handle the message */ + if (!handled) { + PJ_LOG(4,(THIS_FILE, "%s from %s:%d was dropped/unhandled by" + " any modules", + pjsip_rx_data_get_info(rdata), + rdata->pkt_info.src_name, + rdata->pkt_info.src_port)); } - UNLOCK_MODULE_ACCESS(endpt); - /* Must clear mod_data before returning rdata to transport, since * rdata may be reused. */ diff --git a/pjsip/src/pjsip/sip_transport.c b/pjsip/src/pjsip/sip_transport.c index 0215a7b04..81587b03f 100644 --- a/pjsip/src/pjsip/sip_transport.c +++ b/pjsip/src/pjsip/sip_transport.c @@ -602,6 +602,87 @@ PJ_DEF(char*) pjsip_rx_data_get_info(pjsip_rx_data *rdata) return rdata->msg_info.info; } +/* Clone pjsip_rx_data. */ +PJ_DEF(pj_status_t) pjsip_rx_data_clone( const pjsip_rx_data *src, + unsigned flags, + pjsip_rx_data **p_rdata) +{ + pj_pool_t *pool; + pjsip_rx_data *dst; + pjsip_hdr *hdr; + + PJ_ASSERT_RETURN(src && flags==0 && p_rdata, PJ_EINVAL); + + pool = pj_pool_create(src->tp_info.pool->factory, + "rtd%p", + PJSIP_POOL_RDATA_LEN, + PJSIP_POOL_RDATA_INC, + NULL); + if (!pool) + return PJ_ENOMEM; + + dst = PJ_POOL_ZALLOC_T(pool, pjsip_rx_data); + + /* Parts of tp_info */ + dst->tp_info.pool = pool; + dst->tp_info.transport = (pjsip_transport*)src->tp_info.transport; + + /* pkt_info can be memcopied */ + pj_memcpy(&dst->pkt_info, &src->pkt_info, sizeof(src->pkt_info)); + + /* msg_info needs deep clone */ + dst->msg_info.msg_buf = dst->pkt_info.packet; + dst->msg_info.len = src->msg_info.len; + dst->msg_info.msg = pjsip_msg_clone(pool, src->msg_info.msg); + pj_list_init(&dst->msg_info.parse_err); + +#define GET_MSG_HDR2(TYPE, type, var) \ + case PJSIP_H_##TYPE: \ + dst->msg_info.var = (pjsip_##type##_hdr*)hdr; \ + break +#define GET_MSG_HDR(TYPE, var_type) GET_MSG_HDR2(TYPE, var_type, var_type) + + hdr = dst->msg_info.msg->hdr.next; + while (hdr != &dst->msg_info.msg->hdr) { + switch (hdr->type) { + GET_MSG_HDR(CALL_ID, cid); + GET_MSG_HDR(FROM, from); + GET_MSG_HDR(TO, to); + GET_MSG_HDR(VIA, via); + GET_MSG_HDR(CSEQ, cseq); + GET_MSG_HDR(MAX_FORWARDS, max_fwd); + GET_MSG_HDR(ROUTE, route); + GET_MSG_HDR2(RECORD_ROUTE, rr, record_route); + GET_MSG_HDR(CONTENT_TYPE, ctype); + GET_MSG_HDR(CONTENT_LENGTH, clen); + GET_MSG_HDR(REQUIRE, require); + GET_MSG_HDR(SUPPORTED, supported); + default: + break; + } + hdr = hdr->next; + } + +#undef GET_MSG_HDR +#undef GET_MSG_HDR2 + + *p_rdata = dst; + + /* Finally add transport ref */ + return pjsip_transport_add_ref(dst->tp_info.transport); +} + +/* Free previously cloned pjsip_rx_data. */ +PJ_DEF(pj_status_t) pjsip_rx_data_free_cloned(pjsip_rx_data *rdata) +{ + PJ_ASSERT_RETURN(rdata, PJ_EINVAL); + + pjsip_transport_dec_ref(rdata->tp_info.transport); + pj_pool_release(rdata->tp_info.pool); + + return PJ_SUCCESS; +} + /***************************************************************************** * * TRANSPORT KEY