Merge "stasis: Improve topic/subscription names and statistics." into 16

This commit is contained in:
Joshua C. Colp 2019-03-14 09:19:37 -05:00 committed by Gerrit Code Review
commit 3e5dcc9dcf
23 changed files with 214 additions and 63 deletions

View File

@ -13334,6 +13334,7 @@ static void mwi_unsub_event_cb(struct stasis_subscription_change *change)
static void mwi_sub_event_cb(struct stasis_subscription_change *change)
{
struct mwi_sub_task *mwist;
const char *topic;
char *context;
char *mailbox;
@ -13342,7 +13343,9 @@ static void mwi_sub_event_cb(struct stasis_subscription_change *change)
return;
}
if (separate_mailbox(ast_strdupa(stasis_topic_name(change->topic)), &mailbox, &context)) {
/* The topic name is prefixed with "mwi:all/" as this is a pool topic */
topic = stasis_topic_name(change->topic) + 8;
if (separate_mailbox(ast_strdupa(topic), &mailbox, &context)) {
ast_free(mwist);
return;
}

View File

@ -514,6 +514,8 @@ struct stasis_topic;
* from a topic and destroy it. As a result the topic can persist until
* the last subscriber unsubscribes itself even if there is no
* publisher.
*
* \note Topic names should be in the form of <subsystem>:<functionality>[/<object>]
*/
struct stasis_topic *stasis_topic_create(const char *name);

View File

@ -3345,7 +3345,7 @@ int ast_delete_mwi_state_full(const char *mailbox, const char *context, struct a
stasis_publish(mailbox_specific_topic, clear_msg);
}
stasis_topic_pool_delete_topic(mwi_topic_pool, stasis_topic_name(mailbox_specific_topic));
stasis_topic_pool_delete_topic(mwi_topic_pool, mwi_state->uniqueid);
ao2_cleanup(clear_msg);
return 0;
@ -3438,7 +3438,7 @@ int app_init(void)
if (STASIS_MESSAGE_TYPE_INIT(ast_mwi_vm_app_type) != 0) {
return -1;
}
mwi_topic_all = stasis_topic_create("stasis_mwi_topic");
mwi_topic_all = stasis_topic_create("mwi:all");
if (!mwi_topic_all) {
return -1;
}
@ -3454,7 +3454,7 @@ int app_init(void)
if (!mwi_topic_pool) {
return -1;
}
queue_topic_all = stasis_topic_create("stasis_queue_topic");
queue_topic_all = stasis_topic_create("queue:all");
if (!queue_topic_all) {
return -1;
}

View File

@ -4522,7 +4522,7 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE;
}
cdr_topic = stasis_topic_create("cdr_engine");
cdr_topic = stasis_topic_create("cdr:aggregator");
if (!cdr_topic) {
return AST_MODULE_LOAD_FAILURE;
}

View File

@ -1442,12 +1442,12 @@ static int unload_module(void)
*/
static int create_subscriptions(void)
{
cel_aggregation_topic = stasis_topic_create("cel_aggregation_topic");
cel_aggregation_topic = stasis_topic_create("cel:aggregator");
if (!cel_aggregation_topic) {
return -1;
}
cel_topic = stasis_topic_create("cel_topic");
cel_topic = stasis_topic_create("cel:misc");
if (!cel_topic) {
return -1;
}

View File

@ -1480,15 +1480,24 @@ int ast_channel_forward_endpoint(struct ast_channel *chan,
int ast_channel_internal_setup_topics(struct ast_channel *chan)
{
const char *topic_name = chan->uniqueid.unique_id;
char *topic_name;
int ret;
ast_assert(chan->topics == NULL);
if (ast_strlen_zero(topic_name)) {
topic_name = "<dummy-channel>";
if (ast_strlen_zero(chan->uniqueid.unique_id)) {
static int dummy_id;
ret = ast_asprintf(&topic_name, "channel:dummy-%d", ast_atomic_fetchadd_int(&dummy_id, +1));
} else {
ret = ast_asprintf(&topic_name, "channel:%s", chan->uniqueid.unique_id);
}
if (ret < 0) {
return -1;
}
chan->topics = stasis_cp_single_create(
ast_channel_cache_all(), topic_name);
ast_free(topic_name);
if (!chan->topics) {
return -1;
}

View File

@ -902,7 +902,7 @@ int devstate_init(void)
if (STASIS_MESSAGE_TYPE_INIT(ast_device_state_message_type) != 0) {
return -1;
}
device_state_topic_all = stasis_topic_create("ast_device_state_topic");
device_state_topic_all = stasis_topic_create("devicestate:all");
if (!device_state_topic_all) {
return -1;
}

View File

@ -257,9 +257,17 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
}
if (!ast_strlen_zero(resource)) {
char *topic_name;
int ret;
ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
if (ret < 0) {
return NULL;
}
endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
endpoint->id);
topic_name);
ast_free(topic_name);
if (!endpoint->topics) {
return NULL;
}
@ -286,8 +294,17 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
endpoint_publish_snapshot(endpoint);
ao2_link(endpoints, endpoint);
} else {
char *topic_name;
int ret;
ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
if (ret < 0) {
return NULL;
}
endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
endpoint->id);
topic_name);
ast_free(topic_name);
if (!endpoint->topics) {
return NULL;
}

View File

@ -9001,7 +9001,7 @@ static int __init_manager(int reload, int by_external_config)
if (res != 0) {
return -1;
}
manager_topic = stasis_topic_create("manager_topic");
manager_topic = stasis_topic_create("manager:core");
if (!manager_topic) {
return -1;
}

View File

@ -56,7 +56,7 @@ int ast_parking_stasis_init(void)
return -1;
}
parking_topic = stasis_topic_create("ast_parking");
parking_topic = stasis_topic_create("parking:all");
if (!parking_topic) {
return -1;
}

View File

@ -500,7 +500,7 @@ int ast_presence_state_engine_init(void)
return -1;
}
presence_state_topic_all = stasis_topic_create("ast_presence_state_topic_all");
presence_state_topic_all = stasis_topic_create("presence_state:all");
if (!presence_state_topic_all) {
return -1;
}

View File

@ -3539,7 +3539,7 @@ int ast_rtp_engine_init(void)
ast_rwlock_init(&mime_types_lock);
ast_rwlock_init(&static_RTP_PT_lock);
rtp_topic = stasis_topic_create("rtp_topic");
rtp_topic = stasis_topic_create("rtp:all");
if (!rtp_topic) {
return -1;
}

View File

@ -484,7 +484,7 @@ int ast_security_stasis_init(void)
{
ast_register_cleanup(security_stasis_cleanup);
security_topic = stasis_topic_create("ast_security");
security_topic = stasis_topic_create("security:all");
if (!security_topic) {
return -1;
}

View File

@ -349,6 +349,8 @@ struct stasis_topic_statistics {
int messages_dispatched;
/*! \brief The ids of the subscribers to this topic */
struct ao2_container *subscribers;
/*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
struct stasis_topic *topic;
/*! \brief Name of the topic */
char name[0];
};
@ -366,6 +368,9 @@ struct stasis_topic {
struct stasis_topic_statistics *statistics;
#endif
/*! Unique incrementing integer for subscriber ids */
int subscriber_id;
/*! Name of the topic */
char name[0];
};
@ -412,11 +417,11 @@ static void topic_statistics_destroy(void *obj)
ao2_cleanup(statistics->subscribers);
}
static struct stasis_topic_statistics *stasis_topic_statistics_create(const char *name)
static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
{
struct stasis_topic_statistics *statistics;
statistics = ao2_alloc(sizeof(*statistics) + strlen(name) + 1, topic_statistics_destroy);
statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
if (!statistics) {
return NULL;
}
@ -427,7 +432,9 @@ static struct stasis_topic_statistics *stasis_topic_statistics_create(const char
return NULL;
}
strcpy(statistics->name, name); /* SAFE */
/* This is strictly used for the pointer address when showing the topic */
statistics->topic = topic;
strcpy(statistics->name, topic->name); /* SAFE */
ao2_link(topic_statistics, statistics);
return statistics;
@ -448,7 +455,7 @@ struct stasis_topic *stasis_topic_create(const char *name)
res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
#ifdef AST_DEVMODE
topic->statistics = stasis_topic_statistics_create(name);
topic->statistics = stasis_topic_statistics_create(topic);
if (!topic->name || !topic->statistics || res)
#else
if (!topic->name || res)
@ -477,8 +484,8 @@ struct stasis_subscription_statistics {
const char *file;
/*! \brief The function where the subscription originates */
const char *func;
/*! \brief Name of the topic we subscribed to */
char *topic;
/*! \brief Names of the topics we are subscribed to */
struct ao2_container *topics;
/*! \brief The message type that currently took the longest to process */
struct stasis_message_type *highest_time_message_type;
/*! \brief Highest time spent invoking a message */
@ -495,6 +502,8 @@ struct stasis_subscription_statistics {
int uses_threadpool;
/*! \brief The line number where the subscription originates */
int lineno;
/*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
struct stasis_subscription *sub;
/*! \brief Unique ID of the subscription */
char uniqueid[0];
};
@ -503,7 +512,7 @@ struct stasis_subscription_statistics {
/*! \internal */
struct stasis_subscription {
/*! Unique ID for this subscription */
char uniqueid[AST_UUID_STR_LEN];
char *uniqueid;
/*! Topic subscribed to. */
struct stasis_topic *topic;
/*! Mailbox for processing incoming messages. */
@ -546,6 +555,7 @@ static void subscription_dtor(void *obj)
* be bad. */
ast_assert(stasis_subscription_is_done(sub));
ast_free(sub->uniqueid);
ao2_cleanup(sub->topic);
sub->topic = NULL;
ast_taskprocessor_unreference(sub->mailbox);
@ -628,26 +638,37 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st
}
#ifdef AST_DEVMODE
static struct stasis_subscription_statistics *stasis_subscription_statistics_create(const char *uniqueid,
const char *topic, int needs_mailbox, int use_thread_pool, const char *file, int lineno,
static void subscription_statistics_destroy(void *obj)
{
struct stasis_subscription_statistics *statistics = obj;
ao2_cleanup(statistics->topics);
}
static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
int needs_mailbox, int use_thread_pool, const char *file, int lineno,
const char *func)
{
struct stasis_subscription_statistics *statistics;
size_t uniqueid_len = strlen(uniqueid) + 1;
statistics = ao2_alloc(sizeof(*statistics) + uniqueid_len + strlen(topic) + 1, NULL);
statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
if (!statistics) {
return NULL;
}
statistics->topics = ast_str_container_alloc(1);
if (!statistics->topics) {
ao2_ref(statistics, -1);
return NULL;
}
statistics->file = file;
statistics->lineno = lineno;
statistics->func = func;
statistics->uses_mailbox = needs_mailbox;
statistics->uses_threadpool = use_thread_pool;
strcpy(statistics->uniqueid, uniqueid); /* SAFE */
statistics->topic = statistics->uniqueid + uniqueid_len;
strcpy(statistics->topic, topic); /* SAFE */
strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
statistics->sub = sub;
ao2_link(subscription_statistics, statistics);
return statistics;
@ -665,6 +686,7 @@ struct stasis_subscription *internal_stasis_subscribe(
const char *func)
{
struct stasis_subscription *sub;
int ret;
if (!topic) {
return NULL;
@ -675,12 +697,17 @@ struct stasis_subscription *internal_stasis_subscribe(
if (!sub) {
return NULL;
}
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
#ifdef AST_DEVMODE
sub->statistics = stasis_subscription_statistics_create(sub->uniqueid, topic->name, needs_mailbox,
use_thread_pool, file, lineno, func);
if (!sub->statistics) {
ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
if (ret < 0 || !sub->statistics) {
ao2_ref(sub, -1);
return NULL;
}
#else
ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
if (ret < 0) {
ao2_ref(sub, -1);
return NULL;
}
@ -1012,6 +1039,7 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
#ifdef AST_DEVMODE
ast_str_container_add(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
#endif
ao2_unlock(topic);
@ -1035,6 +1063,7 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s
#ifdef AST_DEVMODE
if (!res) {
ast_str_container_remove(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
}
#endif
@ -1498,6 +1527,7 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic,
struct topic_pool_entry {
struct stasis_forward *forward;
struct stasis_topic *topic;
char name[0];
};
static void topic_pool_entry_dtor(void *obj)
@ -1509,10 +1539,19 @@ static void topic_pool_entry_dtor(void *obj)
entry->topic = NULL;
}
static struct topic_pool_entry *topic_pool_entry_alloc(void)
static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
{
return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
AO2_ALLOC_OPT_LOCK_NOLOCK);
struct topic_pool_entry *topic_pool_entry;
topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
topic_pool_entry_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!topic_pool_entry) {
return NULL;
}
strcpy(topic_pool_entry->name, topic_name); /* Safe */
return topic_pool_entry;
}
struct stasis_topic_pool {
@ -1550,7 +1589,7 @@ static int topic_pool_entry_hash(const void *obj, const int flags)
break;
case OBJ_SEARCH_OBJECT:
object = obj;
key = stasis_topic_name(object->topic);
key = object->name;
break;
default:
/* Hash can only work on something with a full key. */
@ -1569,10 +1608,10 @@ static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_OBJECT:
right_key = stasis_topic_name(object_right->topic);
right_key = object_right->name;
/* Fall through */
case OBJ_SEARCH_KEY:
cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
cmp = strcasecmp(object_left->name, right_key);
break;
case OBJ_SEARCH_PARTIAL_KEY:
/* Not supported by container */
@ -1649,18 +1688,29 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool,
{
RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
char *new_topic_name;
int ret;
topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (topic_pool_entry) {
return topic_pool_entry->topic;
}
topic_pool_entry = topic_pool_entry_alloc();
topic_pool_entry = topic_pool_entry_alloc(topic_name);
if (!topic_pool_entry) {
return NULL;
}
topic_pool_entry->topic = stasis_topic_create(topic_name);
/* To provide further detail and to ensure that the topic is unique within the scope of the
* system we prefix it with the pooling topic name, which should itself already be unique.
*/
ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
if (ret < 0) {
return NULL;
}
topic_pool_entry->topic = stasis_topic_create(new_topic_name);
ast_free(new_topic_name);
if (!topic_pool_entry->topic) {
return NULL;
}
@ -2082,12 +2132,15 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
#ifdef AST_DEVMODE
AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
/*!
* \internal
* \brief CLI command implementation for 'stasis statistics show subscriptions'
*/
static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct ao2_container *sorted_subscriptions;
struct ao2_iterator iter;
struct stasis_subscription_statistics *statistics;
int count = 0;
@ -2112,9 +2165,22 @@ static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, str
return CLI_SHOWUSAGE;
}
sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
stasis_subscription_statistics_sort_fn, NULL);
if (!sorted_subscriptions) {
ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
return CLI_SUCCESS;
}
if (ao2_container_dup(sorted_subscriptions, subscription_statistics, 0)) {
ao2_ref(sorted_subscriptions, -1);
ast_cli(a->fd, "Could not sort subscription statistics\n");
return CLI_SUCCESS;
}
ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
iter = ao2_iterator_init(subscription_statistics, 0);
iter = ao2_iterator_init(sorted_subscriptions, 0);
while ((statistics = ao2_iterator_next(&iter))) {
ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
statistics->lowest_time_invoked, statistics->highest_time_invoked);
@ -2125,6 +2191,8 @@ static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, str
}
ao2_iterator_destroy(&iter);
ao2_ref(sorted_subscriptions, -1);
ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
ast_cli(a->fd, "\n%d subscriptions\n\n", count);
@ -2169,6 +2237,8 @@ static char *subscription_statistics_complete_name(const char *word, int state)
static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct stasis_subscription_statistics *statistics;
struct ao2_iterator i;
char *name;
switch (cmd) {
case CLI_INIT:
@ -2196,7 +2266,7 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru
}
ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
ast_cli(a->fd, "Topic: %s\n", statistics->topic);
ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
@ -2213,25 +2283,38 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru
}
ao2_unlock(statistics);
ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
ast_cli(a->fd, "Subscribed topics:\n");
i = ao2_iterator_init(statistics->topics, 0);
while ((name = ao2_iterator_next(&i))) {
ast_cli(a->fd, "\t%s\n", name);
ao2_ref(name, -1);
}
ao2_iterator_destroy(&i);
ao2_ref(statistics, -1);
return CLI_SUCCESS;
}
AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
/*!
* \internal
* \brief CLI command implementation for 'stasis statistics show topics'
*/
static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct ao2_container *sorted_topics;
struct ao2_iterator iter;
struct stasis_topic_statistics *statistics;
int count = 0;
int not_dispatched = 0;
int dispatched = 0;
#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
#define FMT_FIELDS2 "%-64s %10d %10d\n"
#define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
#define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
#define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
switch (cmd) {
case CLI_INIT:
@ -2248,11 +2331,25 @@ static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast
return CLI_SHOWUSAGE;
}
ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
sorted_topics = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
stasis_topic_statistics_sort_fn, NULL);
if (!sorted_topics) {
ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
return CLI_SUCCESS;
}
iter = ao2_iterator_init(topic_statistics, 0);
if (ao2_container_dup(sorted_topics, topic_statistics, 0)) {
ao2_ref(sorted_topics, -1);
ast_cli(a->fd, "Could not sort topic statistics\n");
return CLI_SUCCESS;
}
ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
iter = ao2_iterator_init(sorted_topics, 0);
while ((statistics = ao2_iterator_next(&iter))) {
ast_cli(a->fd, FMT_FIELDS, statistics->name, statistics->messages_not_dispatched, statistics->messages_dispatched,
ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
statistics->messages_not_dispatched, statistics->messages_dispatched,
statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
not_dispatched += statistics->messages_not_dispatched;
dispatched += statistics->messages_dispatched;
@ -2261,7 +2358,9 @@ static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast
}
ao2_iterator_destroy(&iter);
ast_cli(a->fd, FMT_FIELDS2, "Total", not_dispatched, dispatched);
ao2_ref(sorted_topics, -1);
ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
ast_cli(a->fd, "\n%d topics\n\n", count);
#undef FMT_HEADERS
@ -2334,6 +2433,7 @@ static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_
}
ast_cli(a->fd, "Topic: %s\n", statistics->name);
ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);

View File

@ -192,12 +192,22 @@ struct stasis_topic *ast_bridge_topic_all_cached(void)
int bridge_topics_init(struct ast_bridge *bridge)
{
char *topic_name;
int ret;
if (ast_strlen_zero(bridge->uniqueid)) {
ast_log(LOG_ERROR, "Bridge id initialization required\n");
return -1;
}
ret = ast_asprintf(&topic_name, "bridge:%s", bridge->uniqueid);
if (ret < 0) {
return -1;
}
bridge->topics = stasis_cp_single_create(bridge_cache_all,
bridge->uniqueid);
topic_name);
ast_free(topic_name);
if (!bridge->topics) {
return -1;
}
@ -1302,7 +1312,7 @@ int ast_stasis_bridging_init(void)
ast_register_cleanup(stasis_bridging_cleanup);
bridge_cache_all = stasis_cp_all_create("ast_bridge_topic_all",
bridge_cache_all = stasis_cp_all_create("bridge:all",
bridge_snapshot_get_id);
if (!bridge_cache_all) {

View File

@ -948,10 +948,11 @@ static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
{
struct stasis_caching_topic *caching_topic;
static int caching_id;
char *new_name;
int ret;
ret = ast_asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
if (ret < 0) {
return NULL;
}

View File

@ -67,13 +67,14 @@ struct stasis_cp_all *stasis_cp_all_create(const char *name,
{
char *cached_name = NULL;
struct stasis_cp_all *all;
static int cache_id;
all = ao2_t_alloc(sizeof(*all), all_dtor, name);
if (!all) {
return NULL;
}
ast_asprintf(&cached_name, "%s-cached", name);
ast_asprintf(&cached_name, "cache_pattern:%d/%s", ast_atomic_fetchadd_int(&cache_id, +1), name);
if (!cached_name) {
ao2_ref(all, -1);

View File

@ -1367,7 +1367,7 @@ int ast_stasis_channels_init(void)
ast_register_cleanup(stasis_channels_cleanup);
channel_cache_all = stasis_cp_all_create("ast_channel_topic_all",
channel_cache_all = stasis_cp_all_create("channel:all",
channel_snapshot_get_id);
if (!channel_cache_all) {
return -1;

View File

@ -460,7 +460,7 @@ int ast_endpoint_stasis_init(void)
int res = 0;
ast_register_cleanup(endpoints_stasis_cleanup);
endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all",
endpoint_cache_all = stasis_cp_all_create("endpoint:all",
endpoint_snapshot_get_id);
if (!endpoint_cache_all) {
return -1;

View File

@ -374,7 +374,7 @@ int ast_stasis_system_init(void)
{
ast_register_cleanup(stasis_system_cleanup);
system_topic = stasis_topic_create("ast_system");
system_topic = stasis_topic_create("system:all");
if (!system_topic) {
return 1;
}

View File

@ -1224,7 +1224,7 @@ int ast_test_init(void)
ast_register_cleanup(test_cleanup);
/* Create stasis topic */
test_suite_topic = stasis_topic_create("test_suite_topic");
test_suite_topic = stasis_topic_create("testsuite:all");
if (!test_suite_topic) {
return -1;
}

View File

@ -1131,7 +1131,7 @@ static int load_module(void)
goto failed;
}
corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
corosync_aggregate_topic = stasis_topic_create("corosync:aggregator");
if (!corosync_aggregate_topic) {
ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
goto failed;

View File

@ -959,6 +959,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
int res = 0;
size_t context_size = strlen("stasis-") + strlen(name) + 1;
char context_name[context_size];
char *topic_name;
int ret;
ast_assert(name != NULL);
ast_assert(handler != NULL);
@ -979,7 +981,13 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
return NULL;
}
app->topic = stasis_topic_create(name);
ret = ast_asprintf(&topic_name, "ari:application/%s", name);
if (ret < 0) {
return NULL;
}
app->topic = stasis_topic_create(topic_name);
ast_free(topic_name);
if (!app->topic) {
return NULL;
}