core_unreal / core_local: Add multistream and re-negotiation.

When requesting a Local channel the requested stream topology
or a converted stream topology will now be placed onto the
resulting channels.

Frames written in on streams will now also preserve the stream
identifier as they are queued on the opposite channel.

Finally when a stream topology change is requested it is
immediately accepted and reflected on both channels. Each
channel also receives a queued frame to indicate that the
topology has changed.

ASTERISK-28938

Change-Id: I4e9d94da5230d4bd046dc755651493fce1d87186
This commit is contained in:
Joshua C. Colp 2020-06-03 13:47:42 -03:00 committed by Friendly Automation
parent 49b204ed8a
commit 563f2f94d6
3 changed files with 251 additions and 7 deletions

View File

@ -40,6 +40,7 @@ extern "C" {
/* Forward declare some struct names */
struct ast_format_cap;
struct ast_stream_topology;
/* ------------------------------------------------------------------- */
@ -96,6 +97,7 @@ struct ast_unreal_pvt {
unsigned int flags; /*!< Private option flags */
/*! Base name of the unreal channels. exten@context or other name. */
char name[AST_MAX_EXTENSION + AST_MAX_CONTEXT + 2];
struct ast_stream_topology *reqtopology; /*!< Requested stream topology */
};
#define AST_UNREAL_IS_OUTBOUND(a, b) ((a) == (b)->chan ? 1 : 0)
@ -146,6 +148,9 @@ struct ast_frame *ast_unreal_read(struct ast_channel *ast);
/*! Unreal channel framework struct ast_channel_tech.write callback */
int ast_unreal_write(struct ast_channel *ast, struct ast_frame *f);
/*! Unreal channel framework struct ast_channel_tech.write_stream callback */
int ast_unreal_write_stream(struct ast_channel *ast, int stream_num, struct ast_frame *f);
/*! Unreal channel framework struct ast_channel_tech.indicate callback */
int ast_unreal_indicate(struct ast_channel *ast, int condition, const void *data, size_t datalen);
@ -187,6 +192,20 @@ void ast_unreal_destructor(void *vdoomed);
*/
struct ast_unreal_pvt *ast_unreal_alloc(size_t size, ao2_destructor_fn destructor, struct ast_format_cap *cap);
/*!
* \brief Allocate the base unreal struct for a derivative.
* \since 16.12.0
* \since 17.6.0
*
* \param size Size of the unreal struct to allocate.
* \param destructor Destructor callback.
* \param cap Format capabilities to give the unreal private struct.
*
* \retval pvt on success.
* \retval NULL on error.
*/
struct ast_unreal_pvt *ast_unreal_alloc_stream_topology(size_t size, ao2_destructor_fn destructor, struct ast_stream_topology *topology);
/*!
* \brief Create the semi1 and semi2 unreal channels.
* \since 12.0.0

View File

@ -48,6 +48,8 @@
#include "asterisk/stasis_channels.h"
#include "asterisk/_private.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stream.h"
#include "asterisk/translate.h"
/*** DOCUMENTATION
<manager name="LocalOptimizeAway" language="en_US">
@ -136,6 +138,7 @@ static const char tdesc[] = "Local Proxy Channel Driver";
static struct ao2_container *locals;
static struct ast_channel *local_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause);
static struct ast_channel *local_request_with_stream_topology(const char *type, struct ast_stream_topology *topology, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause);
static int local_call(struct ast_channel *ast, const char *dest, int timeout);
static int local_hangup(struct ast_channel *ast);
static int local_devicestate(const char *data);
@ -171,14 +174,15 @@ static struct ast_channel_tech local_tech = {
.type = "Local",
.description = tdesc,
.requester = local_request,
.requester_with_stream_topology = local_request_with_stream_topology,
.send_digit_begin = ast_unreal_digit_begin,
.send_digit_end = ast_unreal_digit_end,
.call = local_call,
.hangup = local_hangup,
.answer = ast_unreal_answer,
.read = ast_unreal_read,
.read_stream = ast_unreal_read,
.write = ast_unreal_write,
.write_video = ast_unreal_write,
.write_stream = ast_unreal_write_stream,
.exception = ast_unreal_read,
.indicate = ast_unreal_indicate,
.fixup = ast_unreal_fixup,
@ -859,14 +863,14 @@ static void local_pvt_destructor(void *vdoomed)
}
/*! \brief Create a call structure */
static struct local_pvt *local_alloc(const char *data, struct ast_format_cap *cap)
static struct local_pvt *local_alloc(const char *data, struct ast_stream_topology *topology)
{
struct local_pvt *pvt;
char *parse;
char *context;
char *opts;
pvt = (struct local_pvt *) ast_unreal_alloc(sizeof(*pvt), local_pvt_destructor, cap);
pvt = (struct local_pvt *) ast_unreal_alloc_stream_topology(sizeof(*pvt), local_pvt_destructor, topology);
if (!pvt) {
return NULL;
}
@ -917,12 +921,95 @@ static struct local_pvt *local_alloc(const char *data, struct ast_format_cap *ca
/*! \brief Part of PBX interface */
static struct ast_channel *local_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
{
struct ast_stream_topology *topology;
struct ast_channel *chan;
topology = ast_stream_topology_create_from_format_cap(cap);
if (!topology) {
return NULL;
}
chan = local_request_with_stream_topology(type, topology, assignedids, requestor, data, cause);
ast_stream_topology_free(topology);
return chan;
}
/*! \brief Part of PBX interface */
static struct ast_channel *local_request_with_stream_topology(const char *type, struct ast_stream_topology *topology, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
{
struct ast_stream_topology *audio_filtered_topology;
int i;
struct local_pvt *p;
struct ast_channel *chan;
ast_callid callid;
/* Create a copy of the requested topology as we don't have ownership over
* the one that is passed in.
*/
audio_filtered_topology = ast_stream_topology_clone(topology);
if (!audio_filtered_topology) {
return NULL;
}
/* Some users of Local channels request every known format in the
* universe. The core itself automatically pruned this list down to a single
* "best" format for audio in non-multistream. We replicate the logic here to
* do the same thing.
*/
for (i = 0; i < ast_stream_topology_get_count(audio_filtered_topology); ++i) {
struct ast_stream *stream;
int res;
struct ast_format *tmp_fmt = NULL;
struct ast_format *best_audio_fmt = NULL;
struct ast_format_cap *caps;
stream = ast_stream_topology_get_stream(audio_filtered_topology, i);
if (ast_stream_get_type(stream) != AST_MEDIA_TYPE_AUDIO) {
continue;
}
/* Respect the immutable state of formats on the stream and create a new
* format capabilities to replace the existing one.
*/
caps = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
if (!caps) {
ao2_ref(audio_filtered_topology, -1);
return NULL;
}
/* The ast_translator_best_choice function treats both caps as const
* but does not declare it in the API.
*/
res = ast_translator_best_choice((struct ast_format_cap *)ast_stream_get_formats(stream), local_tech.capabilities,
&tmp_fmt, &best_audio_fmt);
if (res < 0) {
struct ast_str *tech_codecs = ast_str_alloca(AST_FORMAT_CAP_NAMES_LEN);
struct ast_str *request_codecs = ast_str_alloca(AST_FORMAT_CAP_NAMES_LEN);
ast_log(LOG_WARNING, "No translator path exists for channel type %s (native %s) to %s\n", type,
ast_format_cap_get_names(local_tech.capabilities, &tech_codecs),
ast_format_cap_get_names(ast_stream_get_formats(stream), &request_codecs));
/* If there are no formats then we abort */
ao2_ref(caps, -1);
ao2_ref(audio_filtered_topology, -1);
return NULL;
}
ast_format_cap_append(caps, best_audio_fmt, 0);
ast_stream_set_formats(stream, caps);
ao2_ref(caps, -1);
ao2_ref(tmp_fmt, -1);
ao2_ref(best_audio_fmt, -1);
}
/* Allocate a new private structure and then Asterisk channels */
p = local_alloc(data, cap);
p = local_alloc(data, audio_filtered_topology);
ao2_ref(audio_filtered_topology, -1);
if (!p) {
return NULL;
}
@ -937,6 +1024,7 @@ static struct ast_channel *local_request(const char *type, struct ast_format_cap
return chan;
}
/*! \brief CLI command "local show channels" */
static char *locals_show(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{

View File

@ -40,6 +40,7 @@
#include "asterisk/astobj2.h"
#include "asterisk/bridge.h"
#include "asterisk/core_unreal.h"
#include "asterisk/stream.h"
static unsigned int name_sequence = 0;
@ -315,6 +316,11 @@ struct ast_frame *ast_unreal_read(struct ast_channel *ast)
}
int ast_unreal_write(struct ast_channel *ast, struct ast_frame *f)
{
return ast_unreal_write_stream(ast, -1, f);
}
int ast_unreal_write_stream(struct ast_channel *ast, int stream_num, struct ast_frame *f)
{
struct ast_unreal_pvt *p = ast_channel_tech_pvt(ast);
int res = -1;
@ -337,6 +343,9 @@ int ast_unreal_write(struct ast_channel *ast, struct ast_frame *f)
}
}
/* Update the frame to reflect the stream */
f->stream_num = stream_num;
/* Just queue for delivery to the other side */
ao2_ref(p, 1);
ao2_lock(p);
@ -530,6 +539,86 @@ static int unreal_colp_redirect_indicate(struct ast_unreal_pvt *p, struct ast_ch
return res;
}
/*!
* \internal
* \brief Handle stream topology change request.
* \since 16.12.0
* \since 17.6.0
*
* \param p Unreal private structure.
* \param ast Channel indicating the condition.
* \param topology The requested topology.
*
* \retval 0 on success.
* \retval -1 on error.
*/
static int unreal_colp_stream_topology_request_change(struct ast_unreal_pvt *p, struct ast_channel *ast, const struct ast_stream_topology *topology)
{
struct ast_stream_topology *this_channel_topology;
struct ast_stream_topology *the_other_channel_topology;
int i;
struct ast_stream *stream;
struct ast_channel *my_chan;
struct ast_channel *my_owner;
struct ast_channel *this_channel;
struct ast_channel *the_other_channel;
int res = 0;
this_channel_topology = ast_stream_topology_clone(topology);
if (!this_channel_topology) {
return -1;
}
the_other_channel_topology = ast_stream_topology_clone(topology);
if (!the_other_channel_topology) {
ast_stream_topology_free(this_channel_topology);
return -1;
}
/* We swap the stream state on the other channel because it is as if the channel is
* connected to an external endpoint, so the perspective changes.
*/
for (i = 0; i < ast_stream_topology_get_count(the_other_channel_topology); ++i) {
stream = ast_stream_topology_get_stream(the_other_channel_topology, i);
if (ast_stream_get_state(stream) == AST_STREAM_STATE_RECVONLY) {
ast_stream_set_state(stream, AST_STREAM_STATE_SENDONLY);
} else if (ast_stream_get_state(stream) == AST_STREAM_STATE_SENDONLY) {
ast_stream_set_state(stream, AST_STREAM_STATE_RECVONLY);
}
}
ast_channel_unlock(ast);
ast_unreal_lock_all(p, &my_chan, &my_owner);
if (AST_UNREAL_IS_OUTBOUND(ast, p)) {
this_channel = p->chan;
the_other_channel = p->owner;
} else {
this_channel = p->owner;
the_other_channel = p->chan;
}
if (this_channel) {
ast_channel_set_stream_topology(this_channel, this_channel_topology);
ast_queue_control(this_channel, AST_CONTROL_STREAM_TOPOLOGY_CHANGED);
}
if (the_other_channel) {
ast_channel_set_stream_topology(the_other_channel, the_other_channel_topology);
ast_channel_stream_topology_changed_externally(the_other_channel);
}
if (my_chan) {
ast_channel_unlock(my_chan);
ast_channel_unref(my_chan);
}
if (my_owner) {
ast_channel_unlock(my_owner);
ast_channel_unref(my_owner);
}
ao2_unlock(p);
ast_channel_lock(ast);
return res;
}
int ast_unreal_indicate(struct ast_channel *ast, int condition, const void *data, size_t datalen)
{
struct ast_unreal_pvt *p = ast_channel_tech_pvt(ast);
@ -583,6 +672,11 @@ int ast_unreal_indicate(struct ast_channel *ast, int condition, const void *data
unreal_queue_indicate(p, ast, condition, data, datalen);
res = -1;
break;
case AST_CONTROL_STREAM_TOPOLOGY_REQUEST_CHANGE:
if (ast_channel_is_multistream(ast)) {
res = unreal_colp_stream_topology_request_change(p, ast, data);
}
break;
default:
res = unreal_queue_indicate(p, ast, condition, data, datalen);
break;
@ -916,9 +1010,28 @@ void ast_unreal_destructor(void *vdoomed)
ao2_cleanup(doomed->reqcap);
doomed->reqcap = NULL;
ast_stream_topology_free(doomed->reqtopology);
doomed->reqtopology = NULL;
}
struct ast_unreal_pvt *ast_unreal_alloc(size_t size, ao2_destructor_fn destructor, struct ast_format_cap *cap)
{
struct ast_stream_topology *topology;
struct ast_unreal_pvt *unreal;
topology = ast_stream_topology_create_from_format_cap(cap);
if (!topology) {
return NULL;
}
unreal = ast_unreal_alloc_stream_topology(size, destructor, topology);
ast_stream_topology_free(topology);
return unreal;
}
struct ast_unreal_pvt *ast_unreal_alloc_stream_topology(size_t size, ao2_destructor_fn destructor, struct ast_stream_topology *topology)
{
struct ast_unreal_pvt *unreal;
@ -935,12 +1048,17 @@ struct ast_unreal_pvt *ast_unreal_alloc(size_t size, ao2_destructor_fn destructo
return NULL;
}
unreal->reqcap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
unreal->reqtopology = ast_stream_topology_clone(topology);
if (!unreal->reqtopology) {
ao2_ref(unreal, -1);
return NULL;
}
unreal->reqcap = ast_format_cap_from_stream_topology(topology);
if (!unreal->reqcap) {
ao2_ref(unreal, -1);
return NULL;
}
ast_format_cap_append_from_cap(unreal->reqcap, cap, AST_MEDIA_TYPE_UNKNOWN);
memcpy(&unreal->jb_conf, &jb_conf, sizeof(unreal->jb_conf));
@ -958,6 +1076,7 @@ struct ast_channel *ast_unreal_new_channels(struct ast_unreal_pvt *p,
struct ast_assigned_ids id1 = {NULL, NULL};
struct ast_assigned_ids id2 = {NULL, NULL};
int generated_seqno = ast_atomic_fetchadd_int((int *) &name_sequence, +1);
struct ast_stream_topology *topology;
/* set unique ids for the two channels */
if (assignedids && !ast_strlen_zero(assignedids->uniqueid)) {
@ -975,6 +1094,14 @@ struct ast_channel *ast_unreal_new_channels(struct ast_unreal_pvt *p,
id2.uniqueid = uniqueid2;
}
/* We need to create a topology to place on the first channel, as we can't
* share a single one between both.
*/
topology = ast_stream_topology_clone(p->reqtopology);
if (!topology) {
return NULL;
}
/*
* Allocate two new Asterisk channels
*
@ -987,6 +1114,7 @@ struct ast_channel *ast_unreal_new_channels(struct ast_unreal_pvt *p,
"%s/%s-%08x;1", tech->type, p->name, (unsigned)generated_seqno);
if (!owner) {
ast_log(LOG_WARNING, "Unable to allocate owner channel structure\n");
ast_stream_topology_free(topology);
return NULL;
}
@ -1000,6 +1128,10 @@ struct ast_channel *ast_unreal_new_channels(struct ast_unreal_pvt *p,
ast_channel_nativeformats_set(owner, p->reqcap);
if (ast_channel_is_multistream(owner)) {
ast_channel_set_stream_topology(owner, topology);
}
/* Determine our read/write format and set it on each channel */
fmt = ast_format_cap_get_format(p->reqcap, 0);
if (!fmt) {
@ -1054,6 +1186,11 @@ struct ast_channel *ast_unreal_new_channels(struct ast_unreal_pvt *p,
ast_channel_nativeformats_set(chan, p->reqcap);
if (ast_channel_is_multistream(chan)) {
ast_channel_set_stream_topology(chan, p->reqtopology);
p->reqtopology = NULL;
}
/* Format was already determined when setting up owner */
ast_channel_set_writeformat(chan, fmt);
ast_channel_set_rawwriteformat(chan, fmt);