diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c index a5a369148c..a151f0cb96 100644 --- a/apps/app_voicemail.c +++ b/apps/app_voicemail.c @@ -13334,6 +13334,7 @@ static void mwi_unsub_event_cb(struct stasis_subscription_change *change) static void mwi_sub_event_cb(struct stasis_subscription_change *change) { struct mwi_sub_task *mwist; + const char *topic; char *context; char *mailbox; @@ -13342,7 +13343,9 @@ static void mwi_sub_event_cb(struct stasis_subscription_change *change) return; } - if (separate_mailbox(ast_strdupa(stasis_topic_name(change->topic)), &mailbox, &context)) { + /* The topic name is prefixed with "mwi:all/" as this is a pool topic */ + topic = stasis_topic_name(change->topic) + 8; + if (separate_mailbox(ast_strdupa(topic), &mailbox, &context)) { ast_free(mwist); return; } diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 09db9ea912..0b229bf7fd 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -514,6 +514,8 @@ struct stasis_topic; * from a topic and destroy it. As a result the topic can persist until * the last subscriber unsubscribes itself even if there is no * publisher. + * + * \note Topic names should be in the form of :[/] */ struct stasis_topic *stasis_topic_create(const char *name); diff --git a/main/app.c b/main/app.c index ec74490656..e8a4d2f456 100644 --- a/main/app.c +++ b/main/app.c @@ -3337,7 +3337,7 @@ int ast_delete_mwi_state_full(const char *mailbox, const char *context, struct a stasis_publish(mailbox_specific_topic, clear_msg); } - stasis_topic_pool_delete_topic(mwi_topic_pool, stasis_topic_name(mailbox_specific_topic)); + stasis_topic_pool_delete_topic(mwi_topic_pool, mwi_state->uniqueid); ao2_cleanup(clear_msg); return 0; @@ -3430,7 +3430,7 @@ int app_init(void) if (STASIS_MESSAGE_TYPE_INIT(ast_mwi_vm_app_type) != 0) { return -1; } - mwi_topic_all = stasis_topic_create("stasis_mwi_topic"); + mwi_topic_all = stasis_topic_create("mwi:all"); if (!mwi_topic_all) { return -1; } @@ -3446,7 +3446,7 @@ int app_init(void) if (!mwi_topic_pool) { return -1; } - queue_topic_all = stasis_topic_create("stasis_queue_topic"); + queue_topic_all = stasis_topic_create("queue:all"); if (!queue_topic_all) { return -1; } diff --git a/main/cdr.c b/main/cdr.c index 53f3362827..f8f038c138 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -4504,7 +4504,7 @@ static int load_module(void) return AST_MODULE_LOAD_FAILURE; } - cdr_topic = stasis_topic_create("cdr_engine"); + cdr_topic = stasis_topic_create("cdr:aggregator"); if (!cdr_topic) { return AST_MODULE_LOAD_FAILURE; } diff --git a/main/cel.c b/main/cel.c index 95376db6e3..1e77d25895 100644 --- a/main/cel.c +++ b/main/cel.c @@ -1431,12 +1431,12 @@ static int unload_module(void) */ static int create_subscriptions(void) { - cel_aggregation_topic = stasis_topic_create("cel_aggregation_topic"); + cel_aggregation_topic = stasis_topic_create("cel:aggregator"); if (!cel_aggregation_topic) { return -1; } - cel_topic = stasis_topic_create("cel_topic"); + cel_topic = stasis_topic_create("cel:misc"); if (!cel_topic) { return -1; } diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index 22a2bb6b31..be8fd7c024 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -1520,14 +1520,23 @@ int ast_channel_forward_endpoint(struct ast_channel *chan, int ast_channel_internal_setup_topics(struct ast_channel *chan) { - const char *topic_name = chan->uniqueid.unique_id; + char *topic_name; + int ret; ast_assert(chan->topic == NULL); - if (ast_strlen_zero(topic_name)) { - topic_name = ""; + if (ast_strlen_zero(chan->uniqueid.unique_id)) { + static int dummy_id; + ret = ast_asprintf(&topic_name, "channel:dummy-%d", ast_atomic_fetchadd_int(&dummy_id, +1)); + } else { + ret = ast_asprintf(&topic_name, "channel:%s", chan->uniqueid.unique_id); + } + + if (ret < 0) { + return -1; } chan->topic = stasis_topic_create(topic_name); + ast_free(topic_name); if (!chan->topic) { return -1; } diff --git a/main/devicestate.c b/main/devicestate.c index b6c740ce26..ecf255fefe 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -902,7 +902,7 @@ int devstate_init(void) if (STASIS_MESSAGE_TYPE_INIT(ast_device_state_message_type) != 0) { return -1; } - device_state_topic_all = stasis_topic_create("ast_device_state_topic"); + device_state_topic_all = stasis_topic_create("devicestate:all"); if (!device_state_topic_all) { return -1; } diff --git a/main/endpoints.c b/main/endpoints.c index b95893270b..c53e31d49b 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -255,9 +255,17 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha } if (!ast_strlen_zero(resource)) { + char *topic_name; + int ret; + + ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id); + if (ret < 0) { + return NULL; + } endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(), - endpoint->id); + topic_name); + ast_free(topic_name); if (!endpoint->topics) { return NULL; } @@ -284,8 +292,17 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha endpoint_publish_snapshot(endpoint); ao2_link(endpoints, endpoint); } else { + char *topic_name; + int ret; + + ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id); + if (ret < 0) { + return NULL; + } + endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(), - endpoint->id); + topic_name); + ast_free(topic_name); if (!endpoint->topics) { return NULL; } diff --git a/main/manager.c b/main/manager.c index 0c715e45f2..8e7a8b203f 100644 --- a/main/manager.c +++ b/main/manager.c @@ -8996,7 +8996,7 @@ static int __init_manager(int reload, int by_external_config) if (res != 0) { return -1; } - manager_topic = stasis_topic_create("manager_topic"); + manager_topic = stasis_topic_create("manager:core"); if (!manager_topic) { return -1; } diff --git a/main/parking.c b/main/parking.c index bf0d0b6b78..d77a767b0f 100644 --- a/main/parking.c +++ b/main/parking.c @@ -56,7 +56,7 @@ int ast_parking_stasis_init(void) return -1; } - parking_topic = stasis_topic_create("ast_parking"); + parking_topic = stasis_topic_create("parking:all"); if (!parking_topic) { return -1; } diff --git a/main/presencestate.c b/main/presencestate.c index 65b7f69270..45433b1891 100644 --- a/main/presencestate.c +++ b/main/presencestate.c @@ -500,7 +500,7 @@ int ast_presence_state_engine_init(void) return -1; } - presence_state_topic_all = stasis_topic_create("ast_presence_state_topic_all"); + presence_state_topic_all = stasis_topic_create("presence_state:all"); if (!presence_state_topic_all) { return -1; } diff --git a/main/rtp_engine.c b/main/rtp_engine.c index fd1613cab8..403b663ca3 100644 --- a/main/rtp_engine.c +++ b/main/rtp_engine.c @@ -3539,7 +3539,7 @@ int ast_rtp_engine_init(void) ast_rwlock_init(&mime_types_lock); ast_rwlock_init(&static_RTP_PT_lock); - rtp_topic = stasis_topic_create("rtp_topic"); + rtp_topic = stasis_topic_create("rtp:all"); if (!rtp_topic) { return -1; } diff --git a/main/security_events.c b/main/security_events.c index 37dce02940..0328eca359 100644 --- a/main/security_events.c +++ b/main/security_events.c @@ -484,7 +484,7 @@ int ast_security_stasis_init(void) { ast_register_cleanup(security_stasis_cleanup); - security_topic = stasis_topic_create("ast_security"); + security_topic = stasis_topic_create("security:all"); if (!security_topic) { return -1; } diff --git a/main/stasis.c b/main/stasis.c index fa92eeb134..5f4a147dee 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -349,6 +349,8 @@ struct stasis_topic_statistics { int messages_dispatched; /*! \brief The ids of the subscribers to this topic */ struct ao2_container *subscribers; + /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */ + struct stasis_topic *topic; /*! \brief Name of the topic */ char name[0]; }; @@ -366,6 +368,9 @@ struct stasis_topic { struct stasis_topic_statistics *statistics; #endif + /*! Unique incrementing integer for subscriber ids */ + int subscriber_id; + /*! Name of the topic */ char name[0]; }; @@ -412,11 +417,11 @@ static void topic_statistics_destroy(void *obj) ao2_cleanup(statistics->subscribers); } -static struct stasis_topic_statistics *stasis_topic_statistics_create(const char *name) +static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic) { struct stasis_topic_statistics *statistics; - statistics = ao2_alloc(sizeof(*statistics) + strlen(name) + 1, topic_statistics_destroy); + statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy); if (!statistics) { return NULL; } @@ -427,7 +432,9 @@ static struct stasis_topic_statistics *stasis_topic_statistics_create(const char return NULL; } - strcpy(statistics->name, name); /* SAFE */ + /* This is strictly used for the pointer address when showing the topic */ + statistics->topic = topic; + strcpy(statistics->name, topic->name); /* SAFE */ ao2_link(topic_statistics, statistics); return statistics; @@ -448,7 +455,7 @@ struct stasis_topic *stasis_topic_create(const char *name) res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX); res |= AST_VECTOR_INIT(&topic->upstream_topics, 0); #ifdef AST_DEVMODE - topic->statistics = stasis_topic_statistics_create(name); + topic->statistics = stasis_topic_statistics_create(topic); if (!topic->name || !topic->statistics || res) #else if (!topic->name || res) @@ -477,8 +484,8 @@ struct stasis_subscription_statistics { const char *file; /*! \brief The function where the subscription originates */ const char *func; - /*! \brief Name of the topic we subscribed to */ - char *topic; + /*! \brief Names of the topics we are subscribed to */ + struct ao2_container *topics; /*! \brief The message type that currently took the longest to process */ struct stasis_message_type *highest_time_message_type; /*! \brief Highest time spent invoking a message */ @@ -495,6 +502,8 @@ struct stasis_subscription_statistics { int uses_threadpool; /*! \brief The line number where the subscription originates */ int lineno; + /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */ + struct stasis_subscription *sub; /*! \brief Unique ID of the subscription */ char uniqueid[0]; }; @@ -503,7 +512,7 @@ struct stasis_subscription_statistics { /*! \internal */ struct stasis_subscription { /*! Unique ID for this subscription */ - char uniqueid[AST_UUID_STR_LEN]; + char *uniqueid; /*! Topic subscribed to. */ struct stasis_topic *topic; /*! Mailbox for processing incoming messages. */ @@ -546,6 +555,7 @@ static void subscription_dtor(void *obj) * be bad. */ ast_assert(stasis_subscription_is_done(sub)); + ast_free(sub->uniqueid); ao2_cleanup(sub->topic); sub->topic = NULL; ast_taskprocessor_unreference(sub->mailbox); @@ -628,26 +638,37 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st } #ifdef AST_DEVMODE -static struct stasis_subscription_statistics *stasis_subscription_statistics_create(const char *uniqueid, - const char *topic, int needs_mailbox, int use_thread_pool, const char *file, int lineno, +static void subscription_statistics_destroy(void *obj) +{ + struct stasis_subscription_statistics *statistics = obj; + + ao2_cleanup(statistics->topics); +} + +static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub, + int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func) { struct stasis_subscription_statistics *statistics; - size_t uniqueid_len = strlen(uniqueid) + 1; - statistics = ao2_alloc(sizeof(*statistics) + uniqueid_len + strlen(topic) + 1, NULL); + statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy); if (!statistics) { return NULL; } + statistics->topics = ast_str_container_alloc(1); + if (!statistics->topics) { + ao2_ref(statistics, -1); + return NULL; + } + statistics->file = file; statistics->lineno = lineno; statistics->func = func; statistics->uses_mailbox = needs_mailbox; statistics->uses_threadpool = use_thread_pool; - strcpy(statistics->uniqueid, uniqueid); /* SAFE */ - statistics->topic = statistics->uniqueid + uniqueid_len; - strcpy(statistics->topic, topic); /* SAFE */ + strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */ + statistics->sub = sub; ao2_link(subscription_statistics, statistics); return statistics; @@ -665,6 +686,7 @@ struct stasis_subscription *internal_stasis_subscribe( const char *func) { struct stasis_subscription *sub; + int ret; if (!topic) { return NULL; @@ -675,12 +697,17 @@ struct stasis_subscription *internal_stasis_subscribe( if (!sub) { return NULL; } - ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid)); #ifdef AST_DEVMODE - sub->statistics = stasis_subscription_statistics_create(sub->uniqueid, topic->name, needs_mailbox, - use_thread_pool, file, lineno, func); - if (!sub->statistics) { + ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1)); + sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func); + if (ret < 0 || !sub->statistics) { + ao2_ref(sub, -1); + return NULL; + } +#else + ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1)); + if (ret < 0) { ao2_ref(sub, -1); return NULL; } @@ -1012,6 +1039,7 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs #ifdef AST_DEVMODE ast_str_container_add(topic->statistics->subscribers, stasis_subscription_uniqueid(sub)); + ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic)); #endif ao2_unlock(topic); @@ -1035,6 +1063,7 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s #ifdef AST_DEVMODE if (!res) { ast_str_container_remove(topic->statistics->subscribers, stasis_subscription_uniqueid(sub)); + ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic)); } #endif @@ -1498,6 +1527,7 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic, struct topic_pool_entry { struct stasis_forward *forward; struct stasis_topic *topic; + char name[0]; }; static void topic_pool_entry_dtor(void *obj) @@ -1509,10 +1539,19 @@ static void topic_pool_entry_dtor(void *obj) entry->topic = NULL; } -static struct topic_pool_entry *topic_pool_entry_alloc(void) +static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name) { - return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor, - AO2_ALLOC_OPT_LOCK_NOLOCK); + struct topic_pool_entry *topic_pool_entry; + + topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1, + topic_pool_entry_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!topic_pool_entry) { + return NULL; + } + + strcpy(topic_pool_entry->name, topic_name); /* Safe */ + + return topic_pool_entry; } struct stasis_topic_pool { @@ -1550,7 +1589,7 @@ static int topic_pool_entry_hash(const void *obj, const int flags) break; case OBJ_SEARCH_OBJECT: object = obj; - key = stasis_topic_name(object->topic); + key = object->name; break; default: /* Hash can only work on something with a full key. */ @@ -1569,10 +1608,10 @@ static int topic_pool_entry_cmp(void *obj, void *arg, int flags) switch (flags & OBJ_SEARCH_MASK) { case OBJ_SEARCH_OBJECT: - right_key = stasis_topic_name(object_right->topic); + right_key = object_right->name; /* Fall through */ case OBJ_SEARCH_KEY: - cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key); + cmp = strcasecmp(object_left->name, right_key); break; case OBJ_SEARCH_PARTIAL_KEY: /* Not supported by container */ @@ -1649,18 +1688,29 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, { RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup); SCOPED_AO2LOCK(topic_container_lock, pool->pool_container); + char *new_topic_name; + int ret; topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK); if (topic_pool_entry) { return topic_pool_entry->topic; } - topic_pool_entry = topic_pool_entry_alloc(); + topic_pool_entry = topic_pool_entry_alloc(topic_name); if (!topic_pool_entry) { return NULL; } - topic_pool_entry->topic = stasis_topic_create(topic_name); + /* To provide further detail and to ensure that the topic is unique within the scope of the + * system we prefix it with the pooling topic name, which should itself already be unique. + */ + ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name); + if (ret < 0) { + return NULL; + } + + topic_pool_entry->topic = stasis_topic_create(new_topic_name); + ast_free(new_topic_name); if (!topic_pool_entry->topic) { return NULL; } @@ -2082,12 +2132,15 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type, #ifdef AST_DEVMODE +AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid); + /*! * \internal * \brief CLI command implementation for 'stasis statistics show subscriptions' */ static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) { + struct ao2_container *sorted_subscriptions; struct ao2_iterator iter; struct stasis_subscription_statistics *statistics; int count = 0; @@ -2112,9 +2165,22 @@ static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, str return CLI_SHOWUSAGE; } + sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, + stasis_subscription_statistics_sort_fn, NULL); + if (!sorted_subscriptions) { + ast_cli(a->fd, "Could not create container for sorting subscription statistics\n"); + return CLI_SUCCESS; + } + + if (ao2_container_dup(sorted_subscriptions, subscription_statistics, 0)) { + ao2_ref(sorted_subscriptions, -1); + ast_cli(a->fd, "Could not sort subscription statistics\n"); + return CLI_SUCCESS; + } + ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke"); - iter = ao2_iterator_init(subscription_statistics, 0); + iter = ao2_iterator_init(sorted_subscriptions, 0); while ((statistics = ao2_iterator_next(&iter))) { ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed, statistics->lowest_time_invoked, statistics->highest_time_invoked); @@ -2125,6 +2191,8 @@ static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, str } ao2_iterator_destroy(&iter); + ao2_ref(sorted_subscriptions, -1); + ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed); ast_cli(a->fd, "\n%d subscriptions\n\n", count); @@ -2169,6 +2237,8 @@ static char *subscription_statistics_complete_name(const char *word, int state) static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) { struct stasis_subscription_statistics *statistics; + struct ao2_iterator i; + char *name; switch (cmd) { case CLI_INIT: @@ -2196,7 +2266,7 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru } ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid); - ast_cli(a->fd, "Topic: %s\n", statistics->topic); + ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub); ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "")); ast_cli(a->fd, "Source line number: %d\n", statistics->lineno); ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "")); @@ -2213,25 +2283,38 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru } ao2_unlock(statistics); + ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics)); + + ast_cli(a->fd, "Subscribed topics:\n"); + i = ao2_iterator_init(statistics->topics, 0); + while ((name = ao2_iterator_next(&i))) { + ast_cli(a->fd, "\t%s\n", name); + ao2_ref(name, -1); + } + ao2_iterator_destroy(&i); + ao2_ref(statistics, -1); return CLI_SUCCESS; } +AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name); + /*! * \internal * \brief CLI command implementation for 'stasis statistics show topics' */ static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) { + struct ao2_container *sorted_topics; struct ao2_iterator iter; struct stasis_topic_statistics *statistics; int count = 0; int not_dispatched = 0; int dispatched = 0; -#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n" -#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n" -#define FMT_FIELDS2 "%-64s %10d %10d\n" +#define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n" +#define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n" +#define FMT_FIELDS2 "%-64s %10s %10d %10d\n" switch (cmd) { case CLI_INIT: @@ -2248,11 +2331,25 @@ static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast return CLI_SHOWUSAGE; } - ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch"); + sorted_topics = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, + stasis_topic_statistics_sort_fn, NULL); + if (!sorted_topics) { + ast_cli(a->fd, "Could not create container for sorting topic statistics\n"); + return CLI_SUCCESS; + } - iter = ao2_iterator_init(topic_statistics, 0); + if (ao2_container_dup(sorted_topics, topic_statistics, 0)) { + ao2_ref(sorted_topics, -1); + ast_cli(a->fd, "Could not sort topic statistics\n"); + return CLI_SUCCESS; + } + + ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch"); + + iter = ao2_iterator_init(sorted_topics, 0); while ((statistics = ao2_iterator_next(&iter))) { - ast_cli(a->fd, FMT_FIELDS, statistics->name, statistics->messages_not_dispatched, statistics->messages_dispatched, + ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers), + statistics->messages_not_dispatched, statistics->messages_dispatched, statistics->lowest_time_dispatched, statistics->highest_time_dispatched); not_dispatched += statistics->messages_not_dispatched; dispatched += statistics->messages_dispatched; @@ -2261,7 +2358,9 @@ static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast } ao2_iterator_destroy(&iter); - ast_cli(a->fd, FMT_FIELDS2, "Total", not_dispatched, dispatched); + ao2_ref(sorted_topics, -1); + + ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched); ast_cli(a->fd, "\n%d topics\n\n", count); #undef FMT_HEADERS @@ -2334,6 +2433,7 @@ static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_ } ast_cli(a->fd, "Topic: %s\n", statistics->name); + ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic); ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched); ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched); ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched); diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c index cfdf117c4e..31d3eac398 100644 --- a/main/stasis_bridges.c +++ b/main/stasis_bridges.c @@ -294,12 +294,21 @@ static struct ast_bridge_snapshot_update *bridge_snapshot_update_create( int bridge_topics_init(struct ast_bridge *bridge) { + char *topic_name; + int ret; + if (ast_strlen_zero(bridge->uniqueid)) { ast_log(LOG_ERROR, "Bridge id initialization required\n"); return -1; } - bridge->topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid); + ret = ast_asprintf(&topic_name, "bridge:%s", bridge->uniqueid); + if (ret < 0) { + return -1; + } + + bridge->topic = stasis_topic_pool_get_topic(bridge_topic_pool, topic_name); + ast_free(topic_name); if (!bridge->topic) { return -1; } @@ -1365,7 +1374,7 @@ int ast_stasis_bridging_init(void) ast_register_cleanup(stasis_bridging_cleanup); - bridge_topic_all = stasis_topic_create("ast_bridge_topic_all"); + bridge_topic_all = stasis_topic_create("bridge:all"); if (!bridge_topic_all) { return -1; } diff --git a/main/stasis_cache.c b/main/stasis_cache.c index ee8a1dd4b7..6be4bf1916 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -948,10 +948,11 @@ static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt) struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache) { struct stasis_caching_topic *caching_topic; + static int caching_id; char *new_name; int ret; - ret = ast_asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic)); + ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic)); if (ret < 0) { return NULL; } diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c index 04d816463c..463be69ac5 100644 --- a/main/stasis_cache_pattern.c +++ b/main/stasis_cache_pattern.c @@ -67,13 +67,14 @@ struct stasis_cp_all *stasis_cp_all_create(const char *name, { char *cached_name = NULL; struct stasis_cp_all *all; + static int cache_id; all = ao2_t_alloc(sizeof(*all), all_dtor, name); if (!all) { return NULL; } - ast_asprintf(&cached_name, "%s-cached", name); + ast_asprintf(&cached_name, "cache_pattern:%d/%s", ast_atomic_fetchadd_int(&cache_id, +1), name); if (!cached_name) { ao2_ref(all, -1); diff --git a/main/stasis_channels.c b/main/stasis_channels.c index d39fb08fbf..e8842c1bfe 100644 --- a/main/stasis_channels.c +++ b/main/stasis_channels.c @@ -1658,7 +1658,7 @@ int ast_stasis_channels_init(void) ast_register_cleanup(stasis_channels_cleanup); - channel_topic_all = stasis_topic_create("ast_channel_topic_all"); + channel_topic_all = stasis_topic_create("channel:all"); if (!channel_topic_all) { return -1; } diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c index b3a837b166..289a90e14f 100644 --- a/main/stasis_endpoints.c +++ b/main/stasis_endpoints.c @@ -460,7 +460,7 @@ int ast_endpoint_stasis_init(void) int res = 0; ast_register_cleanup(endpoints_stasis_cleanup); - endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all", + endpoint_cache_all = stasis_cp_all_create("endpoint:all", endpoint_snapshot_get_id); if (!endpoint_cache_all) { return -1; diff --git a/main/stasis_system.c b/main/stasis_system.c index 961a2b06ac..4c84f57e6c 100644 --- a/main/stasis_system.c +++ b/main/stasis_system.c @@ -374,7 +374,7 @@ int ast_stasis_system_init(void) { ast_register_cleanup(stasis_system_cleanup); - system_topic = stasis_topic_create("ast_system"); + system_topic = stasis_topic_create("system:all"); if (!system_topic) { return 1; } diff --git a/main/test.c b/main/test.c index 2abe6988db..32df829d9f 100644 --- a/main/test.c +++ b/main/test.c @@ -1224,7 +1224,7 @@ int ast_test_init(void) ast_register_cleanup(test_cleanup); /* Create stasis topic */ - test_suite_topic = stasis_topic_create("test_suite_topic"); + test_suite_topic = stasis_topic_create("testsuite:all"); if (!test_suite_topic) { return -1; } diff --git a/res/res_corosync.c b/res/res_corosync.c index bf172e3183..6e66c4fef0 100644 --- a/res/res_corosync.c +++ b/res/res_corosync.c @@ -1131,7 +1131,7 @@ static int load_module(void) goto failed; } - corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic"); + corosync_aggregate_topic = stasis_topic_create("corosync:aggregator"); if (!corosync_aggregate_topic) { ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n"); goto failed; diff --git a/res/stasis/app.c b/res/stasis/app.c index 0f0923ae97..a69ca5562c 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -919,6 +919,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat int res = 0; size_t context_size = strlen("stasis-") + strlen(name) + 1; char context_name[context_size]; + char *topic_name; + int ret; ast_assert(name != NULL); ast_assert(handler != NULL); @@ -939,7 +941,13 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat return NULL; } - app->topic = stasis_topic_create(name); + ret = ast_asprintf(&topic_name, "ari:application/%s", name); + if (ret < 0) { + return NULL; + } + + app->topic = stasis_topic_create(topic_name); + ast_free(topic_name); if (!app->topic) { return NULL; }