|
|
|
@ -41,6 +41,9 @@
|
|
|
|
|
#include "asterisk/stasis_bridges.h"
|
|
|
|
|
#include "asterisk/stasis_endpoints.h"
|
|
|
|
|
#include "asterisk/config_options.h"
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
#include "asterisk/cli.h"
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/*** DOCUMENTATION
|
|
|
|
|
<managerEvent language="en_US" name="UserEvent">
|
|
|
|
@ -304,14 +307,67 @@ static struct ast_threadpool *pool;
|
|
|
|
|
|
|
|
|
|
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
|
|
|
|
|
/*! The number of buckets to use for topic statistics */
|
|
|
|
|
#define TOPIC_STATISTICS_BUCKETS 57
|
|
|
|
|
|
|
|
|
|
/*! The number of buckets to use for subscription statistics */
|
|
|
|
|
#define SUBSCRIPTION_STATISTICS_BUCKETS 57
|
|
|
|
|
|
|
|
|
|
/*! Container which stores statistics for topics */
|
|
|
|
|
static struct ao2_container *topic_statistics;
|
|
|
|
|
|
|
|
|
|
/*! Container which stores statistics for subscriptions */
|
|
|
|
|
static struct ao2_container *subscription_statistics;
|
|
|
|
|
|
|
|
|
|
/*! \internal */
|
|
|
|
|
struct stasis_message_type_statistics {
|
|
|
|
|
/*! \brief The number of messages of this published */
|
|
|
|
|
int published;
|
|
|
|
|
/*! \brief The number of messages of this that did not reach a subscriber */
|
|
|
|
|
int unused;
|
|
|
|
|
/*! \brief The stasis message type */
|
|
|
|
|
struct stasis_message_type *message_type;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/*! Lock to protect the message types vector */
|
|
|
|
|
AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
|
|
|
|
|
|
|
|
|
|
/*! Vector containing message type information */
|
|
|
|
|
static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
|
|
|
|
|
|
|
|
|
|
/*! \internal */
|
|
|
|
|
struct stasis_topic_statistics {
|
|
|
|
|
/*! \brief The number of messages that were not dispatched to any subscriber */
|
|
|
|
|
int messages_not_dispatched;
|
|
|
|
|
/*! \brief The number of messages that were dispatched to at least 1 subscriber */
|
|
|
|
|
int messages_dispatched;
|
|
|
|
|
/*! \brief Highest time spent dispatching messages to subscribers */
|
|
|
|
|
int64_t highest_time_dispatched;
|
|
|
|
|
/*! \brief Lowest time spent dispatching messages to subscribers */
|
|
|
|
|
int64_t lowest_time_dispatched;
|
|
|
|
|
/*! \brief The number of subscribers to this topic */
|
|
|
|
|
int subscriber_count;
|
|
|
|
|
/*! \brief Name of the topic */
|
|
|
|
|
char name[0];
|
|
|
|
|
};
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/*! \internal */
|
|
|
|
|
struct stasis_topic {
|
|
|
|
|
char *name;
|
|
|
|
|
/*! Variable length array of the subscribers */
|
|
|
|
|
AST_VECTOR(, struct stasis_subscription *) subscribers;
|
|
|
|
|
|
|
|
|
|
/*! Topics forwarding into this topic */
|
|
|
|
|
AST_VECTOR(, struct stasis_topic *) upstream_topics;
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
struct stasis_topic_statistics *statistics;
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/*! Name of the topic */
|
|
|
|
|
char name[0];
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/* Forward declarations for the tightly-coupled subscription object */
|
|
|
|
@ -337,28 +393,54 @@ static void topic_dtor(void *obj)
|
|
|
|
|
* unsubscribed before we get here. */
|
|
|
|
|
ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
|
|
|
|
|
|
|
|
|
|
ast_free(topic->name);
|
|
|
|
|
topic->name = NULL;
|
|
|
|
|
|
|
|
|
|
AST_VECTOR_FREE(&topic->subscribers);
|
|
|
|
|
AST_VECTOR_FREE(&topic->upstream_topics);
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
if (topic->statistics) {
|
|
|
|
|
ao2_unlink(topic_statistics, topic->statistics);
|
|
|
|
|
ao2_ref(topic->statistics, -1);
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
static struct stasis_topic_statistics *stasis_topic_statistics_create(const char *name)
|
|
|
|
|
{
|
|
|
|
|
struct stasis_topic_statistics *statistics;
|
|
|
|
|
|
|
|
|
|
statistics = ao2_alloc(sizeof(*statistics) + strlen(name) + 1, NULL);
|
|
|
|
|
if (!statistics) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
strcpy(statistics->name, name); /* SAFE */
|
|
|
|
|
ao2_link(topic_statistics, statistics);
|
|
|
|
|
|
|
|
|
|
return statistics;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
struct stasis_topic *stasis_topic_create(const char *name)
|
|
|
|
|
{
|
|
|
|
|
struct stasis_topic *topic;
|
|
|
|
|
int res = 0;
|
|
|
|
|
|
|
|
|
|
topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
|
|
|
|
|
topic = ao2_t_alloc(sizeof(*topic) + strlen(name) + 1, topic_dtor, name);
|
|
|
|
|
if (!topic) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
topic->name = ast_strdup(name);
|
|
|
|
|
strcpy(topic->name, name); /* SAFE */
|
|
|
|
|
res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
|
|
|
|
|
res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
topic->statistics = stasis_topic_statistics_create(name);
|
|
|
|
|
if (!topic->name || !topic->statistics || res) {
|
|
|
|
|
#else
|
|
|
|
|
if (!topic->name || res) {
|
|
|
|
|
ao2_cleanup(topic);
|
|
|
|
|
#endif
|
|
|
|
|
ao2_ref(topic, -1);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -375,6 +457,35 @@ size_t stasis_topic_subscribers(const struct stasis_topic *topic)
|
|
|
|
|
return AST_VECTOR_SIZE(&topic->subscribers);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
struct stasis_subscription_statistics {
|
|
|
|
|
/*! \brief The filename where the subscription originates */
|
|
|
|
|
const char *file;
|
|
|
|
|
/*! \brief The line number where the subscription originates */
|
|
|
|
|
int lineno;
|
|
|
|
|
/*! \brief The function where the subscription originates */
|
|
|
|
|
const char *func;
|
|
|
|
|
/*! \brief The number of messages that were filtered out */
|
|
|
|
|
int messages_dropped;
|
|
|
|
|
/*! \brief The number of messages that passed filtering */
|
|
|
|
|
int messages_passed;
|
|
|
|
|
/*! \brief Highest time spent invoking a message */
|
|
|
|
|
int64_t highest_time_invoked;
|
|
|
|
|
/*! \brief The message type that currently took the longest to process */
|
|
|
|
|
struct stasis_message_type *highest_time_message_type;
|
|
|
|
|
/*! \brief Lowest time spent invoking a message */
|
|
|
|
|
int64_t lowest_time_invoked;
|
|
|
|
|
/*! \brief Using a mailbox to queue messages */
|
|
|
|
|
int uses_mailbox;
|
|
|
|
|
/*! \brief Using stasis threadpool for handling messages */
|
|
|
|
|
int uses_threadpool;
|
|
|
|
|
/*! \brief Name of the topic we subscribed to */
|
|
|
|
|
char *topic;
|
|
|
|
|
/*! \brief Unique ID of the subscription */
|
|
|
|
|
char uniqueid[0];
|
|
|
|
|
};
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/*! \internal */
|
|
|
|
|
struct stasis_subscription {
|
|
|
|
|
/*! Unique ID for this subscription */
|
|
|
|
@ -403,6 +514,11 @@ struct stasis_subscription {
|
|
|
|
|
enum stasis_subscription_message_formatters accepted_formatters;
|
|
|
|
|
/*! The message filter currently in use */
|
|
|
|
|
enum stasis_subscription_message_filter filter;
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
/*! Statistics information */
|
|
|
|
|
struct stasis_subscription_statistics *statistics;
|
|
|
|
|
#endif
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static void subscription_dtor(void *obj)
|
|
|
|
@ -423,6 +539,13 @@ static void subscription_dtor(void *obj)
|
|
|
|
|
ast_cond_destroy(&sub->join_cond);
|
|
|
|
|
|
|
|
|
|
AST_VECTOR_FREE(&sub->accepted_message_types);
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
if (sub->statistics) {
|
|
|
|
|
ao2_unlink(subscription_statistics, sub->statistics);
|
|
|
|
|
ao2_ref(sub->statistics, -1);
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
@ -436,6 +559,12 @@ static void subscription_invoke(struct stasis_subscription *sub,
|
|
|
|
|
{
|
|
|
|
|
unsigned int final = stasis_subscription_final_message(sub, message);
|
|
|
|
|
int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
struct timeval start;
|
|
|
|
|
int elapsed;
|
|
|
|
|
|
|
|
|
|
start = ast_tvnow();
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/* Notify that the final message has been received */
|
|
|
|
|
if (final) {
|
|
|
|
@ -462,6 +591,19 @@ static void subscription_invoke(struct stasis_subscription *sub,
|
|
|
|
|
ast_cond_signal(&sub->join_cond);
|
|
|
|
|
ao2_unlock(sub);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
elapsed = ast_tvdiff_ms(ast_tvnow(), start);
|
|
|
|
|
if (elapsed > sub->statistics->highest_time_invoked) {
|
|
|
|
|
sub->statistics->highest_time_invoked = elapsed;
|
|
|
|
|
ao2_lock(sub->statistics);
|
|
|
|
|
sub->statistics->highest_time_message_type = stasis_message_type(message);
|
|
|
|
|
ao2_unlock(sub->statistics);
|
|
|
|
|
}
|
|
|
|
|
if (elapsed < sub->statistics->lowest_time_invoked) {
|
|
|
|
|
sub->statistics->lowest_time_invoked = elapsed;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
|
|
|
|
@ -471,12 +613,51 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
static struct stasis_subscription_statistics *stasis_subscription_statistics_create(const char *uniqueid,
|
|
|
|
|
const char *topic, int needs_mailbox, int use_thread_pool, const char *file, int lineno,
|
|
|
|
|
const char *func)
|
|
|
|
|
{
|
|
|
|
|
struct stasis_subscription_statistics *statistics;
|
|
|
|
|
size_t uniqueid_len = strlen(uniqueid) + 1;
|
|
|
|
|
|
|
|
|
|
statistics = ao2_alloc(sizeof(*statistics) + uniqueid_len + strlen(topic) + 1, NULL);
|
|
|
|
|
if (!statistics) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
statistics->file = file;
|
|
|
|
|
statistics->lineno = lineno;
|
|
|
|
|
statistics->func = func;
|
|
|
|
|
statistics->uses_mailbox = needs_mailbox;
|
|
|
|
|
statistics->uses_threadpool = use_thread_pool;
|
|
|
|
|
strcpy(statistics->uniqueid, uniqueid); /* SAFE */
|
|
|
|
|
statistics->topic = statistics->uniqueid + uniqueid_len;
|
|
|
|
|
strcpy(statistics->topic, topic); /* SAFE */
|
|
|
|
|
ao2_link(subscription_statistics, statistics);
|
|
|
|
|
|
|
|
|
|
return statistics;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
struct stasis_subscription *internal_stasis_subscribe(
|
|
|
|
|
struct stasis_topic *topic,
|
|
|
|
|
stasis_subscription_cb callback,
|
|
|
|
|
void *data,
|
|
|
|
|
int needs_mailbox,
|
|
|
|
|
int use_thread_pool,
|
|
|
|
|
const char *file,
|
|
|
|
|
int lineno,
|
|
|
|
|
const char *func)
|
|
|
|
|
#else
|
|
|
|
|
struct stasis_subscription *internal_stasis_subscribe(
|
|
|
|
|
struct stasis_topic *topic,
|
|
|
|
|
stasis_subscription_cb callback,
|
|
|
|
|
void *data,
|
|
|
|
|
int needs_mailbox,
|
|
|
|
|
int use_thread_pool)
|
|
|
|
|
#endif
|
|
|
|
|
{
|
|
|
|
|
struct stasis_subscription *sub;
|
|
|
|
|
|
|
|
|
@ -491,6 +672,15 @@ struct stasis_subscription *internal_stasis_subscribe(
|
|
|
|
|
}
|
|
|
|
|
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
sub->statistics = stasis_subscription_statistics_create(sub->uniqueid, topic->name, needs_mailbox,
|
|
|
|
|
use_thread_pool, file, lineno, func);
|
|
|
|
|
if (!sub->statistics) {
|
|
|
|
|
ao2_ref(sub, -1);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
if (needs_mailbox) {
|
|
|
|
|
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
|
|
|
|
|
|
|
|
|
@ -538,6 +728,18 @@ struct stasis_subscription *internal_stasis_subscribe(
|
|
|
|
|
return sub;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
struct stasis_subscription *__stasis_subscribe(
|
|
|
|
|
struct stasis_topic *topic,
|
|
|
|
|
stasis_subscription_cb callback,
|
|
|
|
|
void *data,
|
|
|
|
|
const char *file,
|
|
|
|
|
int lineno,
|
|
|
|
|
const char *func)
|
|
|
|
|
{
|
|
|
|
|
return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
|
|
|
|
|
}
|
|
|
|
|
#else
|
|
|
|
|
struct stasis_subscription *stasis_subscribe(
|
|
|
|
|
struct stasis_topic *topic,
|
|
|
|
|
stasis_subscription_cb callback,
|
|
|
|
@ -545,7 +747,20 @@ struct stasis_subscription *stasis_subscribe(
|
|
|
|
|
{
|
|
|
|
|
return internal_stasis_subscribe(topic, callback, data, 1, 0);
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
struct stasis_subscription *__stasis_subscribe_pool(
|
|
|
|
|
struct stasis_topic *topic,
|
|
|
|
|
stasis_subscription_cb callback,
|
|
|
|
|
void *data,
|
|
|
|
|
const char *file,
|
|
|
|
|
int lineno,
|
|
|
|
|
const char *func)
|
|
|
|
|
{
|
|
|
|
|
return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
|
|
|
|
|
}
|
|
|
|
|
#else
|
|
|
|
|
struct stasis_subscription *stasis_subscribe_pool(
|
|
|
|
|
struct stasis_topic *topic,
|
|
|
|
|
stasis_subscription_cb callback,
|
|
|
|
@ -553,6 +768,7 @@ struct stasis_subscription *stasis_subscribe_pool(
|
|
|
|
|
{
|
|
|
|
|
return internal_stasis_subscribe(topic, callback, data, 1, 1);
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
static int sub_cleanup(void *data)
|
|
|
|
|
{
|
|
|
|
@ -808,6 +1024,11 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
|
|
|
|
|
topic_add_subscription(
|
|
|
|
|
AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
topic->statistics->subscriber_count += 1;
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
ao2_unlock(topic);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
@ -825,6 +1046,13 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s
|
|
|
|
|
}
|
|
|
|
|
res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
|
|
|
|
|
AST_VECTOR_ELEM_CLEANUP_NOOP);
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
if (!res) {
|
|
|
|
|
topic->statistics->subscriber_count -= 1;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
ao2_unlock(topic);
|
|
|
|
|
|
|
|
|
|
return res;
|
|
|
|
@ -885,8 +1113,10 @@ static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
|
|
|
|
|
* \param message The message to send
|
|
|
|
|
* \param synchronous If non-zero, synchronize on the subscriber receiving
|
|
|
|
|
* the message
|
|
|
|
|
* \retval 0 if message was not dispatched
|
|
|
|
|
* \retval 1 if message was dispatched
|
|
|
|
|
*/
|
|
|
|
|
static void dispatch_message(struct stasis_subscription *sub,
|
|
|
|
|
static unsigned int dispatch_message(struct stasis_subscription *sub,
|
|
|
|
|
struct stasis_message *message,
|
|
|
|
|
int synchronous)
|
|
|
|
|
{
|
|
|
|
@ -938,14 +1168,22 @@ static void dispatch_message(struct stasis_subscription *sub,
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
} while (0);
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
if (!sub->mailbox) {
|
|
|
|
|
/* Dispatch directly */
|
|
|
|
|
subscription_invoke(sub, message);
|
|
|
|
|
return;
|
|
|
|
|
return 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Bump the message for the taskprocessor push. This will get de-ref'd
|
|
|
|
@ -957,6 +1195,7 @@ static void dispatch_message(struct stasis_subscription *sub,
|
|
|
|
|
/* Push failed; ugh. */
|
|
|
|
|
ast_log(LOG_ERROR, "Dropping async dispatch\n");
|
|
|
|
|
ao2_cleanup(message);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
struct sync_task_data std;
|
|
|
|
@ -972,7 +1211,7 @@ static void dispatch_message(struct stasis_subscription *sub,
|
|
|
|
|
ao2_cleanup(message);
|
|
|
|
|
ast_mutex_destroy(&std.lock);
|
|
|
|
|
ast_cond_destroy(&std.cond);
|
|
|
|
|
return;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_mutex_lock(&std.lock);
|
|
|
|
@ -984,6 +1223,8 @@ static void dispatch_message(struct stasis_subscription *sub,
|
|
|
|
|
ast_mutex_destroy(&std.lock);
|
|
|
|
|
ast_cond_destroy(&std.cond);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
@ -997,12 +1238,41 @@ static void publish_msg(struct stasis_topic *topic,
|
|
|
|
|
struct stasis_message *message, struct stasis_subscription *sync_sub)
|
|
|
|
|
{
|
|
|
|
|
size_t i;
|
|
|
|
|
unsigned int dispatched = 0;
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
int message_type_id = stasis_message_type_id(stasis_message_type(message));
|
|
|
|
|
struct stasis_message_type_statistics *statistics;
|
|
|
|
|
struct timeval start;
|
|
|
|
|
int elapsed;
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
ast_assert(topic != NULL);
|
|
|
|
|
ast_assert(message != NULL);
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
ast_mutex_lock(&message_type_statistics_lock);
|
|
|
|
|
if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
|
|
|
|
|
struct stasis_message_type_statistics new_statistics = {
|
|
|
|
|
.published = 0,
|
|
|
|
|
};
|
|
|
|
|
if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
|
|
|
|
|
ast_mutex_unlock(&message_type_statistics_lock);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
|
|
|
|
|
statistics->message_type = stasis_message_type(message);
|
|
|
|
|
ast_mutex_unlock(&message_type_statistics_lock);
|
|
|
|
|
|
|
|
|
|
ast_atomic_fetchadd_int(&statistics->published, +1);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/* If there are no subscribers don't bother */
|
|
|
|
|
if (!stasis_topic_subscribers(topic)) {
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
ast_atomic_fetchadd_int(&statistics->unused, +1);
|
|
|
|
|
ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
|
|
|
|
|
#endif
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1011,15 +1281,35 @@ static void publish_msg(struct stasis_topic *topic,
|
|
|
|
|
* Make sure we hold onto a reference while dispatching.
|
|
|
|
|
*/
|
|
|
|
|
ao2_ref(topic, +1);
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
start = ast_tvnow();
|
|
|
|
|
#endif
|
|
|
|
|
ao2_lock(topic);
|
|
|
|
|
for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
|
|
|
|
|
struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
|
|
|
|
|
|
|
|
|
|
ast_assert(sub != NULL);
|
|
|
|
|
|
|
|
|
|
dispatch_message(sub, message, (sub == sync_sub));
|
|
|
|
|
dispatched += dispatch_message(sub, message, (sub == sync_sub));
|
|
|
|
|
}
|
|
|
|
|
ao2_unlock(topic);
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
elapsed = ast_tvdiff_ms(ast_tvnow(), start);
|
|
|
|
|
if (elapsed > topic->statistics->highest_time_dispatched) {
|
|
|
|
|
topic->statistics->highest_time_dispatched = elapsed;
|
|
|
|
|
}
|
|
|
|
|
if (elapsed < topic->statistics->lowest_time_dispatched) {
|
|
|
|
|
topic->statistics->lowest_time_dispatched = elapsed;
|
|
|
|
|
}
|
|
|
|
|
if (dispatched) {
|
|
|
|
|
ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
|
|
|
|
|
} else {
|
|
|
|
|
ast_atomic_fetchadd_int(&statistics->unused, +1);
|
|
|
|
|
ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
ao2_ref(topic, -1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1805,9 +2095,458 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
|
|
|
|
|
|
|
|
|
|
/*! @} */
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief CLI command implementation for 'stasis statistics show subscriptions'
|
|
|
|
|
*/
|
|
|
|
|
static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
|
|
|
|
|
{
|
|
|
|
|
struct ao2_iterator iter;
|
|
|
|
|
struct stasis_subscription_statistics *statistics;
|
|
|
|
|
int count = 0;
|
|
|
|
|
int dropped = 0;
|
|
|
|
|
int passed = 0;
|
|
|
|
|
#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
|
|
|
|
|
#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
|
|
|
|
|
#define FMT_FIELDS2 "%-64s %10d %10d\n"
|
|
|
|
|
|
|
|
|
|
switch (cmd) {
|
|
|
|
|
case CLI_INIT:
|
|
|
|
|
e->command = "stasis statistics show subscriptions";
|
|
|
|
|
e->usage =
|
|
|
|
|
"Usage: stasis statistics show subscriptions\n"
|
|
|
|
|
" Shows a list of subscriptions and their general statistics\n";
|
|
|
|
|
return NULL;
|
|
|
|
|
case CLI_GENERATE:
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (a->argc != e->args) {
|
|
|
|
|
return CLI_SHOWUSAGE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
|
|
|
|
|
|
|
|
|
|
iter = ao2_iterator_init(subscription_statistics, 0);
|
|
|
|
|
while ((statistics = ao2_iterator_next(&iter))) {
|
|
|
|
|
ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
|
|
|
|
|
statistics->lowest_time_invoked, statistics->highest_time_invoked);
|
|
|
|
|
dropped += statistics->messages_dropped;
|
|
|
|
|
passed += statistics->messages_passed;
|
|
|
|
|
ao2_ref(statistics, -1);
|
|
|
|
|
++count;
|
|
|
|
|
}
|
|
|
|
|
ao2_iterator_destroy(&iter);
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
|
|
|
|
|
ast_cli(a->fd, "\n%d subscriptions\n\n", count);
|
|
|
|
|
|
|
|
|
|
#undef FMT_HEADERS
|
|
|
|
|
#undef FMT_FIELDS
|
|
|
|
|
#undef FMT_FIELDS2
|
|
|
|
|
|
|
|
|
|
return CLI_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief CLI tab completion for subscription statistics names
|
|
|
|
|
*/
|
|
|
|
|
static char *subscription_statistics_complete_name(const char *word, int state)
|
|
|
|
|
{
|
|
|
|
|
struct stasis_subscription_statistics *statistics;
|
|
|
|
|
struct ao2_iterator it_statistics;
|
|
|
|
|
int wordlen = strlen(word);
|
|
|
|
|
int which = 0;
|
|
|
|
|
char *result = NULL;
|
|
|
|
|
|
|
|
|
|
it_statistics = ao2_iterator_init(subscription_statistics, 0);
|
|
|
|
|
while ((statistics = ao2_iterator_next(&it_statistics))) {
|
|
|
|
|
if (!strncasecmp(word, statistics->uniqueid, wordlen)
|
|
|
|
|
&& ++which > state) {
|
|
|
|
|
result = ast_strdup(statistics->uniqueid);
|
|
|
|
|
}
|
|
|
|
|
ao2_ref(statistics, -1);
|
|
|
|
|
if (result) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ao2_iterator_destroy(&it_statistics);
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief CLI command implementation for 'stasis statistics show subscription'
|
|
|
|
|
*/
|
|
|
|
|
static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
|
|
|
|
|
{
|
|
|
|
|
struct stasis_subscription_statistics *statistics;
|
|
|
|
|
|
|
|
|
|
switch (cmd) {
|
|
|
|
|
case CLI_INIT:
|
|
|
|
|
e->command = "stasis statistics show subscription";
|
|
|
|
|
e->usage =
|
|
|
|
|
"Usage: stasis statistics show subscription <uniqueid>\n"
|
|
|
|
|
" Show stasis subscription statistics.\n";
|
|
|
|
|
return NULL;
|
|
|
|
|
case CLI_GENERATE:
|
|
|
|
|
if (a->pos == 4) {
|
|
|
|
|
return subscription_statistics_complete_name(a->word, a->n);
|
|
|
|
|
} else {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (a->argc != 5) {
|
|
|
|
|
return CLI_SHOWUSAGE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
statistics = ao2_find(subscription_statistics, a->argv[4], OBJ_SEARCH_KEY);
|
|
|
|
|
if (!statistics) {
|
|
|
|
|
ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
|
|
|
|
|
return CLI_FAILURE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
|
|
|
|
|
ast_cli(a->fd, "Topic: %s\n", statistics->topic);
|
|
|
|
|
ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
|
|
|
|
|
ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
|
|
|
|
|
ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
|
|
|
|
|
ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
|
|
|
|
|
ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
|
|
|
|
|
ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
|
|
|
|
|
ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
|
|
|
|
|
ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
|
|
|
|
|
ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
|
|
|
|
|
|
|
|
|
|
ao2_lock(statistics);
|
|
|
|
|
if (statistics->highest_time_message_type) {
|
|
|
|
|
ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
|
|
|
|
|
}
|
|
|
|
|
ao2_unlock(statistics);
|
|
|
|
|
|
|
|
|
|
ao2_ref(statistics, -1);
|
|
|
|
|
|
|
|
|
|
return CLI_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief CLI command implementation for 'stasis statistics show topics'
|
|
|
|
|
*/
|
|
|
|
|
static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
|
|
|
|
|
{
|
|
|
|
|
struct ao2_iterator iter;
|
|
|
|
|
struct stasis_topic_statistics *statistics;
|
|
|
|
|
int count = 0;
|
|
|
|
|
int not_dispatched = 0;
|
|
|
|
|
int dispatched = 0;
|
|
|
|
|
#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
|
|
|
|
|
#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
|
|
|
|
|
#define FMT_FIELDS2 "%-64s %10d %10d\n"
|
|
|
|
|
|
|
|
|
|
switch (cmd) {
|
|
|
|
|
case CLI_INIT:
|
|
|
|
|
e->command = "stasis statistics show topics";
|
|
|
|
|
e->usage =
|
|
|
|
|
"Usage: stasis statistics show topics\n"
|
|
|
|
|
" Shows a list of topics and their general statistics\n";
|
|
|
|
|
return NULL;
|
|
|
|
|
case CLI_GENERATE:
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (a->argc != e->args) {
|
|
|
|
|
return CLI_SHOWUSAGE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
|
|
|
|
|
|
|
|
|
|
iter = ao2_iterator_init(topic_statistics, 0);
|
|
|
|
|
while ((statistics = ao2_iterator_next(&iter))) {
|
|
|
|
|
ast_cli(a->fd, FMT_FIELDS, statistics->name, statistics->messages_not_dispatched, statistics->messages_dispatched,
|
|
|
|
|
statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
|
|
|
|
|
not_dispatched += statistics->messages_not_dispatched;
|
|
|
|
|
dispatched += statistics->messages_dispatched;
|
|
|
|
|
ao2_ref(statistics, -1);
|
|
|
|
|
++count;
|
|
|
|
|
}
|
|
|
|
|
ao2_iterator_destroy(&iter);
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, FMT_FIELDS2, "Total", not_dispatched, dispatched);
|
|
|
|
|
ast_cli(a->fd, "\n%d topics\n\n", count);
|
|
|
|
|
|
|
|
|
|
#undef FMT_HEADERS
|
|
|
|
|
#undef FMT_FIELDS
|
|
|
|
|
#undef FMT_FIELDS2
|
|
|
|
|
|
|
|
|
|
return CLI_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief CLI tab completion for topic statistics names
|
|
|
|
|
*/
|
|
|
|
|
static char *topic_statistics_complete_name(const char *word, int state)
|
|
|
|
|
{
|
|
|
|
|
struct stasis_topic_statistics *statistics;
|
|
|
|
|
struct ao2_iterator it_statistics;
|
|
|
|
|
int wordlen = strlen(word);
|
|
|
|
|
int which = 0;
|
|
|
|
|
char *result = NULL;
|
|
|
|
|
|
|
|
|
|
it_statistics = ao2_iterator_init(topic_statistics, 0);
|
|
|
|
|
while ((statistics = ao2_iterator_next(&it_statistics))) {
|
|
|
|
|
if (!strncasecmp(word, statistics->name, wordlen)
|
|
|
|
|
&& ++which > state) {
|
|
|
|
|
result = ast_strdup(statistics->name);
|
|
|
|
|
}
|
|
|
|
|
ao2_ref(statistics, -1);
|
|
|
|
|
if (result) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ao2_iterator_destroy(&it_statistics);
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief CLI command implementation for 'stasis statistics show topic'
|
|
|
|
|
*/
|
|
|
|
|
static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
|
|
|
|
|
{
|
|
|
|
|
struct stasis_topic_statistics *statistics;
|
|
|
|
|
|
|
|
|
|
switch (cmd) {
|
|
|
|
|
case CLI_INIT:
|
|
|
|
|
e->command = "stasis statistics show topic";
|
|
|
|
|
e->usage =
|
|
|
|
|
"Usage: stasis statistics show topic <name>\n"
|
|
|
|
|
" Show stasis topic statistics.\n";
|
|
|
|
|
return NULL;
|
|
|
|
|
case CLI_GENERATE:
|
|
|
|
|
if (a->pos == 4) {
|
|
|
|
|
return topic_statistics_complete_name(a->word, a->n);
|
|
|
|
|
} else {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (a->argc != 5) {
|
|
|
|
|
return CLI_SHOWUSAGE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
statistics = ao2_find(topic_statistics, a->argv[4], OBJ_SEARCH_KEY);
|
|
|
|
|
if (!statistics) {
|
|
|
|
|
ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
|
|
|
|
|
return CLI_FAILURE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, "Topic: %s\n", statistics->name);
|
|
|
|
|
ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
|
|
|
|
|
ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
|
|
|
|
|
ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
|
|
|
|
|
ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
|
|
|
|
|
ast_cli(a->fd, "Number of subscribers: %d\n", statistics->subscriber_count);
|
|
|
|
|
|
|
|
|
|
ao2_ref(statistics, -1);
|
|
|
|
|
|
|
|
|
|
return CLI_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief CLI command implementation for 'stasis statistics show messages'
|
|
|
|
|
*/
|
|
|
|
|
static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
|
|
|
|
|
{
|
|
|
|
|
int i;
|
|
|
|
|
int count = 0;
|
|
|
|
|
int published = 0;
|
|
|
|
|
int unused = 0;
|
|
|
|
|
#define FMT_HEADERS "%-64s %10s %10s\n"
|
|
|
|
|
#define FMT_FIELDS "%-64s %10d %10d\n"
|
|
|
|
|
|
|
|
|
|
switch (cmd) {
|
|
|
|
|
case CLI_INIT:
|
|
|
|
|
e->command = "stasis statistics show messages";
|
|
|
|
|
e->usage =
|
|
|
|
|
"Usage: stasis statistics show messages\n"
|
|
|
|
|
" Shows a list of message types and their general statistics\n";
|
|
|
|
|
return NULL;
|
|
|
|
|
case CLI_GENERATE:
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (a->argc != e->args) {
|
|
|
|
|
return CLI_SHOWUSAGE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
|
|
|
|
|
|
|
|
|
|
ast_mutex_lock(&message_type_statistics_lock);
|
|
|
|
|
for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
|
|
|
|
|
struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
|
|
|
|
|
|
|
|
|
|
if (!statistics->message_type) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
|
|
|
|
|
statistics->unused);
|
|
|
|
|
published += statistics->published;
|
|
|
|
|
unused += statistics->unused;
|
|
|
|
|
++count;
|
|
|
|
|
}
|
|
|
|
|
ast_mutex_unlock(&message_type_statistics_lock);
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
|
|
|
|
|
ast_cli(a->fd, "\n%d seen message types\n\n", count);
|
|
|
|
|
|
|
|
|
|
#undef FMT_HEADERS
|
|
|
|
|
#undef FMT_FIELDS
|
|
|
|
|
|
|
|
|
|
return CLI_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static struct ast_cli_entry cli_stasis_statistics[] = {
|
|
|
|
|
AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
|
|
|
|
|
AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
|
|
|
|
|
AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
|
|
|
|
|
AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
|
|
|
|
|
AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static int subscription_statistics_hash(const void *obj, const int flags)
|
|
|
|
|
{
|
|
|
|
|
const struct stasis_subscription_statistics *object;
|
|
|
|
|
const char *key;
|
|
|
|
|
|
|
|
|
|
switch (flags & OBJ_SEARCH_MASK) {
|
|
|
|
|
case OBJ_SEARCH_KEY:
|
|
|
|
|
key = obj;
|
|
|
|
|
break;
|
|
|
|
|
case OBJ_SEARCH_OBJECT:
|
|
|
|
|
object = obj;
|
|
|
|
|
key = object->uniqueid;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
/* Hash can only work on something with a full key. */
|
|
|
|
|
ast_assert(0);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
return ast_str_case_hash(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int subscription_statistics_cmp(void *obj, void *arg, int flags)
|
|
|
|
|
{
|
|
|
|
|
const struct stasis_subscription_statistics *object_left = obj;
|
|
|
|
|
const struct stasis_subscription_statistics *object_right = arg;
|
|
|
|
|
const char *right_key = arg;
|
|
|
|
|
int cmp;
|
|
|
|
|
|
|
|
|
|
switch (flags & OBJ_SEARCH_MASK) {
|
|
|
|
|
case OBJ_SEARCH_OBJECT:
|
|
|
|
|
right_key = object_right->uniqueid;
|
|
|
|
|
/* Fall through */
|
|
|
|
|
case OBJ_SEARCH_KEY:
|
|
|
|
|
cmp = strcasecmp(object_left->uniqueid, right_key);
|
|
|
|
|
break;
|
|
|
|
|
case OBJ_SEARCH_PARTIAL_KEY:
|
|
|
|
|
/* Not supported by container */
|
|
|
|
|
ast_assert(0);
|
|
|
|
|
cmp = -1;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
/*
|
|
|
|
|
* What arg points to is specific to this traversal callback
|
|
|
|
|
* and has no special meaning to astobj2.
|
|
|
|
|
*/
|
|
|
|
|
cmp = 0;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (cmp) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
/*
|
|
|
|
|
* At this point the traversal callback is identical to a sorted
|
|
|
|
|
* container.
|
|
|
|
|
*/
|
|
|
|
|
return CMP_MATCH;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int topic_statistics_hash(const void *obj, const int flags)
|
|
|
|
|
{
|
|
|
|
|
const struct stasis_topic_statistics *object;
|
|
|
|
|
const char *key;
|
|
|
|
|
|
|
|
|
|
switch (flags & OBJ_SEARCH_MASK) {
|
|
|
|
|
case OBJ_SEARCH_KEY:
|
|
|
|
|
key = obj;
|
|
|
|
|
break;
|
|
|
|
|
case OBJ_SEARCH_OBJECT:
|
|
|
|
|
object = obj;
|
|
|
|
|
key = object->name;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
/* Hash can only work on something with a full key. */
|
|
|
|
|
ast_assert(0);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
return ast_str_case_hash(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int topic_statistics_cmp(void *obj, void *arg, int flags)
|
|
|
|
|
{
|
|
|
|
|
const struct stasis_topic_statistics *object_left = obj;
|
|
|
|
|
const struct stasis_topic_statistics *object_right = arg;
|
|
|
|
|
const char *right_key = arg;
|
|
|
|
|
int cmp;
|
|
|
|
|
|
|
|
|
|
switch (flags & OBJ_SEARCH_MASK) {
|
|
|
|
|
case OBJ_SEARCH_OBJECT:
|
|
|
|
|
right_key = object_right->name;
|
|
|
|
|
/* Fall through */
|
|
|
|
|
case OBJ_SEARCH_KEY:
|
|
|
|
|
cmp = strcasecmp(object_left->name, right_key);
|
|
|
|
|
break;
|
|
|
|
|
case OBJ_SEARCH_PARTIAL_KEY:
|
|
|
|
|
/* Not supported by container */
|
|
|
|
|
ast_assert(0);
|
|
|
|
|
cmp = -1;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
/*
|
|
|
|
|
* What arg points to is specific to this traversal callback
|
|
|
|
|
* and has no special meaning to astobj2.
|
|
|
|
|
*/
|
|
|
|
|
cmp = 0;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (cmp) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
/*
|
|
|
|
|
* At this point the traversal callback is identical to a sorted
|
|
|
|
|
* container.
|
|
|
|
|
*/
|
|
|
|
|
return CMP_MATCH;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/*! \brief Cleanup function for graceful shutdowns */
|
|
|
|
|
static void stasis_cleanup(void)
|
|
|
|
|
{
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
|
|
|
|
|
AST_VECTOR_FREE(&message_type_statistics);
|
|
|
|
|
ao2_cleanup(subscription_statistics);
|
|
|
|
|
ao2_cleanup(topic_statistics);
|
|
|
|
|
#endif
|
|
|
|
|
ast_threadpool_shutdown(pool);
|
|
|
|
|
pool = NULL;
|
|
|
|
|
STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
|
|
|
|
@ -1902,5 +2641,28 @@ int stasis_init(void)
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
/* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
|
|
|
|
|
* topic or subscripton.
|
|
|
|
|
*/
|
|
|
|
|
subscription_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
|
|
|
|
|
subscription_statistics_hash, 0, subscription_statistics_cmp);
|
|
|
|
|
if (!subscription_statistics) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
topic_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
|
|
|
|
|
topic_statistics_hash, 0, topic_statistics_cmp);
|
|
|
|
|
if (!topic_statistics) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
AST_VECTOR_INIT(&message_type_statistics, 0);
|
|
|
|
|
|
|
|
|
|
if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|