Skip to content

Commit

Permalink
Picos compatible direct style interface to Lwt
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Apr 25, 2024
1 parent 559ffe2 commit 534944e
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 0 deletions.
3 changes: 3 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
;; For mpsc queue
(multicore-magic
(>= 2.1.0))
;; For picos.lwt
(lwt
(>= 5.7.0))
;; Test dependencies
(multicore-bench
(and
Expand Down
1 change: 1 addition & 0 deletions lib/index.mld
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ These are minimalistic, but fully-functioning, schedulers provided as samples.
{!modules:
Picos_fifos
Picos_threaded
Picos_lwt
}

{^ You may find these useful for both understanding the core Picos framework and
Expand Down
8 changes: 8 additions & 0 deletions lib/picos_lwt/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
(library
(name picos_lwt)
(public_name picos.lwt)
(enabled_if
(>= %{ocaml_version} 5.0.0))
(libraries
(re_export picos)
(re_export lwt)))
21 changes: 21 additions & 0 deletions lib/picos_lwt/intf.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module type Sleep = sig
(** Minimal signature for an implementation of {!sleep} using {!Lwt}. *)

val sleep : float -> unit Lwt.t
(** [sleep seconds] should return a cancelable promise that resolves after
given number of [seconds] (unless canceled). *)
end

module type S = sig
(** Direct style {!Picos} compatible interface to {!Lwt}. *)

val run : forbid:bool -> (unit -> 'a) -> 'a Lwt.t
(** [run ~forbid main] runs the [main] program implemented in {!Picos} as a
promise with {!Lwt} as the scheduler. In other words, the [main] program
will be run as a {!Lwt} promise or fiber. *)

val await : (unit -> 'a Lwt.t) -> 'a
(** [await thunk] awaits for the promise returned by [thunk ()] to resolve and
returns the result. This should only be called from inside a fiber
running inside {!start}. *)
end
97 changes: 97 additions & 0 deletions lib/picos_lwt/picos_lwt.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
include Intf

module Make (Sleep : Sleep) : S = struct
open Picos
open Lwt.Infix

let[@alert "-handler"] rec run :
type a r.
Fiber.t ->
(a, r) Effect.Shallow.continuation ->
(a, Exn_bt.t) Result.t ->
r Lwt.t =
fun fiber k v ->
let effc (type a) :
a Effect.t -> ((a, _) Effect.Shallow.continuation -> _) option =
function
| Fiber.Current -> Some (fun k -> run fiber k (Ok fiber))
| Fiber.Spawn r ->
Some
(fun k ->
match Fiber.canceled fiber with
| None ->
List.iter
(fun main ->
let fiber = Fiber.create ~forbid:r.forbid r.computation in
Lwt.async @@ fun () ->
run fiber (Effect.Shallow.fiber main) (Ok ()))
r.mains;
run fiber k (Ok ())
| Some exn_bt -> run fiber k (Error exn_bt))
| Fiber.Yield ->
Some
(fun k ->
match Fiber.canceled fiber with
| None -> Lwt.pause () >>= fun () -> run fiber k (Ok ())
| Some exn_bt -> run fiber k (Error exn_bt))
| Computation.Cancel_after r ->
Some
(fun k ->
match Fiber.canceled fiber with
| None ->
let sleep =
Sleep.sleep r.seconds >>= fun () ->
Computation.cancel r.computation r.exn_bt;
Lwt.return_unit
in
let canceler =
Trigger.from_action sleep () @@ fun _ sleep _ ->
Lwt.cancel sleep
in
if Computation.try_attach r.computation canceler then
Lwt.async @@ fun () -> sleep
else Trigger.signal canceler;
run fiber k (Ok ())
| Some exn_bt -> run fiber k (Error exn_bt))
| Trigger.Await trigger ->
Some
(fun k ->
let promise, resolver = Lwt.wait () in
let resume _trigger resolver _ = Lwt.wakeup resolver () in
if Fiber.try_suspend fiber trigger resolver () resume then
promise >>= fun () -> run fiber k (Ok (Fiber.canceled fiber))
else run fiber k (Ok (Fiber.canceled fiber)))
| _ -> None
in
let handler = Effect.Shallow.{ retc = Lwt.return; exnc = Lwt.fail; effc } in
match v with
| Ok v -> Effect.Shallow.continue_with k v handler
| Error exn_bt -> Exn_bt.discontinue_with k exn_bt handler

let run ~forbid main =
let computation = Computation.create () in
let fiber = Fiber.create ~forbid computation in
run fiber (Effect.Shallow.fiber main) (Ok ())

let await thunk =
let computation = Computation.create () in
let promise =
Lwt.try_bind thunk
(fun value ->
Computation.return computation value;
Lwt.return_unit)
(fun exn ->
Computation.cancel computation (Exn_bt.get_callstack 0 exn);
Lwt.return_unit)
in
Lwt.async (fun () -> promise);
let trigger = Trigger.create () in
if Computation.try_attach computation trigger then begin
match Trigger.await trigger with
| None -> Computation.await computation
| Some exn_bt ->
Lwt.cancel promise;
Exn_bt.raise exn_bt
end
else Computation.await computation
end
19 changes: 19 additions & 0 deletions lib/picos_lwt/picos_lwt.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
(** A functor for building a {!Picos} compatible direct style interface to
{!Lwt} with given implementation of {{!Sleep} sleep}.
This basically gives you an alternative direct style interface to
programming with {!Lwt}. All the scheduling decisions will be made by
{!Lwt}. *)

include module type of Intf

(** [Make (Sleep)] creates a {!Picos} compatible interface to {!Lwt} with given
implementation of {{!Sleep} sleep}.
For example,
{[
module Picos_lwt_unix = Picos_lwt.Make (Lwt_unix)
]}
instantiates this functor using {!Lwt_unix.sleep} as the implemention of
{{!Sleep.sleep} sleep}. *)
module Make : functor (_ : Sleep) -> S
1 change: 1 addition & 0 deletions picos.opam
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ depends: [
"mtime" {>= "2.0.0"}
"psq" {>= "0.2.1"}
"multicore-magic" {>= "2.1.0"}
"lwt" {>= "5.7.0"}
"multicore-bench" {>= "0.1.2" & with-test}
"alcotest" {>= "1.7.0" & with-test}
"qcheck-core" {>= "0.21.2" & with-test}
Expand Down
23 changes: 23 additions & 0 deletions test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@

;;

(test
(name test_lwt_unix)
(modules test_lwt_unix)
(build_if
(>= %{ocaml_version} 5.0.0))
(libraries picos.lwt lwt.unix alcotest))

;;

(rule
(action
(progn
Expand Down Expand Up @@ -59,6 +68,20 @@

;;

(test
(name test_stdio_with_lwt)
(modules test_stdio_with_lwt)
(libraries
test_scheduler
picos.stdio
picos.select
alcotest
lwt
lwt.unix
threads.posix))

;;

(test
(name test_select)
(modules test_select)
Expand Down
24 changes: 24 additions & 0 deletions test/test_lwt_unix.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
open Picos
module Picos_lwt_unix = Picos_lwt.Make (Lwt_unix)

let basics () =
Lwt_main.run
@@ Picos_lwt_unix.run ~forbid:false
@@ fun () ->
let computation = Computation.create () in
let child =
Computation.capture computation @@ fun () ->
while true do
Picos_lwt_unix.await (fun () -> Lwt_unix.sleep 0.01)
done
in
Fiber.spawn ~forbid:false computation [ child ];
Computation.cancel_after computation ~seconds:0.05
(Exn_bt.get_callstack 0 Exit);
match Computation.await computation with
| () -> assert false
| exception Exit -> ()

let () =
[ ("Basics", [ Alcotest.test_case "" `Quick basics ]) ]
|> Alcotest.run "Picos_lwt"
34 changes: 34 additions & 0 deletions test/test_stdio_with_lwt.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
open Picos
open Picos_stdio

let () =
let[@alert "-handler"] rec propagate () =
let computation =
Computation.with_action () () @@ fun _ _ _ ->
Lwt_unix.handle_signal Sys.sigchld;
propagate ()
in
Picos_select.return_on_sigchld computation ()
in
propagate ()

let test_system_unix () =
let sleep = Lwt_unix.system "sleep 2" in
Lwt_main.run @@ Lwt.bind sleep
@@ fun _status ->
Test_scheduler.run @@ fun () ->
assert (Unix.system "exit 101" = Unix.WEXITED 101);
assert (Unix.system "echo Hello world!" = Unix.WEXITED 0);
assert (Unix.system "this-is-not-supposed-to-exist" = Unix.WEXITED 127);
match Unix.wait () with
| _ -> assert false
| exception Unix.Unix_error (ECHILD, _, _) ->
Lwt.bind (Lwt_unix.system "ls -l") @@ fun _ -> Lwt.return ()

let () =
[
( "Unix",
if Sys.win32 then []
else [ Alcotest.test_case "system" `Quick test_system_unix ] );
]
|> Alcotest.run "Picos_stdio_with_lwt"

0 comments on commit 534944e

Please sign in to comment.