diff --git a/apps/app_queue.c b/apps/app_queue.c index 80c253f224..b29988961f 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -11336,6 +11336,8 @@ static int load_module(void) if (!device_state_sub) { err = -1; } + stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type()); + stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); manager_topic = ast_manager_get_topic(); queue_topic = ast_queue_topic_all(); diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c index f4f651487d..1eb618bd45 100644 --- a/channels/chan_dahdi.c +++ b/channels/chan_dahdi.c @@ -12594,6 +12594,8 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf, * knows that we care about it. Then, chan_dahdi will get the MWI from the * event cache instead of checking the mailbox directly. */ tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL); + stasis_subscription_accept_message_type(tmp->mwi_event_sub, ast_mwi_state_type()); + stasis_subscription_set_filter(tmp->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } #ifdef HAVE_DAHDI_LINEREVERSE_VMWI diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index 01d42b57fa..0ca4234d7d 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -1456,6 +1456,8 @@ static void network_change_stasis_subscribe(void) if (!network_change_sub) { network_change_sub = stasis_subscribe(ast_system_topic(), network_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type()); + stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } @@ -1469,6 +1471,8 @@ static void acl_change_stasis_subscribe(void) if (!acl_change_sub) { acl_change_sub = stasis_subscribe(ast_security_topic(), acl_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type()); + stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } @@ -13072,6 +13076,8 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st * mailboxes. However, we just grab the events out of the cache when it * is time to send MWI, since it is only sent with a REGACK. */ peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL); + stasis_subscription_accept_message_type(peer->mwi_event_sub, ast_mwi_state_type()); + stasis_subscription_set_filter(peer->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c index 2ac7690a6e..46342ce312 100644 --- a/channels/chan_mgcp.c +++ b/channels/chan_mgcp.c @@ -4242,6 +4242,8 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v) * knows that we care about it. Then, chan_mgcp will get the MWI from the * event cache instead of checking the mailbox directly. */ e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL); + stasis_subscription_accept_message_type(e->mwi_event_sub, ast_mwi_state_type()); + stasis_subscription_set_filter(e->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random()); diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 55da37d35c..cb81901d11 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -17494,6 +17494,8 @@ static void network_change_stasis_subscribe(void) if (!network_change_sub) { network_change_sub = stasis_subscribe(ast_system_topic(), network_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type()); + stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } @@ -17507,6 +17509,8 @@ static void acl_change_stasis_subscribe(void) if (!acl_change_sub) { acl_change_sub = stasis_subscribe(ast_security_topic(), acl_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type()); + stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } @@ -28385,6 +28389,9 @@ static void add_peer_mwi_subs(struct sip_peer *peer) mailbox_specific_topic = ast_mwi_topic(mailbox->id); if (mailbox_specific_topic) { mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer); + stasis_subscription_accept_message_type(mailbox->event_sub, ast_mwi_state_type()); + stasis_subscription_accept_message_type(mailbox->event_sub, stasis_subscription_change_type()); + stasis_subscription_set_filter(mailbox->event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } } diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c index 2b13e5eaaa..910b7b8117 100644 --- a/channels/chan_skinny.c +++ b/channels/chan_skinny.c @@ -8334,6 +8334,8 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v mailbox_specific_topic = ast_mwi_topic(l->mailbox); if (mailbox_specific_topic) { l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l); + stasis_subscription_accept_message_type(l->mwi_event_sub, ast_mwi_state_type()); + stasis_subscription_set_filter(l->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/channels/sig_pri.c b/channels/sig_pri.c index fbc4e40f05..ec6d666aea 100644 --- a/channels/sig_pri.c +++ b/channels/sig_pri.c @@ -9130,6 +9130,9 @@ int sig_pri_start_pri(struct sig_pri_span *pri) if (!pri->mbox[i].sub) { ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n", sig_pri_cc_type_name, pri->span, pri->mbox[i].vm_box, mbox_id); + } else { + stasis_subscription_accept_message_type(pri->mbox[i].sub, ast_mwi_state_type()); + stasis_subscription_set_filter(pri->mbox[i].sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } #if defined(HAVE_PRI_MWI_V2) if (ast_strlen_zero(pri->mbox[i].vm_number)) { diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 2b56b53f81..ebd00ee23d 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -291,6 +291,15 @@ enum stasis_message_type_result { STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */ }; +/*! + * \brief Stasis subscription message filters + */ +enum stasis_subscription_message_filter { + STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */ + STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */ + STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */ +}; + /*! * \brief Create a new message type. * @@ -326,6 +335,14 @@ const char *stasis_message_type_name(const struct stasis_message_type *type); */ unsigned int stasis_message_type_hash(const struct stasis_message_type *type); +/*! + * \brief Gets the id of a given message type + * \param type The type to get the id of. + * \return The id + * \since 17.0.0 + */ +int stasis_message_type_id(const struct stasis_message_type *type); + /*! * \brief Check whether a message type is declined * @@ -494,6 +511,14 @@ struct stasis_topic *stasis_topic_create(const char *name); */ const char *stasis_topic_name(const struct stasis_topic *topic); +/*! + * \brief Return the number of subscribers of a topic. + * \param topic Topic. + * \return Number of subscribers of the topic. + * \since 17.0.0 + */ +size_t stasis_topic_subscribers(const struct stasis_topic *topic); + /*! * \brief Publish a message to a topic's subscribers. * \param topic Topic. @@ -559,6 +584,10 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st * \return New \ref stasis_subscription object. * \return \c NULL on error. * \since 12 + * + * \note This callback will receive a callback with a message indicating it + * has been subscribed. This occurs immediately before accepted message + * types can be set and the callback must expect to receive it. */ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data); @@ -584,10 +613,68 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, * \return New \ref stasis_subscription object. * \return \c NULL on error. * \since 12.8.0 + * + * \note This callback will receive a callback with a message indicating it + * has been subscribed. This occurs immediately before accepted message + * types can be set and the callback must expect to receive it. */ struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data); +/*! + * \brief Indicate to a subscription that we are interested in a message type. + * + * This will cause the subscription to allow the given message type to be + * raised to our subscription callback. This enables internal filtering in + * the stasis message bus to reduce messages. + * + * \param subscription Subscription to add message type to. + * \param type The message type we wish to receive. + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + * + * \note If you are wanting to use stasis_final_message you will need to accept + * \ref stasis_subscription_change_type as a message type. + * + * \note Until the subscription is set to selective filtering it is possible for it + * to receive messages of message types that would not normally be accepted. + */ +int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, + const struct stasis_message_type *type); + +/*! + * \brief Indicate to a subscription that we are not interested in a message type. + * + * \param subscription Subscription to remove message type from. + * \param type The message type we don't wish to receive. + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, + const struct stasis_message_type *type); + +/*! + * \brief Set the message type filtering level on a subscription + * + * This will cause the subscription to filter messages according to the + * provided filter level. For example if selective is used then only + * messages matching those provided to \ref stasis_subscription_accept_message_type + * will be raised to the subscription callback. + * + * \param subscription Subscription that should receive all messages. + * \param filter What filter to use + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_subscription_set_filter(struct stasis_subscription *subscription, + enum stasis_subscription_message_filter filter); + /*! * \brief Cancel a subscription. * @@ -1036,6 +1123,41 @@ struct stasis_caching_topic *stasis_caching_unsubscribe_and_join( struct stasis_topic *stasis_caching_get_topic( struct stasis_caching_topic *caching_topic); +/*! + * \brief Indicate to a caching topic that we are interested in a message type. + * + * This will cause the caching topic to receive messages of the given message + * type. This enables internal filtering in the stasis message bus to reduce + * messages. + * + * \param caching_topic The caching topic. + * \param type The message type we wish to receive. + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, + struct stasis_message_type *type); + +/*! + * \brief Set the message type filtering level on a cache + * + * This will cause the underlying subscription to filter messages according to the + * provided filter level. For example if selective is used then only + * messages matching those provided to \ref stasis_subscription_accept_message_type + * will be raised to the subscription callback. + * + * \param caching_topic The caching topic. + * \param filter What filter to use + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, + enum stasis_subscription_message_filter filter); + /*! * \brief A message which instructs the caching topic to remove an entry from * its cache. diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h index e61d3e931c..514d62e695 100644 --- a/include/asterisk/stasis_cache_pattern.h +++ b/include/asterisk/stasis_cache_pattern.h @@ -169,4 +169,39 @@ struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one); struct stasis_topic *stasis_cp_single_topic_cached( struct stasis_cp_single *one); +/*! + * \brief Indicate to an instance that we are interested in a message type. + * + * This will cause the caching topic to receive messages of the given message + * type. This enables internal filtering in the stasis message bus to reduce + * messages. + * + * \param one One side of the cache pattern. + * \param type The message type we wish to receive. + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_cp_single_accept_message_type(struct stasis_cp_single *one, + struct stasis_message_type *type); + +/*! + * \brief Set the message type filtering level on a cache + * + * This will cause the underlying subscription to filter messages according to the + * provided filter level. For example if selective is used then only + * messages matching those provided to \ref stasis_subscription_accept_message_type + * will be raised to the subscription callback. + * + * \param one One side of the cache pattern. + * \param filter What filter to use + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_cp_single_set_filter(struct stasis_cp_single *one, + enum stasis_subscription_message_filter filter); + #endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */ diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 50270a788b..8dcdfcc913 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -233,6 +233,10 @@ void stasis_message_router_remove_cache_update( * \retval -1 on failure * * \since 12 + * + * \note Setting a default callback will automatically cause the underlying + * subscription to receive all messages and not be filtered. If filtering is + * desired then a specific route for each message type should be provided. */ int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, diff --git a/main/ccss.c b/main/ccss.c index 5758574f66..52ec58647b 100644 --- a/main/ccss.c +++ b/main/ccss.c @@ -1433,6 +1433,8 @@ static struct generic_monitor_instance_list *create_new_generic_list(struct ast_ cc_unref(generic_list, "Failed to subscribe to device state"); return NULL; } + stasis_subscription_accept_message_type(generic_list->sub, ast_device_state_message_type()); + stasis_subscription_set_filter(generic_list->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); generic_list->current_state = ast_device_state(monitor->interface->device_name); ao2_t_link(generic_monitors, generic_list, "linking new generic monitor instance list"); return generic_list; @@ -2804,6 +2806,9 @@ static int cc_generic_agent_start_monitoring(struct ast_cc_agent *agent) if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) { return -1; } + stasis_subscription_accept_message_type(generic_pvt->sub, ast_device_state_message_type()); + stasis_subscription_accept_message_type(generic_pvt->sub, stasis_subscription_change_type()); + stasis_subscription_set_filter(generic_pvt->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); cc_ref(agent, "Ref agent for subscription"); return 0; } diff --git a/main/devicestate.c b/main/devicestate.c index 7dcbe82900..b6c740ce26 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -920,6 +920,8 @@ int devstate_init(void) if (!device_state_topic_cached) { return -1; } + stasis_caching_accept_message_type(device_state_topic_cached, ast_device_state_message_type()); + stasis_caching_set_filter(device_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(), devstate_change_cb, NULL); @@ -927,6 +929,8 @@ int devstate_init(void) ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n"); return -1; } + stasis_subscription_accept_message_type(devstate_message_sub, ast_device_state_message_type()); + stasis_subscription_set_filter(devstate_message_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); return 0; } diff --git a/main/endpoints.c b/main/endpoints.c index f1608f3a00..3129fb49f7 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -202,7 +202,7 @@ static void endpoint_cache_clear(void *data, endpoint_publish_snapshot(endpoint); } -static void endpoint_default(void *data, +static void endpoint_subscription_change(void *data, struct stasis_subscription *sub, struct stasis_message *message) { @@ -263,6 +263,8 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha if (!endpoint->topics) { return NULL; } + stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type()); + stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint)); if (!endpoint->router) { @@ -271,8 +273,9 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha r |= stasis_message_router_add(endpoint->router, stasis_cache_clear_type(), endpoint_cache_clear, endpoint); - r |= stasis_message_router_set_default(endpoint->router, - endpoint_default, endpoint); + r |= stasis_message_router_add(endpoint->router, + stasis_subscription_change_type(), endpoint_subscription_change, + endpoint); if (r) { return NULL; } @@ -288,6 +291,8 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha if (!endpoint->topics) { return NULL; } + stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type()); + stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); ao2_link(tech_endpoints, endpoint); } diff --git a/main/manager.c b/main/manager.c index 7accaa15f7..0da023a518 100644 --- a/main/manager.c +++ b/main/manager.c @@ -1527,6 +1527,8 @@ static void acl_change_stasis_subscribe(void) if (!acl_change_sub) { acl_change_sub = stasis_subscribe(ast_security_topic(), acl_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type()); + stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/main/pbx.c b/main/pbx.c index f9612957e5..0a23735c05 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -8416,10 +8416,15 @@ int load_pbx(void) if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) { return -1; } + stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type()); + stasis_subscription_accept_message_type(device_state_sub, hint_change_message_type()); + stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); if (!(presence_state_sub = stasis_subscribe(ast_presence_state_topic_all(), presence_state_cb, NULL))) { return -1; } + stasis_subscription_accept_message_type(presence_state_sub, ast_presence_state_message_type()); + stasis_subscription_set_filter(presence_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); return 0; } diff --git a/main/presencestate.c b/main/presencestate.c index 4121bf5b69..65b7f69270 100644 --- a/main/presencestate.c +++ b/main/presencestate.c @@ -514,6 +514,8 @@ int ast_presence_state_engine_init(void) if (!presence_state_topic_cached) { return -1; } + stasis_caching_accept_message_type(presence_state_topic_cached, ast_presence_state_message_type()); + stasis_caching_set_filter(presence_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); AST_TEST_REGISTER(test_presence_chan); diff --git a/main/stasis.c b/main/stasis.c index ed838733b9..93112d98eb 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -370,6 +370,11 @@ const char *stasis_topic_name(const struct stasis_topic *topic) return topic->name; } +size_t stasis_topic_subscribers(const struct stasis_topic *topic) +{ + return AST_VECTOR_SIZE(&topic->subscribers); +} + /*! \internal */ struct stasis_subscription { /*! Unique ID for this subscription */ @@ -391,6 +396,11 @@ struct stasis_subscription { /*! Flag set when final message for sub has been processed. * Be sure join_lock is held before reading/setting. */ int final_message_processed; + + /*! The message types this subscription is accepting */ + AST_VECTOR(, char) accepted_message_types; + /*! The message filter currently in use */ + enum stasis_subscription_message_filter filter; }; static void subscription_dtor(void *obj) @@ -409,6 +419,8 @@ static void subscription_dtor(void *obj) ast_taskprocessor_unreference(sub->mailbox); sub->mailbox = NULL; ast_cond_destroy(&sub->join_cond); + + AST_VECTOR_FREE(&sub->accepted_message_types); } /*! @@ -420,19 +432,25 @@ static void subscription_dtor(void *obj) static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message) { + unsigned int final = stasis_subscription_final_message(sub, message); + int message_type_id = stasis_message_type_id(stasis_subscription_change_type()); + /* Notify that the final message has been received */ - if (stasis_subscription_final_message(sub, message)) { + if (final) { ao2_lock(sub); sub->final_message_rxed = 1; ast_cond_signal(&sub->join_cond); ao2_unlock(sub); } - /* Since sub is mostly immutable, no need to lock sub */ - sub->callback(sub->data, sub, message); + if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE || + (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) { + /* Since sub is mostly immutable, no need to lock sub */ + sub->callback(sub->data, sub, message); + } /* Notify that the final message has been processed */ - if (stasis_subscription_final_message(sub, message)) { + if (final) { ao2_lock(sub); sub->final_message_processed = 1; ast_cond_signal(&sub->join_cond); @@ -500,6 +518,8 @@ struct stasis_subscription *internal_stasis_subscribe( sub->callback = callback; sub->data = data; ast_cond_init(&sub->join_cond, NULL); + sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE; + AST_VECTOR_INIT(&sub->accepted_message_types, 0); if (topic_add_subscription(topic, sub) != 0) { ao2_ref(sub, -1); @@ -586,6 +606,76 @@ int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscr return res; } +int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, + const struct stasis_message_type *type) +{ + if (!subscription) { + return -1; + } + + ast_assert(type != NULL); + ast_assert(stasis_message_type_name(type) != NULL); + + if (!type || !stasis_message_type_name(type)) { + /* Filtering is unreliable as this message type is not yet initialized + * so force all messages through. + */ + subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE; + return 0; + } + + ao2_lock(subscription->topic); + if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) { + /* We do this for the same reason as above. The subscription can still operate, so allow + * it to do so by forcing all messages through. + */ + subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE; + } + ao2_unlock(subscription->topic); + + return 0; +} + +int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, + const struct stasis_message_type *type) +{ + if (!subscription) { + return -1; + } + + ast_assert(type != NULL); + ast_assert(stasis_message_type_name(type) != NULL); + + if (!type || !stasis_message_type_name(type)) { + return 0; + } + + ao2_lock(subscription->topic); + if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) { + /* The memory is already allocated so this can't fail */ + AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0); + } + ao2_unlock(subscription->topic); + + return 0; +} + +int stasis_subscription_set_filter(struct stasis_subscription *subscription, + enum stasis_subscription_message_filter filter) +{ + if (!subscription) { + return -1; + } + + ao2_lock(subscription->topic); + if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) { + subscription->filter = filter; + } + ao2_unlock(subscription->topic); + + return 0; +} + void stasis_subscription_join(struct stasis_subscription *subscription) { if (subscription) { @@ -781,6 +871,18 @@ static void dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous) { + /* Determine if this subscription is interested in this message. Note that final + * messages are special and are always invoked on the subscription. + */ + if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) { + int message_type_id = stasis_message_type_id(stasis_message_type(message)); + if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) || + !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) && + !stasis_subscription_final_message(sub, message)) { + return; + } + } + if (!sub->mailbox) { /* Dispatch directly */ subscription_invoke(sub, message); @@ -840,6 +942,11 @@ static void publish_msg(struct stasis_topic *topic, ast_assert(topic != NULL); ast_assert(message != NULL); + /* If there are no subscribers don't bother */ + if (!stasis_topic_subscribers(topic)) { + return; + } + /* * The topic may be unref'ed by the subscription invocation. * Make sure we hold onto a reference while dispatching. diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 3d353b311e..bc975fd3da 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -87,6 +87,35 @@ struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *cachi return caching_topic->topic; } +int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, + struct stasis_message_type *type) +{ + int res; + + if (!caching_topic) { + return -1; + } + + /* We wait to accept the stasis specific message types until now so that by default everything + * will flow to us. + */ + res = stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type()); + res |= stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type()); + res |= stasis_subscription_accept_message_type(caching_topic->sub, type); + + return res; +} + +int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, + enum stasis_subscription_message_filter filter) +{ + if (!caching_topic) { + return -1; + } + return stasis_subscription_set_filter(caching_topic->sub, filter); +} + + struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic) { if (!caching_topic) { @@ -856,11 +885,13 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, /* Update the cache */ snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put); if (snapshots.old || msg_put) { - update = update_create(snapshots.old, msg_put); - if (update) { - stasis_publish(caching_topic->topic, update); + if (stasis_topic_subscribers(caching_topic->topic)) { + update = update_create(snapshots.old, msg_put); + if (update) { + stasis_publish(caching_topic->topic, update); + ao2_ref(update, -1); + } } - ao2_cleanup(update); } else { ast_debug(1, "Attempting to remove an item from the %s cache that isn't there: %s %s\n", @@ -873,11 +904,13 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic, snapshots.aggregate_new); } - update = update_create(snapshots.aggregate_old, snapshots.aggregate_new); - if (update) { - stasis_publish(caching_topic->topic, update); + if (stasis_topic_subscribers(caching_topic->topic)) { + update = update_create(snapshots.aggregate_old, snapshots.aggregate_new); + if (update) { + stasis_publish(caching_topic->topic, update); + ao2_ref(update, -1); + } } - ao2_cleanup(update); } ao2_cleanup(snapshots.old); diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c index f0e34b9558..04d816463c 100644 --- a/main/stasis_cache_pattern.c +++ b/main/stasis_cache_pattern.c @@ -217,3 +217,21 @@ struct stasis_topic *stasis_cp_single_topic_cached( } return stasis_caching_get_topic(one->topic_cached); } + +int stasis_cp_single_accept_message_type(struct stasis_cp_single *one, + struct stasis_message_type *type) +{ + if (!one) { + return -1; + } + return stasis_caching_accept_message_type(one->topic_cached, type); +} + +int stasis_cp_single_set_filter(struct stasis_cp_single *one, + enum stasis_subscription_message_filter filter) +{ + if (!one) { + return -1; + } + return stasis_caching_set_filter(one->topic_cached, filter); +} diff --git a/main/stasis_message.c b/main/stasis_message.c index 19f4a928fd..1fdbe858e3 100644 --- a/main/stasis_message.c +++ b/main/stasis_message.c @@ -39,9 +39,11 @@ struct stasis_message_type { struct stasis_message_vtable *vtable; char *name; unsigned int hash; + int id; }; static struct stasis_message_vtable null_vtable = {}; +static int message_type_id; static void message_type_dtor(void *obj) { @@ -78,6 +80,7 @@ int stasis_message_type_create(const char *name, } type->hash = ast_hashtab_hash_string(name); type->vtable = vtable; + type->id = ast_atomic_fetchadd_int(&message_type_id, +1); *result = type; return STASIS_MESSAGE_TYPE_SUCCESS; @@ -93,6 +96,11 @@ unsigned int stasis_message_type_hash(const struct stasis_message_type *type) return type->hash; } +int stasis_message_type_id(const struct stasis_message_type *type) +{ + return type->id; +} + /*! \internal */ struct stasis_message { /*! Time the message was created */ diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 41d426beca..41ebc7ea8a 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -235,6 +235,9 @@ static struct stasis_message_router *stasis_message_router_create_internal( return NULL; } + /* We need to receive subscription change messages so we know when our subscription goes away */ + stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type()); + return router; } @@ -316,6 +319,14 @@ int stasis_message_router_add(struct stasis_message_router *router, } ao2_lock(router); res = route_table_add(&router->routes, message_type, callback, data); + if (!res) { + stasis_subscription_accept_message_type(router->subscription, message_type); + /* Until a specific message type was added we would already drop the message, so being + * selective now doesn't harm us. If we have a default route then we are already forced + * to filter nothing and messages will come in regardless. + */ + stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); + } ao2_unlock(router); return res; } @@ -334,6 +345,10 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router, } ao2_lock(router); res = route_table_add(&router->cache_routes, message_type, callback, data); + if (!res) { + stasis_subscription_accept_message_type(router->subscription, stasis_cache_update_type()); + stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); + } ao2_unlock(router); return res; } @@ -378,6 +393,9 @@ int stasis_message_router_set_default(struct stasis_message_router *router, router->default_route.callback = callback; router->default_route.data = data; ao2_unlock(router); + + stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE); + /* While this implementation can never fail, it used to be able to */ return 0; } diff --git a/res/parking/parking_applications.c b/res/parking/parking_applications.c index dd2fb75879..f9b3e85d26 100644 --- a/res/parking/parking_applications.c +++ b/res/parking/parking_applications.c @@ -868,6 +868,10 @@ static void park_announce_update_cb(void *data, struct stasis_subscription *sub, return; } + if (ast_parked_call_type() != stasis_message_type(message)) { + return; + } + if (payload->event_type != PARKED_CALL) { /* We are only concerned with calls parked */ return; @@ -954,6 +958,10 @@ static int park_and_announce_app_exec(struct ast_channel *chan, const char *data return -1; } + stasis_subscription_accept_message_type(parking_subscription, ast_parked_call_type()); + stasis_subscription_accept_message_type(parking_subscription, stasis_subscription_change_type()); + stasis_subscription_set_filter(parking_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); + /* Now for the fun part... park it! */ ast_bridge_join(parking_bridge, chan, NULL, &chan_features, NULL, 0); diff --git a/res/parking/parking_bridge_features.c b/res/parking/parking_bridge_features.c index f73f37157e..1d3b9e4da8 100644 --- a/res/parking/parking_bridge_features.c +++ b/res/parking/parking_bridge_features.c @@ -213,6 +213,9 @@ static int create_parked_subscription_full(struct ast_channel *chan, const char if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) { return -1; } + stasis_subscription_accept_message_type(parked_datastore->parked_subscription, ast_parked_call_type()); + stasis_subscription_accept_message_type(parked_datastore->parked_subscription, stasis_subscription_change_type()); + stasis_subscription_set_filter(parked_datastore->parked_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); datastore->data = parked_datastore; diff --git a/res/parking/parking_manager.c b/res/parking/parking_manager.c index 6d0a4c06cb..83558ba74e 100644 --- a/res/parking/parking_manager.c +++ b/res/parking/parking_manager.c @@ -686,6 +686,8 @@ static void parking_manager_enable_stasis(void) { if (!parking_sub) { parking_sub = stasis_subscribe(ast_parking_topic(), parking_event_cb, NULL); + stasis_subscription_accept_message_type(parking_sub, ast_parked_call_type()); + stasis_subscription_set_filter(parking_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/res/res_hep_rtcp.c b/res/res_hep_rtcp.c index c3abbc164b..f73cd44e4e 100644 --- a/res/res_hep_rtcp.c +++ b/res/res_hep_rtcp.c @@ -167,6 +167,9 @@ static int load_module(void) if (!stasis_rtp_subscription) { return AST_MODULE_LOAD_DECLINE; } + stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_sent_type()); + stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_received_type()); + stasis_subscription_set_filter(stasis_rtp_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); return AST_MODULE_LOAD_SUCCESS; } diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index 4cd892c05b..83bff8893c 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -269,6 +269,9 @@ static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char ao2_ref(mwi_sub, -1); mwi_stasis_sub = NULL; } + stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, ast_mwi_state_type()); + stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, stasis_subscription_change_type()); + stasis_subscription_set_filter(mwi_stasis_sub->stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); return mwi_stasis_sub; } @@ -1364,7 +1367,11 @@ static int load_module(void) if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { ast_sip_push_task(NULL, send_initial_notify_all, NULL); } else { - stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL); + struct stasis_subscription *sub; + + sub = stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL); + stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type()); + stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c index 648deee596..33129c828c 100644 --- a/res/res_pjsip_outbound_registration.c +++ b/res/res_pjsip_outbound_registration.c @@ -2534,6 +2534,8 @@ static int load_module(void) network_change_sub = stasis_subscribe(ast_system_topic(), network_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type()); + stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); return AST_MODULE_LOAD_SUCCESS; } diff --git a/res/res_pjsip_publish_asterisk.c b/res/res_pjsip_publish_asterisk.c index 220ba0bc54..692f9a7479 100644 --- a/res/res_pjsip_publish_asterisk.c +++ b/res/res_pjsip_publish_asterisk.c @@ -360,6 +360,9 @@ static int asterisk_start_devicestate_publishing(struct ast_sip_outbound_publish ao2_ref(datastore, -1); return -1; } + stasis_subscription_accept_message_type(publisher_state->device_state_subscription, ast_device_state_message_type()); + stasis_subscription_accept_message_type(publisher_state->device_state_subscription, stasis_subscription_change_type()); + stasis_subscription_set_filter(publisher_state->device_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); cached = stasis_cache_dump(ast_device_state_cache(), NULL); ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, datastore); @@ -435,6 +438,9 @@ static int asterisk_start_mwi_publishing(struct ast_sip_outbound_publish *config ao2_ref(datastore, -1); return -1; } + stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, ast_mwi_state_type()); + stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, stasis_subscription_change_type()); + stasis_subscription_set_filter(publisher_state->mailbox_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); cached = stasis_cache_dump(ast_mwi_state_cache(), NULL); ao2_callback(cached, OBJ_NODATA, cached_mwistate_cb, datastore); diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index b5ee15923d..9e8a32bf79 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -5567,7 +5567,11 @@ static int load_module(void) if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { ast_sip_push_task(NULL, subscription_persistence_load, NULL); } else { - stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); + struct stasis_subscription *sub; + + sub = stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); + stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type()); + stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM, diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c index 1e6ca7f46e..3dfaabc445 100644 --- a/res/res_pjsip_refer.c +++ b/res/res_pjsip_refer.c @@ -686,6 +686,10 @@ static void refer_blind_callback(struct ast_channel *chan, struct transfer_chann ast_channel_unlock(chan); ao2_cleanup(refer->progress); + } else { + stasis_subscription_accept_message_type(refer->progress->bridge_sub, ast_channel_entered_bridge_type()); + stasis_subscription_accept_message_type(refer->progress->bridge_sub, stasis_subscription_change_type()); + stasis_subscription_set_filter(refer->progress->bridge_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/res/res_security_log.c b/res/res_security_log.c index 555ba23c18..95429cad3f 100644 --- a/res/res_security_log.c +++ b/res/res_security_log.c @@ -141,6 +141,8 @@ static int load_module(void) LOG_SECURITY = -1; return AST_MODULE_LOAD_DECLINE; } + stasis_subscription_accept_message_type(security_stasis_sub, ast_security_event_type()); + stasis_subscription_set_filter(security_stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); ast_verb(3, "Security Logging Enabled\n"); diff --git a/res/res_stasis_device_state.c b/res/res_stasis_device_state.c index be09b15ad0..1c80f9efaa 100644 --- a/res/res_stasis_device_state.c +++ b/res/res_stasis_device_state.c @@ -394,6 +394,9 @@ static int subscribe_device_state(struct stasis_app *app, void *obj) ao2_ref(sub, -1); return -1; } + stasis_subscription_accept_message_type(sub->sub, ast_device_state_message_type()); + stasis_subscription_accept_message_type(sub->sub, stasis_subscription_change_type()); + stasis_subscription_set_filter(sub->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK); ao2_unlock(device_state_subscriptions); diff --git a/res/res_xmpp.c b/res/res_xmpp.c index 8366a8cbba..838bed86bd 100644 --- a/res/res_xmpp.c +++ b/res/res_xmpp.c @@ -1599,11 +1599,15 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client) if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) { return; } + stasis_subscription_accept_message_type(client->mwi_sub, ast_mwi_state_type()); + stasis_subscription_set_filter(client->mwi_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); if (!(client->device_state_sub = stasis_subscribe(ast_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) { client->mwi_sub = stasis_unsubscribe(client->mwi_sub); return; } + stasis_subscription_accept_message_type(client->device_state_sub, ast_device_state_message_type()); + stasis_subscription_set_filter(client->device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); cached = stasis_cache_dump(ast_device_state_cache(), NULL); ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);