wepoll-source-code-analysis

Wepoll Source code analysis

For GSoc’s tokio project.

Fundamental data struct

queue

Simple bidirectional queue with a dummy node to simplify code as default.

1
2
3
4
5
6
7
8
typedef struct queue_node {
queue_node_t* prev;
queue_node_t* next;
} queue_node_t;

typedef struct queue {
queue_node_t head;
} queue_t;

Wepoll has implemented api to add new node to front and back of existed queue in O(1) time.

ts_tree

Wepoll implement red black tree and provide thread-safe version of it. Thread-safe red black binary tree is used to manage port_state which related to certain IOCP port. The ts_tree_node is embedded in port_state and contained in a local ts_tree called epoll__handle_tree. It manage all IOCP port and provide access to associated port_state by looking for the HANDLE returned from CreateIoCompletionPort().

1
2
3
4
5
6
7
8
9
typedef struct ts_tree {
tree_t tree;
SRWLOCK lock;
} ts_tree_t;

typedef struct ts_tree_node {
tree_node_t tree_node;
reflock_t reflock; // Reference counted
} ts_tree_node_t;

The ts_tree is guarded by SRWLOCK to control read/write of it. And ts_tree_node is managed by reflock to control its lifetime. Under normal operation, threads increase and decrease the reference count, which are wait-free operations. The reflock normally prevents a chunk of memory from being freed, but does allow the chunk of memory to eventually be released in a coordinated fashion.

epoll_create()

The epoll_create() and epoll_create1 ignore their variables and call epoll__create. In epoll__create, it create a new port by calling CreateIoCompletionPort, initialing port_state struct and adding this new port_state to epoll__handle_tree.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
port_state_t* port_new(HANDLE* iocp_out) {
port_state_t* port_state;
HANDLE iocp;

port_state = port__alloc();
iocp = port__create_iocp(); //call CreateIoCompletionPort() to complete its work
memset(port_state, 0, sizeof *port_state);

port_state->iocp = iocp;
tree_init(&port_state->sock_tree);
queue_init(&port_state->sock_update_queue);
queue_init(&port_state->sock_deleted_queue);
queue_init(&port_state->poll_group_queue);
//The ts_tree_node struct embedded
ts_tree_node_init(&port_state->handle_tree_node);
//This port_state can be acquired with pointer of handle_tree_node
//Since the offset of head of port_state to handle_tree_node is fixed
//The address of port_state can be calculated from this:
//address_of_port_state = pointer_to_handle_tree_node - offset
InitializeCriticalSection(&port_state->lock);

*iocp_out = iocp;
return port_state;
}

The port_new function take care of port_state initialization.

epoll_ctl()

First, epoll_ctl find the port_state specified by HANDLE and increase the reference count associated to it.

Second, enter the critical section and compplete the work according to op:

  • EPOLL_CTL_ADD
    The port__ctl_add create a new sock_state and get ws_base_socket & poll_group for new sock_state, then add it to the port_state‘s tree struct to manage it.

  • EPOLL_CTL_MOD
    The port__ctl_mod get associated sock_state, set event on it, and add it to the sock_update_queue waiting for updates. Then called port__update_events_if_polling

  • EPOLL_CTL_DEL
    The port__ctl_del get sock_state in port_state by SOCKET, and delete this sock_state. If this socket’s polling request is still pending, cancel it:

1
2
3
4
5
6
/* CancelIoEx() may fail with ERROR_NOT_FOUND if the overlapped operation has
* already completed. This is not a problem and we proceed normally. */
if (!HasOverlappedIoCompleted(&sock_state->overlapped) &&
!CancelIoEx(afd_helper_handle, &sock_state->overlapped) &&
GetLastError() != ERROR_NOT_FOUND)
return_map_error(-1);

And remove it from port_state‘s update queue and tree struct managing related sock_state. If the poll request still needs to complete, the sock_state object can’t be free()d yet. So it’s added to port_state deleted socket queue.

epoll_wait()

First, epoll_ctl find the port_state specified by HANDLE and increase the reference count associated to it.

Second, choose the appropriate timeout and location for storing iocp_event(on stack or on heap). Then begin the loop, dequeue completion packets until either at least one interesting event has been discovered, or the timeout is reached.

In detail, it update all sock_state in port_state‘s update_queue in the proper way. And In detail, it update all sock_state in port_state‘s update_queue in the proper way. And then waits for pending I/O operations that are associated with the specified completion port to complete by GetQueuedCompletionStatusEx. Store the iocp events returned in the location decided before. Iterate each event and process it by calling sock_feed_event:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
int sock_feed_event(port_state_t* port_state,
OVERLAPPED* overlapped,
struct epoll_event* ev) {
sock_state_t* sock_state =
container_of(overlapped, sock_state_t, overlapped);
AFD_POLL_INFO* poll_info = &sock_state->poll_info;
uint32_t epoll_events = 0;

sock_state->poll_status = SOCK__POLL_IDLE;
sock_state->pending_events = 0;

if (sock_state->delete_pending) {
/* Socket has been deleted earlier and can now be freed. */
return sock__delete(port_state, sock_state, false);

} else if ((NTSTATUS) overlapped->Internal == STATUS_CANCELLED) {
/* The poll request was cancelled by CancelIoEx. */

} else if (!NT_SUCCESS(overlapped->Internal)) {
/* The overlapped request itself failed in an unexpected way. */
epoll_events = EPOLLERR;

} else if (poll_info->NumberOfHandles < 1) {
/* This poll operation succeeded but didn't report any socket events. */

} else if (poll_info->Handles[0].Events & AFD_POLL_LOCAL_CLOSE) {
/* The poll operation reported that the socket was closed. */
return sock__delete(port_state, sock_state, false);

} else {
/* Events related to our socket were reported. */
epoll_events =
sock__afd_events_to_epoll_events(poll_info->Handles[0].Events);
}

/* Requeue the socket so a new poll request will be submitted. */
port_request_socket_update(port_state, sock_state);

/* Filter out events that the user didn't ask for. */
epoll_events &= sock_state->user_events;

/* Return if there are no epoll events to report. */
if (epoll_events == 0)
return 0;

/* If the the socket has the EPOLLONESHOT flag set, unmonitor all events,
* even EPOLLERR and EPOLLHUP. But always keep looking for closed sockets. */
if (sock_state->user_events & EPOLLONESHOT)
sock_state->user_events = 0;

ev->data = sock_state->user_data;
ev->events = epoll_events;
return 1;
}

If EPOLLONESHOT is set, clear all events flag.

Finally, if there’s still polling, update events in port_state‘s update_queue.

epoll_close()

First, epoll_close find the port_state specified by HANDLE and increase the reference count associated to it. And close the IOCP port associated with itself.

Then, force delete all sock_state in both sock_tree and sock_deleted_queue. And clear all poll_group in sock_state and the afd handler in them as well.