Merge "stasis: Add internal filtering of messages." into 16

This commit is contained in:
Joshua Colp 2018-11-19 08:37:05 -06:00 committed by Gerrit Code Review
commit cc0157d0d3
33 changed files with 457 additions and 17 deletions

View File

@ -11336,6 +11336,8 @@ static int load_module(void)
if (!device_state_sub) {
err = -1;
}
stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());
stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
manager_topic = ast_manager_get_topic();
queue_topic = ast_queue_topic_all();

View File

@ -12594,6 +12594,8 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf,
* knows that we care about it. Then, chan_dahdi will get the MWI from the
* event cache instead of checking the mailbox directly. */
tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
stasis_subscription_accept_message_type(tmp->mwi_event_sub, ast_mwi_state_type());
stasis_subscription_set_filter(tmp->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
#ifdef HAVE_DAHDI_LINEREVERSE_VMWI

View File

@ -1456,6 +1456,8 @@ static void network_change_stasis_subscribe(void)
if (!network_change_sub) {
network_change_sub = stasis_subscribe(ast_system_topic(),
network_change_stasis_cb, NULL);
stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
@ -1469,6 +1471,8 @@ static void acl_change_stasis_subscribe(void)
if (!acl_change_sub) {
acl_change_sub = stasis_subscribe(ast_security_topic(),
acl_change_stasis_cb, NULL);
stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
@ -13072,6 +13076,8 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st
* mailboxes. However, we just grab the events out of the cache when it
* is time to send MWI, since it is only sent with a REGACK. */
peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
stasis_subscription_accept_message_type(peer->mwi_event_sub, ast_mwi_state_type());
stasis_subscription_set_filter(peer->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}

View File

@ -4242,6 +4242,8 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v)
* knows that we care about it. Then, chan_mgcp will get the MWI from the
* event cache instead of checking the mailbox directly. */
e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
stasis_subscription_accept_message_type(e->mwi_event_sub, ast_mwi_state_type());
stasis_subscription_set_filter(e->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());

View File

@ -17494,6 +17494,8 @@ static void network_change_stasis_subscribe(void)
if (!network_change_sub) {
network_change_sub = stasis_subscribe(ast_system_topic(),
network_change_stasis_cb, NULL);
stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
@ -17507,6 +17509,8 @@ static void acl_change_stasis_subscribe(void)
if (!acl_change_sub) {
acl_change_sub = stasis_subscribe(ast_security_topic(),
acl_change_stasis_cb, NULL);
stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
@ -28385,6 +28389,9 @@ static void add_peer_mwi_subs(struct sip_peer *peer)
mailbox_specific_topic = ast_mwi_topic(mailbox->id);
if (mailbox_specific_topic) {
mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer);
stasis_subscription_accept_message_type(mailbox->event_sub, ast_mwi_state_type());
stasis_subscription_accept_message_type(mailbox->event_sub, stasis_subscription_change_type());
stasis_subscription_set_filter(mailbox->event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}
}

View File

@ -8334,6 +8334,8 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v
mailbox_specific_topic = ast_mwi_topic(l->mailbox);
if (mailbox_specific_topic) {
l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);
stasis_subscription_accept_message_type(l->mwi_event_sub, ast_mwi_state_type());
stasis_subscription_set_filter(l->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}

View File

@ -9130,6 +9130,9 @@ int sig_pri_start_pri(struct sig_pri_span *pri)
if (!pri->mbox[i].sub) {
ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",
sig_pri_cc_type_name, pri->span, pri->mbox[i].vm_box, mbox_id);
} else {
stasis_subscription_accept_message_type(pri->mbox[i].sub, ast_mwi_state_type());
stasis_subscription_set_filter(pri->mbox[i].sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
#if defined(HAVE_PRI_MWI_V2)
if (ast_strlen_zero(pri->mbox[i].vm_number)) {

View File

@ -291,6 +291,15 @@ enum stasis_message_type_result {
STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */
};
/*!
* \brief Stasis subscription message filters
*/
enum stasis_subscription_message_filter {
STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */
STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */
STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */
};
/*!
* \brief Create a new message type.
*
@ -326,6 +335,14 @@ const char *stasis_message_type_name(const struct stasis_message_type *type);
*/
unsigned int stasis_message_type_hash(const struct stasis_message_type *type);
/*!
* \brief Gets the id of a given message type
* \param type The type to get the id of.
* \return The id
* \since 17.0.0
*/
int stasis_message_type_id(const struct stasis_message_type *type);
/*!
* \brief Check whether a message type is declined
*
@ -494,6 +511,14 @@ struct stasis_topic *stasis_topic_create(const char *name);
*/
const char *stasis_topic_name(const struct stasis_topic *topic);
/*!
* \brief Return the number of subscribers of a topic.
* \param topic Topic.
* \return Number of subscribers of the topic.
* \since 17.0.0
*/
size_t stasis_topic_subscribers(const struct stasis_topic *topic);
/*!
* \brief Publish a message to a topic's subscribers.
* \param topic Topic.
@ -559,6 +584,10 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st
* \return New \ref stasis_subscription object.
* \return \c NULL on error.
* \since 12
*
* \note This callback will receive a callback with a message indicating it
* has been subscribed. This occurs immediately before accepted message
* types can be set and the callback must expect to receive it.
*/
struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data);
@ -584,10 +613,68 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
* \return New \ref stasis_subscription object.
* \return \c NULL on error.
* \since 12.8.0
*
* \note This callback will receive a callback with a message indicating it
* has been subscribed. This occurs immediately before accepted message
* types can be set and the callback must expect to receive it.
*/
struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data);
/*!
* \brief Indicate to a subscription that we are interested in a message type.
*
* This will cause the subscription to allow the given message type to be
* raised to our subscription callback. This enables internal filtering in
* the stasis message bus to reduce messages.
*
* \param subscription Subscription to add message type to.
* \param type The message type we wish to receive.
* \retval 0 on success
* \retval -1 failure
*
* \since 17.0.0
*
* \note If you are wanting to use stasis_final_message you will need to accept
* \ref stasis_subscription_change_type as a message type.
*
* \note Until the subscription is set to selective filtering it is possible for it
* to receive messages of message types that would not normally be accepted.
*/
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
const struct stasis_message_type *type);
/*!
* \brief Indicate to a subscription that we are not interested in a message type.
*
* \param subscription Subscription to remove message type from.
* \param type The message type we don't wish to receive.
* \retval 0 on success
* \retval -1 failure
*
* \since 17.0.0
*/
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
const struct stasis_message_type *type);
/*!
* \brief Set the message type filtering level on a subscription
*
* This will cause the subscription to filter messages according to the
* provided filter level. For example if selective is used then only
* messages matching those provided to \ref stasis_subscription_accept_message_type
* will be raised to the subscription callback.
*
* \param subscription Subscription that should receive all messages.
* \param filter What filter to use
* \retval 0 on success
* \retval -1 failure
*
* \since 17.0.0
*/
int stasis_subscription_set_filter(struct stasis_subscription *subscription,
enum stasis_subscription_message_filter filter);
/*!
* \brief Cancel a subscription.
*
@ -1036,6 +1123,41 @@ struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
struct stasis_topic *stasis_caching_get_topic(
struct stasis_caching_topic *caching_topic);
/*!
* \brief Indicate to a caching topic that we are interested in a message type.
*
* This will cause the caching topic to receive messages of the given message
* type. This enables internal filtering in the stasis message bus to reduce
* messages.
*
* \param caching_topic The caching topic.
* \param type The message type we wish to receive.
* \retval 0 on success
* \retval -1 failure
*
* \since 17.0.0
*/
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
struct stasis_message_type *type);
/*!
* \brief Set the message type filtering level on a cache
*
* This will cause the underlying subscription to filter messages according to the
* provided filter level. For example if selective is used then only
* messages matching those provided to \ref stasis_subscription_accept_message_type
* will be raised to the subscription callback.
*
* \param caching_topic The caching topic.
* \param filter What filter to use
* \retval 0 on success
* \retval -1 failure
*
* \since 17.0.0
*/
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
enum stasis_subscription_message_filter filter);
/*!
* \brief A message which instructs the caching topic to remove an entry from
* its cache.

View File

@ -169,4 +169,39 @@ struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one);
struct stasis_topic *stasis_cp_single_topic_cached(
struct stasis_cp_single *one);
/*!
* \brief Indicate to an instance that we are interested in a message type.
*
* This will cause the caching topic to receive messages of the given message
* type. This enables internal filtering in the stasis message bus to reduce
* messages.
*
* \param one One side of the cache pattern.
* \param type The message type we wish to receive.
* \retval 0 on success
* \retval -1 failure
*
* \since 17.0.0
*/
int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,
struct stasis_message_type *type);
/*!
* \brief Set the message type filtering level on a cache
*
* This will cause the underlying subscription to filter messages according to the
* provided filter level. For example if selective is used then only
* messages matching those provided to \ref stasis_subscription_accept_message_type
* will be raised to the subscription callback.
*
* \param one One side of the cache pattern.
* \param filter What filter to use
* \retval 0 on success
* \retval -1 failure
*
* \since 17.0.0
*/
int stasis_cp_single_set_filter(struct stasis_cp_single *one,
enum stasis_subscription_message_filter filter);
#endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */

View File

@ -233,6 +233,10 @@ void stasis_message_router_remove_cache_update(
* \retval -1 on failure
*
* \since 12
*
* \note Setting a default callback will automatically cause the underlying
* subscription to receive all messages and not be filtered. If filtering is
* desired then a specific route for each message type should be provided.
*/
int stasis_message_router_set_default(struct stasis_message_router *router,
stasis_subscription_cb callback,

View File

@ -1433,6 +1433,8 @@ static struct generic_monitor_instance_list *create_new_generic_list(struct ast_
cc_unref(generic_list, "Failed to subscribe to device state");
return NULL;
}
stasis_subscription_accept_message_type(generic_list->sub, ast_device_state_message_type());
stasis_subscription_set_filter(generic_list->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
generic_list->current_state = ast_device_state(monitor->interface->device_name);
ao2_t_link(generic_monitors, generic_list, "linking new generic monitor instance list");
return generic_list;
@ -2804,6 +2806,9 @@ static int cc_generic_agent_start_monitoring(struct ast_cc_agent *agent)
if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) {
return -1;
}
stasis_subscription_accept_message_type(generic_pvt->sub, ast_device_state_message_type());
stasis_subscription_accept_message_type(generic_pvt->sub, stasis_subscription_change_type());
stasis_subscription_set_filter(generic_pvt->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
cc_ref(agent, "Ref agent for subscription");
return 0;
}

View File

@ -920,6 +920,8 @@ int devstate_init(void)
if (!device_state_topic_cached) {
return -1;
}
stasis_caching_accept_message_type(device_state_topic_cached, ast_device_state_message_type());
stasis_caching_set_filter(device_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),
devstate_change_cb, NULL);
@ -927,6 +929,8 @@ int devstate_init(void)
ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");
return -1;
}
stasis_subscription_accept_message_type(devstate_message_sub, ast_device_state_message_type());
stasis_subscription_set_filter(devstate_message_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return 0;
}

View File

@ -202,7 +202,7 @@ static void endpoint_cache_clear(void *data,
endpoint_publish_snapshot(endpoint);
}
static void endpoint_default(void *data,
static void endpoint_subscription_change(void *data,
struct stasis_subscription *sub,
struct stasis_message *message)
{
@ -263,6 +263,8 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
if (!endpoint->topics) {
return NULL;
}
stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
if (!endpoint->router) {
@ -271,8 +273,9 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
r |= stasis_message_router_add(endpoint->router,
stasis_cache_clear_type(), endpoint_cache_clear,
endpoint);
r |= stasis_message_router_set_default(endpoint->router,
endpoint_default, endpoint);
r |= stasis_message_router_add(endpoint->router,
stasis_subscription_change_type(), endpoint_subscription_change,
endpoint);
if (r) {
return NULL;
}
@ -288,6 +291,8 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
if (!endpoint->topics) {
return NULL;
}
stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
ao2_link(tech_endpoints, endpoint);
}

View File

@ -1527,6 +1527,8 @@ static void acl_change_stasis_subscribe(void)
if (!acl_change_sub) {
acl_change_sub = stasis_subscribe(ast_security_topic(),
acl_change_stasis_cb, NULL);
stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}

View File

@ -8416,10 +8416,15 @@ int load_pbx(void)
if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) {
return -1;
}
stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());
stasis_subscription_accept_message_type(device_state_sub, hint_change_message_type());
stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
if (!(presence_state_sub = stasis_subscribe(ast_presence_state_topic_all(), presence_state_cb, NULL))) {
return -1;
}
stasis_subscription_accept_message_type(presence_state_sub, ast_presence_state_message_type());
stasis_subscription_set_filter(presence_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return 0;
}

View File

@ -514,6 +514,8 @@ int ast_presence_state_engine_init(void)
if (!presence_state_topic_cached) {
return -1;
}
stasis_caching_accept_message_type(presence_state_topic_cached, ast_presence_state_message_type());
stasis_caching_set_filter(presence_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
AST_TEST_REGISTER(test_presence_chan);

View File

@ -370,6 +370,11 @@ const char *stasis_topic_name(const struct stasis_topic *topic)
return topic->name;
}
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
{
return AST_VECTOR_SIZE(&topic->subscribers);
}
/*! \internal */
struct stasis_subscription {
/*! Unique ID for this subscription */
@ -391,6 +396,11 @@ struct stasis_subscription {
/*! Flag set when final message for sub has been processed.
* Be sure join_lock is held before reading/setting. */
int final_message_processed;
/*! The message types this subscription is accepting */
AST_VECTOR(, char) accepted_message_types;
/*! The message filter currently in use */
enum stasis_subscription_message_filter filter;
};
static void subscription_dtor(void *obj)
@ -409,6 +419,8 @@ static void subscription_dtor(void *obj)
ast_taskprocessor_unreference(sub->mailbox);
sub->mailbox = NULL;
ast_cond_destroy(&sub->join_cond);
AST_VECTOR_FREE(&sub->accepted_message_types);
}
/*!
@ -420,19 +432,25 @@ static void subscription_dtor(void *obj)
static void subscription_invoke(struct stasis_subscription *sub,
struct stasis_message *message)
{
unsigned int final = stasis_subscription_final_message(sub, message);
int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
/* Notify that the final message has been received */
if (stasis_subscription_final_message(sub, message)) {
if (final) {
ao2_lock(sub);
sub->final_message_rxed = 1;
ast_cond_signal(&sub->join_cond);
ao2_unlock(sub);
}
/* Since sub is mostly immutable, no need to lock sub */
sub->callback(sub->data, sub, message);
if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
(message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
/* Since sub is mostly immutable, no need to lock sub */
sub->callback(sub->data, sub, message);
}
/* Notify that the final message has been processed */
if (stasis_subscription_final_message(sub, message)) {
if (final) {
ao2_lock(sub);
sub->final_message_processed = 1;
ast_cond_signal(&sub->join_cond);
@ -500,6 +518,8 @@ struct stasis_subscription *internal_stasis_subscribe(
sub->callback = callback;
sub->data = data;
ast_cond_init(&sub->join_cond, NULL);
sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
AST_VECTOR_INIT(&sub->accepted_message_types, 0);
if (topic_add_subscription(topic, sub) != 0) {
ao2_ref(sub, -1);
@ -586,6 +606,76 @@ int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscr
return res;
}
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
const struct stasis_message_type *type)
{
if (!subscription) {
return -1;
}
ast_assert(type != NULL);
ast_assert(stasis_message_type_name(type) != NULL);
if (!type || !stasis_message_type_name(type)) {
/* Filtering is unreliable as this message type is not yet initialized
* so force all messages through.
*/
subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
return 0;
}
ao2_lock(subscription->topic);
if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
/* We do this for the same reason as above. The subscription can still operate, so allow
* it to do so by forcing all messages through.
*/
subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
}
ao2_unlock(subscription->topic);
return 0;
}
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
const struct stasis_message_type *type)
{
if (!subscription) {
return -1;
}
ast_assert(type != NULL);
ast_assert(stasis_message_type_name(type) != NULL);
if (!type || !stasis_message_type_name(type)) {
return 0;
}
ao2_lock(subscription->topic);
if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
/* The memory is already allocated so this can't fail */
AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
}
ao2_unlock(subscription->topic);
return 0;
}
int stasis_subscription_set_filter(struct stasis_subscription *subscription,
enum stasis_subscription_message_filter filter)
{
if (!subscription) {
return -1;
}
ao2_lock(subscription->topic);
if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
subscription->filter = filter;
}
ao2_unlock(subscription->topic);
return 0;
}
void stasis_subscription_join(struct stasis_subscription *subscription)
{
if (subscription) {
@ -781,6 +871,18 @@ static void dispatch_message(struct stasis_subscription *sub,
struct stasis_message *message,
int synchronous)
{
/* Determine if this subscription is interested in this message. Note that final
* messages are special and are always invoked on the subscription.
*/
if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
int message_type_id = stasis_message_type_id(stasis_message_type(message));
if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
!AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
!stasis_subscription_final_message(sub, message)) {
return;
}
}
if (!sub->mailbox) {
/* Dispatch directly */
subscription_invoke(sub, message);
@ -840,6 +942,11 @@ static void publish_msg(struct stasis_topic *topic,
ast_assert(topic != NULL);
ast_assert(message != NULL);
/* If there are no subscribers don't bother */
if (!stasis_topic_subscribers(topic)) {
return;
}
/*
* The topic may be unref'ed by the subscription invocation.
* Make sure we hold onto a reference while dispatching.

View File

@ -87,6 +87,35 @@ struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *cachi
return caching_topic->topic;
}
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
struct stasis_message_type *type)
{
int res;
if (!caching_topic) {
return -1;
}
/* We wait to accept the stasis specific message types until now so that by default everything
* will flow to us.
*/
res = stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type());
res |= stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type());
res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
return res;
}
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
enum stasis_subscription_message_filter filter)
{
if (!caching_topic) {
return -1;
}
return stasis_subscription_set_filter(caching_topic->sub, filter);
}
struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
{
if (!caching_topic) {
@ -856,11 +885,13 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
/* Update the cache */
snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
if (snapshots.old || msg_put) {
update = update_create(snapshots.old, msg_put);
if (update) {
stasis_publish(caching_topic->topic, update);
if (stasis_topic_subscribers(caching_topic->topic)) {
update = update_create(snapshots.old, msg_put);
if (update) {
stasis_publish(caching_topic->topic, update);
ao2_ref(update, -1);
}
}
ao2_cleanup(update);
} else {
ast_debug(1,
"Attempting to remove an item from the %s cache that isn't there: %s %s\n",
@ -873,11 +904,13 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
snapshots.aggregate_new);
}
update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
if (update) {
stasis_publish(caching_topic->topic, update);
if (stasis_topic_subscribers(caching_topic->topic)) {
update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
if (update) {
stasis_publish(caching_topic->topic, update);
ao2_ref(update, -1);
}
}
ao2_cleanup(update);
}
ao2_cleanup(snapshots.old);

View File

@ -217,3 +217,21 @@ struct stasis_topic *stasis_cp_single_topic_cached(
}
return stasis_caching_get_topic(one->topic_cached);
}
int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,
struct stasis_message_type *type)
{
if (!one) {
return -1;
}
return stasis_caching_accept_message_type(one->topic_cached, type);
}
int stasis_cp_single_set_filter(struct stasis_cp_single *one,
enum stasis_subscription_message_filter filter)
{
if (!one) {
return -1;
}
return stasis_caching_set_filter(one->topic_cached, filter);
}

View File

@ -39,9 +39,11 @@ struct stasis_message_type {
struct stasis_message_vtable *vtable;
char *name;
unsigned int hash;
int id;
};
static struct stasis_message_vtable null_vtable = {};
static int message_type_id;
static void message_type_dtor(void *obj)
{
@ -78,6 +80,7 @@ int stasis_message_type_create(const char *name,
}
type->hash = ast_hashtab_hash_string(name);
type->vtable = vtable;
type->id = ast_atomic_fetchadd_int(&message_type_id, +1);
*result = type;
return STASIS_MESSAGE_TYPE_SUCCESS;
@ -93,6 +96,11 @@ unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
return type->hash;
}
int stasis_message_type_id(const struct stasis_message_type *type)
{
return type->id;
}
/*! \internal */
struct stasis_message {
/*! Time the message was created */

View File

@ -235,6 +235,9 @@ static struct stasis_message_router *stasis_message_router_create_internal(
return NULL;
}
/* We need to receive subscription change messages so we know when our subscription goes away */
stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type());
return router;
}
@ -316,6 +319,14 @@ int stasis_message_router_add(struct stasis_message_router *router,
}
ao2_lock(router);
res = route_table_add(&router->routes, message_type, callback, data);
if (!res) {
stasis_subscription_accept_message_type(router->subscription, message_type);
/* Until a specific message type was added we would already drop the message, so being
* selective now doesn't harm us. If we have a default route then we are already forced
* to filter nothing and messages will come in regardless.
*/
stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
ao2_unlock(router);
return res;
}
@ -334,6 +345,10 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router,
}
ao2_lock(router);
res = route_table_add(&router->cache_routes, message_type, callback, data);
if (!res) {
stasis_subscription_accept_message_type(router->subscription, stasis_cache_update_type());
stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
ao2_unlock(router);
return res;
}
@ -378,6 +393,9 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
router->default_route.callback = callback;
router->default_route.data = data;
ao2_unlock(router);
stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);
/* While this implementation can never fail, it used to be able to */
return 0;
}

View File

@ -868,6 +868,10 @@ static void park_announce_update_cb(void *data, struct stasis_subscription *sub,
return;
}
if (ast_parked_call_type() != stasis_message_type(message)) {
return;
}
if (payload->event_type != PARKED_CALL) {
/* We are only concerned with calls parked */
return;
@ -954,6 +958,10 @@ static int park_and_announce_app_exec(struct ast_channel *chan, const char *data
return -1;
}
stasis_subscription_accept_message_type(parking_subscription, ast_parked_call_type());
stasis_subscription_accept_message_type(parking_subscription, stasis_subscription_change_type());
stasis_subscription_set_filter(parking_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
/* Now for the fun part... park it! */
ast_bridge_join(parking_bridge, chan, NULL, &chan_features, NULL, 0);

View File

@ -213,6 +213,9 @@ static int create_parked_subscription_full(struct ast_channel *chan, const char
if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {
return -1;
}
stasis_subscription_accept_message_type(parked_datastore->parked_subscription, ast_parked_call_type());
stasis_subscription_accept_message_type(parked_datastore->parked_subscription, stasis_subscription_change_type());
stasis_subscription_set_filter(parked_datastore->parked_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
datastore->data = parked_datastore;

View File

@ -686,6 +686,8 @@ static void parking_manager_enable_stasis(void)
{
if (!parking_sub) {
parking_sub = stasis_subscribe(ast_parking_topic(), parking_event_cb, NULL);
stasis_subscription_accept_message_type(parking_sub, ast_parked_call_type());
stasis_subscription_set_filter(parking_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}

View File

@ -167,6 +167,9 @@ static int load_module(void)
if (!stasis_rtp_subscription) {
return AST_MODULE_LOAD_DECLINE;
}
stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_sent_type());
stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_received_type());
stasis_subscription_set_filter(stasis_rtp_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return AST_MODULE_LOAD_SUCCESS;
}

View File

@ -269,6 +269,9 @@ static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char
ao2_ref(mwi_sub, -1);
mwi_stasis_sub = NULL;
}
stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, ast_mwi_state_type());
stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, stasis_subscription_change_type());
stasis_subscription_set_filter(mwi_stasis_sub->stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return mwi_stasis_sub;
}
@ -1364,7 +1367,11 @@ static int load_module(void)
if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
ast_sip_push_task(NULL, send_initial_notify_all, NULL);
} else {
stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
struct stasis_subscription *sub;
sub = stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());
stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}

View File

@ -2282,6 +2282,8 @@ static int load_module(void)
network_change_sub = stasis_subscribe(ast_system_topic(),
network_change_stasis_cb, NULL);
stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return AST_MODULE_LOAD_SUCCESS;
}

View File

@ -360,6 +360,9 @@ static int asterisk_start_devicestate_publishing(struct ast_sip_outbound_publish
ao2_ref(datastore, -1);
return -1;
}
stasis_subscription_accept_message_type(publisher_state->device_state_subscription, ast_device_state_message_type());
stasis_subscription_accept_message_type(publisher_state->device_state_subscription, stasis_subscription_change_type());
stasis_subscription_set_filter(publisher_state->device_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
cached = stasis_cache_dump(ast_device_state_cache(), NULL);
ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, datastore);
@ -435,6 +438,9 @@ static int asterisk_start_mwi_publishing(struct ast_sip_outbound_publish *config
ao2_ref(datastore, -1);
return -1;
}
stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, ast_mwi_state_type());
stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, stasis_subscription_change_type());
stasis_subscription_set_filter(publisher_state->mailbox_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
cached = stasis_cache_dump(ast_mwi_state_cache(), NULL);
ao2_callback(cached, OBJ_NODATA, cached_mwistate_cb, datastore);

View File

@ -5567,7 +5567,11 @@ static int load_module(void)
if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
ast_sip_push_task(NULL, subscription_persistence_load, NULL);
} else {
stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
struct stasis_subscription *sub;
sub = stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());
stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,

View File

@ -686,6 +686,10 @@ static void refer_blind_callback(struct ast_channel *chan, struct transfer_chann
ast_channel_unlock(chan);
ao2_cleanup(refer->progress);
} else {
stasis_subscription_accept_message_type(refer->progress->bridge_sub, ast_channel_entered_bridge_type());
stasis_subscription_accept_message_type(refer->progress->bridge_sub, stasis_subscription_change_type());
stasis_subscription_set_filter(refer->progress->bridge_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
}
}

View File

@ -141,6 +141,8 @@ static int load_module(void)
LOG_SECURITY = -1;
return AST_MODULE_LOAD_DECLINE;
}
stasis_subscription_accept_message_type(security_stasis_sub, ast_security_event_type());
stasis_subscription_set_filter(security_stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
ast_verb(3, "Security Logging Enabled\n");

View File

@ -394,6 +394,9 @@ static int subscribe_device_state(struct stasis_app *app, void *obj)
ao2_ref(sub, -1);
return -1;
}
stasis_subscription_accept_message_type(sub->sub, ast_device_state_message_type());
stasis_subscription_accept_message_type(sub->sub, stasis_subscription_change_type());
stasis_subscription_set_filter(sub->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
ao2_unlock(device_state_subscriptions);

View File

@ -1626,11 +1626,15 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client)
if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
return;
}
stasis_subscription_accept_message_type(client->mwi_sub, ast_mwi_state_type());
stasis_subscription_set_filter(client->mwi_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
if (!(client->device_state_sub = stasis_subscribe(ast_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) {
client->mwi_sub = stasis_unsubscribe(client->mwi_sub);
return;
}
stasis_subscription_accept_message_type(client->device_state_sub, ast_device_state_message_type());
stasis_subscription_set_filter(client->device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
cached = stasis_cache_dump(ast_device_state_cache(), NULL);
ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);