asterisk/res/res_aeap/transport_websocket.c

255 lines
6.5 KiB
C

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2021, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include "asterisk/http_websocket.h"
#include "asterisk/utils.h"
#include "logger.h"
#include "transport.h"
#include "transport_websocket.h"
#define log_error(obj, fmt, ...) aeap_error(obj, "websocket", fmt, ##__VA_ARGS__)
struct aeap_transport_websocket {
/*! Derive from base transport (must be first attribute) */
struct aeap_transport base;
/*! The underlying websocket */
struct ast_websocket *ws;
};
static int websocket_connect(struct aeap_transport *self, const char *url,
const char *protocol, int timeout)
{
struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
enum ast_websocket_result ws_result;
struct ast_websocket_client_options ws_options = {
.uri = url,
.protocols = protocol,
.timeout = timeout,
.tls_cfg = NULL,
};
transport->ws = ast_websocket_client_create_with_options(&ws_options, &ws_result);
if (ws_result != WS_OK) {
log_error(self, "connect failure (%d)", (int)ws_result);
return -1;
}
return 0;
}
static int websocket_disconnect(struct aeap_transport *self)
{
struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
if (transport->ws) {
ast_websocket_unref(transport->ws);
transport->ws = NULL;
}
return 0;
}
static void websocket_destroy(struct aeap_transport *self)
{
/*
* Disconnect takes care of cleaning up the websocket. Note, disconnect
* was called by the base/dispatch interface prior to calling this
* function so nothing to do here.
*/
}
static intmax_t websocket_read(struct aeap_transport *self, void *buf, intmax_t size,
enum AST_AEAP_DATA_TYPE *rtype)
{
struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
char *payload;
uint64_t bytes_read = 0;
uint64_t total_bytes_read = 0;
enum ast_websocket_opcode opcode;
int fragmented = 0;
*rtype = AST_AEAP_DATA_TYPE_NONE;
if (ast_websocket_fd(transport->ws) < 0) {
log_error(self, "unavailable for reading");
/* Ensure this transport is in a disconnected state */
aeap_transport_disconnect(self);
return -1;
}
/*
* This function is called with the read_lock locked. However, the lock needs to be
* unlocked while waiting for input otherwise a deadlock can occur during disconnect
* (disconnect attempts to grab the lock but can't because read holds it here). So
* unlock it prior to waiting.
*/
ast_mutex_unlock(&transport->base.read_lock);
while (ast_websocket_wait_for_input(transport->ws, -1) <= 0) {
/* If this was poll getting interrupted just go back to waiting */
if (errno == EINTR || errno == EAGAIN) {
continue;
}
ast_mutex_lock(&transport->base.read_lock);
log_error(self, "poll failure: %s", strerror(errno));
/* Ensure this transport is in a disconnected state */
aeap_transport_disconnect(self);
return -1;
}
ast_mutex_lock(&transport->base.read_lock);
if (!transport->ws) {
/*
* It's possible the transport was told to disconnect while waiting for input.
* If so then the websocket will be NULL, so we don't want to continue.
*/
return 0;
}
do {
if (ast_websocket_read(transport->ws, &payload, &bytes_read, &opcode,
&fragmented) != 0) {
log_error(self, "read failure (%d): %s", opcode, strerror(errno));
return -1;
}
if (!bytes_read) {
continue;
}
if (total_bytes_read + bytes_read > size) {
log_error(self, "attempted to read too many bytes into (%jd) sized buffer", size);
return -1;
}
memcpy(buf + total_bytes_read, payload, bytes_read);
total_bytes_read += bytes_read;
} while (opcode == AST_WEBSOCKET_OPCODE_CONTINUATION);
switch (opcode) {
case AST_WEBSOCKET_OPCODE_CLOSE:
log_error(self, "closed");
return -1;
case AST_WEBSOCKET_OPCODE_BINARY:
*rtype = AST_AEAP_DATA_TYPE_BINARY;
break;
case AST_WEBSOCKET_OPCODE_TEXT:
*rtype = AST_AEAP_DATA_TYPE_STRING;
/* Append terminator, but check for overflow first */
if (total_bytes_read == size) {
log_error(self, "unable to write string terminator");
return -1;
}
*((char *)(buf + total_bytes_read)) = '\0';
break;
default:
/* Ignore all other message types */
return 0;
}
return total_bytes_read;
}
static intmax_t websocket_write(struct aeap_transport *self, const void *buf, intmax_t size,
enum AST_AEAP_DATA_TYPE wtype)
{
struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
intmax_t res = 0;
switch (wtype) {
case AST_AEAP_DATA_TYPE_BINARY:
res = ast_websocket_write(transport->ws, AST_WEBSOCKET_OPCODE_BINARY,
(char *)buf, size);
break;
case AST_AEAP_DATA_TYPE_STRING:
res = ast_websocket_write(transport->ws, AST_WEBSOCKET_OPCODE_TEXT,
(char *)buf, size);
break;
default:
break;
}
if (res < 0) {
log_error(self, "problem writing to websocket (closed)");
/*
* If the underlying socket is closed then ensure the
* transport is in a disconnected state as well.
*/
aeap_transport_disconnect(self);
return res;
}
return size;
}
static struct aeap_transport_vtable *transport_websocket_vtable(void)
{
static struct aeap_transport_vtable websocket_vtable = {
.connect = websocket_connect,
.disconnect = websocket_disconnect,
.destroy = websocket_destroy,
.read = websocket_read,
.write = websocket_write,
};
return &websocket_vtable;
}
/*!
* \brief Initialize a transport websocket object, and set its virtual table
*
* \param transport The transport to initialize
*
* \returns 0 on success, -1 on error
*/
static int transport_websocket_init(struct aeap_transport_websocket *transport)
{
transport->ws = NULL;
((struct aeap_transport *)transport)->vtable = transport_websocket_vtable();
return 0;
}
struct aeap_transport_websocket *aeap_transport_websocket_create(void)
{
struct aeap_transport_websocket *transport;
transport = ast_calloc(1, sizeof(*transport));
if (!transport) {
ast_log(LOG_ERROR, "AEAP websocket: unable to create transport websocket");
return NULL;
}
if (transport_websocket_init(transport)) {
ast_free(transport);
return NULL;
}
return transport;
}