Update ogs_pollset library

- Fix the ogs_pollset_remove() bug in select(WIN32), kqueue(MacOSX)
- Modify to enable ogs_pollset_remove() in pollset callback handler
This commit is contained in:
Sukchan Lee 2020-11-09 22:57:14 -05:00
parent 830587a250
commit 23e8e6577c
5 changed files with 259 additions and 5 deletions

View File

@ -137,6 +137,7 @@ static int kqueue_add(ogs_poll_t *poll)
return kqueue_set(poll, filter, EV_ADD|EV_ENABLE);
}
#if 0 /* ogs_pollset_remove() is not working, SHOULD remove the below code */
static int kqueue_remove(ogs_poll_t *poll)
{
ogs_pollset_t *pollset = NULL;
@ -165,6 +166,22 @@ static int kqueue_remove(ogs_poll_t *poll)
return OGS_OK;
}
#else /* New approach : ogs_pollset_remove() is properly working. */
static int kqueue_remove(ogs_poll_t *poll)
{
int filter = 0;
if (poll->when & OGS_POLLIN) {
filter = EVFILT_READ;
}
if (poll->when & OGS_POLLOUT) {
filter = EVFILT_WRITE;
}
return kqueue_set(poll, filter, EV_DELETE);
}
#endif
static int kqueue_process(ogs_pollset_t *pollset, ogs_time_t timeout)
{
@ -187,6 +204,9 @@ static int kqueue_process(ogs_pollset_t *pollset, ogs_time_t timeout)
n = kevent(context->kqueue,
context->change_list, context->nchanges,
context->event_list, context->nevents, tp);
context->nchanges = 0;
if (n < 0) {
ogs_log_message(OGS_LOG_ERROR, ogs_socket_errno, "kqueue failed");
return OGS_ERROR;

View File

@ -27,6 +27,8 @@ extern const ogs_pollset_actions_t ogs_kqueue_actions;
extern const ogs_pollset_actions_t ogs_epoll_actions;
extern const ogs_pollset_actions_t ogs_select_actions;
static void *self_handler_data = NULL;
ogs_pollset_actions_t ogs_pollset_actions;
bool ogs_pollset_actions_initialized = false;
@ -87,7 +89,11 @@ ogs_poll_t *ogs_pollset_add(ogs_pollset_t *pollset, short when,
poll->when = when;
poll->fd = fd;
poll->handler = handler;
poll->data = data;
if (data == &self_handler_data)
poll->data = poll;
else
poll->data = data;
poll->pollset = pollset;
@ -117,3 +123,8 @@ void ogs_pollset_remove(ogs_poll_t *poll)
ogs_pool_free(&pollset->pool, poll);
}
void *ogs_pollset_self_handler_data(void)
{
return &self_handler_data;
}

View File

@ -40,6 +40,8 @@ ogs_poll_t *ogs_pollset_add(ogs_pollset_t *pollset, short when,
ogs_socket_t fd, ogs_poll_handler_f handler, void *data);
void ogs_pollset_remove(ogs_poll_t *poll);
void *ogs_pollset_self_handler_data(void);
typedef struct ogs_pollset_actions_s {
void (*init)(ogs_pollset_t *pollset);
void (*cleanup)(ogs_pollset_t *pollset);

View File

@ -139,7 +139,7 @@ static int select_remove(ogs_poll_t *poll)
static int select_process(ogs_pollset_t *pollset, ogs_time_t timeout)
{
struct select_context_s *context = NULL;
ogs_poll_t *poll = NULL;
ogs_poll_t *poll = NULL, *next_poll = NULL;
int rc;
struct timeval tv, *tp;
@ -181,13 +181,15 @@ static int select_process(ogs_pollset_t *pollset, ogs_time_t timeout)
return OGS_TIMEUP;
}
ogs_list_for_each(&context->list, poll) {
ogs_list_for_each_safe(&context->list, next_poll, poll) {
short when = 0;
if (FD_ISSET(poll->fd, &context->work_read_fd_set)) {
if ((poll->when & OGS_POLLIN) &&
FD_ISSET(poll->fd, &context->work_read_fd_set)) {
when |= OGS_POLLIN;
}
if (FD_ISSET(poll->fd, &context->work_write_fd_set)) {
if ((poll->when & OGS_POLLOUT) &&
FD_ISSET(poll->fd, &context->work_write_fd_set)) {
when |= OGS_POLLOUT;
}

View File

@ -435,6 +435,224 @@ static void test7_func(abts_case *tc, void *data)
*/
}
static ogs_socknode_t *test8_server;
static ogs_socknode_t *test8_client1, *test8_client2, *test8_client3;
static ogs_sock_t *test8_accept1, *test8_accept2, *test8_accept3;
static int test8_okay = 1;
static void test8_handler(short when, ogs_socket_t fd, void *data)
{
test8_okay++;
}
static void test8_handler_with_remove(short when, ogs_socket_t fd, void *data)
{
ogs_poll_t *poll = data;
ogs_assert(when == OGS_POLLOUT);
ogs_assert(poll);
ogs_pollset_remove(poll);
test8_okay++;
}
static void test8_func(abts_case *tc, void *data)
{
int rv;
ogs_poll_t *write1, *write2, *write3;
ogs_sockaddr_t *addr;
ogs_pollset_t *pollset = ogs_pollset_create(512);
ABTS_PTR_NOTNULL(tc, pollset);
rv = ogs_getaddrinfo(&addr, AF_INET, "127.0.0.1", PORT, AI_PASSIVE);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
test8_server = ogs_socknode_new(addr);
ABTS_PTR_NOTNULL(tc, test8_server);
ogs_tcp_server(test8_server);
ABTS_PTR_NOTNULL(tc, test8_server->sock);
rv = ogs_getaddrinfo(&addr, AF_INET, "127.0.0.1", PORT, AI_PASSIVE);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
test8_client1 = ogs_socknode_new(addr);
ABTS_PTR_NOTNULL(tc, test8_client1);
ogs_tcp_client(test8_client1);
ABTS_PTR_NOTNULL(tc, test8_client1->sock);
test8_accept1 = ogs_sock_accept(test8_server->sock);
ABTS_PTR_NOTNULL(tc, test8_accept1);
rv = ogs_getaddrinfo(&addr, AF_INET, "127.0.0.1", PORT, AI_PASSIVE);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
test8_client2 = ogs_socknode_new(addr);
ABTS_PTR_NOTNULL(tc, test8_client2);
ogs_tcp_client(test8_client2);
ABTS_PTR_NOTNULL(tc, test8_client2->sock);
test8_accept2 = ogs_sock_accept(test8_server->sock);
ABTS_PTR_NOTNULL(tc, test8_accept2);
rv = ogs_getaddrinfo(&addr, AF_INET, "127.0.0.1", PORT, AI_PASSIVE);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
test8_client3 = ogs_socknode_new(addr);
ABTS_PTR_NOTNULL(tc, test8_client3);
ogs_tcp_client(test8_client3);
ABTS_PTR_NOTNULL(tc, test8_client3->sock);
test8_accept3 = ogs_sock_accept(test8_server->sock);
ABTS_PTR_NOTNULL(tc, test8_accept3);
write1 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept1->fd, test8_handler, NULL);
ABTS_PTR_NOTNULL(tc, write1);
write2 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept2->fd, test8_handler, NULL);
ABTS_PTR_NOTNULL(tc, write2);
write3 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept3->fd, test8_handler, NULL);
ABTS_PTR_NOTNULL(tc, write3);
rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
ABTS_INT_EQUAL(tc, 4, test8_okay);
ogs_pollset_remove(write1);
ogs_pollset_remove(write2);
ogs_pollset_remove(write3);
write2 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept2->fd, test8_handler, NULL);
ABTS_PTR_NOTNULL(tc, write2);
write3 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept3->fd, test8_handler, NULL);
ABTS_PTR_NOTNULL(tc, write3);
rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
ABTS_INT_EQUAL(tc, 6, test8_okay);
ogs_pollset_remove(write2);
ogs_pollset_remove(write3);
write3 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept3->fd, test8_handler, NULL);
ABTS_PTR_NOTNULL(tc, write3);
rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
ABTS_INT_EQUAL(tc, 7, test8_okay);
ogs_pollset_remove(write3);
write1 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept1->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write1);
write2 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept2->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write2);
write3 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept3->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write3);
rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
ABTS_INT_EQUAL(tc, 10, test8_okay);
write1 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept1->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write1);
write2 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept2->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write2);
rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
ABTS_INT_EQUAL(tc, 12, test8_okay);
write1 = ogs_pollset_add(pollset, OGS_POLLOUT,
test8_accept1->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write1);
rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
ABTS_INT_EQUAL(tc, 13, test8_okay);
write1 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT,
test8_accept1->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write1);
write2 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT,
test8_accept2->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write2);
write3 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT,
test8_accept3->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write3);
rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
ABTS_INT_EQUAL(tc, 16, test8_okay);
write2 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT,
test8_accept2->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write2);
write3 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT,
test8_accept3->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write3);
rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
ABTS_INT_EQUAL(tc, 18, test8_okay);
write3 = ogs_pollset_add(pollset, OGS_POLLIN|OGS_POLLOUT,
test8_accept3->fd, test8_handler_with_remove,
ogs_pollset_self_handler_data());
ABTS_PTR_NOTNULL(tc, write3);
rv = ogs_pollset_poll(pollset, OGS_INFINITE_TIME);
ABTS_INT_EQUAL(tc, OGS_OK, rv);
ABTS_INT_EQUAL(tc, 19, test8_okay);
ogs_socknode_free(test8_client1);
ogs_sock_destroy(test8_accept1);
ogs_socknode_free(test8_client2);
ogs_sock_destroy(test8_accept2);
ogs_socknode_free(test8_client3);
ogs_sock_destroy(test8_accept3);
ogs_socknode_free(test8_server);
ogs_pollset_destroy(pollset);
}
abts_suite *test_poll(abts_suite *suite)
{
suite = ADD_SUITE(suite)
@ -449,6 +667,7 @@ abts_suite *test_poll(abts_suite *suite)
abts_run_test(suite, test5_func, NULL);
abts_run_test(suite, test6_func, NULL);
abts_run_test(suite, test7_func, NULL);
abts_run_test(suite, test8_func, NULL);
return suite;
}