1016 lines
30 KiB
C
1016 lines
30 KiB
C
/* $Id$ */
|
|
/*
|
|
* Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
|
|
*
|
|
* This program is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation; either version 2 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program; if not, write to the Free Software
|
|
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
*/
|
|
|
|
/*
|
|
* ioqueue_common_abs.c
|
|
*
|
|
* This contains common functionalities to emulate proactor pattern with
|
|
* various event dispatching mechanisms (e.g. select, epoll).
|
|
*
|
|
* This file will be included by the appropriate ioqueue implementation.
|
|
* This file is NOT supposed to be compiled as stand-alone source.
|
|
*/
|
|
|
|
static void ioqueue_init( pj_ioqueue_t *ioqueue )
|
|
{
|
|
ioqueue->lock = NULL;
|
|
ioqueue->auto_delete_lock = 0;
|
|
}
|
|
|
|
static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
|
|
{
|
|
if (ioqueue->auto_delete_lock && ioqueue->lock ) {
|
|
pj_lock_release(ioqueue->lock);
|
|
return pj_lock_destroy(ioqueue->lock);
|
|
} else
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
* pj_ioqueue_set_lock()
|
|
*/
|
|
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
|
|
pj_lock_t *lock,
|
|
pj_bool_t auto_delete )
|
|
{
|
|
PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
|
|
|
|
if (ioqueue->auto_delete_lock && ioqueue->lock) {
|
|
pj_lock_destroy(ioqueue->lock);
|
|
}
|
|
|
|
ioqueue->lock = lock;
|
|
ioqueue->auto_delete_lock = auto_delete;
|
|
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
static pj_status_t ioqueue_init_key( pj_pool_t *pool,
|
|
pj_ioqueue_t *ioqueue,
|
|
pj_ioqueue_key_t *key,
|
|
pj_sock_t sock,
|
|
void *user_data,
|
|
const pj_ioqueue_callback *cb)
|
|
{
|
|
pj_status_t rc;
|
|
int optlen;
|
|
|
|
key->ioqueue = ioqueue;
|
|
key->fd = sock;
|
|
key->user_data = user_data;
|
|
pj_list_init(&key->read_list);
|
|
pj_list_init(&key->write_list);
|
|
#if PJ_HAS_TCP
|
|
pj_list_init(&key->accept_list);
|
|
key->connecting = 0;
|
|
#endif
|
|
|
|
/* Save callback. */
|
|
pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
|
|
|
|
#if PJ_IOQUEUE_HAS_SAFE_UNREG
|
|
/* Set initial reference count to 1 */
|
|
pj_assert(key->ref_count == 0);
|
|
++key->ref_count;
|
|
|
|
key->closing = 0;
|
|
#endif
|
|
|
|
/* Get socket type. When socket type is datagram, some optimization
|
|
* will be performed during send to allow parallel send operations.
|
|
*/
|
|
optlen = sizeof(key->fd_type);
|
|
rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
|
|
&key->fd_type, &optlen);
|
|
if (rc != PJ_SUCCESS)
|
|
key->fd_type = PJ_SOCK_STREAM;
|
|
|
|
/* Create mutex for the key. */
|
|
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
|
|
rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
|
|
#endif
|
|
|
|
return rc;
|
|
}
|
|
|
|
/*
|
|
* pj_ioqueue_get_user_data()
|
|
*
|
|
* Obtain value associated with a key.
|
|
*/
|
|
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
|
|
{
|
|
PJ_ASSERT_RETURN(key != NULL, NULL);
|
|
return key->user_data;
|
|
}
|
|
|
|
/*
|
|
* pj_ioqueue_set_user_data()
|
|
*/
|
|
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
|
|
void *user_data,
|
|
void **old_data)
|
|
{
|
|
PJ_ASSERT_RETURN(key, PJ_EINVAL);
|
|
|
|
if (old_data)
|
|
*old_data = key->user_data;
|
|
key->user_data = user_data;
|
|
|
|
return PJ_SUCCESS;
|
|
}
|
|
|
|
PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
|
|
{
|
|
return !pj_list_empty(&key->write_list);
|
|
}
|
|
|
|
PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
|
|
{
|
|
return !pj_list_empty(&key->read_list);
|
|
}
|
|
|
|
PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
|
|
{
|
|
#if PJ_HAS_TCP
|
|
return !pj_list_empty(&key->accept_list);
|
|
#else
|
|
return 0;
|
|
#endif
|
|
}
|
|
|
|
PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
|
|
{
|
|
return key->connecting;
|
|
}
|
|
|
|
|
|
#if PJ_IOQUEUE_HAS_SAFE_UNREG
|
|
# define IS_CLOSING(key) (key->closing)
|
|
#else
|
|
# define IS_CLOSING(key) (0)
|
|
#endif
|
|
|
|
|
|
/*
|
|
* ioqueue_dispatch_event()
|
|
*
|
|
* Report occurence of an event in the key to be processed by the
|
|
* framework.
|
|
*/
|
|
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
|
|
{
|
|
/* Lock the key. */
|
|
pj_mutex_lock(h->mutex);
|
|
|
|
if (h->closing) {
|
|
pj_mutex_unlock(h->mutex);
|
|
return;
|
|
}
|
|
|
|
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
|
|
if (h->connecting) {
|
|
/* Completion of connect() operation */
|
|
pj_ssize_t bytes_transfered;
|
|
|
|
/* Clear operation. */
|
|
h->connecting = 0;
|
|
|
|
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
|
|
ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
|
|
|
|
|
|
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
|
|
/* from connect(2):
|
|
* On Linux, use getsockopt to read the SO_ERROR option at
|
|
* level SOL_SOCKET to determine whether connect() completed
|
|
* successfully (if SO_ERROR is zero).
|
|
*/
|
|
{
|
|
int value;
|
|
socklen_t vallen = sizeof(value);
|
|
int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
|
|
&value, &vallen);
|
|
if (gs_rc != 0) {
|
|
/* Argh!! What to do now???
|
|
* Just indicate that the socket is connected. The
|
|
* application will get error as soon as it tries to use
|
|
* the socket to send/receive.
|
|
*/
|
|
bytes_transfered = 0;
|
|
} else {
|
|
bytes_transfered = value;
|
|
}
|
|
}
|
|
#elif defined(PJ_WIN32) && PJ_WIN32!=0
|
|
bytes_transfered = 0; /* success */
|
|
#else
|
|
/* Excellent information in D.J. Bernstein page:
|
|
* http://cr.yp.to/docs/connect.html
|
|
*
|
|
* Seems like the most portable way of detecting connect()
|
|
* failure is to call getpeername(). If socket is connected,
|
|
* getpeername() will return 0. If the socket is not connected,
|
|
* it will return ENOTCONN, and read(fd, &ch, 1) will produce
|
|
* the right errno through error slippage. This is a combination
|
|
* of suggestions from Douglas C. Schmidt and Ken Keys.
|
|
*/
|
|
{
|
|
int gp_rc;
|
|
struct sockaddr_in addr;
|
|
socklen_t addrlen = sizeof(addr);
|
|
|
|
gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
|
|
bytes_transfered = (gp_rc < 0) ? gp_rc : -gp_rc;
|
|
}
|
|
#endif
|
|
|
|
/* Unlock; from this point we don't need to hold key's mutex. */
|
|
pj_mutex_unlock(h->mutex);
|
|
|
|
/* Call callback. */
|
|
if (h->cb.on_connect_complete && !IS_CLOSING(h))
|
|
(*h->cb.on_connect_complete)(h, bytes_transfered);
|
|
|
|
/* Done. */
|
|
|
|
} else
|
|
#endif /* PJ_HAS_TCP */
|
|
if (key_has_pending_write(h)) {
|
|
/* Socket is writable. */
|
|
struct write_operation *write_op;
|
|
pj_ssize_t sent;
|
|
pj_status_t send_rc;
|
|
|
|
/* Get the first in the queue. */
|
|
write_op = h->write_list.next;
|
|
|
|
/* For datagrams, we can remove the write_op from the list
|
|
* so that send() can work in parallel.
|
|
*/
|
|
if (h->fd_type == PJ_SOCK_DGRAM) {
|
|
pj_list_erase(write_op);
|
|
|
|
if (pj_list_empty(&h->write_list))
|
|
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
|
|
|
|
}
|
|
|
|
/* Send the data.
|
|
* Unfortunately we must do this while holding key's mutex, thus
|
|
* preventing parallel write on a single key.. :-((
|
|
*/
|
|
sent = write_op->size - write_op->written;
|
|
if (write_op->op == PJ_IOQUEUE_OP_SEND) {
|
|
write_op->op = 0;
|
|
send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
|
|
&sent, write_op->flags);
|
|
} else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
|
|
write_op->op = 0;
|
|
send_rc = pj_sock_sendto(h->fd,
|
|
write_op->buf+write_op->written,
|
|
&sent, write_op->flags,
|
|
&write_op->rmt_addr,
|
|
write_op->rmt_addrlen);
|
|
} else {
|
|
pj_assert(!"Invalid operation type!");
|
|
write_op->op = 0;
|
|
send_rc = PJ_EBUG;
|
|
}
|
|
|
|
if (send_rc == PJ_SUCCESS) {
|
|
write_op->written += sent;
|
|
} else {
|
|
pj_assert(send_rc > 0);
|
|
write_op->written = -send_rc;
|
|
}
|
|
|
|
/* Are we finished with this buffer? */
|
|
if (send_rc!=PJ_SUCCESS ||
|
|
write_op->written == (pj_ssize_t)write_op->size ||
|
|
h->fd_type == PJ_SOCK_DGRAM)
|
|
{
|
|
if (h->fd_type != PJ_SOCK_DGRAM) {
|
|
/* Write completion of the whole stream. */
|
|
pj_list_erase(write_op);
|
|
write_op->op = 0;
|
|
|
|
/* Clear operation if there's no more data to send. */
|
|
if (pj_list_empty(&h->write_list))
|
|
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
|
|
|
|
}
|
|
|
|
/* No need to hold mutex anymore */
|
|
pj_mutex_unlock(h->mutex);
|
|
|
|
/* Call callback. */
|
|
if (h->cb.on_write_complete && !IS_CLOSING(h)) {
|
|
(*h->cb.on_write_complete)(h,
|
|
(pj_ioqueue_op_key_t*)write_op,
|
|
write_op->written);
|
|
}
|
|
|
|
} else {
|
|
pj_mutex_unlock(h->mutex);
|
|
}
|
|
|
|
/* Done. */
|
|
} else {
|
|
/*
|
|
* This is normal; execution may fall here when multiple threads
|
|
* are signalled for the same event, but only one thread eventually
|
|
* able to process the event.
|
|
*/
|
|
pj_mutex_unlock(h->mutex);
|
|
}
|
|
}
|
|
|
|
void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
|
|
{
|
|
pj_status_t rc;
|
|
|
|
/* Lock the key. */
|
|
pj_mutex_lock(h->mutex);
|
|
|
|
if (h->closing) {
|
|
pj_mutex_unlock(h->mutex);
|
|
return;
|
|
}
|
|
|
|
# if PJ_HAS_TCP
|
|
if (!pj_list_empty(&h->accept_list)) {
|
|
|
|
struct accept_operation *accept_op;
|
|
|
|
/* Get one accept operation from the list. */
|
|
accept_op = h->accept_list.next;
|
|
pj_list_erase(accept_op);
|
|
accept_op->op = 0;
|
|
|
|
/* Clear bit in fdset if there is no more pending accept */
|
|
if (pj_list_empty(&h->accept_list))
|
|
ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
|
|
|
|
rc=pj_sock_accept(h->fd, accept_op->accept_fd,
|
|
accept_op->rmt_addr, accept_op->addrlen);
|
|
if (rc==PJ_SUCCESS && accept_op->local_addr) {
|
|
rc = pj_sock_getsockname(*accept_op->accept_fd,
|
|
accept_op->local_addr,
|
|
accept_op->addrlen);
|
|
}
|
|
|
|
/* Unlock; from this point we don't need to hold key's mutex. */
|
|
pj_mutex_unlock(h->mutex);
|
|
|
|
/* Call callback. */
|
|
if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
|
|
(*h->cb.on_accept_complete)(h,
|
|
(pj_ioqueue_op_key_t*)accept_op,
|
|
*accept_op->accept_fd, rc);
|
|
}
|
|
|
|
}
|
|
else
|
|
# endif
|
|
if (key_has_pending_read(h)) {
|
|
struct read_operation *read_op;
|
|
pj_ssize_t bytes_read;
|
|
|
|
/* Get one pending read operation from the list. */
|
|
read_op = h->read_list.next;
|
|
pj_list_erase(read_op);
|
|
|
|
/* Clear fdset if there is no pending read. */
|
|
if (pj_list_empty(&h->read_list))
|
|
ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
|
|
|
|
bytes_read = read_op->size;
|
|
|
|
if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
|
|
read_op->op = 0;
|
|
rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
|
|
read_op->rmt_addr,
|
|
read_op->rmt_addrlen);
|
|
} else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
|
|
read_op->op = 0;
|
|
rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
|
|
} else {
|
|
pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
|
|
read_op->op = 0;
|
|
/*
|
|
* User has specified pj_ioqueue_read().
|
|
* On Win32, we should do ReadFile(). But because we got
|
|
* here because of select() anyway, user must have put a
|
|
* socket descriptor on h->fd, which in this case we can
|
|
* just call pj_sock_recv() instead of ReadFile().
|
|
* On Unix, user may put a file in h->fd, so we'll have
|
|
* to call read() here.
|
|
* This may not compile on systems which doesn't have
|
|
* read(). That's why we only specify PJ_LINUX here so
|
|
* that error is easier to catch.
|
|
*/
|
|
# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
|
|
defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
|
|
rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
|
|
//rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
|
|
// &bytes_read, NULL);
|
|
# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
|
|
bytes_read = read(h->fd, read_op->buf, bytes_read);
|
|
rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
|
|
# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
|
|
bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
|
|
rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
|
|
# else
|
|
# error "Implement read() for this platform!"
|
|
# endif
|
|
}
|
|
|
|
if (rc != PJ_SUCCESS) {
|
|
# if defined(PJ_WIN32) && PJ_WIN32 != 0
|
|
/* On Win32, for UDP, WSAECONNRESET on the receive side
|
|
* indicates that previous sending has triggered ICMP Port
|
|
* Unreachable message.
|
|
* But we wouldn't know at this point which one of previous
|
|
* key that has triggered the error, since UDP socket can
|
|
* be shared!
|
|
* So we'll just ignore it!
|
|
*/
|
|
|
|
if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
|
|
//PJ_LOG(4,(THIS_FILE,
|
|
// "Ignored ICMP port unreach. on key=%p", h));
|
|
}
|
|
# endif
|
|
|
|
/* In any case we would report this to caller. */
|
|
bytes_read = -rc;
|
|
}
|
|
|
|
/* Unlock; from this point we don't need to hold key's mutex. */
|
|
pj_mutex_unlock(h->mutex);
|
|
|
|
/* Call callback. */
|
|
if (h->cb.on_read_complete && !IS_CLOSING(h)) {
|
|
(*h->cb.on_read_complete)(h,
|
|
(pj_ioqueue_op_key_t*)read_op,
|
|
bytes_read);
|
|
}
|
|
|
|
} else {
|
|
/*
|
|
* This is normal; execution may fall here when multiple threads
|
|
* are signalled for the same event, but only one thread eventually
|
|
* able to process the event.
|
|
*/
|
|
pj_mutex_unlock(h->mutex);
|
|
}
|
|
}
|
|
|
|
|
|
void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
|
|
pj_ioqueue_key_t *h )
|
|
{
|
|
pj_mutex_lock(h->mutex);
|
|
|
|
if (!h->connecting) {
|
|
/* It is possible that more than one thread was woken up, thus
|
|
* the remaining thread will see h->connecting as zero because
|
|
* it has been processed by other thread.
|
|
*/
|
|
pj_mutex_unlock(h->mutex);
|
|
return;
|
|
}
|
|
|
|
if (h->closing) {
|
|
pj_mutex_unlock(h->mutex);
|
|
return;
|
|
}
|
|
|
|
/* Clear operation. */
|
|
h->connecting = 0;
|
|
|
|
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
|
|
ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
|
|
|
|
pj_mutex_unlock(h->mutex);
|
|
|
|
/* Call callback. */
|
|
if (h->cb.on_connect_complete && !IS_CLOSING(h))
|
|
(*h->cb.on_connect_complete)(h, -1);
|
|
}
|
|
|
|
/*
|
|
* pj_ioqueue_recv()
|
|
*
|
|
* Start asynchronous recv() from the socket.
|
|
*/
|
|
PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
void *buffer,
|
|
pj_ssize_t *length,
|
|
unsigned flags )
|
|
{
|
|
struct read_operation *read_op;
|
|
|
|
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
|
|
PJ_CHECK_STACK();
|
|
|
|
read_op = (struct read_operation*)op_key;
|
|
read_op->op = 0;
|
|
|
|
/* Check if key is closing. */
|
|
if (key->closing)
|
|
return PJ_ECANCELLED;
|
|
|
|
/* Try to see if there's data immediately available.
|
|
*/
|
|
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
|
|
pj_status_t status;
|
|
pj_ssize_t size;
|
|
|
|
size = *length;
|
|
status = pj_sock_recv(key->fd, buffer, &size, flags);
|
|
if (status == PJ_SUCCESS) {
|
|
/* Yes! Data is available! */
|
|
*length = size;
|
|
return PJ_SUCCESS;
|
|
} else {
|
|
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
|
|
* the error to caller.
|
|
*/
|
|
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
|
|
return status;
|
|
}
|
|
}
|
|
|
|
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
|
|
|
|
/*
|
|
* No data is immediately available.
|
|
* Must schedule asynchronous operation to the ioqueue.
|
|
*/
|
|
read_op->op = PJ_IOQUEUE_OP_RECV;
|
|
read_op->buf = buffer;
|
|
read_op->size = *length;
|
|
read_op->flags = flags;
|
|
|
|
pj_mutex_lock(key->mutex);
|
|
pj_list_insert_before(&key->read_list, read_op);
|
|
ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
|
|
pj_mutex_unlock(key->mutex);
|
|
|
|
return PJ_EPENDING;
|
|
}
|
|
|
|
/*
|
|
* pj_ioqueue_recvfrom()
|
|
*
|
|
* Start asynchronous recvfrom() from the socket.
|
|
*/
|
|
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
void *buffer,
|
|
pj_ssize_t *length,
|
|
unsigned flags,
|
|
pj_sockaddr_t *addr,
|
|
int *addrlen)
|
|
{
|
|
struct read_operation *read_op;
|
|
|
|
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
|
|
PJ_CHECK_STACK();
|
|
|
|
/* Check if key is closing. */
|
|
if (key->closing)
|
|
return PJ_ECANCELLED;
|
|
|
|
read_op = (struct read_operation*)op_key;
|
|
read_op->op = 0;
|
|
|
|
/* Try to see if there's data immediately available.
|
|
*/
|
|
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
|
|
pj_status_t status;
|
|
pj_ssize_t size;
|
|
|
|
size = *length;
|
|
status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
|
|
addr, addrlen);
|
|
if (status == PJ_SUCCESS) {
|
|
/* Yes! Data is available! */
|
|
*length = size;
|
|
return PJ_SUCCESS;
|
|
} else {
|
|
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
|
|
* the error to caller.
|
|
*/
|
|
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
|
|
return status;
|
|
}
|
|
}
|
|
|
|
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
|
|
|
|
/*
|
|
* No data is immediately available.
|
|
* Must schedule asynchronous operation to the ioqueue.
|
|
*/
|
|
read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
|
|
read_op->buf = buffer;
|
|
read_op->size = *length;
|
|
read_op->flags = flags;
|
|
read_op->rmt_addr = addr;
|
|
read_op->rmt_addrlen = addrlen;
|
|
|
|
pj_mutex_lock(key->mutex);
|
|
pj_list_insert_before(&key->read_list, read_op);
|
|
ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
|
|
pj_mutex_unlock(key->mutex);
|
|
|
|
return PJ_EPENDING;
|
|
}
|
|
|
|
/*
|
|
* pj_ioqueue_send()
|
|
*
|
|
* Start asynchronous send() to the descriptor.
|
|
*/
|
|
PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
const void *data,
|
|
pj_ssize_t *length,
|
|
unsigned flags)
|
|
{
|
|
struct write_operation *write_op;
|
|
pj_status_t status;
|
|
pj_ssize_t sent;
|
|
|
|
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
|
|
PJ_CHECK_STACK();
|
|
|
|
/* Check if key is closing. */
|
|
if (key->closing)
|
|
return PJ_ECANCELLED;
|
|
|
|
write_op = (struct write_operation*)op_key;
|
|
write_op->op = 0;
|
|
|
|
/* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
|
|
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
|
|
|
|
/* Fast track:
|
|
* Try to send data immediately, only if there's no pending write!
|
|
* Note:
|
|
* We are speculating that the list is empty here without properly
|
|
* acquiring ioqueue's mutex first. This is intentional, to maximize
|
|
* performance via parallelism.
|
|
*
|
|
* This should be safe, because:
|
|
* - by convention, we require caller to make sure that the
|
|
* key is not unregistered while other threads are invoking
|
|
* an operation on the same key.
|
|
* - pj_list_empty() is safe to be invoked by multiple threads,
|
|
* even when other threads are modifying the list.
|
|
*/
|
|
if (pj_list_empty(&key->write_list)) {
|
|
/*
|
|
* See if data can be sent immediately.
|
|
*/
|
|
sent = *length;
|
|
status = pj_sock_send(key->fd, data, &sent, flags);
|
|
if (status == PJ_SUCCESS) {
|
|
/* Success! */
|
|
*length = sent;
|
|
return PJ_SUCCESS;
|
|
} else {
|
|
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
|
|
* the error to caller.
|
|
*/
|
|
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
|
|
return status;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Schedule asynchronous send.
|
|
*/
|
|
write_op->op = PJ_IOQUEUE_OP_SEND;
|
|
write_op->buf = (void*)data;
|
|
write_op->size = *length;
|
|
write_op->written = 0;
|
|
write_op->flags = flags;
|
|
|
|
pj_mutex_lock(key->mutex);
|
|
pj_list_insert_before(&key->write_list, write_op);
|
|
ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
|
|
pj_mutex_unlock(key->mutex);
|
|
|
|
return PJ_EPENDING;
|
|
}
|
|
|
|
|
|
/*
|
|
* pj_ioqueue_sendto()
|
|
*
|
|
* Start asynchronous write() to the descriptor.
|
|
*/
|
|
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
const void *data,
|
|
pj_ssize_t *length,
|
|
pj_uint32_t flags,
|
|
const pj_sockaddr_t *addr,
|
|
int addrlen)
|
|
{
|
|
struct write_operation *write_op;
|
|
pj_status_t status;
|
|
pj_ssize_t sent;
|
|
|
|
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
|
|
PJ_CHECK_STACK();
|
|
|
|
/* Check if key is closing. */
|
|
if (key->closing)
|
|
return PJ_ECANCELLED;
|
|
|
|
write_op = (struct write_operation*)op_key;
|
|
write_op->op = 0;
|
|
|
|
/* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
|
|
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
|
|
|
|
/* Fast track:
|
|
* Try to send data immediately, only if there's no pending write!
|
|
* Note:
|
|
* We are speculating that the list is empty here without properly
|
|
* acquiring ioqueue's mutex first. This is intentional, to maximize
|
|
* performance via parallelism.
|
|
*
|
|
* This should be safe, because:
|
|
* - by convention, we require caller to make sure that the
|
|
* key is not unregistered while other threads are invoking
|
|
* an operation on the same key.
|
|
* - pj_list_empty() is safe to be invoked by multiple threads,
|
|
* even when other threads are modifying the list.
|
|
*/
|
|
if (pj_list_empty(&key->write_list)) {
|
|
/*
|
|
* See if data can be sent immediately.
|
|
*/
|
|
sent = *length;
|
|
status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
|
|
if (status == PJ_SUCCESS) {
|
|
/* Success! */
|
|
*length = sent;
|
|
return PJ_SUCCESS;
|
|
} else {
|
|
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
|
|
* the error to caller.
|
|
*/
|
|
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
|
|
return status;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Check that address storage can hold the address parameter.
|
|
*/
|
|
PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
|
|
|
|
/*
|
|
* Schedule asynchronous send.
|
|
*/
|
|
write_op->op = PJ_IOQUEUE_OP_SEND_TO;
|
|
write_op->buf = (void*)data;
|
|
write_op->size = *length;
|
|
write_op->written = 0;
|
|
write_op->flags = flags;
|
|
pj_memcpy(&write_op->rmt_addr, addr, addrlen);
|
|
write_op->rmt_addrlen = addrlen;
|
|
|
|
pj_mutex_lock(key->mutex);
|
|
pj_list_insert_before(&key->write_list, write_op);
|
|
ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
|
|
pj_mutex_unlock(key->mutex);
|
|
|
|
return PJ_EPENDING;
|
|
}
|
|
|
|
#if PJ_HAS_TCP
|
|
/*
|
|
* Initiate overlapped accept() operation.
|
|
*/
|
|
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
pj_sock_t *new_sock,
|
|
pj_sockaddr_t *local,
|
|
pj_sockaddr_t *remote,
|
|
int *addrlen)
|
|
{
|
|
struct accept_operation *accept_op;
|
|
pj_status_t status;
|
|
|
|
/* check parameters. All must be specified! */
|
|
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
|
|
|
|
/* Check if key is closing. */
|
|
if (key->closing)
|
|
return PJ_ECANCELLED;
|
|
|
|
accept_op = (struct accept_operation*)op_key;
|
|
accept_op->op = 0;
|
|
|
|
/* Fast track:
|
|
* See if there's new connection available immediately.
|
|
*/
|
|
if (pj_list_empty(&key->accept_list)) {
|
|
status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
|
|
if (status == PJ_SUCCESS) {
|
|
/* Yes! New connection is available! */
|
|
if (local && addrlen) {
|
|
status = pj_sock_getsockname(*new_sock, local, addrlen);
|
|
if (status != PJ_SUCCESS) {
|
|
pj_sock_close(*new_sock);
|
|
*new_sock = PJ_INVALID_SOCKET;
|
|
return status;
|
|
}
|
|
}
|
|
return PJ_SUCCESS;
|
|
} else {
|
|
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
|
|
* the error to caller.
|
|
*/
|
|
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
|
|
return status;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* No connection is available immediately.
|
|
* Schedule accept() operation to be completed when there is incoming
|
|
* connection available.
|
|
*/
|
|
accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
|
|
accept_op->accept_fd = new_sock;
|
|
accept_op->rmt_addr = remote;
|
|
accept_op->addrlen= addrlen;
|
|
accept_op->local_addr = local;
|
|
|
|
pj_mutex_lock(key->mutex);
|
|
pj_list_insert_before(&key->accept_list, accept_op);
|
|
ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
|
|
pj_mutex_unlock(key->mutex);
|
|
|
|
return PJ_EPENDING;
|
|
}
|
|
|
|
/*
|
|
* Initiate overlapped connect() operation (well, it's non-blocking actually,
|
|
* since there's no overlapped version of connect()).
|
|
*/
|
|
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
|
|
const pj_sockaddr_t *addr,
|
|
int addrlen )
|
|
{
|
|
pj_status_t status;
|
|
|
|
/* check parameters. All must be specified! */
|
|
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
|
|
|
|
/* Check if key is closing. */
|
|
if (key->closing)
|
|
return PJ_ECANCELLED;
|
|
|
|
/* Check if socket has not been marked for connecting */
|
|
if (key->connecting != 0)
|
|
return PJ_EPENDING;
|
|
|
|
status = pj_sock_connect(key->fd, addr, addrlen);
|
|
if (status == PJ_SUCCESS) {
|
|
/* Connected! */
|
|
return PJ_SUCCESS;
|
|
} else {
|
|
if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
|
|
/* Pending! */
|
|
pj_mutex_lock(key->mutex);
|
|
key->connecting = PJ_TRUE;
|
|
ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
|
|
ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
|
|
pj_mutex_unlock(key->mutex);
|
|
return PJ_EPENDING;
|
|
} else {
|
|
/* Error! */
|
|
return status;
|
|
}
|
|
}
|
|
}
|
|
#endif /* PJ_HAS_TCP */
|
|
|
|
|
|
PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
|
|
pj_size_t size )
|
|
{
|
|
pj_memset(op_key, 0, size);
|
|
}
|
|
|
|
|
|
/*
|
|
* pj_ioqueue_is_pending()
|
|
*/
|
|
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key )
|
|
{
|
|
struct generic_operation *op_rec;
|
|
|
|
PJ_UNUSED_ARG(key);
|
|
|
|
op_rec = (struct generic_operation*)op_key;
|
|
return op_rec->op != 0;
|
|
}
|
|
|
|
|
|
/*
|
|
* pj_ioqueue_post_completion()
|
|
*/
|
|
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
|
|
pj_ioqueue_op_key_t *op_key,
|
|
pj_ssize_t bytes_status )
|
|
{
|
|
struct generic_operation *op_rec;
|
|
|
|
/*
|
|
* Find the operation key in all pending operation list to
|
|
* really make sure that it's still there; then call the callback.
|
|
*/
|
|
pj_mutex_lock(key->mutex);
|
|
|
|
/* Find the operation in the pending read list. */
|
|
op_rec = (struct generic_operation*)key->read_list.next;
|
|
while (op_rec != (void*)&key->read_list) {
|
|
if (op_rec == (void*)op_key) {
|
|
pj_list_erase(op_rec);
|
|
op_rec->op = 0;
|
|
pj_mutex_unlock(key->mutex);
|
|
|
|
(*key->cb.on_read_complete)(key, op_key, bytes_status);
|
|
return PJ_SUCCESS;
|
|
}
|
|
op_rec = op_rec->next;
|
|
}
|
|
|
|
/* Find the operation in the pending write list. */
|
|
op_rec = (struct generic_operation*)key->write_list.next;
|
|
while (op_rec != (void*)&key->write_list) {
|
|
if (op_rec == (void*)op_key) {
|
|
pj_list_erase(op_rec);
|
|
op_rec->op = 0;
|
|
pj_mutex_unlock(key->mutex);
|
|
|
|
(*key->cb.on_write_complete)(key, op_key, bytes_status);
|
|
return PJ_SUCCESS;
|
|
}
|
|
op_rec = op_rec->next;
|
|
}
|
|
|
|
/* Find the operation in the pending accept list. */
|
|
op_rec = (struct generic_operation*)key->accept_list.next;
|
|
while (op_rec != (void*)&key->accept_list) {
|
|
if (op_rec == (void*)op_key) {
|
|
pj_list_erase(op_rec);
|
|
op_rec->op = 0;
|
|
pj_mutex_unlock(key->mutex);
|
|
|
|
(*key->cb.on_accept_complete)(key, op_key,
|
|
PJ_INVALID_SOCKET,
|
|
bytes_status);
|
|
return PJ_SUCCESS;
|
|
}
|
|
op_rec = op_rec->next;
|
|
}
|
|
|
|
pj_mutex_unlock(key->mutex);
|
|
|
|
return PJ_EINVALIDOP;
|
|
}
|
|
|