From 8456b909045a71e5b1c5e12455c5153d8250d71a Mon Sep 17 00:00:00 2001 From: Andreas Eversberg Date: Tue, 7 May 2024 13:42:25 +0200 Subject: [PATCH] PJSIP: Add functions to change TCP transport on the fly Related: SY#6888 Change-Id: I41dbbd89187fcfc26198ad9dbe19b89b99ee9c32 --- pjlib/include/pj/activesock.h | 1 + pjlib/src/pj/activesock.c | 5 + pjsip/include/pjsip/sip_transport.h | 6 ++ pjsip/src/pjsip/sip_transport_tcp.c | 147 ++++++++++++++++++++++++++++ 4 files changed, 159 insertions(+) diff --git a/pjlib/include/pj/activesock.h b/pjlib/include/pj/activesock.h index ff9f6b608..f95eee231 100644 --- a/pjlib/include/pj/activesock.h +++ b/pjlib/include/pj/activesock.h @@ -585,6 +585,7 @@ PJ_DECL(pj_status_t) pj_activesock_start_connect(pj_activesock_t *asock, const pj_sockaddr_t *remaddr, int addr_len); +PJ_DEF(pj_ioqueue_t) *pj_activesock_get_ioqueue(pj_activesock_t *asock); #endif /* PJ_HAS_TCP */ diff --git a/pjlib/src/pj/activesock.c b/pjlib/src/pj/activesock.c index 12a38c586..192fb7791 100644 --- a/pjlib/src/pj/activesock.c +++ b/pjlib/src/pj/activesock.c @@ -980,5 +980,10 @@ static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, } } + +PJ_DEF(pj_ioqueue_t) *pj_activesock_get_ioqueue(pj_activesock_t *asock) +{ + return asock->ioqueue; +} #endif /* PJ_HAS_TCP */ diff --git a/pjsip/include/pjsip/sip_transport.h b/pjsip/include/pjsip/sip_transport.h index 0127433d4..57cd00cd1 100644 --- a/pjsip/include/pjsip/sip_transport.h +++ b/pjsip/include/pjsip/sip_transport.h @@ -935,6 +935,12 @@ struct pjsip_transport */ pj_status_t (*destroy)(pjsip_transport *transport); + pj_status_t (*create_new_sock)(struct pjsip_transport *base, + pj_sockaddr *local_addr); + pj_status_t (*connect_new_sock)(struct pjsip_transport *base, + pj_sockaddr *local_addr, + pj_sockaddr *remote_addr); + /* * Application may extend this structure.. */ diff --git a/pjsip/src/pjsip/sip_transport_tcp.c b/pjsip/src/pjsip/sip_transport_tcp.c index 5dd1d24f7..724f8b0d5 100644 --- a/pjsip/src/pjsip/sip_transport_tcp.c +++ b/pjsip/src/pjsip/sip_transport_tcp.c @@ -105,6 +105,7 @@ struct tcp_transport pj_sock_t sock; pj_activesock_t *asock; pj_bool_t has_pending_connect; + pj_sock_t new_sock; /* Keep-alive timer. */ pj_timer_entry ka_timer; @@ -607,6 +608,13 @@ static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e); /* Clean up TCP resources */ static void tcp_on_destroy(void *arg); +/* Change transport addresses */ +static pj_status_t tcp_create_new_sock(struct pjsip_transport *base, + pj_sockaddr *local_addr); +static pj_status_t tcp_connect_new_sock(struct pjsip_transport *base, + pj_sockaddr *local_addr, + pj_sockaddr *remote_addr); + /* * Common function to create TCP transport, called when pending accept() and * pending connect() complete. @@ -642,6 +650,7 @@ static pj_status_t tcp_create( struct tcp_listener *listener, tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport); tcp->is_server = is_server; tcp->sock = sock; + tcp->new_sock = PJ_INVALID_SOCKET; /*tcp->listener = listener;*/ pj_list_init(&tcp->delayed_list); tcp->base.pool = pool; @@ -685,6 +694,9 @@ static pj_status_t tcp_create( struct tcp_listener *listener, tcp->base.destroy = &tcp_destroy_transport; tcp->base.factory = &listener->factory; tcp->base.initial_timeout = listener->initial_timeout; + tcp->base.create_new_sock = tcp_create_new_sock; + tcp->base.connect_new_sock = tcp_connect_new_sock; + /* Create group lock */ status = pj_grp_lock_create_w_handler(pool, NULL, tcp, &tcp_on_destroy, @@ -859,6 +871,10 @@ static pj_status_t tcp_destroy(pjsip_transport *transport, tcp->sock = PJ_INVALID_SOCKET; } + if (tcp->new_sock != PJ_INVALID_SOCKET) + pj_sock_close(tcp->new_sock); + + if (tcp->grp_lock) { pj_grp_lock_t *grp_lock = tcp->grp_lock; tcp->grp_lock = NULL; @@ -1766,5 +1782,136 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_restart(pjsip_tpfactory *factory, return status; } +static pj_status_t tcp_create_new_sock(struct pjsip_transport *base, + pj_sockaddr *local_addr) +{ + struct tcp_transport *tcp = (struct tcp_transport*) base; + struct tcp_listener *listener = (struct tcp_listener *)base->factory; + int af = pjsip_transport_type_get_af(listener->factory.type); + pj_sock_t new_sock; + pj_status_t status; + pj_sockaddr addr; + + /* Close old socket */ + if (tcp->new_sock != PJ_INVALID_SOCKET) { + pj_sock_close(tcp->new_sock); + tcp->new_sock = PJ_INVALID_SOCKET; + } + + /* Create new socket */ + status = pj_sock_socket(af, + pj_SOCK_STREAM() | pj_SOCK_CLOEXEC(), + 0, &new_sock); + if (status != PJ_SUCCESS) + return status; + + if (!local_addr) { + local_addr = &addr; + pj_bzero(&addr, sizeof(*local_addr)); + pj_sockaddr_cp(&addr, &listener->bound_addr); + pj_sockaddr_set_port(&addr, 0); + } + + /* Apply QoS, if specified */ + status = pj_sock_apply_qos2(new_sock, listener->qos_type, + &listener->qos_params, + 2, listener->factory.obj_name, + "outgoing SIP TCP socket"); + + /* Apply socket options, if specified */ + if (listener->sockopt_params.cnt) { + status = pj_sock_setsockopt_params(new_sock, &listener->sockopt_params); + if (status != PJ_SUCCESS) { + PJ_PERROR(4, (listener->factory.obj_name, status, + "Warning: error applying socket options")); + } + } + + if (1) { + int enabled = 1; + status = pj_sock_setsockopt(new_sock, pj_SOL_SOCKET(), pj_SO_REUSEADDR(), + &enabled, sizeof(enabled)); + if (status != PJ_SUCCESS) { + PJ_PERROR(4, (listener->factory.obj_name, status, + "Warning: error applying SO_REUSEADDR")); + } + } + + /* Bind new socket */ + status = pj_sock_bind(new_sock, local_addr, + pj_sockaddr_get_len(local_addr)); + if (status != PJ_SUCCESS) { + pj_sock_close(new_sock); + return status; + } + + /* Store and return */ + tcp->new_sock = new_sock; + return PJ_SUCCESS; +} + +/* Change transport addresses */ +static pj_status_t tcp_connect_new_sock(struct pjsip_transport *base, + pj_sockaddr *local_addr, + pj_sockaddr *remote_addr) +{ + struct tcp_transport *tcp = (struct tcp_transport*) base; + pj_activesock_t *asock = tcp->asock; + pj_activesock_t *new_asock; + pj_activesock_cfg asock_cfg; + pj_activesock_cb tcp_callback; + pj_status_t status; + int addr_len; + + PJ_ASSERT_RETURN(tcp->new_sock != PJ_INVALID_SOCKET, PJ_EINVAL); + + /* Get the local addess */ + addr_len = sizeof(*local_addr); + status = pj_sock_getsockname(tcp->new_sock, &local_addr, &addr_len); + if (status != PJ_SUCCESS) { + pj_activesock_close(new_asock); + return status; + } + + /* Create active socket */ + pj_activesock_cfg_default(&asock_cfg); + asock_cfg.async_cnt = 1; + asock_cfg.grp_lock = tcp->grp_lock; + + pj_bzero(&tcp_callback, sizeof(tcp_callback)); + tcp_callback.on_data_read = &on_data_read; + tcp_callback.on_data_sent = &on_data_sent; + tcp_callback.on_connect_complete = &on_connect_complete; + + status = pj_activesock_create(tcp->base.pool, tcp->new_sock, pj_SOCK_STREAM(), &asock_cfg, + pj_activesock_get_ioqueue(asock), &tcp_callback, tcp, &new_asock); + if (status != PJ_SUCCESS) { + pj_sock_close(tcp->new_sock); + tcp->new_sock = PJ_INVALID_SOCKET; + return status; + } + + /* Start asynchronous connect() operation */ + status = pj_activesock_start_connect(new_asock, tcp->base.pool, remote_addr, pj_sockaddr_get_len(remote_addr)); + if (status != PJ_SUCCESS && status != PJ_EPENDING) { + pj_activesock_close(new_asock); + return status; + } + + /* Close old socket and assign the new one. */ + pj_activesock_close(tcp->asock); + tcp->sock = tcp->new_sock; + tcp->new_sock = PJ_INVALID_SOCKET; + tcp->asock = new_asock; + + /* Check if socket is already connected. */ + tcp->has_pending_connect = PJ_TRUE; + if (status == PJ_SUCCESS) { + on_connect_complete(tcp->asock, PJ_SUCCESS); + } + + return PJ_SUCCESS; +} + #endif /* PJ_HAS_TCP */