-
Notifications
You must be signed in to change notification settings - Fork 292
CP-32622: avoid using select and instead use epoll #4877
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
aa166fb
2877609
416bf66
aa9fb63
16508f1
f6819e0
d86f1b8
f4c2087
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
(libraries | ||
cohttp | ||
message-switch-core | ||
polly | ||
rpclib.core | ||
rpclib.json | ||
threads.posix | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,15 +38,15 @@ module Delay = struct | |
(* Concrete type is the ends of a pipe *) | ||
type t = { | ||
(* A pipe is used to wake up a thread blocked in wait: *) | ||
mutable pipe_out: Unix.file_descr option | ||
; mutable pipe_in: Unix.file_descr option | ||
mutable sock_out: Unix.file_descr option | ||
; mutable sock_in: Unix.file_descr option | ||
; (* Indicates that a signal arrived before a wait: *) | ||
mutable signalled: bool | ||
; m: Mutex.t | ||
} | ||
|
||
let make () = | ||
{pipe_out= None; pipe_in= None; signalled= false; m= Mutex.create ()} | ||
{sock_out= None; sock_in= None; signalled= false; m= Mutex.create ()} | ||
|
||
exception Pre_signalled | ||
|
||
|
@@ -59,39 +59,44 @@ module Delay = struct | |
finally' | ||
(fun () -> | ||
try | ||
let pipe_out = | ||
let sock_out = | ||
Mutex.execute x.m (fun () -> | ||
if x.signalled then ( | ||
x.signalled <- false ; | ||
raise Pre_signalled | ||
) ; | ||
let pipe_out, pipe_in = Unix.pipe () in | ||
let sock_out, sock_in = | ||
Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 | ||
in | ||
(* these will be unconditionally closed on exit *) | ||
to_close := [pipe_out; pipe_in] ; | ||
x.pipe_out <- Some pipe_out ; | ||
x.pipe_in <- Some pipe_in ; | ||
to_close := [sock_out; sock_in] ; | ||
x.sock_out <- Some sock_out ; | ||
x.sock_in <- Some sock_in ; | ||
x.signalled <- false ; | ||
pipe_out | ||
sock_out | ||
) | ||
in | ||
let r, _, _ = Unix.select [pipe_out] [] [] seconds in | ||
(* flush the single byte from the pipe *) | ||
if r <> [] then ignore (Unix.read pipe_out (Bytes.create 1) 0 1) ; | ||
(* flush the single byte from the socket *) | ||
Unix.setsockopt_float sock_out Unix.SO_RCVTIMEO seconds ; | ||
(* return true if we waited the full length of time, false if we were woken *) | ||
r = [] | ||
try | ||
ignore (Unix.read sock_out (Bytes.create 1) 0 1) ; | ||
Unix.setsockopt_float sock_out Unix.SO_RCVTIMEO 0. ; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we'd miss resetting the socket option in case of an exception. |
||
false | ||
with Unix.Unix_error (Unix.EAGAIN, _, _) -> true | ||
with Pre_signalled -> false | ||
) | ||
(fun () -> | ||
Mutex.execute x.m (fun () -> | ||
x.pipe_out <- None ; | ||
x.pipe_in <- None ; | ||
x.sock_out <- None ; | ||
x.sock_in <- None ; | ||
List.iter close' !to_close | ||
) | ||
) | ||
|
||
let signal (x : t) = | ||
Mutex.execute x.m (fun () -> | ||
match x.pipe_in with | ||
match x.sock_in with | ||
| Some fd -> | ||
ignore (Unix.write fd (Bytes.of_string "X") 0 1) | ||
| None -> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,8 +32,6 @@ let json_rpc_write_timeout = ref 60000000000L | |
|
||
(* timeout value in ns when writing RPC request *) | ||
|
||
let to_s s = Int64.to_float s *. 1e-9 | ||
|
||
(* Read the entire contents of the fd, of unknown length *) | ||
let timeout_read fd timeout = | ||
let buf = Buffer.create !json_rpc_max_len in | ||
|
@@ -42,13 +40,6 @@ let timeout_read fd timeout = | |
Mtime.Span.to_uint64_ns (Mtime_clock.count read_start) | ||
in | ||
let rec inner max_time max_bytes = | ||
let ready_to_read, _, _ = | ||
try Unix.select [fd] [] [] (to_s max_time) | ||
with | ||
(* in case the unix.select call fails in situation like interrupt *) | ||
| Unix.Unix_error (Unix.EINTR, _, _) -> | ||
([], [], []) | ||
in | ||
(* This is not accurate the calculate time just for the select part. | ||
However, we think the read time will be minor comparing to the scale of | ||
tens of seconds. the current style will be much concise in code. *) | ||
|
@@ -60,27 +51,25 @@ let timeout_read fd timeout = | |
debug "Timeout after read %d" (Buffer.length buf) ; | ||
raise Timeout | ||
) ; | ||
if List.mem fd ready_to_read then | ||
let bytes = Bytes.make 4096 '\000' in | ||
match Unix.read fd bytes 0 4096 with | ||
| 0 -> | ||
Buffer.contents buf (* EOF *) | ||
| n -> | ||
if n > max_bytes then ( | ||
debug "exceeding maximum read limit %d, clear buffer" | ||
!json_rpc_max_len ; | ||
Buffer.clear buf ; | ||
raise Read_error | ||
) else ( | ||
Buffer.add_subbytes buf bytes 0 n ; | ||
inner remain_time (max_bytes - n) | ||
) | ||
| exception | ||
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) | ||
-> | ||
inner remain_time max_bytes | ||
else | ||
inner remain_time max_bytes | ||
( try Unix.setsockopt_float fd SO_RCVTIMEO (Int64.to_float max_time) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do seem to repeat this quite often, perhaps as a future cleanup PR we could define a helper function: 'timed_read' which does the setsockopt+read+fun.protect to set it back, same for write a 'timed_write' might be useful There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this would avoid mistakes, such as around resetting the socket option. |
||
with Unix.Unix_error (Unix.ENOTSOCK, _, _) -> | ||
(* In the unit tests, the fd comes from a pipe... ignore *) | ||
() | ||
) ; | ||
let bytes = Bytes.make 4096 '\000' in | ||
match Unix.read fd bytes 0 4096 with | ||
| 0 -> | ||
Buffer.contents buf (* EOF *) | ||
| n when n > max_bytes -> | ||
debug "exceeding maximum read limit %d, clear buffer" !json_rpc_max_len ; | ||
Buffer.clear buf ; | ||
raise Read_error | ||
| n -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could be expressed as
|
||
Buffer.add_subbytes buf bytes 0 n ; | ||
inner remain_time (max_bytes - n) | ||
| exception | ||
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> | ||
inner remain_time max_bytes | ||
in | ||
inner timeout !json_rpc_max_len | ||
|
||
|
@@ -95,13 +84,6 @@ let timeout_write filedesc total_length data response_time = | |
Mtime.Span.to_uint64_ns (Mtime_clock.count write_start) | ||
in | ||
let rec inner_write offset max_time = | ||
let _, ready_to_write, _ = | ||
try Unix.select [] [filedesc] [] (to_s max_time) | ||
with | ||
(* in case the unix.select call fails in situation like interrupt *) | ||
| Unix.Unix_error (Unix.EINTR, _, _) -> | ||
([], [], []) | ||
in | ||
let remain_time = | ||
let used_time = get_total_used_time () in | ||
Int64.sub response_time used_time | ||
|
@@ -110,32 +92,30 @@ let timeout_write filedesc total_length data response_time = | |
debug "Timeout to write %d at offset %d" total_length offset ; | ||
raise Timeout | ||
) ; | ||
if List.mem filedesc ready_to_write then | ||
let length = total_length - offset in | ||
let bytes_written = | ||
try Unix.single_write filedesc data offset length | ||
with | ||
| Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) | ||
-> | ||
0 | ||
in | ||
let new_offset = offset + bytes_written in | ||
Unix.setsockopt_float filedesc Unix.SO_SNDTIMEO (Int64.to_float max_time) ; | ||
let length = total_length - offset in | ||
let bytes_written = | ||
try Unix.single_write filedesc data offset length | ||
with Unix.Unix_error (Unix.EINTR, _, _) -> 0 | ||
in | ||
let new_offset = offset + bytes_written in | ||
try | ||
if length = bytes_written then | ||
() | ||
else | ||
inner_write new_offset remain_time | ||
else | ||
inner_write offset remain_time | ||
with Unix.Unix_error (Unix.EAGAIN, _, _) -> inner_write offset remain_time | ||
in | ||
inner_write 0 response_time | ||
|
||
let with_rpc ?(version = Jsonrpc.V2) ~path ~call () = | ||
let uri = Uri.of_string (Printf.sprintf "file://%s" path) in | ||
Open_uri.with_open_uri uri (fun s -> | ||
Unix.set_nonblock s ; | ||
let req = Bytes.of_string (Jsonrpc.string_of_call ~version call) in | ||
timeout_write s (Bytes.length req) req !json_rpc_write_timeout ; | ||
Unix.setsockopt_float s SO_SNDTIMEO 0. ; | ||
let res = timeout_read s !json_rpc_read_timeout in | ||
Unix.setsockopt_float s SO_RCVTIMEO 0. ; | ||
debug "Response: %s" res ; | ||
Jsonrpc.response_of_string ~strict:false res | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
message-switch-unix | ||
mtime | ||
mtime.clock.os | ||
polly | ||
ppx_sexp_conv.runtime-lib | ||
re | ||
rpclib.core | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
astring | ||
dune-build-info | ||
fpath | ||
polly | ||
safe-resources | ||
stunnel | ||
threads | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'd better use
finally
to ensure that this timeout is reset in case of any exception (only one kind is caught above.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every
read
onic.fd
seems to go through here, and each read is preceded by setting a timeout, so I don't think we even need to reset the timeout, just drop this line.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are more reads in http.ml but they also set a socket timeout. Will need to check the other callers too.