diff --git a/neco.c b/neco.c index b8917e9..b234e38 100644 --- a/neco.c +++ b/neco.c @@ -17,7 +17,8 @@ NECO_MAXCAP // Max stack_group capacity NECO_GAPSIZE // Size of gap (guard) pages NECO_SIGSTKSZ // Size of signal stack on main thread NECO_BURST // Number of read attempts before waiting, def: disabled -NECO_MAXWORKER // Max number of worker threads, def: 2 +NECO_MAXWORKERS // Max number of worker threads, def: 64 +NECO_MAXIOWORKERS // Max number of io threads, def: 2 // Additional options that activate features @@ -27,7 +28,6 @@ NECO_NOSTACKFREELIST // Do not use a stack free list NECO_NOPOOL // Do not use a coroutine and channel pools NECO_USEHEAPSTACK // Allocate stacks on the heap using malloc NECO_NOSIGNALS // Disable signal handling -NECO_USEROUNDROBINPIN // Use Round-robin when pinning background work NECO_NOWORKERS // Disable all worker threads, work will run in coroutine */ @@ -41,7 +41,7 @@ NECO_NOWORKERS // Disable all worker threads, work will run in coroutine #define DEF_BURST -1 #define NECO_USEHEAPSTACK #define NECO_NOSIGNALS -#define NECO_NOWORKER +#define NECO_NOWORKERS #else #define DEF_STACKSIZE 8388608 #define DEF_DEFCAP 4 @@ -49,8 +49,9 @@ NECO_NOWORKERS // Disable all worker threads, work will run in coroutine #define DEF_GAPSIZE 1048576 #define DEF_SIGSTKSZ 1048576 #define DEF_BURST -1 -#define DEF_MAXWORKER 2 +#define DEF_MAXWORKERS 64 #define DEF_MAXRINGSIZE 32 +#define DEF_MAXIOWORKERS 2 #endif #ifndef NECO_STACKSIZE @@ -71,15 +72,18 @@ NECO_NOWORKERS // Disable all worker threads, work will run in coroutine #ifndef NECO_BURST #define NECO_BURST DEF_BURST #endif -#ifndef NECO_MAXWORKER -#define NECO_MAXWORKER DEF_MAXWORKER +#ifndef NECO_MAXWORKERS +#define NECO_MAXWORKERS DEF_MAXWORKERS #endif #ifndef NECO_MAXRINGSIZE #define NECO_MAXRINGSIZE DEF_MAXRINGSIZE #endif +#ifndef NECO_MAXIOWORKERS +#define NECO_MAXIOWORKERS DEF_MAXIOWORKERS +#endif #ifdef NECO_TESTING -#if NECO_BURST == -1 +#if NECO_BURST <= 0 #undef NECO_BURST #define NECO_BURST 1 #endif @@ -114,7 +118,7 @@ NECO_NOWORKERS // Disable all worker threads, work will run in coroutine #include "deps/aat.h" #include "deps/stack.h" -#ifndef NECO_NOWORKER +#ifndef NECO_NOWORKERS #include "deps/worker.h" #endif @@ -2591,7 +2595,7 @@ void *stack_addr(struct stack *stack) { } // END stack.c -#ifndef NECO_NOWORKER +#ifndef NECO_NOWORKERS // BEGIN worker.c // https://github.com/tidwall/worker.c // @@ -2795,7 +2799,7 @@ bool worker_submit(struct worker *worker, int64_t pin, void(*work)(void *udata), return submitted; } // END worker.c -#endif // NECO_NOWORKER +#endif // NECO_NOWORKERS #if defined(__GNUC__) #pragma GCC diagnostic pop @@ -3583,7 +3587,7 @@ struct runtime { void (*arc4random_buf)(void *, size_t); #endif -#ifndef NECO_NOWORKER +#ifndef NECO_NOWORKERS struct worker *worker; pthread_mutex_t iomu; struct colist iolist; @@ -4146,7 +4150,7 @@ static void rt_sched_paused_step(void) { } timeout = CLAMP(timeout, 0, MAX_TIMEOUT); -#ifndef NECO_NOWORKER +#ifndef NECO_NOWORKERS if (rt->niowaiters > 0) { timeout = 0; while (1) { @@ -4319,9 +4323,9 @@ static int run(void(*coroutine)(int, void**), int nargs, va_list *args, goto fail; } -#ifndef NECO_NOWORKER +#ifndef NECO_NOWORKERS struct worker_opts wopts = { - .max_threads = NECO_MAXWORKER, + .max_threads = NECO_MAXWORKERS, .max_thread_entries = NECO_MAXRINGSIZE, .malloc = malloc0, .free = free0, @@ -4350,7 +4354,7 @@ static int run(void(*coroutine)(int, void**), int nargs, va_list *args, rt_freezchanpool(); rt_restore_signal_handlers(); rt_release_dlhandles(); -#ifndef NECO_NOWORKER +#ifndef NECO_NOWORKERS worker_free(rt->worker); #endif rt_release(); @@ -4981,7 +4985,7 @@ static void cowait(int fd, enum evkind kind, int64_t deadline) { // Networking code //////////////////////////////////////////////////////////////////////////////// -#ifndef NECO_NOWORKER +#ifndef NECO_NOWORKERS struct ioread { int fd; @@ -5010,6 +5014,9 @@ static ssize_t read1(int fd, void *data, size_t nbytes) { if (neco_fail_read_counter > 0) { nowork = true; } +#endif +#if NECO_MAXIOWORKERS <= 0 + nowork = true; #endif if (!nowork) { nowork = true; @@ -5021,11 +5028,7 @@ static ssize_t read1(int fd, void *data, size_t nbytes) { .co = co, .rt = rt, }; -#ifdef NECO_USEROUNDROBINPIN - int64_t pin = -1; -#else - int64_t pin = co->id; -#endif + int64_t pin = co->id % NECO_MAXIOWORKERS; if (worker_submit(rt->worker, pin, ioread, &info)) { rt->niowaiters++; sco_pause(); @@ -5144,7 +5147,7 @@ static ssize_t write1(int fd, const void *data, size_t nbytes) { #define write2 write1 #endif -#ifndef NECO_NOWORKER +#ifndef NECO_NOWORKERS struct iowrite { int fd; const void *data; @@ -5172,6 +5175,9 @@ static ssize_t write3(int fd, const void *data, size_t nbytes) { if (neco_fail_write_counter > 0 || neco_partial_write > 0) { nowork = true; } +#endif +#if NECO_MAXIOWORKERS <= 0 + nowork = true; #endif if (!nowork) { nowork = true; @@ -5183,11 +5189,7 @@ static ssize_t write3(int fd, const void *data, size_t nbytes) { .co = co, .rt = rt, }; -#ifdef NECO_USEROUNDROBINPIN - int64_t pin = -1; -#else - int64_t pin = co->id; -#endif + int64_t pin = co->id % NECO_MAXIOWORKERS; if (worker_submit(rt->worker, pin, iowrite, &info)) { rt->niowaiters++; sco_pause(); @@ -8651,7 +8653,7 @@ int neco_resume(int64_t id) { return ret; } -#ifndef NECO_NOWORKER +#ifndef NECO_NOWORKERS struct iowork { void(*work)(void *udata); void *udata; @@ -8676,7 +8678,7 @@ static int workfn(int64_t pin, void(*work)(void *udata), void *udata) { if (!co) { return NECO_PERM; } -#ifdef NECO_NOWORKER +#ifdef NECO_NOWORKERS // Run in foreground work(udata); #else