Ticket #622: initial integration of ioqueue_epoll patch, updated the configure script

git-svn-id: https://svn.pjsip.org/repos/pjproject/trunk@2295 74dad513-b988-da41-8d7b-12977e46ad98
This commit is contained in:
Benny Prijono 2008-09-18 21:22:16 +00:00
parent 6ba5f59841
commit 99eec385bc
3 changed files with 5460 additions and 5298 deletions

10526
aconfigure

File diff suppressed because it is too large Load Diff

View File

@ -322,13 +322,33 @@ AC_COMPILE_IFELSE([AC_LANG_PROGRAM([#include <pthread.h>],
AC_MSG_RESULT(yes)],
[AC_MSG_RESULT(no)])
dnl ######################
dnl # ioqueue selection
dnl #
AC_SUBST(ac_os_objs)
AC_MSG_CHECKING([ioqueue backend])
AC_ARG_ENABLE(epoll,
AC_HELP_STRING([--enable-epoll],
[Use /dev/epoll ioqueue on Linux (beta)]),
[
ac_os_objs=ioqueue_epoll.o
AC_MSG_RESULT([/dev/epoll])
],
[
ac_os_objs=ioqueue_select.o
AC_MSG_RESULT([select()])
])
dnl ######################
dnl # OS specific files
dnl #
case $target in
*mingw* | *cygw* | *win32* | *w32* )
ac_os_objs="file_access_win32.o file_io_win32.o os_core_win32.o os_error_win32.o os_time_win32.o os_timestamp_win32.o guid_win32.o ioqueue_select.o"
ac_os_objs="$ac_os_objs file_access_win32.o file_io_win32.o os_core_win32.o os_error_win32.o os_time_win32.o os_timestamp_win32.o guid_win32.o"
;;
*)
ac_os_objs="file_access_unistd.o file_io_ansi.o os_core_unix.o os_error_unix.o os_time_unix.o os_timestamp_posix.o ioqueue_select.o"
ac_os_objs="$ac_os_objs file_access_unistd.o file_io_ansi.o os_core_unix.o os_error_unix.o os_time_unix.o os_timestamp_posix.o"
# UUID
if test "$ac_has_uuid_lib" = "1" -a "$ac_has_uuid_h" = "1"; then
ac_os_objs="$ac_os_objs guid_uuid.o"

View File

@ -175,6 +175,13 @@ struct pj_ioqueue_t
int epfd;
struct epoll_event *events;
struct queue *queue;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
pj_mutex_t *ref_cnt_mutex;
pj_ioqueue_key_t active_list;
pj_ioqueue_key_t closing_list;
pj_ioqueue_key_t free_list;
#endif
};
/* Include implementation for common abstraction after we declare
@ -182,6 +189,11 @@ struct pj_ioqueue_t
*/
#include "ioqueue_common_abs.c"
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Scan closing keys to be put to free list again */
static void scan_closing_keys(pj_ioqueue_t *ioqueue);
#endif
/*
* pj_ioqueue_name()
*/
@ -206,6 +218,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_ioqueue_t *ioqueue;
pj_status_t rc;
pj_lock_t *lock;
int i;
/* Check that arguments are valid. */
PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
@ -223,6 +236,46 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
ioqueue->count = 0;
pj_list_init(&ioqueue->hlist);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* When safe unregistration is used (the default), we pre-create
* all keys and put them in the free list.
*/
/* Mutex to protect key's reference counter
* We don't want to use key's mutex or ioqueue's mutex because
* that would create deadlock situation in some cases.
*/
rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
if (rc != PJ_SUCCESS)
return rc;
/* Init key list */
pj_list_init(&ioqueue->free_list);
pj_list_init(&ioqueue->closing_list);
/* Pre-create all keys according to max_fd */
for ( i=0; i<max_fd; ++i) {
pj_ioqueue_key_t *key;
key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
key->ref_count = 0;
rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
if (rc != PJ_SUCCESS) {
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
pj_mutex_destroy(key->mutex);
key = key->next;
}
pj_mutex_destroy(ioqueue->ref_cnt_mutex);
return rc;
}
pj_list_push_back(&ioqueue->free_list, key);
}
#endif
rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
if (rc != PJ_SUCCESS)
return rc;
@ -256,12 +309,37 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
*/
PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
{
pj_ioqueue_key_t *key;
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP);
pj_lock_acquire(ioqueue->lock);
os_close(ioqueue->epfd);
ioqueue->epfd = 0;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Destroy reference counters */
key = ioqueue->active_list.next;
while (key != &ioqueue->active_list) {
pj_mutex_destroy(key->mutex);
key = key->next;
}
key = ioqueue->closing_list.next;
while (key != &ioqueue->closing_list) {
pj_mutex_destroy(key->mutex);
key = key->next;
}
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
pj_mutex_destroy(key->mutex);
key = key->next;
}
pj_mutex_destroy(ioqueue->ref_cnt_mutex);
#endif
return ioqueue_destroy(ioqueue);
}
@ -303,8 +381,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
goto on_return;
}
/* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
* the key from the free list. Otherwise allocate a new one.
*/
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Scan closing_keys first to let them come back to free_list */
scan_closing_keys(ioqueue);
pj_assert(!pj_list_empty(&ioqueue->free_list));
if (pj_list_empty(&ioqueue->free_list)) {
rc = PJ_ETOOMANY;
goto on_return;
}
key = ioqueue->free_list.next;
pj_list_erase(key);
#else
/* Create key. */
key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
#endif
rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
if (rc != PJ_SUCCESS) {
key = NULL;
@ -312,12 +409,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
}
/* Create key's mutex */
rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
/* rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
if (rc != PJ_SUCCESS) {
key = NULL;
goto on_return;
}
*/
/* os_epoll_ctl. */
ev.events = EPOLLIN | EPOLLERR;
ev.epoll_data = (epoll_data_type)key;
@ -345,6 +442,41 @@ on_return:
return rc;
}
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Increment key's reference counter */
static void increment_counter(pj_ioqueue_key_t *key)
{
pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
++key->ref_count;
pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
}
/* Decrement the key's reference counter, and when the counter reach zero,
* destroy the key.
*
* Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
*/
static void decrement_counter(pj_ioqueue_key_t *key)
{
pj_lock_acquire(key->ioqueue->lock);
pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
--key->ref_count;
if (key->ref_count == 0) {
pj_assert(key->closing == 1);
pj_gettimeofday(&key->free_time);
key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
pj_time_val_normalize(&key->free_time);
pj_list_erase(key);
pj_list_push_back(&key->ioqueue->closing_list, key);
}
pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
pj_lock_release(key->ioqueue->lock);
}
#endif
/*
* pj_ioqueue_unregister()
*
@ -363,7 +495,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
pj_assert(ioqueue->count > 0);
--ioqueue->count;
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
pj_list_erase(key);
#endif
ev.events = 0;
ev.epoll_data = (epoll_data_type)key;
@ -374,11 +508,24 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
return rc;
}
pj_lock_release(ioqueue->lock);
/* Destroy the key. */
pj_sock_close(key->fd);
pj_lock_release(ioqueue->lock);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Mark key is closing. */
key->closing = 1;
/* Decrement counter. */
decrement_counter(key);
/* Done. */
pj_mutex_unlock(key->mutex);
#else
pj_mutex_destroy(key->mutex);
#endif
return PJ_SUCCESS;
}
@ -420,6 +567,29 @@ static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
}
}
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Scan closing keys to be put to free list again */
static void scan_closing_keys(pj_ioqueue_t *ioqueue)
{
pj_time_val now;
pj_ioqueue_key_t *h;
pj_gettimeofday(&now);
h = ioqueue->closing_list.next;
while (h != &ioqueue->closing_list) {
pj_ioqueue_key_t *next = h->next;
pj_assert(h->closing != 0);
if (PJ_TIME_VAL_GTE(now, h->free_time)) {
pj_list_erase(h);
pj_list_push_back(&ioqueue->free_list, h);
}
h = next;
}
}
#endif
/*
* pj_ioqueue_poll()
*
@ -441,6 +611,16 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec);
if (count == 0) {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check the closing keys only when there's no activity and when there are
* pending closing keys.
*/
if (count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
pj_lock_acquire(ioqueue->lock);
scan_closing_keys(ioqueue);
pj_lock_release(ioqueue->lock);
}
#endif
TRACE_((THIS_FILE, "os_epoll_wait timed out"));
return count;
}
@ -467,6 +647,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
*/
if ((events[i].events & EPOLLIN) &&
(key_has_pending_read(h) || key_has_pending_accept(h))) {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
queue[processed].key = h;
queue[processed].event_type = READABLE_EVENT;
++processed;
@ -476,6 +660,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
* Check for writeability.
*/
if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
queue[processed].key = h;
queue[processed].event_type = WRITEABLE_EVENT;
++processed;
@ -486,6 +674,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
* Check for completion of connect() operation.
*/
if ((events[i].events & EPOLLOUT) && (h->connecting)) {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
queue[processed].key = h;
queue[processed].event_type = WRITEABLE_EVENT;
++processed;
@ -496,6 +688,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
* Check for error condition.
*/
if (events[i].events & EPOLLERR && (h->connecting)) {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
queue[processed].key = h;
queue[processed].event_type = EXCEPTION_EVENT;
++processed;
@ -519,6 +715,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
pj_assert(!"Invalid event!");
break;
}
#if PJ_IOQUEUE_HAS_SAFE_UNREG
decrement_counter(queue[i].key);
#endif
}
/* Special case: