Skip to content

Commit ca56215

Browse files
committed
Refactor io handling
1 parent acebc32 commit ca56215

File tree

13 files changed

+513
-571
lines changed

13 files changed

+513
-571
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ CFLAGS = -B/usr/bin/ -Wall -Wextra --std=gnu11 -D_GNU_SOURCE
44
LFLAGS = -lpthread
55
SRC_FILES = proxy.c http.c log.c util.c tunnel_conn.c \
66
states/accepted.c states/connecting.c states/tunneling.c \
7-
lib/asyncaddrinfo/asyncaddrinfo.c
7+
lib/asyncaddrinfo/asyncaddrinfo.c \
8+
poll.c proxy_server.c
89
OUT_DIR = out
910
BIN = proxy
1011

http.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
#ifndef HTTPS_PROXY_HTTP_H
22
#define HTTPS_PROXY_HTTP_H
33

4-
#include "tunnel_conn.h"
5-
64
int parse_http_connect_message(char* message, char** host_parsed, char** port_parsed, char** http_version_parsed);
75

86
#endif // HTTPS_PROXY_HTTP_H

poll.c

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
#include "poll.h"
2+
#include <errno.h>
3+
#include <malloc.h>
4+
#include <sys/epoll.h>
5+
#include <unistd.h>
6+
7+
#define EPOLL_MAX_EVENTS 64
8+
9+
struct poll {
10+
int epoll_fd;
11+
};
12+
13+
struct poll* poll_create() {
14+
int epoll_fd = epoll_create1(0);
15+
if (epoll_fd < 0) {
16+
return NULL;
17+
}
18+
19+
struct poll* p = malloc(sizeof(struct poll));
20+
p->epoll_fd = epoll_fd;
21+
return p;
22+
}
23+
24+
void poll_destroy(struct poll* p) {
25+
close(p->epoll_fd);
26+
free(p);
27+
}
28+
29+
struct poll_task {
30+
void* data;
31+
bool one_shot;
32+
poll_callback callback;
33+
};
34+
35+
int poll_submit_event(
36+
struct poll* p,
37+
int fd,
38+
void* data,
39+
uint32_t base_events,
40+
bool one_shot,
41+
bool edge_triggered,
42+
poll_callback callback) {
43+
/*
44+
* FIXME:
45+
* since we only free the task when it's completed, submitting another task before the first one is completed
46+
* on the same fd results in `task` being leaked.
47+
*/
48+
struct poll_task* task = malloc(sizeof(struct poll_task));
49+
task->data = data;
50+
task->one_shot = one_shot;
51+
task->callback = callback;
52+
53+
struct epoll_event event;
54+
event.data.ptr = task;
55+
event.events = base_events;
56+
if (one_shot) {
57+
event.events |= EPOLLONESHOT;
58+
}
59+
if (edge_triggered) {
60+
event.events |= EPOLLET;
61+
}
62+
63+
// try `mod` first, then `add` if `mod` fails
64+
if (epoll_ctl(p->epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0) {
65+
if (errno != ENOENT) {
66+
free(task);
67+
return -1;
68+
}
69+
70+
if (epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, fd, &event) < 0) {
71+
free(task);
72+
return -1;
73+
}
74+
}
75+
76+
return 0;
77+
}
78+
79+
int poll_wait_for_readability(
80+
struct poll* p,
81+
int fd,
82+
void* data,
83+
bool one_shot,
84+
bool edge_triggered,
85+
poll_callback callback) {
86+
return poll_submit_event(p, fd, data, EPOLLIN, one_shot, edge_triggered, callback);
87+
}
88+
89+
int poll_wait_for_writability(
90+
struct poll* p,
91+
int fd,
92+
void* data,
93+
bool one_shot,
94+
bool edge_triggered,
95+
poll_callback callback) {
96+
return poll_submit_event(p, fd, data, EPOLLOUT, one_shot, edge_triggered, callback);
97+
}
98+
99+
int poll_run(struct poll* p) {
100+
struct epoll_event events[EPOLL_MAX_EVENTS];
101+
while (1) {
102+
int num_events = epoll_wait(p->epoll_fd, events, EPOLL_MAX_EVENTS, -1);
103+
if (num_events < 0) {
104+
return num_events;
105+
}
106+
107+
for (int i = 0; i < num_events; i++) {
108+
// TODO: need to check for EPOLLERR?
109+
struct poll_task* task = events[i].data.ptr;
110+
task->callback(p, task->data);
111+
// If it's one-shot, this task will not be used again.
112+
// Otherwise, subsequent notifications will return the same task pointer.
113+
if (task->one_shot) {
114+
free(task);
115+
}
116+
}
117+
}
118+
}

poll.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#ifndef HTTPS_PROXY_POLL_H
2+
#define HTTPS_PROXY_POLL_H
3+
4+
#include <stdbool.h>
5+
6+
struct poll;
7+
8+
struct poll* poll_create();
9+
void poll_destroy(struct poll* p);
10+
int poll_run(struct poll* p);
11+
12+
typedef void (*poll_callback)(struct poll* p, void* data);
13+
14+
int poll_wait_for_readability(
15+
struct poll* p,
16+
int fd,
17+
void* data,
18+
bool one_shot,
19+
bool edge_triggered,
20+
poll_callback callback);
21+
22+
int poll_wait_for_writability(
23+
struct poll* p,
24+
int fd,
25+
void* data,
26+
bool one_shot,
27+
bool edge_triggered,
28+
poll_callback callback);
29+
30+
#endif // HTTPS_PROXY_POLL_H

proxy.c

Lines changed: 33 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99
#include <unistd.h>
1010
#include "lib/asyncaddrinfo/asyncaddrinfo.h"
1111
#include "log.h"
12+
#include "poll.h"
13+
#include "proxy_server.h"
1214
#include "states/epoll_cb.h"
1315
#include "util.h"
1416

1517
#define CONNECT_BACKLOG 512
16-
#define EPOLL_MAX_EVENTS 64
1718
#define DEFAULT_THREAD_COUNT 8
1819
#define MAX_BLACKLIST_LEN 100
1920

@@ -39,75 +40,37 @@ int create_bind_listen(unsigned short port) {
3940
return listening_socket;
4041
}
4142

42-
struct event_loop_args {
43+
struct connection_thread_args {
4344
unsigned short thread_id;
44-
bool telemetry_enabled;
45-
int listening_socket;
46-
char** blacklist;
47-
int blacklist_len;
45+
struct proxy_server* config;
4846
};
4947

50-
void handle_connections_in_event_loop(struct event_loop_args* args) {
51-
int epoll_fd = epoll_create1(0);
52-
if (epoll_fd < 0) {
53-
die(hsprintf("failed to create epoll instance: %s", errno2s(errno)));
48+
void handle_connections(struct proxy_server* server) {
49+
struct poll* p = poll_create();
50+
if (p == NULL) {
51+
die(hsprintf("failed to create poll instance: %s", errno2s(errno)));
5452
}
5553

56-
struct epoll_event event, events[EPOLL_MAX_EVENTS];
57-
58-
// Configure `event.data.ptr` to be NULL when there are events on listening socket
59-
event.data.ptr = NULL;
6054
// Since we will call `accept4` until there are no more incoming connections,
61-
// and edge-triggered is more efficient than level-triggered,
62-
// we can register edge-triggered notification for read events on the listening socket
63-
event.events = EPOLLIN | EPOLLET;
64-
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, args->listening_socket, &event) < 0) {
65-
die(hsprintf("failed to add listening socket %d into epoll: %s", args->listening_socket, errno2s(errno)));
55+
// we can register edge-triggered notification for read events on the listening socket.
56+
// Edge-triggered is more efficient than level-triggered.
57+
if (poll_wait_for_readability(
58+
p, server->listening_socket, server, false, true, (poll_callback)accept_incoming_connections) < 0) {
59+
die(hsprintf("failed to register readability notification for listening socket: %s", errno2s(errno)));
6660
}
6761

68-
// event loop
69-
while (1) {
70-
int num_events = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, -1);
71-
DEBUG_LOG("epoll_wait returned %d", num_events);
72-
if (num_events < 0) {
73-
die(hsprintf("epoll_wait error: %s", errno2s(errno)));
74-
}
75-
76-
for (int i = 0; i < num_events; i++) {
77-
if (events[i].data.ptr == NULL) {
78-
// events on listening socket
79-
if (events[i].events & EPOLLERR) {
80-
DEBUG_LOG("epoll reported error on listening socket");
81-
}
82-
if (!(events[i].events & EPOLLIN)) {
83-
DEBUG_LOG("listening socket is not readable but epoll woke us up anyway");
84-
continue;
85-
}
86-
accept_incoming_connections(
87-
epoll_fd, args->listening_socket, args->telemetry_enabled, args->blacklist, args->blacklist_len);
88-
} else {
89-
// events on existing connection
90-
struct epoll_cb* cb = events[i].data.ptr;
91-
if (cb->type == cb_type_accepted) {
92-
handle_accepted_cb(epoll_fd, (struct epoll_accepted_cb*)cb, events[i].events);
93-
} else if (cb->type == cb_type_connecting) {
94-
handle_connecting_cb(epoll_fd, (struct epoll_connecting_cb*)cb, events[i].events);
95-
} else if (cb->type == cb_type_tunneling) {
96-
handle_tunneling_cb(epoll_fd, (struct epoll_tunneling_cb*)cb, events[i].events);
97-
}
98-
}
99-
}
62+
// start the event loop and run until termination
63+
if (poll_run(p) < 0) {
64+
die(hsprintf("poll_run returned error: %s", errno2s(errno)));
10065
}
10166

102-
if (close(epoll_fd) < 0) {
103-
die(hsprintf("failed to close epoll instance: %s", errno2s(errno)));
104-
}
67+
poll_destroy(p);
10568
}
10669

107-
void* handle_connections_in_event_loop_pthread_wrapper(void* raw_args) {
108-
struct event_loop_args* args = raw_args;
70+
void* handle_connections_pthread_wrapper(void* raw_args) {
71+
struct connection_thread_args* args = raw_args;
10972
thread_id__ = args->thread_id; // for logging purpose
110-
handle_connections_in_event_loop(args);
73+
handle_connections(args->config);
11174
return NULL;
11275
}
11376

@@ -200,33 +163,37 @@ int main(int argc, char** argv) {
200163
printf("- number of connection threads: %hu\n", connection_threads);
201164
printf("- number of async addrinfo (DNS) threads: %hu\n", asyncaddrinfo_threads);
202165

203-
int listening_socket = create_bind_listen(listening_port);
204-
205166
// start the addr info lookup threads
206167
asyncaddrinfo_init(asyncaddrinfo_threads);
207168

208169
// start the connection threads
209-
struct event_loop_args args_list[connection_threads];
170+
// TODO: should this struct be server or config?
171+
int listening_socket = create_bind_listen(listening_port);
172+
struct proxy_server server = {
173+
.listening_socket = listening_socket,
174+
.telemetry_enabled = telemetry_enabled,
175+
.blacklist = blacklist,
176+
.blacklist_len = blacklist_len,
177+
};
178+
179+
struct connection_thread_args args_list[connection_threads];
210180
for (int i = 0; i < connection_threads; i++) {
211-
args_list[i].listening_socket = listening_socket;
212-
args_list[i].telemetry_enabled = telemetry_enabled;
213181
args_list[i].thread_id = i;
214-
args_list[i].blacklist = blacklist;
215-
args_list[i].blacklist_len = blacklist_len;
182+
args_list[i].config = &server;
216183
}
217184

218185
pthread_t workers[connection_threads - 1];
219186
for (int i = 0; i < connection_threads - 1; i++) {
220187
// child threads will have id from 1 onwards
221188
// the main thread will be thread 0
222-
if (0 != pthread_create(&workers[i], NULL, handle_connections_in_event_loop_pthread_wrapper, &args_list[i + 1])) {
189+
if (0 != pthread_create(&workers[i], NULL, handle_connections_pthread_wrapper, &args_list[i + 1])) {
223190
die(hsprintf("error creating thread %d: %s", i + 1, errno2s(errno)));
224191
}
225192
}
226193

227194
printf("Accepting requests\n");
228195
// run another event loop on the main thread
229-
handle_connections_in_event_loop_pthread_wrapper(&args_list[0]);
196+
handle_connections_pthread_wrapper(&args_list[0]);
230197

231198
// We will never reach here, the cleanup code below is just for completeness' sake
232199

proxy_server.c

Whitespace-only changes.

proxy_server.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#ifndef HTTPS_PROXY_PROXY_SERVER_H
2+
#define HTTPS_PROXY_PROXY_SERVER_H
3+
4+
#include <stdbool.h>
5+
6+
struct proxy_server {
7+
int listening_socket;
8+
bool telemetry_enabled;
9+
char** blacklist;
10+
int blacklist_len;
11+
};
12+
13+
#endif // HTTPS_PROXY_PROXY_SERVER_H

0 commit comments

Comments
 (0)