diff --git a/lib/core/ogs-kqueue.c b/lib/core/ogs-kqueue.c index 2a9ee33ca..4660f1ec0 100644 --- a/lib/core/ogs-kqueue.c +++ b/lib/core/ogs-kqueue.c @@ -137,6 +137,7 @@ static int kqueue_add(ogs_poll_t *poll) return kqueue_set(poll, filter, EV_ADD|EV_ENABLE); } +#if 0 /* ogs_pollset_remove() is not working, SHOULD remove the below code */ static int kqueue_remove(ogs_poll_t *poll) { ogs_pollset_t *pollset = NULL; @@ -165,6 +166,22 @@ static int kqueue_remove(ogs_poll_t *poll) return OGS_OK; } +#else /* New approach : ogs_pollset_remove() is properly working. */ + +static int kqueue_remove(ogs_poll_t *poll) +{ + int filter = 0; + + if (poll->when & OGS_POLLIN) { + filter = EVFILT_READ; + } + if (poll->when & OGS_POLLOUT) { + filter = EVFILT_WRITE; + } + + return kqueue_set(poll, filter, EV_DELETE); +} +#endif static int kqueue_process(ogs_pollset_t *pollset, ogs_time_t timeout) { @@ -187,6 +204,9 @@ static int kqueue_process(ogs_pollset_t *pollset, ogs_time_t timeout) n = kevent(context->kqueue, context->change_list, context->nchanges, context->event_list, context->nevents, tp); + + context->nchanges = 0; + if (n < 0) { ogs_log_message(OGS_LOG_ERROR, ogs_socket_errno, "kqueue failed"); return OGS_ERROR; diff --git a/lib/core/ogs-poll.c b/lib/core/ogs-poll.c index d907abf00..3b39344c7 100644 --- a/lib/core/ogs-poll.c +++ b/lib/core/ogs-poll.c @@ -27,6 +27,8 @@ extern const ogs_pollset_actions_t ogs_kqueue_actions; extern const ogs_pollset_actions_t ogs_epoll_actions; extern const ogs_pollset_actions_t ogs_select_actions; +static void *self_handler_data = NULL; + ogs_pollset_actions_t ogs_pollset_actions; bool ogs_pollset_actions_initialized = false; @@ -87,7 +89,11 @@ ogs_poll_t *ogs_pollset_add(ogs_pollset_t *pollset, short when, poll->when = when; poll->fd = fd; poll->handler = handler; - poll->data = data; + + if (data == &self_handler_data) + poll->data = poll; + else + poll->data = data; poll->pollset = pollset; @@ -117,3 +123,8 @@ void ogs_pollset_remove(ogs_poll_t *poll) ogs_pool_free(&pollset->pool, poll); } + +void *ogs_pollset_self_handler_data(void) +{ + return &self_handler_data; +} diff --git a/lib/core/ogs-poll.h b/lib/core/ogs-poll.h index 8fee72134..e4261e628 100644 --- a/lib/core/ogs-poll.h +++ b/lib/core/ogs-poll.h @@ -40,6 +40,8 @@ ogs_poll_t *ogs_pollset_add(ogs_pollset_t *pollset, short when, ogs_socket_t fd, ogs_poll_handler_f handler, void *data); void ogs_pollset_remove(ogs_poll_t *poll); +void *ogs_pollset_self_handler_data(void); + typedef struct ogs_pollset_actions_s { void (*init)(ogs_pollset_t *pollset); void (*cleanup)(ogs_pollset_t *pollset); diff --git a/lib/core/ogs-select.c b/lib/core/ogs-select.c index cf5aa334e..4ae46be39 100644 --- a/lib/core/ogs-select.c +++ b/lib/core/ogs-select.c @@ -139,7 +139,7 @@ static int select_remove(ogs_poll_t *poll) static int select_process(ogs_pollset_t *pollset, ogs_time_t timeout) { struct select_context_s *context = NULL; - ogs_poll_t *poll = NULL; + ogs_poll_t *poll = NULL, *next_poll = NULL; int rc; struct timeval tv, *tp; @@ -181,13 +181,15 @@ static int select_process(ogs_pollset_t *pollset, ogs_time_t timeout) return OGS_TIMEUP; } - ogs_list_for_each(&context->list, poll) { + ogs_list_for_each_safe(&context->list, next_poll, poll) { short when = 0; - if (FD_ISSET(poll->fd, &context->work_read_fd_set)) { + if ((poll->when & OGS_POLLIN) && + FD_ISSET(poll->fd, &context->work_read_fd_set)) { when |= OGS_POLLIN; } - if (FD_ISSET(poll->fd, &context->work_write_fd_set)) { + if ((poll->when & OGS_POLLOUT) && + FD_ISSET(poll->fd, &context->work_write_fd_set)) { when |= OGS_POLLOUT; } diff --git a/tests/core/poll-test.c b/tests/core/poll-test.c index 676d3255a..97be85ab6 100644 --- a/tests/core/poll-test.c +++ b/tests/core/poll-test.c @@ -435,6 +435,224 @@ static void test7_func(abts_case *tc, void *data) */ } +static ogs_socknode_t *test8_server; +static ogs_socknode_t *test8_client1, *test8_client2, *test8_client3; +static ogs_sock_t *test8_accept1, *test8_accept2, *test8_accept3; +static int test8_okay = 1; + +static void test8_handler(short when, ogs_socket_t fd, void *data) +{ + test8_okay++; +} + +static void test8_handler_with_remove(short when, ogs_socket_t fd, void *data) +{ + ogs_poll_t *poll = data; + + ogs_assert(when == OGS_POLLOUT); + ogs_assert(poll); + ogs_pollset_remove(poll); + + test8_okay++; +} + +static void test8_func(abts_case *tc, void *data) +{ + int rv; + ogs_poll_t *write1, *write2, *write3; + ogs_sockaddr_t *addr; + ogs_pollset_t *pollset = ogs_pollset_create(512); + ABTS_PTR_NOTNULL(tc, pollset); + + rv = ogs_getaddrinfo(&addr, AF_INET, "127.0.0.1", PORT, AI_PASSIVE); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + test8_server = ogs_socknode_new(addr); + ABTS_PTR_NOTNULL(tc, test8_server); + ogs_tcp_server(test8_server); + ABTS_PTR_NOTNULL(tc, test8_server->sock); + + rv = ogs_getaddrinfo(&addr, AF_INET, "127.0.0.1", PORT, AI_PASSIVE); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + test8_client1 = ogs_socknode_new(addr); + ABTS_PTR_NOTNULL(tc, test8_client1); + ogs_tcp_client(test8_client1); + ABTS_PTR_NOTNULL(tc, test8_client1->sock); + + test8_accept1 = ogs_sock_accept(test8_server->sock); + ABTS_PTR_NOTNULL(tc, test8_accept1); + + rv = ogs_getaddrinfo(&addr, AF_INET, "127.0.0.1", PORT, AI_PASSIVE); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + test8_client2 = ogs_socknode_new(addr); + ABTS_PTR_NOTNULL(tc, test8_client2); + ogs_tcp_client(test8_client2); + ABTS_PTR_NOTNULL(tc, test8_client2->sock); + + test8_accept2 = ogs_sock_accept(test8_server->sock); + ABTS_PTR_NOTNULL(tc, test8_accept2); + + rv = ogs_getaddrinfo(&addr, AF_INET, "127.0.0.1", PORT, AI_PASSIVE); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + test8_client3 = ogs_socknode_new(addr); + ABTS_PTR_NOTNULL(tc, test8_client3); + ogs_tcp_client(test8_client3); + ABTS_PTR_NOTNULL(tc, test8_client3->sock); + + test8_accept3 = ogs_sock_accept(test8_server->sock); + ABTS_PTR_NOTNULL(tc, test8_accept3); + + write1 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept1->fd, test8_handler, NULL); + ABTS_PTR_NOTNULL(tc, write1); + + write2 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept2->fd, test8_handler, NULL); + ABTS_PTR_NOTNULL(tc, write2); + + write3 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept3->fd, test8_handler, NULL); + ABTS_PTR_NOTNULL(tc, write3); + + rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + + ABTS_INT_EQUAL(tc, 4, test8_okay); + + ogs_pollset_remove(write1); + ogs_pollset_remove(write2); + ogs_pollset_remove(write3); + + write2 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept2->fd, test8_handler, NULL); + ABTS_PTR_NOTNULL(tc, write2); + + write3 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept3->fd, test8_handler, NULL); + ABTS_PTR_NOTNULL(tc, write3); + + rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + + ABTS_INT_EQUAL(tc, 6, test8_okay); + + ogs_pollset_remove(write2); + ogs_pollset_remove(write3); + + write3 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept3->fd, test8_handler, NULL); + ABTS_PTR_NOTNULL(tc, write3); + + rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + + ABTS_INT_EQUAL(tc, 7, test8_okay); + + ogs_pollset_remove(write3); + + write1 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept1->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write1); + + write2 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept2->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write2); + + write3 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept3->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write3); + + rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + + ABTS_INT_EQUAL(tc, 10, test8_okay); + + write1 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept1->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write1); + + write2 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept2->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write2); + + rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + + ABTS_INT_EQUAL(tc, 12, test8_okay); + + write1 = ogs_pollset_add(pollset, OGS_POLLOUT, + test8_accept1->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write1); + + rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + + ABTS_INT_EQUAL(tc, 13, test8_okay); + + write1 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT, + test8_accept1->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write1); + + write2 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT, + test8_accept2->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write2); + + write3 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT, + test8_accept3->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write3); + + rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + + ABTS_INT_EQUAL(tc, 16, test8_okay); + + write2 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT, + test8_accept2->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write2); + + write3 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT, + test8_accept3->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write3); + + rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + + ABTS_INT_EQUAL(tc, 18, test8_okay); + + write3 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT, + test8_accept3->fd, test8_handler_with_remove, + ogs_pollset_self_handler_data()); + ABTS_PTR_NOTNULL(tc, write3); + + rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME); + ABTS_INT_EQUAL(tc, OGS_OK, rv); + + ABTS_INT_EQUAL(tc, 19, test8_okay); + + ogs_socknode_free(test8_client1); + ogs_sock_destroy(test8_accept1); + + ogs_socknode_free(test8_client2); + ogs_sock_destroy(test8_accept2); + + ogs_socknode_free(test8_client3); + ogs_sock_destroy(test8_accept3); + + ogs_socknode_free(test8_server); + + ogs_pollset_destroy(pollset); +} + abts_suite *test_poll(abts_suite *suite) { suite = ADD_SUITE(suite) @@ -449,6 +667,7 @@ abts_suite *test_poll(abts_suite *suite) abts_run_test(suite, test5_func, NULL); abts_run_test(suite, test6_func, NULL); abts_run_test(suite, test7_func, NULL); + abts_run_test(suite, test8_func, NULL); return suite; }