Merge "rtp: Add support for transport-cc in receiver direction." into 16

This commit is contained in:
Friendly Automation 2019-05-03 10:08:16 -05:00 committed by Gerrit Code Review
commit 7bddfdbfa6
4 changed files with 458 additions and 7 deletions

View File

@ -306,6 +306,8 @@ struct ast_rtp_payload_type {
#define AST_RTP_RTCP_FMT_FIR 4
/*! REMB Information (From draft-alvestrand-rmcat-remb-03) */
#define AST_RTP_RTCP_FMT_REMB 15
/*! Transport-wide congestion control feedback (From draft-holmer-rmcat-transport-wide-cc-extensions-01) */
#define AST_RTP_RTCP_FMT_TRANSPORT_WIDE_CC 15
/*!
* \since 12
@ -541,6 +543,8 @@ enum ast_rtp_extension {
AST_RTP_EXTENSION_UNSUPPORTED = 0,
/*! abs-send-time from https://tools.ietf.org/html/draft-alvestrand-rmcat-remb-03 */
AST_RTP_EXTENSION_ABS_SEND_TIME,
/*! transport-cc from https://tools.ietf.org/html/draft-holmer-rmcat-transport-wide-cc-extensions-01 */
AST_RTP_EXTENSION_TRANSPORT_WIDE_CC,
/*! The maximum number of known RTP extensions */
AST_RTP_EXTENSION_MAX,
};

View File

@ -235,6 +235,7 @@ struct ast_rtp_instance {
static const char * const rtp_extension_uris[AST_RTP_EXTENSION_MAX] = {
[AST_RTP_EXTENSION_UNSUPPORTED] = "",
[AST_RTP_EXTENSION_ABS_SEND_TIME] = "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
[AST_RTP_EXTENSION_TRANSPORT_WIDE_CC] = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
};
/*! List of RTP engines that are currently registered */

View File

@ -274,6 +274,7 @@ static int create_rtp(struct ast_sip_session *session, struct ast_sip_session_me
ast_rtp_instance_set_prop(session_media->rtp, AST_RTP_PROPERTY_REMB, session->endpoint->media.webrtc);
if (session->endpoint->media.webrtc) {
enable_rtp_extension(session, session_media, AST_RTP_EXTENSION_ABS_SEND_TIME, AST_RTP_EXTENSION_DIRECTION_SENDRECV, sdp);
enable_rtp_extension(session, session_media, AST_RTP_EXTENSION_TRANSPORT_WIDE_CC, AST_RTP_EXTENSION_DIRECTION_SENDRECV, sdp);
}
if (session->endpoint->media.tos_video || session->endpoint->media.cos_video) {
ast_rtp_instance_set_qos(session_media->rtp, session->endpoint->media.tos_video,
@ -1184,7 +1185,18 @@ static void add_rtcp_fb_to_stream(struct ast_sip_session *session,
pj_str_t stmp;
pjmedia_sdp_attr *attr;
if (!session->endpoint->media.webrtc || session_media->type != AST_MEDIA_TYPE_VIDEO) {
if (!session->endpoint->media.webrtc) {
return;
}
/* transport-cc is supposed to be for the entire transport, and any media sources so
* while the header does not appear in audio streams and isn't negotiated there, we still
* place this attribute in as Chrome does.
*/
attr = pjmedia_sdp_attr_create(pool, "rtcp-fb", pj_cstr(&stmp, "* transport-cc"));
pjmedia_sdp_attr_add(&media->attr_count, media->attr, attr);
if (session_media->type != AST_MEDIA_TYPE_VIDEO) {
return;
}

View File

@ -312,6 +312,32 @@ struct rtp_ssrc_mapping {
struct ast_rtp_instance *instance;
};
/*! \brief Packet statistics (used for transport-cc) */
struct rtp_transport_wide_cc_packet_statistics {
/*! The transport specific sequence number */
unsigned int seqno;
/*! The time at which the packet was received */
struct timeval received;
/*! The delta between this packet and the previous */
int delta;
};
/*! \brief Statistics information (used for transport-cc) */
struct rtp_transport_wide_cc_statistics {
/*! A vector of packet statistics */
AST_VECTOR(, struct rtp_transport_wide_cc_packet_statistics) packet_statistics; /*!< Packet statistics, used for transport-cc */
/*! The last sequence number received */
unsigned int last_seqno;
/*! The last extended sequence number */
unsigned int last_extended_seqno;
/*! How many feedback packets have gone out */
unsigned int feedback_count;
/*! How many cycles have occurred for the sequence numbers */
unsigned int cycles;
/*! Scheduler id for periodic feedback transmission */
int schedid;
};
/*! \brief RTP session description */
struct ast_rtp {
int s;
@ -387,6 +413,8 @@ struct ast_rtp {
struct ast_data_buffer *send_buffer; /*!< Buffer for storing sent packets for retransmission */
struct ast_data_buffer *recv_buffer; /*!< Buffer for storing received packets for retransmission */
struct rtp_transport_wide_cc_statistics transport_wide_cc; /*!< Transport-cc statistics information */
#ifdef HAVE_PJPROJECT
ast_cond_t cond; /*!< ICE/TURN condition for signaling */
@ -3676,6 +3704,11 @@ static int ast_rtp_new(struct ast_rtp_instance *instance,
return -1;
}
if (AST_VECTOR_INIT(&rtp->transport_wide_cc.packet_statistics, 0)) {
return -1;
}
rtp->transport_wide_cc.schedid = -1;
rtp->f.subclass.format = ao2_bump(ast_format_none);
rtp->lastrxformat = ao2_bump(ast_format_none);
rtp->lasttxformat = ao2_bump(ast_format_none);
@ -3752,6 +3785,8 @@ static int ast_rtp_destroy(struct ast_rtp_instance *instance)
ast_data_buffer_free(rtp->recv_buffer);
}
AST_VECTOR_FREE(&rtp->transport_wide_cc.packet_statistics);
ao2_cleanup(rtp->lasttxformat);
ao2_cleanup(rtp->lastrxformat);
ao2_cleanup(rtp->f.subclass.format);
@ -6308,6 +6343,377 @@ static void rtp_instance_unlock(struct ast_rtp_instance *instance)
}
}
static int rtp_transport_wide_cc_packet_statistics_cmp(struct rtp_transport_wide_cc_packet_statistics a,
struct rtp_transport_wide_cc_packet_statistics b)
{
return a.seqno - b.seqno;
}
static void rtp_transport_wide_cc_feedback_status_vector_append(unsigned char *rtcpheader, int *packet_len, int *status_vector_chunk_bits,
uint16_t *status_vector_chunk, int status)
{
/* Appending this status will use up 2 bits */
*status_vector_chunk_bits -= 2;
/* We calculate which bits we want to update the status of. Since a status vector
* is 16 bits we take away 2 (for the header), and then we take away any that have
* already been used.
*/
*status_vector_chunk |= (status << (16 - 2 - (14 - *status_vector_chunk_bits)));
/* If there are still bits available we can return early */
if (*status_vector_chunk_bits) {
return;
}
/* Otherwise we have to place this chunk into the packet */
put_unaligned_uint16(rtcpheader + *packet_len, htons(*status_vector_chunk));
*status_vector_chunk_bits = 14;
/* The first bit being 1 indicates that this is a status vector chunk and the second
* bit being 1 indicates that we are using 2 bits to represent each status for a
* packet.
*/
*status_vector_chunk = (1 << 15) | (1 << 14);
*packet_len += 2;
}
static void rtp_transport_wide_cc_feedback_status_append(unsigned char *rtcpheader, int *packet_len, int *status_vector_chunk_bits,
uint16_t *status_vector_chunk, int *run_length_chunk_count, int *run_length_chunk_status, int status)
{
if (*run_length_chunk_status != status) {
while (*run_length_chunk_count > 0 && *run_length_chunk_count < 8) {
/* Realistically it only makes sense to use a run length chunk if there were 8 or more
* consecutive packets of the same type, otherwise we could end up making the packet larger
* if we have lots of small blocks of the same type. To help with this we backfill the status
* vector (since it always represents 7 packets). Best case we end up with only that single
* status vector and the rest are run length chunks.
*/
rtp_transport_wide_cc_feedback_status_vector_append(rtcpheader, packet_len, status_vector_chunk_bits,
status_vector_chunk, *run_length_chunk_status);
*run_length_chunk_count -= 1;
}
if (*run_length_chunk_count) {
/* There is a run length chunk which needs to be written out */
put_unaligned_uint16(rtcpheader + *packet_len, htons((0 << 15) | (*run_length_chunk_status << 13) | *run_length_chunk_count));
*packet_len += 2;
}
/* In all cases the run length chunk has to be reset */
*run_length_chunk_count = 0;
*run_length_chunk_status = -1;
if (*status_vector_chunk_bits == 14) {
/* We aren't in the middle of a status vector so we can try for a run length chunk */
*run_length_chunk_status = status;
*run_length_chunk_count = 1;
} else {
/* We're doing a status vector so populate it accordingly */
rtp_transport_wide_cc_feedback_status_vector_append(rtcpheader, packet_len, status_vector_chunk_bits,
status_vector_chunk, status);
}
} else {
/* This is easy, the run length chunk count can just get bumped up */
*run_length_chunk_count += 1;
}
}
static int rtp_transport_wide_cc_feedback_produce(const void *data)
{
struct ast_rtp_instance *instance = (struct ast_rtp_instance *) data;
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
unsigned char *rtcpheader;
char bdata[1024];
struct rtp_transport_wide_cc_packet_statistics *first_packet;
struct rtp_transport_wide_cc_packet_statistics *previous_packet;
int i;
int status_vector_chunk_bits = 14;
uint16_t status_vector_chunk = (1 << 15) | (1 << 14);
int run_length_chunk_count = 0;
int run_length_chunk_status = -1;
int packet_len = 20;
int delta_len = 0;
int packet_count = 0;
unsigned int received_msw;
unsigned int received_lsw;
struct ast_sockaddr remote_address = { { 0, } };
int res;
int ice;
unsigned int large_delta_count = 0;
unsigned int small_delta_count = 0;
unsigned int lost_count = 0;
if (!rtp || !rtp->rtcp || rtp->transport_wide_cc.schedid == -1) {
ao2_ref(instance, -1);
return 0;
}
ao2_lock(instance);
rtcpheader = (unsigned char *)bdata;
/* The first packet in the vector acts as our base sequence number and reference time */
first_packet = AST_VECTOR_GET_ADDR(&rtp->transport_wide_cc.packet_statistics, 0);
previous_packet = first_packet;
/* We go through each packet that we have statistics for, adding it either to a status
* vector chunk or a run length chunk. The code tries to be as efficient as possible to
* reduce packet size and will favor run length chunks when it makes sense.
*/
for (i = 0; i < AST_VECTOR_SIZE(&rtp->transport_wide_cc.packet_statistics); ++i) {
struct rtp_transport_wide_cc_packet_statistics *statistics;
int lost = 0;
int res = 0;
statistics = AST_VECTOR_GET_ADDR(&rtp->transport_wide_cc.packet_statistics, i);
packet_count++;
if (first_packet != statistics) {
/* The vector stores statistics in a sorted fashion based on the sequence
* number. This ensures we can detect any packets that have been lost/not
* received by comparing the sequence numbers.
*/
lost = statistics->seqno - (previous_packet->seqno + 1);
lost_count += lost;
}
while (lost) {
/* We append a not received status until all the lost packets have been accounted for */
rtp_transport_wide_cc_feedback_status_append(rtcpheader, &packet_len, &status_vector_chunk_bits,
&status_vector_chunk, &run_length_chunk_count, &run_length_chunk_status, 0);
packet_count++;
/* If there is no more room left for storing packets stop now, we leave 20
* extra bits at the end just in case.
*/
if ((sizeof(bdata) - (packet_len + delta_len + 20)) < 0) {
res = -1;
break;
}
lost--;
}
/* If the lost packet appending bailed out because we have no more space, then exit here too */
if (res) {
break;
}
/* Per the spec the delta is in increments of 250 */
statistics->delta = ast_tvdiff_us(statistics->received, previous_packet->received) / 250;
/* Based on the delta determine the status of this packet */
if (statistics->delta < 0 || statistics->delta > 127) {
/* Large or negative delta */
rtp_transport_wide_cc_feedback_status_append(rtcpheader, &packet_len, &status_vector_chunk_bits,
&status_vector_chunk, &run_length_chunk_count, &run_length_chunk_status, 2);
delta_len += 2;
large_delta_count++;
} else {
/* Small delta */
rtp_transport_wide_cc_feedback_status_append(rtcpheader, &packet_len, &status_vector_chunk_bits,
&status_vector_chunk, &run_length_chunk_count, &run_length_chunk_status, 1);
delta_len += 1;
small_delta_count++;
}
previous_packet = statistics;
/* If there is no more room left in the packet stop handling of any subsequent packets */
if ((sizeof(bdata) - (packet_len + delta_len + 20)) < 0) {
break;
}
}
if (status_vector_chunk_bits != 14) {
/* If the status vector chunk has packets in it then place it in the RTCP packet */
put_unaligned_uint16(rtcpheader + packet_len, htons(status_vector_chunk));
packet_len += 2;
} else if (run_length_chunk_count) {
/* If there is a run length chunk in progress then place it in the RTCP packet */
put_unaligned_uint16(rtcpheader + packet_len, htons((0 << 15) | (run_length_chunk_status << 13) | run_length_chunk_count));
packet_len += 2;
}
/* We iterate again to build delta chunks */
for (i = 0; i < AST_VECTOR_SIZE(&rtp->transport_wide_cc.packet_statistics); ++i) {
struct rtp_transport_wide_cc_packet_statistics *statistics;
statistics = AST_VECTOR_GET_ADDR(&rtp->transport_wide_cc.packet_statistics, i);
if (statistics->delta < 0 || statistics->delta > 127) {
/* We need 2 bytes to store this delta */
put_unaligned_uint16(rtcpheader + packet_len, htons(statistics->delta));
packet_len += 2;
} else {
/* We can store this delta in 1 byte */
rtcpheader[packet_len] = statistics->delta;
packet_len += 1;
}
/* If this is the last packet handled by the run length chunk or status vector chunk code
* then we can go no further.
*/
if (statistics == previous_packet) {
break;
}
}
/* Zero pad the end of the packet */
while (packet_len % 4) {
rtcpheader[packet_len++] = 0;
}
/* Add the general RTCP header information */
put_unaligned_uint32(rtcpheader, htonl((2 << 30) | (AST_RTP_RTCP_FMT_TRANSPORT_WIDE_CC << 24)
| (AST_RTP_RTCP_RTPFB << 16) | ((packet_len / 4) - 1)));
put_unaligned_uint32(rtcpheader + 4, htonl(rtp->ssrc));
put_unaligned_uint32(rtcpheader + 8, htonl(rtp->themssrc));
/* Add the transport-cc specific header information */
put_unaligned_uint32(rtcpheader + 12, htonl((first_packet->seqno << 16) | packet_count));
timeval2ntp(first_packet->received, &received_msw, &received_lsw);
put_unaligned_time24(rtcpheader + 16, received_msw, received_lsw);
rtcpheader[19] = rtp->transport_wide_cc.feedback_count;
/* The packet is now fully constructed so send it out */
ast_sockaddr_copy(&remote_address, &rtp->rtcp->them);
ast_debug(2, "Sending transport-cc feedback packet of size '%d' on '%s' with packet count of %d (small = %d, large = %d, lost = %d)\n",
packet_len, ast_rtp_instance_get_channel_id(instance), packet_count, small_delta_count, large_delta_count, lost_count);
res = rtcp_sendto(instance, (unsigned int *)rtcpheader, packet_len, 0, &remote_address, &ice);
if (res < 0) {
ast_log(LOG_ERROR, "RTCP transport-cc feedback error to %s due to %s\n",
ast_sockaddr_stringify(&remote_address), strerror(errno));
}
AST_VECTOR_RESET(&rtp->transport_wide_cc.packet_statistics, AST_VECTOR_ELEM_CLEANUP_NOOP);
rtp->transport_wide_cc.feedback_count++;
ao2_unlock(instance);
return 1000;
}
static void rtp_instance_parse_transport_wide_cc(struct ast_rtp_instance *instance, struct ast_rtp *rtp,
unsigned char *data, int len)
{
uint16_t *seqno = (uint16_t *)data;
struct rtp_transport_wide_cc_packet_statistics statistics;
struct ast_rtp_instance *transport = rtp->bundled ? rtp->bundled : instance;
struct ast_rtp *transport_rtp = ast_rtp_instance_get_data(transport);
/* If the sequence number has cycled over then record it as such */
if (((int)transport_rtp->transport_wide_cc.last_seqno - (int)ntohs(*seqno)) > 100) {
transport_rtp->transport_wide_cc.cycles += RTP_SEQ_MOD;
}
/* Populate the statistics information for this packet */
statistics.seqno = transport_rtp->transport_wide_cc.cycles + ntohs(*seqno);
statistics.received = ast_tvnow();
/* We allow at a maximum 1000 packet statistics in play at a time, if we hit the
* limit we give up and start fresh.
*/
if (AST_VECTOR_SIZE(&transport_rtp->transport_wide_cc.packet_statistics) > 1000) {
AST_VECTOR_RESET(&rtp->transport_wide_cc.packet_statistics, AST_VECTOR_ELEM_CLEANUP_NOOP);
}
if (!AST_VECTOR_SIZE(&transport_rtp->transport_wide_cc.packet_statistics) ||
statistics.seqno > transport_rtp->transport_wide_cc.last_extended_seqno) {
/* This is the expected path */
if (AST_VECTOR_APPEND(&transport_rtp->transport_wide_cc.packet_statistics, statistics)) {
return;
}
transport_rtp->transport_wide_cc.last_extended_seqno = statistics.seqno;
transport_rtp->transport_wide_cc.last_seqno = ntohs(*seqno);
} else {
/* This packet was out of order, so reorder it within the vector accordingly */
if (AST_VECTOR_ADD_SORTED(&transport_rtp->transport_wide_cc.packet_statistics, statistics,
rtp_transport_wide_cc_packet_statistics_cmp)) {
return;
}
}
/* If we have not yet scheduled the periodic sending of feedback for this transport then do so */
if (transport_rtp->transport_wide_cc.schedid < 0 && transport_rtp->rtcp) {
ast_debug(1, "Starting RTCP transport-cc feedback transmission on RTP instance '%p'\n", transport);
ao2_ref(transport, +1);
ast_log(LOG_NOTICE, "Starting feedback\n");
transport_rtp->transport_wide_cc.schedid = ast_sched_add(rtp->sched, 1000,
rtp_transport_wide_cc_feedback_produce, transport);
if (transport_rtp->transport_wide_cc.schedid < 0) {
ao2_ref(transport, -1);
ast_log(LOG_WARNING, "Scheduling RTCP transport-cc feedback transmission failed on RTP instance '%p'\n",
transport);
}
}
}
static void rtp_instance_parse_extmap_extensions(struct ast_rtp_instance *instance, struct ast_rtp *rtp,
unsigned char *extension, int len)
{
int transport_wide_cc_id = ast_rtp_instance_extmap_get_id(instance, AST_RTP_EXTENSION_TRANSPORT_WIDE_CC);
int pos = 0;
/* We currently only care about the transport-cc extension, so if that's not negotiated then do nothing */
if (transport_wide_cc_id == -1) {
return;
}
/* Only while we do not exceed available extension data do we continue */
while (pos < len) {
int id = extension[pos] >> 4;
int extension_len = (extension[pos] & 0xF) + 1;
/* We've handled the first byte as it contains the extension id and length, so always
* skip ahead now
*/
pos += 1;
if (id == 0) {
/* From the RFC:
* In both forms, padding bytes have the value of 0 (zero). They may be
* placed between extension elements, if desired for alignment, or after
* the last extension element, if needed for padding. A padding byte
* does not supply the ID of an element, nor the length field. When a
* padding byte is found, it is ignored and the parser moves on to
* interpreting the next byte.
*/
continue;
} else if (id == 15) {
/* From the RFC:
* The local identifier value 15 is reserved for future extension and
* MUST NOT be used as an identifier. If the ID value 15 is
* encountered, its length field should be ignored, processing of the
* entire extension should terminate at that point, and only the
* extension elements present prior to the element with ID 15
* considered.
*/
break;
} else if ((pos + extension_len) > len) {
/* The extension is corrupted and is stating that it contains more data than is
* available in the extensions data.
*/
break;
}
/* If this is transport-cc then we need to parse it further */
if (id == transport_wide_cc_id) {
rtp_instance_parse_transport_wide_cc(instance, rtp, extension + pos, extension_len);
}
/* Skip ahead to the next extension */
pos += extension_len;
}
}
static struct ast_frame *ast_rtp_interpret(struct ast_rtp_instance *instance, struct ast_srtp *srtp,
const struct ast_sockaddr *remote_address, unsigned char *read_area, int length, int prev_seqno)
{
@ -6353,18 +6759,24 @@ static struct ast_frame *ast_rtp_interpret(struct ast_rtp_instance *instance, st
/* Look for any RTP extensions, currently we do not support any */
if (ext) {
hdrlen += (ntohl(rtpheader[hdrlen/4]) & 0xffff) << 2;
hdrlen += 4;
if (DEBUG_ATLEAST(1)) {
unsigned int profile;
profile = (ntohl(rtpheader[3]) & 0xffff0000) >> 16;
int extensions_size = (ntohl(rtpheader[hdrlen/4]) & 0xffff) << 2;
unsigned int profile;
profile = (ntohl(rtpheader[3]) & 0xffff0000) >> 16;
if (profile == 0xbede) {
/* We skip over the first 4 bytes as they are just for the one byte extension header */
rtp_instance_parse_extmap_extensions(instance, rtp, read_area + hdrlen + 4, extensions_size);
} else if (DEBUG_ATLEAST(1)) {
if (profile == 0x505a) {
ast_log(LOG_DEBUG, "Found Zfone extension in RTP stream - zrtp - not supported.\n");
} else if (profile != 0xbede) {
} else {
/* SDP negotiated RTP extensions can not currently be output in logging */
ast_log(LOG_DEBUG, "Found unknown RTP Extensions %x\n", profile);
}
}
hdrlen += extensions_size;
hdrlen += 4;
}
/* Make sure after we potentially mucked with the header length that it is once again valid */
@ -7316,6 +7728,18 @@ static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_pro
ao2_lock(instance);
rtp->rtcp->schedid = -1;
}
if (rtp->transport_wide_cc.schedid > -1) {
ao2_unlock(instance);
if (!ast_sched_del(rtp->sched, rtp->transport_wide_cc.schedid)) {
ao2_ref(instance, -1);
} else {
ast_debug(1, "Failed to tear down RTCP transport-cc feedback on RTP instance '%p'\n", instance);
ao2_lock(instance);
return;
}
ao2_lock(instance);
rtp->transport_wide_cc.schedid = -1;
}
if (rtp->rtcp->s > -1 && rtp->rtcp->s != rtp->s) {
close(rtp->rtcp->s);
}
@ -7623,6 +8047,15 @@ static void ast_rtp_stop(struct ast_rtp_instance *instance)
rtp->rtcp->schedid = -1;
}
if (rtp->transport_wide_cc.schedid > -1) {
ao2_unlock(instance);
if (!ast_sched_del(rtp->sched, rtp->transport_wide_cc.schedid)) {
ao2_ref(instance, -1);
}
ao2_lock(instance);
rtp->transport_wide_cc.schedid = -1;
}
if (rtp->red) {
ao2_unlock(instance);
AST_SCHED_DEL(rtp->sched, rtp->red->schedid);
@ -7763,6 +8196,7 @@ static int ast_rtp_extension_enable(struct ast_rtp_instance *instance, enum ast_
{
switch (extension) {
case AST_RTP_EXTENSION_ABS_SEND_TIME:
case AST_RTP_EXTENSION_TRANSPORT_WIDE_CC:
return 1;
default:
return 0;