990 lines
32 KiB
C
990 lines
32 KiB
C
/*
|
|
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
|
|
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
|
|
*
|
|
* This program is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation; either version 2 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program; if not, write to the Free Software
|
|
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
*/
|
|
#include <pj/activesock.h>
|
|
#include <pj/compat/socket.h>
|
|
#include <pj/assert.h>
|
|
#include <pj/errno.h>
|
|
#include <pj/log.h>
|
|
#include <pj/pool.h>
|
|
#include <pj/sock.h>
|
|
#include <pj/string.h>
|
|
|
|
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
|
|
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
|
|
# include <CFNetwork/CFNetwork.h>
|
|
|
|
static pj_bool_t ios_bg_support = PJ_TRUE;
|
|
#endif
|
|
|
|
#define PJ_ACTIVESOCK_MAX_LOOP 50
|
|
|
|
|
|
enum read_type
|
|
{
|
|
TYPE_NONE,
|
|
TYPE_RECV,
|
|
TYPE_RECV_FROM
|
|
};
|
|
|
|
enum shutdown_dir
|
|
{
|
|
SHUT_NONE = 0,
|
|
SHUT_RX = 1,
|
|
SHUT_TX = 2
|
|
};
|
|
|
|
struct read_op
|
|
{
|
|
pj_ioqueue_op_key_t op_key;
|
|
pj_uint8_t *pkt;
|
|
unsigned max_size;
|
|
pj_size_t size;
|
|
pj_sockaddr src_addr;
|
|
int src_addr_len;
|
|
};
|
|
|
|
struct accept_op
|
|
{
|
|
pj_ioqueue_op_key_t op_key;
|
|
pj_sock_t new_sock;
|
|
pj_sockaddr rem_addr;
|
|
int rem_addr_len;
|
|
};
|
|
|
|
struct send_data
|
|
{
|
|
pj_uint8_t *data;
|
|
pj_ssize_t len;
|
|
pj_ssize_t sent;
|
|
unsigned flags;
|
|
};
|
|
|
|
struct pj_activesock_t
|
|
{
|
|
pj_ioqueue_key_t *key;
|
|
pj_bool_t stream_oriented;
|
|
pj_bool_t whole_data;
|
|
pj_ioqueue_t *ioqueue;
|
|
void *user_data;
|
|
unsigned async_count;
|
|
unsigned shutdown;
|
|
unsigned max_loop;
|
|
pj_activesock_cb cb;
|
|
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
|
|
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
|
|
int bg_setting;
|
|
pj_sock_t sock;
|
|
CFReadStreamRef readStream;
|
|
#endif
|
|
|
|
unsigned err_counter;
|
|
pj_status_t last_err;
|
|
|
|
struct send_data send_data;
|
|
|
|
struct read_op *read_op;
|
|
pj_uint32_t read_flags;
|
|
enum read_type read_type;
|
|
|
|
struct accept_op *accept_op;
|
|
};
|
|
|
|
|
|
static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
pj_ssize_t bytes_read);
|
|
static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
pj_ssize_t bytes_sent);
|
|
#if PJ_HAS_TCP
|
|
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
pj_sock_t sock,
|
|
pj_status_t status);
|
|
static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
|
|
pj_status_t status);
|
|
#endif
|
|
|
|
PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
|
|
{
|
|
pj_bzero(cfg, sizeof(*cfg));
|
|
cfg->async_cnt = 1;
|
|
cfg->concurrency = -1;
|
|
cfg->whole_data = PJ_TRUE;
|
|
cfg->sock_cloexec = PJ_TRUE;
|
|
}
|
|
|
|
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
|
|
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
|
|
static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock)
|
|
{
|
|
if (asock->readStream) {
|
|
CFReadStreamClose(asock->readStream);
|
|
CFRelease(asock->readStream);
|
|
asock->readStream = NULL;
|
|
}
|
|
}
|
|
|
|
static void activesock_create_iphone_os_stream(pj_activesock_t *asock)
|
|
{
|
|
#if (defined(__IPHONE_OS_VERSION_MIN_REQUIRED) && \
|
|
__IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0)
|
|
|
|
if (ios_bg_support && asock->bg_setting && asock->stream_oriented) {
|
|
activesock_destroy_iphone_os_stream(asock);
|
|
|
|
CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock,
|
|
&asock->readStream, NULL);
|
|
|
|
if (!asock->readStream ||
|
|
CFReadStreamSetProperty(asock->readStream,
|
|
kCFStreamNetworkServiceType,
|
|
kCFStreamNetworkServiceTypeVoIP)
|
|
!= TRUE ||
|
|
CFReadStreamOpen(asock->readStream) != TRUE)
|
|
{
|
|
PJ_LOG(2,("", "Failed to configure TCP transport for VoIP "
|
|
"usage. Usage of THIS particular TCP transport in "
|
|
"background mode will not be supported."));
|
|
|
|
|
|
activesock_destroy_iphone_os_stream(asock);
|
|
}
|
|
}
|
|
|
|
#endif
|
|
}
|
|
|
|
|
|
PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock,
|
|
int val)
|
|
{
|
|
asock->bg_setting = val;
|
|
if (asock->bg_setting)
|
|
activesock_create_iphone_os_stream(asock);
|
|
else
|
|
activesock_destroy_iphone_os_stream(asock);
|
|
}
|
|
|
|
PJ_DEF(void) pj_activesock_enable_iphone_os_bg(pj_bool_t val)
|
|
{
|
|
ios_bg_support = val;
|
|
}
|
|
#endif
|
|
|
|
PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
|
|
pj_sock_t sock,
|
|
int sock_type,
|
|
const pj_activesock_cfg *opt,
|
|
pj_ioqueue_t *ioqueue,
|
|
const pj_activesock_cb *cb,
|
|
void *user_data,
|
|
pj_activesock_t **p_asock)
|
|
{
|
|
pj_activesock_t *asock;
|
|
pj_ioqueue_callback ioq_cb;
|
|
pj_status_t status;
|
|
|
|
PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
|
|
PJ_ASSERT_RETURN(sock>=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
|
|
PJ_ASSERT_RETURN((sock_type & 0xF)==pj_SOCK_STREAM() ||
|
|
(sock_type & 0xF)==pj_SOCK_DGRAM(), PJ_EINVAL);
|
|
PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
|
|
|
|
asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
|
|
asock->ioqueue = ioqueue;
|
|
asock->stream_oriented = ((sock_type & 0xF) == pj_SOCK_STREAM());
|
|
asock->async_count = (opt? opt->async_cnt : 1);
|
|
asock->whole_data = (opt? opt->whole_data : 1);
|
|
asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
|
|
asock->user_data = user_data;
|
|
pj_memcpy(&asock->cb, cb, sizeof(*cb));
|
|
|
|
pj_bzero(&ioq_cb, sizeof(ioq_cb));
|
|
ioq_cb.on_read_complete = &ioqueue_on_read_complete;
|
|
ioq_cb.on_write_complete = &ioqueue_on_write_complete;
|
|
#if PJ_HAS_TCP
|
|
ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
|
|
ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
|
|
#endif
|
|
|
|
status = pj_ioqueue_register_sock2(pool, ioqueue, sock,
|
|
(opt? opt->grp_lock : NULL),
|
|
asock, &ioq_cb, &asock->key);
|
|
if (status != PJ_SUCCESS) {
|
|
pj_activesock_close(asock);
|
|
return status;
|
|
}
|
|
|
|
if (asock->whole_data) {
|
|
/* Must disable concurrency otherwise there is a race condition */
|
|
pj_ioqueue_set_concurrency(asock->key, 0);
|
|
} else if (opt && opt->concurrency >= 0) {
|
|
pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
|
|
}
|
|
|
|
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
|
|
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
|
|
asock->sock = sock;
|
|
asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG;
|
|
#endif
|
|
|
|
*p_asock = asock;
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
|
|
PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
|
|
const pj_sockaddr *addr,
|
|
const pj_activesock_cfg *opt,
|
|
pj_ioqueue_t *ioqueue,
|
|
const pj_activesock_cb *cb,
|
|
void *user_data,
|
|
pj_activesock_t **p_asock,
|
|
pj_sockaddr *bound_addr)
|
|
{
|
|
pj_sock_t sock_fd;
|
|
pj_sockaddr default_addr;
|
|
pj_status_t status;
|
|
int sock_type = pj_SOCK_DGRAM();
|
|
|
|
if (opt && opt->sock_cloexec)
|
|
sock_type |= pj_SOCK_CLOEXEC();
|
|
|
|
if (addr == NULL) {
|
|
pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
|
|
addr = &default_addr;
|
|
}
|
|
|
|
status = pj_sock_socket(addr->addr.sa_family, sock_type, 0,
|
|
&sock_fd);
|
|
if (status != PJ_SUCCESS) {
|
|
return status;
|
|
}
|
|
|
|
status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
|
|
if (status != PJ_SUCCESS) {
|
|
pj_sock_close(sock_fd);
|
|
return status;
|
|
}
|
|
|
|
status = pj_activesock_create(pool, sock_fd, sock_type, opt,
|
|
ioqueue, cb, user_data, p_asock);
|
|
if (status != PJ_SUCCESS) {
|
|
pj_sock_close(sock_fd);
|
|
return status;
|
|
}
|
|
|
|
if (bound_addr) {
|
|
int addr_len = sizeof(*bound_addr);
|
|
status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
|
|
if (status != PJ_SUCCESS) {
|
|
pj_activesock_close(*p_asock);
|
|
return status;
|
|
}
|
|
}
|
|
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
|
|
{
|
|
pj_ioqueue_key_t *key;
|
|
pj_bool_t unregister = PJ_FALSE;
|
|
|
|
PJ_ASSERT_RETURN(asock, PJ_EINVAL);
|
|
asock->shutdown = SHUT_RX | SHUT_TX;
|
|
|
|
/* Avoid double unregistration on the key */
|
|
key = asock->key;
|
|
if (key) {
|
|
pj_ioqueue_lock_key(key);
|
|
unregister = (asock->key != NULL);
|
|
asock->key = NULL;
|
|
pj_ioqueue_unlock_key(key);
|
|
}
|
|
|
|
if (unregister) {
|
|
pj_ioqueue_unregister(key);
|
|
|
|
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
|
|
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
|
|
activesock_destroy_iphone_os_stream(asock);
|
|
#endif
|
|
}
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
|
|
PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
|
|
void *user_data)
|
|
{
|
|
PJ_ASSERT_RETURN(asock, PJ_EINVAL);
|
|
asock->user_data = user_data;
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
|
|
PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
|
|
{
|
|
PJ_ASSERT_RETURN(asock, NULL);
|
|
return asock->user_data;
|
|
}
|
|
|
|
|
|
PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
|
|
pj_pool_t *pool,
|
|
unsigned buff_size,
|
|
pj_uint32_t flags)
|
|
{
|
|
void **readbuf;
|
|
unsigned i;
|
|
|
|
PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
|
|
|
|
readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
|
|
sizeof(void*));
|
|
|
|
for (i=0; i<asock->async_count; ++i) {
|
|
readbuf[i] = pj_pool_alloc(pool, buff_size);
|
|
}
|
|
|
|
return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
|
|
}
|
|
|
|
|
|
PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
|
|
pj_pool_t *pool,
|
|
unsigned buff_size,
|
|
void *readbuf[],
|
|
pj_uint32_t flags)
|
|
{
|
|
unsigned i;
|
|
pj_status_t status;
|
|
|
|
PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
|
|
PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
|
|
PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
|
|
|
|
asock->read_op = (struct read_op*)
|
|
pj_pool_calloc(pool, asock->async_count,
|
|
sizeof(struct read_op));
|
|
asock->read_type = TYPE_RECV;
|
|
asock->read_flags = flags;
|
|
|
|
for (i=0; i<asock->async_count; ++i) {
|
|
struct read_op *r = &asock->read_op[i];
|
|
pj_ssize_t size_to_read;
|
|
|
|
r->pkt = (pj_uint8_t*)readbuf[i];
|
|
size_to_read = r->max_size = buff_size;
|
|
|
|
status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
|
|
PJ_IOQUEUE_ALWAYS_ASYNC | flags);
|
|
PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
|
|
|
|
if (status != PJ_EPENDING)
|
|
return status;
|
|
}
|
|
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
|
|
PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
|
|
pj_pool_t *pool,
|
|
unsigned buff_size,
|
|
pj_uint32_t flags)
|
|
{
|
|
void **readbuf;
|
|
unsigned i;
|
|
|
|
PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
|
|
|
|
readbuf = (void**) pj_pool_calloc(pool, asock->async_count,
|
|
sizeof(void*));
|
|
|
|
for (i=0; i<asock->async_count; ++i) {
|
|
readbuf[i] = pj_pool_alloc(pool, buff_size);
|
|
}
|
|
|
|
return pj_activesock_start_recvfrom2(asock, pool, buff_size,
|
|
readbuf, flags);
|
|
}
|
|
|
|
|
|
PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
|
|
pj_pool_t *pool,
|
|
unsigned buff_size,
|
|
void *readbuf[],
|
|
pj_uint32_t flags)
|
|
{
|
|
unsigned i;
|
|
pj_status_t status;
|
|
|
|
PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
|
|
PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
|
|
|
|
asock->read_op = (struct read_op*)
|
|
pj_pool_calloc(pool, asock->async_count,
|
|
sizeof(struct read_op));
|
|
asock->read_type = TYPE_RECV_FROM;
|
|
asock->read_flags = flags;
|
|
|
|
for (i=0; i<asock->async_count; ++i) {
|
|
struct read_op *r = &asock->read_op[i];
|
|
pj_ssize_t size_to_read;
|
|
|
|
r->pkt = (pj_uint8_t*) readbuf[i];
|
|
size_to_read = r->max_size = buff_size;
|
|
r->src_addr_len = sizeof(r->src_addr);
|
|
|
|
status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
|
|
&size_to_read,
|
|
PJ_IOQUEUE_ALWAYS_ASYNC | flags,
|
|
&r->src_addr, &r->src_addr_len);
|
|
PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
|
|
|
|
if (status != PJ_EPENDING)
|
|
return status;
|
|
}
|
|
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
|
|
static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
pj_ssize_t bytes_read)
|
|
{
|
|
pj_activesock_t *asock;
|
|
struct read_op *r = (struct read_op*)op_key;
|
|
unsigned loop = 0;
|
|
pj_status_t status;
|
|
|
|
asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
|
|
|
|
/* Ignore if we've been shutdown */
|
|
if (asock->shutdown & SHUT_RX)
|
|
return;
|
|
|
|
do {
|
|
unsigned flags;
|
|
|
|
if (bytes_read > 0) {
|
|
/*
|
|
* We've got new data.
|
|
*/
|
|
pj_size_t remainder;
|
|
pj_bool_t ret;
|
|
|
|
/* Append this new data to existing data. If socket is stream
|
|
* oriented, user might have left some data in the buffer.
|
|
* Otherwise if socket is datagram there will be nothing in
|
|
* existing packet hence the packet will contain only the new
|
|
* packet.
|
|
*/
|
|
r->size += bytes_read;
|
|
|
|
/* Set default remainder to zero */
|
|
remainder = 0;
|
|
|
|
/* And return value to TRUE */
|
|
ret = PJ_TRUE;
|
|
|
|
/* Notify callback */
|
|
if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
|
|
ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
|
|
PJ_SUCCESS, &remainder);
|
|
PJ_ASSERT_ON_FAIL(
|
|
!asock->stream_oriented || remainder <= r->size, {
|
|
PJ_LOG(2, ("",
|
|
"App bug! Invalid remainder length from "
|
|
"activesock on_data_read()."));
|
|
remainder = 0;
|
|
});
|
|
} else if (asock->read_type == TYPE_RECV_FROM &&
|
|
asock->cb.on_data_recvfrom)
|
|
{
|
|
ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
|
|
&r->src_addr,
|
|
r->src_addr_len,
|
|
PJ_SUCCESS);
|
|
}
|
|
|
|
/* If callback returns false, we have been destroyed! */
|
|
if (!ret)
|
|
return;
|
|
|
|
/* Only stream oriented socket may leave data in the packet */
|
|
if (asock->stream_oriented) {
|
|
r->size = remainder;
|
|
} else {
|
|
r->size = 0;
|
|
}
|
|
|
|
} else if (bytes_read <= 0 &&
|
|
-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
|
|
-bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
|
|
(asock->stream_oriented ||
|
|
-bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)))
|
|
{
|
|
pj_size_t remainder;
|
|
pj_bool_t ret;
|
|
|
|
if (bytes_read == 0) {
|
|
/* For stream/connection oriented socket, this means the
|
|
* connection has been closed. For datagram sockets, it means
|
|
* we've received datagram with zero length.
|
|
*/
|
|
if (asock->stream_oriented)
|
|
status = PJ_EEOF;
|
|
else
|
|
status = PJ_SUCCESS;
|
|
} else {
|
|
/* This means we've got an error. If this is stream/connection
|
|
* oriented, it means connection has been closed. For datagram
|
|
* sockets, it means we've got some error (e.g. EWOULDBLOCK).
|
|
*/
|
|
status = (pj_status_t)-bytes_read;
|
|
}
|
|
|
|
/* Set default remainder to zero */
|
|
remainder = 0;
|
|
|
|
/* And return value to TRUE */
|
|
ret = PJ_TRUE;
|
|
|
|
/* Notify callback */
|
|
if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
|
|
/* For connection oriented socket, we still need to report
|
|
* the remainder data (if any) to the user to let user do
|
|
* processing with the remainder data before it closes the
|
|
* connection.
|
|
* If there is no remainder data, set the packet to NULL.
|
|
*/
|
|
|
|
/* Shouldn't set the packet to NULL, as there may be active
|
|
* socket user, such as SSL socket, that needs to have access
|
|
* to the read buffer packet.
|
|
*/
|
|
//ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
|
|
// r->size, status, &remainder);
|
|
ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
|
|
status, &remainder);
|
|
PJ_ASSERT_ON_FAIL(
|
|
!asock->stream_oriented || remainder <= r->size, {
|
|
PJ_LOG(2, ("",
|
|
"App bug! Invalid remainder length from "
|
|
"activesock on_data_read()."));
|
|
remainder = 0;
|
|
});
|
|
|
|
} else if (asock->read_type == TYPE_RECV_FROM &&
|
|
asock->cb.on_data_recvfrom)
|
|
{
|
|
/* This would always be datagram oriented hence there's
|
|
* nothing in the packet. We can't be sure if there will be
|
|
* anything useful in the source_addr, so just put NULL
|
|
* there too.
|
|
*/
|
|
/* In some scenarios, status may be PJ_SUCCESS. The upper
|
|
* layer application may not expect the callback to be called
|
|
* with successful status and NULL data, so lets not call the
|
|
* callback if the status is PJ_SUCCESS.
|
|
*/
|
|
if (status != PJ_SUCCESS ) {
|
|
ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
|
|
NULL, 0, status);
|
|
}
|
|
}
|
|
|
|
/* If callback returns false, we have been destroyed! */
|
|
if (!ret)
|
|
return;
|
|
|
|
/* Also stop further read if we've been shutdown */
|
|
if (asock->shutdown & SHUT_RX)
|
|
return;
|
|
|
|
/* Only stream oriented socket may leave data in the packet */
|
|
if (asock->stream_oriented) {
|
|
r->size = remainder;
|
|
} else {
|
|
r->size = 0;
|
|
}
|
|
}
|
|
|
|
/* Read next data. We limit ourselves to processing max_loop immediate
|
|
* data, so when the loop counter has exceeded this value, force the
|
|
* read()/recvfrom() to return pending operation to allow the program
|
|
* to do other jobs.
|
|
*/
|
|
bytes_read = r->max_size - r->size;
|
|
flags = asock->read_flags;
|
|
if (++loop >= asock->max_loop)
|
|
flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
|
|
|
|
if (asock->read_type == TYPE_RECV) {
|
|
status = pj_ioqueue_recv(key, op_key, r->pkt + r->size,
|
|
&bytes_read, flags);
|
|
} else {
|
|
r->src_addr_len = sizeof(r->src_addr);
|
|
status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
|
|
&bytes_read, flags,
|
|
&r->src_addr, &r->src_addr_len);
|
|
}
|
|
|
|
if (status == PJ_SUCCESS) {
|
|
/* Immediate data */
|
|
;
|
|
} else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
|
|
/* Error */
|
|
bytes_read = -status;
|
|
} else {
|
|
break;
|
|
}
|
|
} while (1);
|
|
|
|
}
|
|
|
|
|
|
static pj_status_t send_remaining(pj_activesock_t *asock,
|
|
pj_ioqueue_op_key_t *send_key)
|
|
{
|
|
struct send_data *sd = (struct send_data*)send_key->activesock_data;
|
|
pj_status_t status;
|
|
|
|
do {
|
|
pj_ssize_t size;
|
|
|
|
size = sd->len - sd->sent;
|
|
status = pj_ioqueue_send(asock->key, send_key,
|
|
sd->data+sd->sent, &size, sd->flags);
|
|
if (status != PJ_SUCCESS) {
|
|
/* Pending or error */
|
|
break;
|
|
}
|
|
|
|
sd->sent += size;
|
|
if (sd->sent == sd->len) {
|
|
/* The whole data has been sent. */
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
} while (sd->sent < sd->len);
|
|
|
|
return status;
|
|
}
|
|
|
|
|
|
PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
|
|
pj_ioqueue_op_key_t *send_key,
|
|
const void *data,
|
|
pj_ssize_t *size,
|
|
unsigned flags)
|
|
{
|
|
PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
|
|
|
|
if (asock->shutdown & SHUT_TX)
|
|
return PJ_EINVALIDOP;
|
|
|
|
send_key->activesock_data = NULL;
|
|
|
|
if (asock->whole_data) {
|
|
pj_ssize_t whole;
|
|
pj_status_t status;
|
|
|
|
whole = *size;
|
|
|
|
status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
|
|
if (status != PJ_SUCCESS) {
|
|
/* Pending or error */
|
|
return status;
|
|
}
|
|
|
|
if (*size == whole) {
|
|
/* The whole data has been sent. */
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
/* Data was partially sent */
|
|
asock->send_data.data = (pj_uint8_t*)data;
|
|
asock->send_data.len = whole;
|
|
asock->send_data.sent = *size;
|
|
asock->send_data.flags = flags;
|
|
send_key->activesock_data = &asock->send_data;
|
|
|
|
/* Try again */
|
|
status = send_remaining(asock, send_key);
|
|
if (status == PJ_SUCCESS) {
|
|
*size = whole;
|
|
}
|
|
return status;
|
|
|
|
} else {
|
|
return pj_ioqueue_send(asock->key, send_key, data, size, flags);
|
|
}
|
|
}
|
|
|
|
|
|
PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
|
|
pj_ioqueue_op_key_t *send_key,
|
|
const void *data,
|
|
pj_ssize_t *size,
|
|
unsigned flags,
|
|
const pj_sockaddr_t *addr,
|
|
int addr_len)
|
|
{
|
|
PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len,
|
|
PJ_EINVAL);
|
|
|
|
if (asock->shutdown & SHUT_TX)
|
|
return PJ_EINVALIDOP;
|
|
|
|
return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
|
|
addr, addr_len);
|
|
}
|
|
|
|
|
|
static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
pj_ssize_t bytes_sent)
|
|
{
|
|
pj_activesock_t *asock;
|
|
|
|
asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
|
|
|
|
/* Ignore if we've been shutdown. This may cause data to be partially
|
|
* sent even when 'wholedata' was requested if the OS only sent partial
|
|
* buffer.
|
|
*/
|
|
if (asock->shutdown & SHUT_TX)
|
|
return;
|
|
|
|
if (bytes_sent > 0 && op_key->activesock_data) {
|
|
/* whole_data is requested. Make sure we send all the data */
|
|
struct send_data *sd = (struct send_data*)op_key->activesock_data;
|
|
|
|
sd->sent += bytes_sent;
|
|
if (sd->sent == sd->len) {
|
|
/* all has been sent */
|
|
bytes_sent = sd->sent;
|
|
op_key->activesock_data = NULL;
|
|
} else {
|
|
/* send remaining data */
|
|
pj_status_t status;
|
|
|
|
status = send_remaining(asock, op_key);
|
|
if (status == PJ_EPENDING)
|
|
return;
|
|
else if (status == PJ_SUCCESS)
|
|
bytes_sent = sd->sent;
|
|
else
|
|
bytes_sent = -status;
|
|
|
|
op_key->activesock_data = NULL;
|
|
}
|
|
}
|
|
|
|
if (asock->cb.on_data_sent) {
|
|
pj_bool_t ret;
|
|
|
|
ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
|
|
|
|
/* If callback returns false, we have been destroyed! */
|
|
if (!ret)
|
|
return;
|
|
}
|
|
}
|
|
|
|
#if PJ_HAS_TCP
|
|
PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
|
|
pj_pool_t *pool)
|
|
{
|
|
unsigned i;
|
|
|
|
PJ_ASSERT_RETURN(asock, PJ_EINVAL);
|
|
PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
|
|
|
|
/* Ignore if we've been shutdown */
|
|
if (asock->shutdown)
|
|
return PJ_EINVALIDOP;
|
|
|
|
asock->accept_op = (struct accept_op*)
|
|
pj_pool_calloc(pool, asock->async_count,
|
|
sizeof(struct accept_op));
|
|
for (i=0; i<asock->async_count; ++i) {
|
|
struct accept_op *a = &asock->accept_op[i];
|
|
pj_status_t status;
|
|
|
|
do {
|
|
a->new_sock = PJ_INVALID_SOCKET;
|
|
a->rem_addr_len = sizeof(a->rem_addr);
|
|
|
|
status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
|
|
NULL, &a->rem_addr, &a->rem_addr_len);
|
|
if (status == PJ_SUCCESS) {
|
|
/* We've got immediate connection. Not sure if it's a good
|
|
* idea to call the callback now (probably application will
|
|
* not be prepared to process it), so lets just silently
|
|
* close the socket.
|
|
*/
|
|
pj_sock_close(a->new_sock);
|
|
}
|
|
} while (status == PJ_SUCCESS);
|
|
|
|
if (status != PJ_EPENDING) {
|
|
return status;
|
|
}
|
|
}
|
|
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
|
|
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
pj_sock_t new_sock,
|
|
pj_status_t status)
|
|
{
|
|
pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
|
|
struct accept_op *accept_op = (struct accept_op*) op_key;
|
|
|
|
PJ_UNUSED_ARG(new_sock);
|
|
|
|
/* Ignore if we've been shutdown */
|
|
if (asock->shutdown)
|
|
return;
|
|
|
|
do {
|
|
if (status == asock->last_err && status != PJ_SUCCESS) {
|
|
asock->err_counter++;
|
|
if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) {
|
|
PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()"
|
|
" operation, stopping further ioqueue accepts.",
|
|
asock->err_counter, asock->last_err));
|
|
|
|
if ((status == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) &&
|
|
(asock->cb.on_accept_complete2))
|
|
{
|
|
(*asock->cb.on_accept_complete2)(asock,
|
|
accept_op->new_sock,
|
|
&accept_op->rem_addr,
|
|
accept_op->rem_addr_len,
|
|
PJ_ESOCKETSTOP);
|
|
}
|
|
return;
|
|
}
|
|
} else {
|
|
asock->err_counter = 0;
|
|
asock->last_err = status;
|
|
}
|
|
|
|
if (status==PJ_SUCCESS && (asock->cb.on_accept_complete2 ||
|
|
asock->cb.on_accept_complete)) {
|
|
pj_bool_t ret;
|
|
|
|
/* Notify callback */
|
|
if (asock->cb.on_accept_complete2) {
|
|
ret = (*asock->cb.on_accept_complete2)(asock,
|
|
accept_op->new_sock,
|
|
&accept_op->rem_addr,
|
|
accept_op->rem_addr_len,
|
|
status);
|
|
} else {
|
|
ret = (*asock->cb.on_accept_complete)(asock,
|
|
accept_op->new_sock,
|
|
&accept_op->rem_addr,
|
|
accept_op->rem_addr_len);
|
|
}
|
|
|
|
/* If callback returns false, we have been destroyed! */
|
|
if (!ret)
|
|
return;
|
|
|
|
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
|
|
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
|
|
activesock_create_iphone_os_stream(asock);
|
|
#endif
|
|
} else if (status==PJ_SUCCESS) {
|
|
/* Application doesn't handle the new socket, we need to
|
|
* close it to avoid resource leak.
|
|
*/
|
|
pj_sock_close(accept_op->new_sock);
|
|
}
|
|
|
|
/* Don't start another accept() if we've been shutdown */
|
|
if (asock->shutdown)
|
|
return;
|
|
|
|
/* Prepare next accept() */
|
|
accept_op->new_sock = PJ_INVALID_SOCKET;
|
|
accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
|
|
|
|
status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
|
|
NULL, &accept_op->rem_addr,
|
|
&accept_op->rem_addr_len);
|
|
|
|
} while (status != PJ_EPENDING && status != PJ_ECANCELLED);
|
|
}
|
|
|
|
|
|
PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
|
|
pj_pool_t *pool,
|
|
const pj_sockaddr_t *remaddr,
|
|
int addr_len)
|
|
{
|
|
PJ_UNUSED_ARG(pool);
|
|
|
|
if (asock->shutdown)
|
|
return PJ_EINVALIDOP;
|
|
|
|
return pj_ioqueue_connect(asock->key, remaddr, addr_len);
|
|
}
|
|
|
|
static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key,
|
|
pj_status_t status)
|
|
{
|
|
pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
|
|
|
|
/* Ignore if we've been shutdown */
|
|
if (asock->shutdown)
|
|
return;
|
|
|
|
if (asock->cb.on_connect_complete) {
|
|
pj_bool_t ret;
|
|
|
|
ret = (*asock->cb.on_connect_complete)(asock, status);
|
|
|
|
if (!ret) {
|
|
/* We've been destroyed */
|
|
return;
|
|
}
|
|
|
|
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
|
|
PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
|
|
activesock_create_iphone_os_stream(asock);
|
|
#endif
|
|
|
|
}
|
|
}
|
|
#endif /* PJ_HAS_TCP */
|
|
|