Merge "bridges: Remove reliance on stasis caching"

This commit is contained in:
George Joseph 2018-11-29 15:05:33 -06:00 committed by Gerrit Code Review
commit 945451af90
15 changed files with 397 additions and 340 deletions

36
CHANGES
View File

@ -35,6 +35,42 @@ Channels
ast_channel_snapshot_update structure as it's data.
ast_channel_snapshot_get_latest() still returns the latest snapshot.
Bridging
------------------
* The bridging core no longer uses the stasis cache for bridge
snapshots. The latest bridge snapshot is now stored on the
ast_bridge structure itself.
* The following APIs are no longer available since the stasis cache
is no longer used:
ast_bridge_topic_cached()
ast_bridge_topic_all_cached()
* A topic pool is now used for individual bridge topics.
* The ast_bridge_cache() function was removed since there's no
longer a separate container of snapshots.
* A new function "ast_bridges()" was created to retrieve the
container of all bridges. Users formerly calling
ast_bridge_cache() can use the new function to iterate over
bridges and retrieve the latest snapshot directly from the
bridge.
* The ast_bridge_snapshot_get_latest() function was renamed to
ast_bridge_get_snapshot_by_uniqueid().
* A new function "ast_bridge_get_snapshot()" was created to retrieve
the bridge snapshot directly from the bridge structure.
* The ast_bridge_topic_all() function now returns a normal topic
not a cached one so you can't use stasis cache functions on it
either.
* The ast_bridge_snapshot_type() stasis message now has the
ast_bridge_snapshot_update structure as it's data. It contains
the last snapshot and the new one.
------------------------------------------------------------------------------
--- Functionality changes from Asterisk 16.0.0 to Asterisk 16.1.0 ------------
------------------------------------------------------------------------------

View File

@ -58,3 +58,34 @@ Channels:
The ast_channel_snapshot_type() stasis message now has the
ast_channel_snapshot_update structure as it's data.
ast_channel_snapshot_get_latest() still returns the latest snapshot.
Applications
- The JabberStatus application, deprecated in Asterisk 12, has been removed.
Bridging
- The bridging core no longer uses the stasis cache for bridge
snapshots. The latest bridge snapshot is now stored on the
ast_bridge structure itself.
- The following APIs are no longer available since the stasis cache
is no longer used:
ast_bridge_topic_cached()
ast_bridge_topic_all_cached()
- A topic pool is now used for individual bridge topics.
- The ast_bridge_cache() function was removed since there's no
longer a separate container of snapshots.
- A new function "ast_bridges()" was created to retrieve the
container of all bridges. Users formerly calling
ast_bridge_cache() can use the new function to iterate over
bridges and retrieve the latest snapshot directly from the
bridge.
- The ast_bridge_snapshot_get_latest() function was renamed to
ast_bridge_get_snapshot_by_uniqueid().
- A new function "ast_bridge_get_snapshot()" was created to retrieve
the bridge snapshot directly from the bridge structure.
- The ast_bridge_topic_all() function now returns a normal topic
not a cached one so you can't use stasis cache functions on it
either.
- The ast_bridge_snapshot_type() stasis message now has the
ast_bridge_snapshot_update structure as it's data. It contains
the last snapshot and the new one.

View File

@ -712,7 +712,7 @@ int manager_confbridge_init(void)
STASIS_MESSAGE_TYPE_INIT(confbridge_welcome_type);
bridge_state_router = stasis_message_router_create(
ast_bridge_topic_all_cached());
ast_bridge_topic_all());
if (!bridge_state_router) {
return -1;

View File

@ -75,6 +75,7 @@ extern "C" {
#include "asterisk/dsp.h"
#include "asterisk/uuid.h"
struct a02_container;
struct ast_bridge_technology;
struct ast_bridge;
struct ast_bridge_tech_optimizations;
@ -299,6 +300,39 @@ struct ast_bridge_softmix {
AST_LIST_HEAD_NOLOCK(ast_bridge_channels_list, ast_bridge_channel);
/*!
* \brief Structure that contains a snapshot of information about a bridge
*/
struct ast_bridge_snapshot {
AST_DECLARE_STRING_FIELDS(
/*! Immutable bridge UUID. */
AST_STRING_FIELD(uniqueid);
/*! Bridge technology that is handling the bridge */
AST_STRING_FIELD(technology);
/*! Bridge subclass that is handling the bridge */
AST_STRING_FIELD(subclass);
/*! Creator of the bridge */
AST_STRING_FIELD(creator);
/*! Name given to the bridge by its creator */
AST_STRING_FIELD(name);
/*! Unique ID of the channel providing video, if one exists */
AST_STRING_FIELD(video_source_id);
);
/*! AO2 container of bare channel uniqueid strings participating in the bridge.
* Allocated from ast_str_container_alloc() */
struct ao2_container *channels;
/*! Bridge flags to tweak behavior */
struct ast_flags feature_flags;
/*! Bridge capabilities */
uint32_t capabilities;
/*! Number of channels participating in the bridge */
unsigned int num_channels;
/*! Number of active channels in the bridge. */
unsigned int num_active;
/*! The video mode of the bridge */
enum ast_bridge_video_mode_type video_mode;
};
/*!
* \brief Structure that contains information about a bridge
*/
@ -312,7 +346,7 @@ struct ast_bridge {
/*! Private information unique to the bridge technology */
void *tech_pvt;
/*! Per-bridge topics */
struct stasis_cp_single *topics;
struct stasis_topic *topic;
/*! Call ID associated with the bridge */
ast_callid callid;
/*! Linked list of channels participating in the bridge */
@ -358,11 +392,26 @@ struct ast_bridge {
/*! Type mapping used for media routing */
struct ast_vector_int media_types;
/*! Current bridge snapshot */
struct ast_bridge_snapshot *current_snapshot;
};
/*! \brief Bridge base class virtual method table. */
extern struct ast_bridge_methods ast_bridge_base_v_table;
/*!
* \brief Returns the global bridges container
* \since 17.0
*
* \retval a pointer to the bridges container success
* \retval NULL on failure
*
* \note You must use ao2_ref(<container>, -1) when done with it
*
* \warning You must not attempt to modify the container returned.
*/
struct ao2_container *ast_bridges(void);
/*!
* \brief Create a new base class bridge
*

View File

@ -31,37 +31,9 @@ extern "C" {
#include "asterisk/bridge.h"
#include "asterisk/pbx.h"
/*!
* \brief Structure that contains a snapshot of information about a bridge
*/
struct ast_bridge_snapshot {
AST_DECLARE_STRING_FIELDS(
/*! Immutable bridge UUID. */
AST_STRING_FIELD(uniqueid);
/*! Bridge technology that is handling the bridge */
AST_STRING_FIELD(technology);
/*! Bridge subclass that is handling the bridge */
AST_STRING_FIELD(subclass);
/*! Creator of the bridge */
AST_STRING_FIELD(creator);
/*! Name given to the bridge by its creator */
AST_STRING_FIELD(name);
/*! Unique ID of the channel providing video, if one exists */
AST_STRING_FIELD(video_source_id);
);
/*! AO2 container of bare channel uniqueid strings participating in the bridge.
* Allocated from ast_str_container_alloc() */
struct ao2_container *channels;
/*! Bridge flags to tweak behavior */
struct ast_flags feature_flags;
/*! Bridge capabilities */
uint32_t capabilities;
/*! Number of channels participating in the bridge */
unsigned int num_channels;
/*! Number of active channels in the bridge. */
unsigned int num_active;
/*! The video mode of the bridge */
enum ast_bridge_video_mode_type video_mode;
struct ast_bridge_snapshot_update {
struct ast_bridge_snapshot *old_snapshot;
struct ast_bridge_snapshot *new_snapshot;
};
/*!
@ -99,22 +71,6 @@ struct stasis_message_type *ast_bridge_snapshot_type(void);
*/
struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge);
/*!
* \since 12
* \brief A topic which publishes the events for a particular bridge.
*
* \ref ast_bridge_snapshot messages are replaced with stasis_cache_update
* messages.
*
* If the given \a bridge is \c NULL, ast_bridge_topic_all_cached() is returned.
*
* \param bridge Bridge for which to get a topic or \c NULL.
*
* \retval Topic for bridge's events.
* \retval ast_bridge_topic_all() if \a bridge is \c NULL.
*/
struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge);
/*!
* \since 12
* \brief A topic which publishes the events for all bridges.
@ -122,22 +78,6 @@ struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge);
*/
struct stasis_topic *ast_bridge_topic_all(void);
/*!
* \since 12
* \brief A caching topic which caches \ref ast_bridge_snapshot messages from
* ast_bridge_events_all(void).
*
* \retval Caching topic for all bridge events.
*/
struct stasis_topic *ast_bridge_topic_all_cached(void);
/*!
* \since 12
* \brief Backend cache for ast_bridge_topic_all_cached().
* \retval Cache of \ref ast_bridge_snapshot.
*/
struct stasis_cache *ast_bridge_cache(void);
/*!
* \since 12
* \brief Publish the state of a bridge
@ -490,17 +430,31 @@ void ast_bridge_publish_attended_transfer(struct ast_attended_transfer_message *
struct stasis_message_type *ast_attended_transfer_type(void);
/*!
* \brief Returns the most recent snapshot for the bridge.
* \brief Returns the current snapshot for the bridge.
* \since 17.0
*
* The returned pointer is AO2 managed, so ao2_cleanup() when you're done.
*
* \param bridge_id Uniqueid of the bridge for which to get the snapshot.
* \param bridge_id Uniqueid of the bridge from which to get the snapshot.
* \return Most recent snapshot. ao2_cleanup() when done.
* \return \c NULL if channel isn't in cache.
* \return \c NULL if bridge or snapshot doesn't exist.
*/
struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(
struct ast_bridge_snapshot *ast_bridge_get_snapshot_by_uniqueid(
const char *bridge_id);
/*!
* \brief Returns the current snapshot for the bridge.
* \since 17.0
*
* The returned pointer is AO2 managed, so ao2_cleanup() when you're done.
*
* \param bridge The bridge from which to get the snapshot.
* \return Most recent snapshot. ao2_cleanup() when done.
* \return \c NULL if there isn't a snapshot.
*/
struct ast_bridge_snapshot *ast_bridge_get_snapshot(
struct ast_bridge *bridge);
/*!
* \internal
* \brief Initialize the topics for a single bridge.
@ -509,6 +463,15 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(
*/
int bridge_topics_init(struct ast_bridge *bridge);
/*!
* \internal
* \since 17.0
* \brief Publish destroy then cleanup topics.
*
* \param bridge The bridge to clean up
*/
void bridge_topics_destroy(struct ast_bridge *bridge);
/*!
* \internal
* \brief Initialize the stasis bridging topic and message types

View File

@ -167,6 +167,7 @@ options.o: _ASTCFLAGS+=$(call get_menuselect_cflags,REF_DEBUG)
sched.o: _ASTCFLAGS+=$(call get_menuselect_cflags,DEBUG_SCHEDULER DUMP_SCHEDULER)
tcptls.o: _ASTCFLAGS+=$(OPENSSL_INCLUDE) -Wno-deprecated-declarations
uuid.o: _ASTCFLAGS+=$(UUID_INCLUDE)
stasis.o: _ASTCFLAGS+=$(call get_menuselect_cflags,AO2_DEBUG)
OBJS:=$(sort $(OBJS))

View File

@ -171,6 +171,11 @@ struct bridge_manager_controller {
/*! Bridge manager controller. */
static struct bridge_manager_controller *bridge_manager;
struct ao2_container *ast_bridges(void)
{
return ao2_bump(bridges);
}
/*!
* \internal
* \brief Request service for a bridge from the bridge manager.
@ -650,25 +655,6 @@ static void bridge_handle_actions(struct ast_bridge *bridge)
}
}
static struct stasis_message *create_bridge_snapshot_message(struct ast_bridge *bridge)
{
RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
if (!ast_bridge_snapshot_type()) {
return NULL;
}
ast_bridge_lock(bridge);
snapshot = ast_bridge_snapshot_create(bridge);
ast_bridge_unlock(bridge);
if (!snapshot) {
return NULL;
}
return stasis_message_create(ast_bridge_snapshot_type(), snapshot);
}
static void destroy_bridge(void *obj)
{
struct ast_bridge *bridge = obj;
@ -677,17 +663,7 @@ static void destroy_bridge(void *obj)
bridge->uniqueid, bridge->v_table->name);
if (bridge->construction_completed) {
RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
clear_msg = create_bridge_snapshot_message(bridge);
if (clear_msg) {
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
msg = stasis_cache_clear_create(clear_msg);
if (msg) {
stasis_publish(ast_bridge_topic(bridge), msg);
}
}
bridge_topics_destroy(bridge);
}
/* Do any pending actions in the context of destruction. */
@ -726,9 +702,8 @@ static void destroy_bridge(void *obj)
cleanup_video_mode(bridge);
stasis_cp_single_unsubscribe(bridge->topics);
ast_string_field_free_memory(bridge);
ao2_cleanup(bridge->current_snapshot);
}
struct ast_bridge *bridge_register(struct ast_bridge *bridge)
@ -2008,6 +1983,9 @@ int ast_bridge_remove(struct ast_bridge *bridge, struct ast_channel *chan)
{
struct ast_bridge_channel *bridge_channel;
ast_debug(1, "Removing channel %s from bridge %s\n",
ast_channel_name(chan), bridge->uniqueid);
ast_bridge_lock(bridge);
/* Try to find the channel that we want to remove */
@ -5071,43 +5049,13 @@ static char *complete_bridge_live(const char *word)
return NULL;
}
static char *complete_bridge_stasis(const char *word)
{
int wordlen = strlen(word);
struct ao2_container *cached_bridges;
struct ao2_iterator iter;
struct stasis_message *msg;
cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type());
if (!cached_bridges) {
return NULL;
}
iter = ao2_iterator_init(cached_bridges, 0);
for (; (msg = ao2_iterator_next(&iter)); ao2_ref(msg, -1)) {
struct ast_bridge_snapshot *snapshot = stasis_message_data(msg);
if (!strncasecmp(word, snapshot->uniqueid, wordlen)) {
if (ast_cli_completion_add(ast_strdup(snapshot->uniqueid))) {
ao2_ref(msg, -1);
break;
}
}
}
ao2_iterator_destroy(&iter);
ao2_ref(cached_bridges, -1);
return NULL;
}
static char *handle_bridge_show_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
#define FORMAT_HDR "%-36s %5s %-15s %s\n"
#define FORMAT_ROW "%-36s %5u %-15s %s\n"
RAII_VAR(struct ao2_container *, cached_bridges, NULL, ao2_cleanup);
struct ao2_iterator iter;
struct stasis_message *msg;
struct ast_bridge *bridge;
switch (cmd) {
case CLI_INIT:
@ -5120,25 +5068,23 @@ static char *handle_bridge_show_all(struct ast_cli_entry *e, int cmd, struct ast
return NULL;
}
cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type());
if (!cached_bridges) {
ast_cli(a->fd, "Failed to retrieve cached bridges\n");
return CLI_SUCCESS;
}
ast_cli(a->fd, FORMAT_HDR, "Bridge-ID", "Chans", "Type", "Technology");
iter = ao2_iterator_init(cached_bridges, 0);
for (; (msg = ao2_iterator_next(&iter)); ao2_ref(msg, -1)) {
struct ast_bridge_snapshot *snapshot = stasis_message_data(msg);
iter = ao2_iterator_init(bridges, 0);
for (; (bridge = ao2_iterator_next(&iter)); ao2_ref(bridge, -1)) {
struct ast_bridge_snapshot *snapshot = ast_bridge_get_snapshot(bridge);
ast_cli(a->fd, FORMAT_ROW,
snapshot->uniqueid,
snapshot->num_channels,
S_OR(snapshot->subclass, "<unknown>"),
S_OR(snapshot->technology, "<unknown>"));
if (snapshot) {
ast_cli(a->fd, FORMAT_ROW,
snapshot->uniqueid,
snapshot->num_channels,
S_OR(snapshot->subclass, "<unknown>"),
S_OR(snapshot->technology, "<unknown>"));
ao2_ref(snapshot, -1);
}
}
ao2_iterator_destroy(&iter);
return CLI_SUCCESS;
#undef FORMAT_HDR
@ -5165,7 +5111,6 @@ static int bridge_show_specific_print_channel(void *obj, void *arg, int flags)
static char *handle_bridge_show_specific(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct ast_bridge_snapshot *snapshot;
switch (cmd) {
@ -5177,7 +5122,7 @@ static char *handle_bridge_show_specific(struct ast_cli_entry *e, int cmd, struc
return NULL;
case CLI_GENERATE:
if (a->pos == 2) {
return complete_bridge_stasis(a->word);
return complete_bridge_live(a->word);
}
return NULL;
}
@ -5186,18 +5131,17 @@ static char *handle_bridge_show_specific(struct ast_cli_entry *e, int cmd, struc
return CLI_SHOWUSAGE;
}
msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), a->argv[2]);
if (!msg) {
snapshot = ast_bridge_get_snapshot_by_uniqueid(a->argv[2]);
if (!snapshot) {
ast_cli(a->fd, "Bridge '%s' not found\n", a->argv[2]);
return CLI_SUCCESS;
}
snapshot = stasis_message_data(msg);
ast_cli(a->fd, "Id: %s\n", snapshot->uniqueid);
ast_cli(a->fd, "Type: %s\n", S_OR(snapshot->subclass, "<unknown>"));
ast_cli(a->fd, "Technology: %s\n", S_OR(snapshot->technology, "<unknown>"));
ast_cli(a->fd, "Num-Channels: %u\n", snapshot->num_channels);
ao2_callback(snapshot->channels, OBJ_NODATA, bridge_show_specific_print_channel, a);
ao2_ref(snapshot, -1);
return CLI_SUCCESS;
}

View File

@ -4288,7 +4288,7 @@ static int create_subscriptions(void)
if (!channel_subscription) {
return -1;
}
bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic);
bridge_subscription = stasis_forward_all(ast_bridge_topic_all(), cdr_topic);
if (!bridge_subscription) {
return -1;
}

View File

@ -1449,7 +1449,7 @@ static int create_subscriptions(void)
}
cel_bridge_forwarder = stasis_forward_all(
ast_bridge_topic_all_cached(),
ast_bridge_topic_all(),
cel_aggregation_topic);
if (!cel_bridge_forwarder) {
return -1;

View File

@ -330,22 +330,15 @@ static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, bridge_event_string, NULL, ast_free);
struct stasis_cache_update *update;
struct ast_bridge_snapshot *old_snapshot;
struct ast_bridge_snapshot *new_snapshot;
struct ast_bridge_snapshot_update *update;
size_t i;
update = stasis_message_data(message);
ast_assert(ast_bridge_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
for (i = 0; i < ARRAY_LEN(bridge_monitors); ++i) {
RAII_VAR(struct ast_manager_event_blob *, event, NULL, ao2_cleanup);
event = bridge_monitors[i](old_snapshot, new_snapshot);
event = bridge_monitors[i](update->old_snapshot, update->new_snapshot);
if (!event) {
continue;
}
@ -354,7 +347,7 @@ static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
if (!bridge_event_string) {
bridge_event_string =
ast_manager_build_bridge_state_string(
new_snapshot ? new_snapshot : old_snapshot);
update->new_snapshot ? update->new_snapshot : update->old_snapshot);
if (!bridge_event_string) {
return;
}
@ -446,26 +439,30 @@ static void channel_leave_cb(void *data, struct stasis_subscription *sub,
ast_str_buffer(channel_text));
}
static int filter_bridge_type_cb(void *obj, void *arg, int flags)
{
char *bridge_type = arg;
struct ast_bridge_snapshot *snapshot = stasis_message_data(obj);
/* unlink all the snapshots that do not match the bridge type */
return strcmp(bridge_type, snapshot->technology) ? CMP_MATCH : 0;
}
struct bridge_list_data {
const char *id_text;
struct ast_str *id_text;
const char *type_filter;
int count;
};
static int send_bridge_list_item_cb(void *obj, void *arg, void *data, int flags)
{
struct ast_bridge_snapshot *snapshot = stasis_message_data(obj);
struct ast_bridge *bridge = obj;
RAII_VAR(struct ast_bridge_snapshot *, snapshot, ast_bridge_get_snapshot(bridge), ao2_cleanup);
struct mansession *s = arg;
struct bridge_list_data *list_data = data;
RAII_VAR(struct ast_str *, bridge_info, ast_manager_build_bridge_state_string(snapshot), ast_free);
struct ast_str * bridge_info;
if (!snapshot) {
return 0;
}
if (!ast_strlen_zero(list_data->type_filter)
&& strcmp(list_data->type_filter, snapshot->technology)) {
return 0;
}
bridge_info = ast_manager_build_bridge_state_string(snapshot);
if (!bridge_info) {
return 0;
}
@ -475,9 +472,12 @@ static int send_bridge_list_item_cb(void *obj, void *arg, void *data, int flags)
"%s"
"%s"
"\r\n",
list_data->id_text,
ast_str_buffer(list_data->id_text),
ast_str_buffer(bridge_info));
++list_data->count;
ast_free(bridge_info);
return 0;
}
@ -485,41 +485,37 @@ static int manager_bridges_list(struct mansession *s, const struct message *m)
{
const char *id = astman_get_header(m, "ActionID");
const char *type_filter = astman_get_header(m, "BridgeType");
RAII_VAR(struct ast_str *, id_text, ast_str_create(128), ast_free);
RAII_VAR(struct ao2_container *, bridges, NULL, ao2_cleanup);
struct bridge_list_data list_data;
struct ao2_container *bridges;
struct bridge_list_data list_data = { 0 };
if (!id_text) {
astman_send_error(s, m, "Internal error");
return -1;
}
if (!ast_strlen_zero(id)) {
ast_str_set(&id_text, 0, "ActionID: %s\r\n", id);
}
bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type());
bridges = ast_bridges();
if (!bridges) {
astman_send_error(s, m, "Internal error");
return -1;
}
astman_send_listack(s, m, "Bridge listing will follow", "start");
if (!ast_strlen_zero(type_filter)) {
char *type_filter_dup = ast_strdupa(type_filter);
ao2_callback(bridges, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
filter_bridge_type_cb, type_filter_dup);
list_data.id_text = ast_str_create(128);
if (!list_data.id_text) {
ao2_ref(bridges, -1);
astman_send_error(s, m, "Internal error");
return -1;
}
list_data.id_text = ast_str_buffer(id_text);
list_data.count = 0;
if (!ast_strlen_zero(id)) {
ast_str_set(&list_data.id_text, 0, "ActionID: %s\r\n", id);
}
list_data.type_filter = type_filter;
astman_send_listack(s, m, "Bridge listing will follow", "start");
ao2_callback_data(bridges, OBJ_NODATA, send_bridge_list_item_cb, s, &list_data);
astman_send_list_complete_start(s, m, "BridgeListComplete", list_data.count);
astman_send_list_complete_end(s);
ast_free(list_data.id_text);
ao2_ref(bridges, -1);
return 0;
}
@ -550,7 +546,7 @@ static int send_bridge_info_item_cb(void *obj, void *arg, void *data, int flags)
"%s"
"%s"
"\r\n",
list_data->id_text,
ast_str_buffer(list_data->id_text),
ast_str_buffer(channel_text));
++list_data->count;
return 0;
@ -560,43 +556,39 @@ static int manager_bridge_info(struct mansession *s, const struct message *m)
{
const char *id = astman_get_header(m, "ActionID");
const char *bridge_uniqueid = astman_get_header(m, "BridgeUniqueid");
RAII_VAR(struct ast_str *, id_text, ast_str_create(128), ast_free);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
RAII_VAR(struct ast_str *, bridge_info, NULL, ast_free);
struct ast_bridge_snapshot *snapshot;
struct bridge_list_data list_data;
if (!id_text) {
astman_send_error(s, m, "Internal error");
return -1;
}
RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
struct bridge_list_data list_data = { 0 };
if (ast_strlen_zero(bridge_uniqueid)) {
astman_send_error(s, m, "BridgeUniqueid must be provided");
return 0;
}
if (!ast_strlen_zero(id)) {
ast_str_set(&id_text, 0, "ActionID: %s\r\n", id);
}
msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), bridge_uniqueid);
if (!msg) {
snapshot = ast_bridge_get_snapshot_by_uniqueid(bridge_uniqueid);
if (!snapshot) {
astman_send_error(s, m, "Specified BridgeUniqueid not found");
return 0;
}
snapshot = stasis_message_data(msg);
bridge_info = ast_manager_build_bridge_state_string(snapshot);
if (!bridge_info) {
astman_send_error(s, m, "Internal error");
return -1;
}
list_data.id_text = ast_str_create(128);
if (!list_data.id_text) {
astman_send_error(s, m, "Internal error");
return -1;
}
if (!ast_strlen_zero(id)) {
ast_str_set(&list_data.id_text, 0, "ActionID: %s\r\n", id);
}
astman_send_listack(s, m, "Bridge channel listing will follow", "start");
list_data.id_text = ast_str_buffer(id_text);
list_data.count = 0;
ao2_callback_data(snapshot->channels, OBJ_NODATA, send_bridge_info_item_cb, s, &list_data);
astman_send_list_complete_start(s, m, "BridgeInfoComplete", list_data.count);
@ -604,6 +596,7 @@ static int manager_bridge_info(struct mansession *s, const struct message *m)
astman_append(s, "%s", ast_str_buffer(bridge_info));
}
astman_send_list_complete_end(s);
ast_free(list_data.id_text);
return 0;
}
@ -703,7 +696,7 @@ int manager_bridging_init(void)
return -1;
}
bridge_topic = ast_bridge_topic_all_cached();
bridge_topic = ast_bridge_topic_all();
if (!bridge_topic) {
return -1;
}
@ -718,7 +711,7 @@ int manager_bridging_init(void)
return -1;
}
ret |= stasis_message_router_add_cache_update(bridge_state_router,
ret |= stasis_message_router_add(bridge_state_router,
ast_bridge_snapshot_type(), bridge_snapshot_update, NULL);
ret |= stasis_message_router_add(bridge_state_router,

View File

@ -155,7 +155,8 @@ static struct ast_json *ast_bridge_merge_message_to_json(
struct stasis_message *msg,
const struct stasis_message_sanitizer *sanitize);
static struct stasis_cp_all *bridge_cache_all;
static struct stasis_topic *bridge_topic_all;
static struct stasis_topic_pool *bridge_topic_pool;
/*!
* @{ \brief Define bridge message types.
@ -175,33 +176,9 @@ STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type,
.to_ami = attended_transfer_to_ami);
/*! @} */
struct stasis_cache *ast_bridge_cache(void)
{
return stasis_cp_all_cache(bridge_cache_all);
}
struct stasis_topic *ast_bridge_topic_all(void)
{
return stasis_cp_all_topic(bridge_cache_all);
}
struct stasis_topic *ast_bridge_topic_all_cached(void)
{
return stasis_cp_all_topic_cached(bridge_cache_all);
}
int bridge_topics_init(struct ast_bridge *bridge)
{
if (ast_strlen_zero(bridge->uniqueid)) {
ast_log(LOG_ERROR, "Bridge id initialization required\n");
return -1;
}
bridge->topics = stasis_cp_single_create(bridge_cache_all,
bridge->uniqueid);
if (!bridge->topics) {
return -1;
}
return 0;
return bridge_topic_all;
}
struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
@ -210,16 +187,7 @@ struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
return ast_bridge_topic_all();
}
return stasis_cp_single_topic(bridge->topics);
}
struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge)
{
if (!bridge) {
return ast_bridge_topic_all_cached();
}
return stasis_cp_single_topic_cached(bridge->topics);
return bridge->topic;
}
/*! \brief Destructor for bridge snapshots */
@ -292,24 +260,106 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_create(struct ast_bridge *bridge
return snapshot;
}
void ast_bridge_publish_state(struct ast_bridge *bridge)
static void bridge_snapshot_update_dtor(void *obj)
{
struct ast_bridge_snapshot *snapshot;
struct stasis_message *msg;
struct ast_bridge_snapshot_update *update = obj;
if (!ast_bridge_snapshot_type()) {
return;
ast_debug(3, "Update: %p Old: %s New: %s\n", update,
update->old_snapshot ? update->old_snapshot->uniqueid : "<none>",
update->new_snapshot ? update->new_snapshot->uniqueid : "<none>");
ao2_cleanup(update->old_snapshot);
ao2_cleanup(update->new_snapshot);
}
static struct ast_bridge_snapshot_update *bridge_snapshot_update_create(
struct ast_bridge_snapshot *old, struct ast_bridge_snapshot *new)
{
struct ast_bridge_snapshot_update *update;
update = ao2_alloc_options(sizeof(*update), bridge_snapshot_update_dtor,
AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!update) {
return NULL;
}
update->old_snapshot = ao2_bump(old);
update->new_snapshot = ao2_bump(new);
ast_debug(3, "Update: %p Old: %s New: %s\n", update,
update->old_snapshot ? update->old_snapshot->uniqueid : "<none>",
update->new_snapshot ? update->new_snapshot->uniqueid : "<none>");
return update;
}
int bridge_topics_init(struct ast_bridge *bridge)
{
if (ast_strlen_zero(bridge->uniqueid)) {
ast_log(LOG_ERROR, "Bridge id initialization required\n");
return -1;
}
bridge->topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
if (!bridge->topic) {
return -1;
}
return 0;
}
void bridge_topics_destroy(struct ast_bridge *bridge)
{
struct ast_bridge_snapshot_update *update;
struct stasis_message *msg;
ast_assert(bridge != NULL);
snapshot = ast_bridge_snapshot_create(bridge);
if (!snapshot) {
if (!bridge->current_snapshot) {
bridge->current_snapshot = ast_bridge_snapshot_create(bridge);
if (!bridge->current_snapshot) {
return;
}
}
update = bridge_snapshot_update_create(bridge->current_snapshot, NULL);
if (!update) {
return;
}
msg = stasis_message_create(ast_bridge_snapshot_type(), snapshot);
ao2_ref(snapshot, -1);
msg = stasis_message_create(ast_bridge_snapshot_type(), update);
ao2_ref(update, -1);
if (!msg) {
return;
}
stasis_publish(ast_bridge_topic(bridge), msg);
ao2_ref(msg, -1);
stasis_topic_pool_delete_topic(bridge_topic_pool, stasis_topic_name(ast_bridge_topic(bridge)));
}
void ast_bridge_publish_state(struct ast_bridge *bridge)
{
struct ast_bridge_snapshot *new_snapshot;
struct ast_bridge_snapshot_update *update;
struct stasis_message *msg;
ast_assert(bridge != NULL);
new_snapshot = ast_bridge_snapshot_create(bridge);
if (!new_snapshot) {
return;
}
update = bridge_snapshot_update_create(bridge->current_snapshot, new_snapshot);
/* There may not have been an old snapshot */
ao2_cleanup(bridge->current_snapshot);
bridge->current_snapshot = new_snapshot;
if (!update) {
return;
}
msg = stasis_message_create(ast_bridge_snapshot_type(), update);
ao2_ref(update, -1);
if (!msg) {
return;
}
@ -321,11 +371,20 @@ void ast_bridge_publish_state(struct ast_bridge *bridge)
static void bridge_publish_state_from_blob(struct ast_bridge *bridge,
struct ast_bridge_blob *obj)
{
struct ast_bridge_snapshot_update *update;
struct stasis_message *msg;
ast_assert(obj != NULL);
msg = stasis_message_create(ast_bridge_snapshot_type(), obj->bridge);
update = bridge_snapshot_update_create(bridge->current_snapshot, obj->bridge);
ao2_cleanup(bridge->current_snapshot);
bridge->current_snapshot = ao2_bump(obj->bridge);
if (!update) {
return;
}
msg = stasis_message_create(ast_bridge_snapshot_type(), update);
ao2_ref(update, -1);
if (!msg) {
return;
}
@ -1250,35 +1309,37 @@ void ast_bridge_publish_attended_transfer(struct ast_attended_transfer_message *
ao2_ref(msg, -1);
}
struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid)
struct ast_bridge_snapshot *ast_bridge_get_snapshot_by_uniqueid(const char *uniqueid)
{
struct stasis_message *message;
struct ast_bridge *bridge;
struct ast_bridge_snapshot *snapshot;
ast_assert(!ast_strlen_zero(uniqueid));
message = stasis_cache_get(ast_bridge_cache(),
ast_bridge_snapshot_type(),
uniqueid);
if (!message) {
bridge = ast_bridge_find_by_id(uniqueid);
if (!bridge) {
return NULL;
}
snapshot = ao2_bump(stasis_message_data(message));
ao2_ref(message, -1);
ast_bridge_lock(bridge);
snapshot = ao2_bump(bridge->current_snapshot);
ast_bridge_unlock(bridge);
ao2_ref(bridge, -1);
return snapshot;
}
/*! \brief snapshot ID getter for caching topic */
static const char *bridge_snapshot_get_id(struct stasis_message *msg)
struct ast_bridge_snapshot *ast_bridge_get_snapshot(struct ast_bridge *bridge)
{
struct ast_bridge_snapshot *snapshot;
if (stasis_message_type(msg) != ast_bridge_snapshot_type()) {
if (!bridge) {
return NULL;
}
snapshot = stasis_message_data(msg);
return snapshot->uniqueid;
ast_bridge_lock(bridge);
snapshot = ao2_bump(bridge->current_snapshot);
ast_bridge_unlock(bridge);
return snapshot;
}
static void stasis_bridging_cleanup(void)
@ -1290,8 +1351,10 @@ static void stasis_bridging_cleanup(void)
STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type);
ao2_cleanup(bridge_cache_all);
bridge_cache_all = NULL;
ao2_cleanup(bridge_topic_pool);
bridge_topic_pool = NULL;
ao2_cleanup(bridge_topic_all);
bridge_topic_all = NULL;
}
int ast_stasis_bridging_init(void)
@ -1300,10 +1363,12 @@ int ast_stasis_bridging_init(void)
ast_register_cleanup(stasis_bridging_cleanup);
bridge_cache_all = stasis_cp_all_create("ast_bridge_topic_all",
bridge_snapshot_get_id);
if (!bridge_cache_all) {
bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
if (!bridge_topic_all) {
return -1;
}
bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all);
if (!bridge_topic_pool) {
return -1;
}

View File

@ -65,7 +65,7 @@ static struct ast_bridge *find_bridge(
bridge = stasis_app_bridge_find_by_id(bridge_id);
if (bridge == NULL) {
RAII_VAR(struct ast_bridge_snapshot *, snapshot,
ast_bridge_snapshot_get_latest(bridge_id), ao2_cleanup);
ast_bridge_get_snapshot_by_uniqueid(bridge_id), ao2_cleanup);
if (!snapshot) {
ast_ari_response_error(response, 404, "Not found",
"Bridge not found");
@ -856,7 +856,7 @@ void ast_ari_bridges_get(struct ast_variable *headers,
struct ast_ari_bridges_get_args *args,
struct ast_ari_response *response)
{
RAII_VAR(struct ast_bridge_snapshot *, snapshot, ast_bridge_snapshot_get_latest(args->bridge_id), ao2_cleanup);
RAII_VAR(struct ast_bridge_snapshot *, snapshot, ast_bridge_get_snapshot_by_uniqueid(args->bridge_id), ao2_cleanup);
if (!snapshot) {
ast_ari_response_error(
response, 404, "Not Found",
@ -885,23 +885,13 @@ void ast_ari_bridges_list(struct ast_variable *headers,
struct ast_ari_bridges_list_args *args,
struct ast_ari_response *response)
{
RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, bridges, NULL, ao2_cleanup);
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct ao2_iterator i;
void *obj;
struct ast_bridge *bridge;
cache = ast_bridge_cache();
if (!cache) {
ast_ari_response_error(
response, 500, "Internal Server Error",
"Message bus not initialized");
return;
}
ao2_ref(cache, +1);
snapshots = stasis_cache_dump(cache, ast_bridge_snapshot_type());
if (!snapshots) {
bridges = ast_bridges();
if (!bridges) {
ast_ari_response_alloc_failed(response);
return;
}
@ -912,12 +902,14 @@ void ast_ari_bridges_list(struct ast_variable *headers,
return;
}
i = ao2_iterator_init(snapshots, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup);
struct ast_bridge_snapshot *snapshot = stasis_message_data(msg);
i = ao2_iterator_init(bridges, 0);
while ((bridge = ao2_iterator_next(&i))) {
struct ast_bridge_snapshot *snapshot = ast_bridge_get_snapshot(bridge);
/* ast_bridge_snapshot_to_json will return NULL if snapshot is NULL */
struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
ao2_ref(bridge, -1);
ao2_cleanup(snapshot);
if (!json_bridge || ast_json_array_append(json, json_bridge)) {
ao2_iterator_destroy(&i);
ast_ari_response_alloc_failed(response);

View File

@ -1974,7 +1974,7 @@ enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
have_channel = 1;
} else if (ast_begins_with(uri, "bridge:")) {
type = STASIS_UMOS_BRIDGE;
snapshot = ast_bridge_snapshot_get_latest(uri + 7);
snapshot = ast_bridge_get_snapshot_by_uniqueid(uri + 7);
} else if (ast_begins_with(uri, "endpoint:")) {
type = STASIS_UMOS_ENDPOINT;
snapshot = ast_endpoint_latest_snapshot(uri + 9, NULL);

View File

@ -172,16 +172,9 @@ static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
}
forwards->forward_type = FORWARD_BRIDGE;
if (bridge) {
forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
app->topic);
}
forwards->topic_cached_forward = stasis_forward_all(
bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
app->topic);
forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), app->topic);
if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
if (!forwards->topic_forward && bridge) {
forwards_unsubscribe(forwards);
ao2_ref(forwards, -1);
return NULL;
@ -666,33 +659,23 @@ static void sub_bridge_update_handler(void *data,
{
struct ast_json *json = NULL;
struct stasis_app *app = data;
struct stasis_cache_update *update;
struct ast_bridge_snapshot *new_snapshot;
struct ast_bridge_snapshot *old_snapshot;
struct ast_bridge_snapshot_update *update;
const struct timeval *tv;
ast_assert(stasis_message_type(message) == stasis_cache_update_type());
update = stasis_message_data(message);
ast_assert(update->type == ast_bridge_snapshot_type());
tv = stasis_message_timestamp(message);
new_snapshot = stasis_message_data(update->new_snapshot);
old_snapshot = stasis_message_data(update->old_snapshot);
tv = update->new_snapshot ?
stasis_message_timestamp(update->new_snapshot) :
stasis_message_timestamp(message);
if (!new_snapshot) {
json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
} else if (!old_snapshot) {
json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
} else if (new_snapshot && old_snapshot
&& strcmp(new_snapshot->video_source_id, old_snapshot->video_source_id)) {
json = simple_bridge_event("BridgeVideoSourceChanged", new_snapshot, tv);
if (json && !ast_strlen_zero(old_snapshot->video_source_id)) {
if (!update->new_snapshot) {
json = simple_bridge_event("BridgeDestroyed", update->old_snapshot, tv);
} else if (!update->old_snapshot) {
json = simple_bridge_event("BridgeCreated", update->new_snapshot, tv);
} else if (update->new_snapshot && update->old_snapshot
&& strcmp(update->new_snapshot->video_source_id, update->old_snapshot->video_source_id)) {
json = simple_bridge_event("BridgeVideoSourceChanged", update->new_snapshot, tv);
if (json && !ast_strlen_zero(update->old_snapshot->video_source_id)) {
ast_json_object_set(json, "old_video_source_id",
ast_json_string_create(old_snapshot->video_source_id));
ast_json_string_create(update->old_snapshot->video_source_id));
}
}
@ -701,8 +684,8 @@ static void sub_bridge_update_handler(void *data,
ast_json_unref(json);
}
if (!new_snapshot && old_snapshot) {
unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
if (!update->new_snapshot && update->old_snapshot) {
unsubscribe(app, "bridge", update->old_snapshot->uniqueid, 1);
}
}
@ -961,7 +944,7 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
return NULL;
}
res |= stasis_message_router_add_cache_update(app->router,
res |= stasis_message_router_add(app->router,
ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
res |= stasis_message_router_add(app->router,

View File

@ -351,7 +351,7 @@ static struct ast_str *__test_cel_generate_peer_str(struct ast_channel_snapshot
static struct ast_str *test_cel_generate_peer_str_snapshot(struct ast_channel_snapshot *chan, struct ast_bridge *bridge)
{
RAII_VAR(struct ast_bridge_snapshot *, snapshot,
ast_bridge_snapshot_get_latest(bridge->uniqueid),
ast_bridge_get_snapshot(bridge),
ao2_cleanup);
if (!snapshot) {