Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Sep 21, 2024
1 parent a1fc988 commit 6bfcc94
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 173 deletions.
339 changes: 166 additions & 173 deletions lib/picos_std.sync/ch.ml
Original file line number Diff line number Diff line change
@@ -1,81 +1,6 @@
open Picos_std_event
open Picos

module Q = struct
type ('a, _) node =
| Nil : ('a, [> `Nil ]) node
| Cons : {
mutable next : 'a next;
mutable value : 'a;
}
-> ('a, [> `Cons ]) node

and 'a next = N : ('a, [< `Nil | `Cons ]) node -> 'a next [@@unboxed]

external as_atomic : ('a, [< `Cons ]) node -> 'a next Atomic.t = "%identity"

type 'a cons = ('a, [ `Cons ]) node
type 'a t = { head : 'a cons Atomic.t; tail : 'a cons Atomic.t }

let create ?padded () =
let node : _ cons = Cons { next = N Nil; value = Obj.magic () } in
let head = Atomic.make node |> Multicore_magic.copy_as ?padded in
let tail = Atomic.make node |> Multicore_magic.copy_as ?padded in
Multicore_magic.copy_as ?padded { head; tail }

(* *)

let rec fix_tail tail new_tail backoff =
let old_tail = Atomic.get tail in
if
Atomic.get (as_atomic new_tail) == N Nil
&& not (Atomic.compare_and_set tail old_tail new_tail)
then fix_tail tail new_tail (Backoff.once backoff)

let rec add tail link (new_node : _ cons) backoff =
match Atomic.get (as_atomic link) with
| N Nil ->
if Atomic.compare_and_set (as_atomic link) (N Nil) (N new_node) then begin
fix_tail tail new_node Backoff.default;
link
end
else
let backoff = Backoff.once backoff in
add tail link new_node backoff
| N (Cons _ as cons) -> add tail cons new_node backoff

let create_node value : _ cons = Cons { next = N Nil; value }

let add t (new_node : _ cons) : 'a cons =
let old_tail = Atomic.get t.tail in
if Atomic.compare_and_set (as_atomic old_tail) (N Nil) (N new_node) then begin
(* If the below CAS fails a new node was inserted and we are done. *)
Atomic.compare_and_set t.tail old_tail new_node |> ignore;
old_tail
end
else
let backoff = Backoff.once Backoff.default in
add t.tail old_tail new_node backoff

(* *)

let all t = Atomic.get t.head
let next cons = Atomic.get (as_atomic cons)

let remove t (prev : 'a cons) (Cons curr_r as curr : 'a cons) null =
match Atomic.get (as_atomic curr) with
| N Nil ->
(* [curr] is the last node, we must clear it *)
curr_r.value <- null;
if Atomic.get t.head == prev then
Atomic.compare_and_set t.head prev curr |> ignore
| N (Cons _) as next ->
(* If below CAS fails, someone else must have removed [curr] *)
Atomic.compare_and_set (as_atomic prev) (N curr) next |> ignore
end

external unit_as_false : unit -> bool = "%identity"

module Atomic = Multicore_magic.Transparent_atomic
module Tx = Computation.Tx

type 'a taker =
Expand All @@ -93,19 +18,13 @@ type 'a giver =
}
-> 'a giver

let null_taker = T { computation = Computation.exited (); result = Fun.id }
type 'a state = { givers : 'a giver Q.t; takers : 'a taker Q.t }

and null_giver =
G { computation = Computation.exited (); result = Fun.id; value = () }
let empty = { givers = T Zero; takers = T Zero }

external generalize_taker : unit taker -> 'a taker = "%identity"
external generalize_giver : unit giver -> 'a giver = "%identity"
type 'a t = 'a state Atomic.t

type 'a t = { givers : 'a giver Q.t; takers : 'a taker Q.t }

let create ?padded () =
let givers = Q.create ?padded () and takers = Q.create ?padded () in
Multicore_magic.copy_as ?padded { givers; takers }
let create ?padded () = Atomic.make empty |> Multicore_magic.copy_as ?padded

(* *)

Expand All @@ -120,108 +39,182 @@ let[@inline never] wait computation =

(* *)

let rec give_as t (G gr as self) prev =
match Q.next prev with
| N Nil -> false
| N (Cons { value = T tr; _ } as curr) ->
if Tx.same tr.computation gr.computation then give_as t self curr
else
let tx = Tx.create () in
let result = tr.result in
let value = gr.value in
if not (Tx.try_return tx tr.computation (fun () -> result value)) then begin
Q.remove t.takers prev curr (generalize_taker null_taker);
give_as t self curr
end
else if
(not (Tx.try_return tx gr.computation gr.result))
|| not (Tx.try_commit tx)
then
(not (Computation.is_running gr.computation))
|| give_as t self
(if Computation.is_running tr.computation then prev else curr)
else begin
Q.remove t.takers prev curr (generalize_taker null_taker);
true
end

let rec give t value prev =
match Q.next prev with
| N Nil ->
let computation = Computation.create () in
let rec give t value backoff =
let before = Atomic.fenceless_get t in
match before.takers with
| Q.T Zero ->
let computation = Computation.create ~mode:`LIFO () in
let self = G { computation; result = Fun.id; value } in
let curr = Q.create_node self in
let prev = Q.add t.givers curr in
if not (give_as t self (Q.all t.takers)) then wait computation;
Q.remove t.givers prev curr (generalize_giver null_giver)
| N (Cons { value = T { computation; result }; _ } as curr) ->
let givers = Q.add before.givers self in
let after = { givers; takers = T Zero } in
if Atomic.compare_and_set t before after then wait computation
else give t value (Backoff.once backoff)
| Q.T (One _ as takers) ->
let (T { computation; result }) = Q.head takers in
let got = Computation.try_return computation (fun () -> result value) in
Q.remove t.takers prev curr (generalize_taker null_taker);
if not got then give t value curr
let takers = Q.tail takers in
let givers = before.givers in
let after =
if takers == T Zero && givers == T Zero then empty
else { givers; takers }
in
let no_contention = Atomic.compare_and_set t before after in
if not got then
give t value (if no_contention then backoff else Backoff.once backoff)

let rec take t backoff =
let before = Atomic.fenceless_get t in
match before.givers with
| Q.T Zero ->
let computation = Computation.create ~mode:`LIFO () in
let self = T { computation; result = Fun.id } in
let takers = Q.add before.takers self in
let after = { givers = T Zero; takers } in
if Atomic.compare_and_set t before after then begin
wait computation;
Computation.await computation ()
end
else take t (Backoff.once backoff)
| Q.T (One _ as givers) ->
let (G { computation; result; value }) = Q.head givers in
let got = Computation.try_return computation result in
let givers = Q.tail givers in
let takers = before.takers in
let after =
if givers == T Zero && takers == T Zero then empty
else { givers; takers }
in
let no_contention = Atomic.compare_and_set t before after in
if got then value
else take t (if no_contention then backoff else Backoff.once backoff)

(* *)

let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ Q.cons)
tail =
let (T tr as taker) = head_r.value in
if Tx.same tr.computation gr.computation then
let selfs = Q.cons taker selfs in
give_as_advance t self before selfs head tail
else
let tx = Tx.create () in
let result = tr.result in
let value = gr.value in
if not (Tx.try_return tx tr.computation (fun () -> result value)) then
give_as_advance t self before selfs head tail
else if
(not (Tx.try_return tx gr.computation gr.result))
|| not (Tx.try_commit tx)
then
if not (Computation.is_running gr.computation) then ( (* TODO *) )
else if Computation.is_running tr.computation then
give_as t self before selfs head tail
else give_as_advance t self before selfs head tail
else
let takers =
if head == tail then Q.reverse_as_queue selfs
else
let head = Q.reverse_to (Q.as_cons head_r.next) selfs in
Q.T (One { head; tail; cons = tail })
in
let givers = before.givers in
let after =
if takers == Q.T Zero && givers == Q.T Zero then empty
else { givers; takers }
in
if not (Atomic.compare_and_set t before after) then
( (* TODO: avoid leak *) )

and give_as_advance t self before selfs (Cons head_r as head : _ Q.cons) tail =
if head != tail then give_as t self before selfs (Q.as_cons head_r.next) tail
else
let takers = Q.reverse_as_queue selfs in
let givers = Q.add before.givers self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then give_as_start t self

and give_as_start t self =
let before = Atomic.get t in
match before.takers with
| Q.T Zero ->
let takers = Q.T Zero in
let givers = Q.singleton self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then give_as_start t self
| Q.T (One r as o) ->
Q.exec o;
give_as t self before (Q.S Nil) r.head r.cons

let give_evt t value =
let request computation result =
let self = G { computation; result; value } in
let curr = Q.create_node self in
let prev = Q.add t.givers curr in
if give_as t self (Q.all t.takers) then
Q.remove t.givers prev curr (generalize_giver null_giver)
give_as_start t (G { computation; result; value })
in
Event.from_request { request }

(* *)

let rec take_as t (T tr as self) prev =
match Q.next prev with
| N Nil -> false
| N (Cons { value = G gr; _ } as curr) ->
if Tx.same tr.computation gr.computation then take_as t self curr
else
let tx = Tx.create () in
let result = tr.result in
let value = gr.value in
if not (Tx.try_return tx gr.computation gr.result) then begin
Q.remove t.givers prev curr (generalize_giver null_giver);
take_as t self curr
end
else if
(not (Tx.try_return tx tr.computation (fun () -> result value)))
|| not (Tx.try_commit tx)
then
(not (Computation.is_running tr.computation))
|| take_as t self
(if Computation.is_running gr.computation then prev else curr)
else begin
Q.remove t.givers prev curr (generalize_giver null_giver);
true
end

let rec take t prev =
match Q.next prev with
| N Nil ->
let computation = Computation.create () in
let self = T { computation; result = Fun.id } in
let curr = Q.create_node self in
let prev = Q.add t.takers curr in
if not (take_as t self (Q.all t.givers)) then wait computation;
Q.remove t.takers prev curr (generalize_taker null_taker);
Computation.await computation ()
| N (Cons { value = G { computation; result; value }; _ } as curr) ->
let got = Computation.try_return computation result in
Q.remove t.givers prev curr (generalize_giver null_giver);
if got then value else take t curr
let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ Q.cons)
tail =
let (G gr as giver) = head_r.value in
if Tx.same tr.computation gr.computation then
let selfs = Q.cons giver selfs in
take_as_advance t self before selfs head tail
else
let tx = Tx.create () in
let result = tr.result in
let value = gr.value in
if not (Tx.try_return tx gr.computation gr.result) then
take_as_advance t self before selfs head tail
else if
(not (Tx.try_return tx tr.computation (fun () -> result value)))
|| not (Tx.try_commit tx)
then
if not (Computation.is_running gr.computation) then ( (* TODO *) )
else if Computation.is_running tr.computation then
take_as t self before selfs head tail
else take_as_advance t self before selfs head tail
else
let takers = before.takers in
let givers =
if head == tail then Q.reverse_as_queue selfs
else
let head = Q.reverse_to (Q.as_cons head_r.next) selfs in
Q.T (One { head; tail; cons = tail })
in
let after =
if takers == Q.T Zero && givers == Q.T Zero then empty
else { givers; takers }
in
if not (Atomic.compare_and_set t before after) then
( (* TODO: avoid leak *) )

and take_as_advance t self before selfs (Cons head_r as head : _ Q.cons) tail =
if head != tail then take_as t self before selfs (Q.as_cons head_r.next) tail
else
let givers = Q.reverse_as_queue selfs in
let takers = Q.add before.takers self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then take_as_start t self

and take_as_start t self =
let before = Atomic.get t in
match before.givers with
| Q.T Zero ->
let givers = Q.T Zero in
let takers = Q.singleton self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then take_as_start t self
| Q.T (One r as o) ->
Q.exec o;
take_as t self before (Q.S Nil) r.head r.cons

let take_evt t =
let request computation result =
let self = T { computation; result } in
let curr = Q.create_node self in
let prev = Q.add t.takers curr in
if take_as t self (Q.all t.givers) then
Q.remove t.takers prev curr (generalize_taker null_taker)
take_as_start t (T { computation; result })
in
Event.from_request { request }

(* *)

let[@inline] give t value = give t value (Q.all t.takers)
let[@inline] take t = take t (Q.all t.givers)
let[@inline] take t = take t Backoff.default
let[@inline] give t value = give t value Backoff.default
15 changes: 15 additions & 0 deletions lib/picos_std.sync/q.ml
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,18 @@ let remove (One o as t : (_, _) one) value =
let tail = find_tail head in
T (One { head; tail; cons = tail })
| exception Not_found -> T t

let cons value next = S (Cons { value; next })

let rec reverse_to tail = function
| S Nil -> tail
| S (Cons cons_r as next) ->
let prev = cons_r.next in
cons_r.next <- S tail;
reverse_to next prev

let reverse_as_queue = function
| S Nil -> T Zero
| S (Cons cons_r as tail) ->
let head = reverse_to tail cons_r.next in
T (One { head; tail; cons = tail })

0 comments on commit 6bfcc94

Please sign in to comment.