res_pjsip_pubsub: Add new pubsub module capabilities. (#82)

The existing res_pjsip_pubsub APIs are somewhat limited in
what they can do. This adds a few API extensions that make
it possible for PJSIP pubsub modules to implement richer
features than is currently possible.

* Allow pubsub modules to get a handle to pjsip_rx_data on subscription
* Allow pubsub modules to run a callback when a subscription is renewed
* Allow pubsub modules to run a callback for outgoing NOTIFYs, with
  a handle to the tdata, so that modules can append their own headers
  to the NOTIFYs

This change does not add any features directly, but makes possible
several new features that will be added in future changes.

Resolves: #81
ASTERISK-30485 #close

Master-Only: True
This commit is contained in:
InterLinked1 2023-05-18 13:41:38 -04:00 committed by GitHub
parent cd2865175c
commit 659f2aae3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 92 additions and 39 deletions

View File

@ -232,6 +232,8 @@ enum ast_sip_subscription_notify_reason {
#define AST_SIP_EXTEN_STATE_DATA "ast_sip_exten_state_data"
/*! Type used for conveying mailbox state */
#define AST_SIP_MESSAGE_ACCUMULATOR "ast_sip_message_accumulator"
/*! Type used for device feature synchronization */
#define AST_SIP_DEVICE_FEATURE_SYNC_DATA "ast_sip_device_feature_sync_data"
/*!
* \brief Data used to create bodies for NOTIFY/PUBLISH requests.
@ -268,6 +270,18 @@ struct ast_sip_notifier {
* \return The response code to send to the SUBSCRIBE.
*/
int (*new_subscribe)(struct ast_sip_endpoint *endpoint, const char *resource);
/*!
* \brief Same as new_subscribe, but also pass a handle to the pjsip_rx_data
*
* \note If this callback exists, it will be executed, otherwise new_subscribe will be.
* Only use this if you need the rdata. Otherwise, use new_subscribe.
*
* \param endpoint The endpoint from which we received the SUBSCRIBE
* \param resource The name of the resource to which the subscription is being made
* \param rdata The pjsip_rx_data for incoming subscription
* \return The response code to send to the SUBSCRIBE.
*/
int (*new_subscribe_with_rdata)(struct ast_sip_endpoint *endpoint, const char *resource, pjsip_rx_data *rdata);
/*!
* \brief Called when an inbound subscription has been accepted.
*
@ -282,6 +296,25 @@ struct ast_sip_notifier {
* \retval -1 Failure
*/
int (*subscription_established)(struct ast_sip_subscription *sub);
/*!
* \brief Called when a SUBSCRIBE arrives for an already active subscription.
*
* \param sub The existing subscription
* \retval 0 Success
* \retval -1 Failure
*/
int (*refresh_subscribe)(struct ast_sip_subscription *sub, pjsip_rx_data *rdata);
/*!
* \brief Optional callback to execute before sending outgoing NOTIFY requests.
* Because res_pjsip_pubsub creates the tdata internally, this allows modules
* to access the tdata if needed, e.g. to add custom headers.
*
* \param sub The existing subscription
* \param tdata The pjsip_tx_data to use for the outgoing NOTIFY
* \retval 0 Success
* \retval -1 Failure
*/
int (*notify_created)(struct ast_sip_subscription *sub, pjsip_tx_data *tdata);
/*!
* \brief Supply data needed to create a NOTIFY body.
*

View File

@ -1018,6 +1018,8 @@ static int have_visited(const char *resource, struct resources *visited)
return 0;
}
#define NEW_SUBSCRIBE(notifier, endpoint, resource, rdata) notifier->new_subscribe_with_rdata ? notifier->new_subscribe_with_rdata(endpoint, resource, rdata) : notifier->new_subscribe(endpoint, resource)
/*!
* \brief Build child nodes for a given parent.
*
@ -1040,7 +1042,7 @@ static int have_visited(const char *resource, struct resources *visited)
* \param visited The resources that have already been visited.
*/
static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
struct resource_list *list, struct tree_node *parent, struct resources *visited)
struct resource_list *list, struct tree_node *parent, struct resources *visited, pjsip_rx_data *rdata)
{
int i;
@ -1056,7 +1058,7 @@ static void build_node_children(struct ast_sip_endpoint *endpoint, const struct
child_list = retrieve_resource_list(resource, list->event);
if (!child_list) {
int resp = handler->notifier->new_subscribe(endpoint, resource);
int resp = NEW_SUBSCRIBE(handler->notifier, endpoint, resource, rdata);
if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
char display_name[AST_MAX_EXTENSION] = "";
if (list->resource_display_name && handler->notifier->get_resource_display_name) {
@ -1085,7 +1087,7 @@ static void build_node_children(struct ast_sip_endpoint *endpoint, const struct
ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
continue;
}
build_node_children(endpoint, handler, child_list, current, visited);
build_node_children(endpoint, handler, child_list, current, visited, rdata);
if (AST_VECTOR_SIZE(&current->children) > 0) {
ast_debug(1, "List %s had no successful children.\n", resource);
if (AST_VECTOR_APPEND(&parent->children, current)) {
@ -1158,19 +1160,21 @@ static void resource_tree_destroy(struct resource_tree *tree)
* \retval 300-699 Failure to subscribe to requested resource.
*/
static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
const char *resource, struct resource_tree *tree, int has_eventlist_support)
const char *resource, struct resource_tree *tree, int has_eventlist_support, pjsip_rx_data *rdata)
{
RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup);
struct resources visited;
if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
int not_eventlist_but_needs_children = !strcmp(handler->body_type, AST_SIP_DEVICE_FEATURE_SYNC_DATA);
if ((!has_eventlist_support && !not_eventlist_but_needs_children) || !(list = retrieve_resource_list(resource, handler->event_name))) {
ast_debug(2, "Subscription '%s->%s' is not to a list\n",
ast_sorcery_object_get_id(endpoint), resource);
tree->root = tree_node_alloc(resource, NULL, 0, NULL);
if (!tree->root) {
return 500;
}
return handler->notifier->new_subscribe(endpoint, resource);
return NEW_SUBSCRIBE(handler->notifier, endpoint, resource, rdata);
}
ast_debug(2, "Subscription '%s->%s' is a list\n",
@ -1187,7 +1191,7 @@ static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct a
tree->notification_batch_interval = list->notification_batch_interval;
build_node_children(endpoint, handler, list, tree->root, &visited);
build_node_children(endpoint, handler, list, tree->root, &visited, rdata);
AST_VECTOR_FREE(&visited);
if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
@ -1380,6 +1384,7 @@ static void shutdown_subscriptions(struct ast_sip_subscription *sub)
sub->handler->subscription_shutdown(sub);
}
}
static int subscription_unreference_dialog(void *obj)
{
struct sip_subscription_tree *sub_tree = obj;
@ -1674,7 +1679,7 @@ static int sub_persistence_recreate(void *obj)
memset(&tree, 0, sizeof(tree));
resp = build_resource_tree(endpoint, handler, resource, &tree,
ast_sip_pubsub_has_eventlist_support(rdata));
ast_sip_pubsub_has_eventlist_support(rdata), rdata);
if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
pj_status_t dlg_status;
@ -2454,6 +2459,16 @@ static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
return require;
}
static void set_state_terminated(struct ast_sip_subscription *sub)
{
int i;
sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
set_state_terminated(AST_VECTOR_GET(&sub->children, i));
}
}
/*!
* \brief Send a NOTIFY request to a subscriber
*
@ -2491,6 +2506,12 @@ static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int forc
pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
}
if (sub_tree->root->handler->notifier->notify_created) {
/* The module for this event wants a callback to the pjsip_tx_data,
* e.g. so it can add custom headers or do something custom to the response. */
sub_tree->root->handler->notifier->notify_created(sub_tree->root, tdata);
}
if (sip_subscription_send_request(sub_tree, tdata)) {
/* do not call pjsip_tx_data_dec_ref(tdata). The pjsip_dlg_send_request deletes the message on error */
return -1;
@ -2954,6 +2975,7 @@ static int generate_initial_notify(struct ast_sip_subscription *sub)
notify_data = sub->handler->notifier->get_notify_data(sub);
if (!notify_data) {
ast_debug(3, "No notify data, not generating any body content\n");
return -1;
}
@ -3085,7 +3107,7 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
memset(&tree, 0, sizeof(tree));
resp = build_resource_tree(endpoint, handler, resource, &tree,
ast_sip_pubsub_has_eventlist_support(rdata));
ast_sip_pubsub_has_eventlist_support(rdata), rdata);
if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
resource_tree_destroy(&tree);
@ -3095,6 +3117,7 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status, NULL);
if (!sub_tree) {
if (dlg_status != PJ_EEXISTS) {
ast_debug(3, "No dialog exists, rejecting\n");
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
}
} else {
@ -3331,6 +3354,7 @@ static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoi
publication->handler = handler;
if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
AST_SIP_PUBLISH_STATE_INITIALIZED)) {
ast_debug(3, "Publication state change failed\n");
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
ao2_cleanup(publication);
return NULL;
@ -3760,16 +3784,6 @@ static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
return PJ_FALSE;
}
static void set_state_terminated(struct ast_sip_subscription *sub)
{
int i;
sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
set_state_terminated(AST_VECTOR_GET(&sub->children, i));
}
}
/*!
* \brief Callback sequence for subscription terminate:
*
@ -3852,7 +3866,8 @@ static void set_state_terminated(struct ast_sip_subscription *sub)
/* The code in this function was previously in pubsub_on_evsub_state. */
static void clean_sub_tree(pjsip_evsub *evsub){
static void clean_sub_tree(pjsip_evsub *evsub)
{
struct sip_subscription_tree *sub_tree;
sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
@ -3917,7 +3932,6 @@ static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
return;
}
/* It's easier to write this as what we WANT to process, then negate it. */
if (!(sub_tree->state == SIP_SUB_TREE_TERMINATE_IN_PROGRESS
|| (event->type == PJSIP_EVENT_TSX_STATE && sub_tree->state == SIP_SUB_TREE_NORMAL)
@ -3932,9 +3946,8 @@ static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
This was previously handled by pubsub_on_rx_refresh setting:
'sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING' */
if (event->body.tsx_state.type == PJSIP_EVENT_RX_MSG &&
!pjsip_method_cmp(&event->body.tsx_state.tsx->method, &pjsip_subscribe_method) &&
pjsip_evsub_get_expires(evsub) == 0) {
!pjsip_method_cmp(&event->body.tsx_state.tsx->method, &pjsip_subscribe_method) &&
pjsip_evsub_get_expires(evsub) == 0) {
ast_debug(3, "Subscription ending, do nothing.\n");
return;
}
@ -4063,6 +4076,7 @@ static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
{
struct sip_subscription_tree *sub_tree;
RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
ast_debug(3, "evsub %p sub_tree %p sub_tree state %s\n", evsub, sub_tree,
@ -4090,27 +4104,33 @@ static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
}
endpoint = ast_pjsip_rdata_get_endpoint(rdata);
/* If the handler wants a callback on refresh, then do it (some protocols require this). */
if (sub_tree->state == SIP_SUB_TREE_NORMAL && sub_tree->root->handler->notifier->refresh_subscribe) {
if (!sub_tree->root->handler->notifier->refresh_subscribe(sub_tree->root, rdata)) {
return; /* If the callback handled it, we're done. */
}
}
if (sub_tree->state == SIP_SUB_TREE_NORMAL && sub_tree->is_list) {
/* update RLS */
const char *resource = sub_tree->root->resource;
struct ast_sip_subscription *old_root = sub_tree->root;
struct ast_sip_subscription *new_root = NULL;
RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
struct ast_sip_subscription_handler *handler = NULL;
struct ast_sip_pubsub_body_generator *generator = NULL;
if ((endpoint = ast_pjsip_rdata_get_endpoint(rdata))
&& (handler = subscription_get_handler_from_rdata(rdata, ast_sorcery_object_get_id(endpoint)))
&& (generator = subscription_get_generator_from_rdata(rdata, handler))) {
if (endpoint && (generator = subscription_get_generator_from_rdata(rdata, sub_tree->root->handler))) {
struct resource_tree tree;
int resp;
memset(&tree, 0, sizeof(tree));
resp = build_resource_tree(endpoint, handler, resource, &tree,
ast_sip_pubsub_has_eventlist_support(rdata));
resp = build_resource_tree(endpoint, sub_tree->root->handler, resource, &tree,
ast_sip_pubsub_has_eventlist_support(rdata), rdata);
if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
new_root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree.root);
new_root = create_virtual_subscriptions(sub_tree->root->handler, resource, generator, sub_tree, tree.root);
if (new_root) {
if (cmp_subscription_childrens(old_root, new_root)) {
ast_debug(1, "RLS '%s->%s' was modified, regenerate it\n", ast_sorcery_object_get_id(endpoint), old_root->resource);
@ -5335,7 +5355,7 @@ AST_TEST_DEFINE(resource_tree)
}
tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
@ -5405,7 +5425,7 @@ AST_TEST_DEFINE(complex_resource_tree)
}
tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
@ -5466,7 +5486,7 @@ AST_TEST_DEFINE(bad_resource)
}
tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
@ -5535,7 +5555,7 @@ AST_TEST_DEFINE(bad_branch)
}
tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
@ -5608,7 +5628,7 @@ AST_TEST_DEFINE(duplicate_resource)
}
tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
@ -5680,7 +5700,7 @@ AST_TEST_DEFINE(loop)
}
tree = ast_calloc(1, sizeof(*tree));
resp = build_resource_tree(NULL, &test_handler, "herp", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "herp", tree, 1, NULL);
if (resp == 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;
@ -5727,7 +5747,7 @@ AST_TEST_DEFINE(bad_event)
/* Since the test_handler is for event "test", this should not build a list, but
* instead result in a single resource being created, called "foo"
*/
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1);
resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1, NULL);
if (resp != 200) {
ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp);
return AST_TEST_FAIL;