From 6bfcc94509bb271f07c0d0e19498050088994d60 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Thu, 22 Aug 2024 17:56:39 +0300 Subject: [PATCH] WIP --- lib/picos_std.sync/ch.ml | 339 +++++++++++++++++++-------------------- lib/picos_std.sync/q.ml | 15 ++ 2 files changed, 181 insertions(+), 173 deletions(-) diff --git a/lib/picos_std.sync/ch.ml b/lib/picos_std.sync/ch.ml index 628f2158..9b3bae2a 100644 --- a/lib/picos_std.sync/ch.ml +++ b/lib/picos_std.sync/ch.ml @@ -1,81 +1,6 @@ open Picos_std_event open Picos - -module Q = struct - type ('a, _) node = - | Nil : ('a, [> `Nil ]) node - | Cons : { - mutable next : 'a next; - mutable value : 'a; - } - -> ('a, [> `Cons ]) node - - and 'a next = N : ('a, [< `Nil | `Cons ]) node -> 'a next [@@unboxed] - - external as_atomic : ('a, [< `Cons ]) node -> 'a next Atomic.t = "%identity" - - type 'a cons = ('a, [ `Cons ]) node - type 'a t = { head : 'a cons Atomic.t; tail : 'a cons Atomic.t } - - let create ?padded () = - let node : _ cons = Cons { next = N Nil; value = Obj.magic () } in - let head = Atomic.make node |> Multicore_magic.copy_as ?padded in - let tail = Atomic.make node |> Multicore_magic.copy_as ?padded in - Multicore_magic.copy_as ?padded { head; tail } - - (* *) - - let rec fix_tail tail new_tail backoff = - let old_tail = Atomic.get tail in - if - Atomic.get (as_atomic new_tail) == N Nil - && not (Atomic.compare_and_set tail old_tail new_tail) - then fix_tail tail new_tail (Backoff.once backoff) - - let rec add tail link (new_node : _ cons) backoff = - match Atomic.get (as_atomic link) with - | N Nil -> - if Atomic.compare_and_set (as_atomic link) (N Nil) (N new_node) then begin - fix_tail tail new_node Backoff.default; - link - end - else - let backoff = Backoff.once backoff in - add tail link new_node backoff - | N (Cons _ as cons) -> add tail cons new_node backoff - - let create_node value : _ cons = Cons { next = N Nil; value } - - let add t (new_node : _ cons) : 'a cons = - let old_tail = Atomic.get t.tail in - if Atomic.compare_and_set (as_atomic old_tail) (N Nil) (N new_node) then begin - (* If the below CAS fails a new node was inserted and we are done. *) - Atomic.compare_and_set t.tail old_tail new_node |> ignore; - old_tail - end - else - let backoff = Backoff.once Backoff.default in - add t.tail old_tail new_node backoff - - (* *) - - let all t = Atomic.get t.head - let next cons = Atomic.get (as_atomic cons) - - let remove t (prev : 'a cons) (Cons curr_r as curr : 'a cons) null = - match Atomic.get (as_atomic curr) with - | N Nil -> - (* [curr] is the last node, we must clear it *) - curr_r.value <- null; - if Atomic.get t.head == prev then - Atomic.compare_and_set t.head prev curr |> ignore - | N (Cons _) as next -> - (* If below CAS fails, someone else must have removed [curr] *) - Atomic.compare_and_set (as_atomic prev) (N curr) next |> ignore -end - -external unit_as_false : unit -> bool = "%identity" - +module Atomic = Multicore_magic.Transparent_atomic module Tx = Computation.Tx type 'a taker = @@ -93,19 +18,13 @@ type 'a giver = } -> 'a giver -let null_taker = T { computation = Computation.exited (); result = Fun.id } +type 'a state = { givers : 'a giver Q.t; takers : 'a taker Q.t } -and null_giver = - G { computation = Computation.exited (); result = Fun.id; value = () } +let empty = { givers = T Zero; takers = T Zero } -external generalize_taker : unit taker -> 'a taker = "%identity" -external generalize_giver : unit giver -> 'a giver = "%identity" +type 'a t = 'a state Atomic.t -type 'a t = { givers : 'a giver Q.t; takers : 'a taker Q.t } - -let create ?padded () = - let givers = Q.create ?padded () and takers = Q.create ?padded () in - Multicore_magic.copy_as ?padded { givers; takers } +let create ?padded () = Atomic.make empty |> Multicore_magic.copy_as ?padded (* *) @@ -120,108 +39,182 @@ let[@inline never] wait computation = (* *) -let rec give_as t (G gr as self) prev = - match Q.next prev with - | N Nil -> false - | N (Cons { value = T tr; _ } as curr) -> - if Tx.same tr.computation gr.computation then give_as t self curr - else - let tx = Tx.create () in - let result = tr.result in - let value = gr.value in - if not (Tx.try_return tx tr.computation (fun () -> result value)) then begin - Q.remove t.takers prev curr (generalize_taker null_taker); - give_as t self curr - end - else if - (not (Tx.try_return tx gr.computation gr.result)) - || not (Tx.try_commit tx) - then - (not (Computation.is_running gr.computation)) - || give_as t self - (if Computation.is_running tr.computation then prev else curr) - else begin - Q.remove t.takers prev curr (generalize_taker null_taker); - true - end - -let rec give t value prev = - match Q.next prev with - | N Nil -> - let computation = Computation.create () in +let rec give t value backoff = + let before = Atomic.fenceless_get t in + match before.takers with + | Q.T Zero -> + let computation = Computation.create ~mode:`LIFO () in let self = G { computation; result = Fun.id; value } in - let curr = Q.create_node self in - let prev = Q.add t.givers curr in - if not (give_as t self (Q.all t.takers)) then wait computation; - Q.remove t.givers prev curr (generalize_giver null_giver) - | N (Cons { value = T { computation; result }; _ } as curr) -> + let givers = Q.add before.givers self in + let after = { givers; takers = T Zero } in + if Atomic.compare_and_set t before after then wait computation + else give t value (Backoff.once backoff) + | Q.T (One _ as takers) -> + let (T { computation; result }) = Q.head takers in let got = Computation.try_return computation (fun () -> result value) in - Q.remove t.takers prev curr (generalize_taker null_taker); - if not got then give t value curr + let takers = Q.tail takers in + let givers = before.givers in + let after = + if takers == T Zero && givers == T Zero then empty + else { givers; takers } + in + let no_contention = Atomic.compare_and_set t before after in + if not got then + give t value (if no_contention then backoff else Backoff.once backoff) + +let rec take t backoff = + let before = Atomic.fenceless_get t in + match before.givers with + | Q.T Zero -> + let computation = Computation.create ~mode:`LIFO () in + let self = T { computation; result = Fun.id } in + let takers = Q.add before.takers self in + let after = { givers = T Zero; takers } in + if Atomic.compare_and_set t before after then begin + wait computation; + Computation.await computation () + end + else take t (Backoff.once backoff) + | Q.T (One _ as givers) -> + let (G { computation; result; value }) = Q.head givers in + let got = Computation.try_return computation result in + let givers = Q.tail givers in + let takers = before.takers in + let after = + if givers == T Zero && takers == T Zero then empty + else { givers; takers } + in + let no_contention = Atomic.compare_and_set t before after in + if got then value + else take t (if no_contention then backoff else Backoff.once backoff) + +(* *) + +let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ Q.cons) + tail = + let (T tr as taker) = head_r.value in + if Tx.same tr.computation gr.computation then + let selfs = Q.cons taker selfs in + give_as_advance t self before selfs head tail + else + let tx = Tx.create () in + let result = tr.result in + let value = gr.value in + if not (Tx.try_return tx tr.computation (fun () -> result value)) then + give_as_advance t self before selfs head tail + else if + (not (Tx.try_return tx gr.computation gr.result)) + || not (Tx.try_commit tx) + then + if not (Computation.is_running gr.computation) then ( (* TODO *) ) + else if Computation.is_running tr.computation then + give_as t self before selfs head tail + else give_as_advance t self before selfs head tail + else + let takers = + if head == tail then Q.reverse_as_queue selfs + else + let head = Q.reverse_to (Q.as_cons head_r.next) selfs in + Q.T (One { head; tail; cons = tail }) + in + let givers = before.givers in + let after = + if takers == Q.T Zero && givers == Q.T Zero then empty + else { givers; takers } + in + if not (Atomic.compare_and_set t before after) then + ( (* TODO: avoid leak *) ) + +and give_as_advance t self before selfs (Cons head_r as head : _ Q.cons) tail = + if head != tail then give_as t self before selfs (Q.as_cons head_r.next) tail + else + let takers = Q.reverse_as_queue selfs in + let givers = Q.add before.givers self in + let after = { givers; takers } in + if not (Atomic.compare_and_set t before after) then give_as_start t self + +and give_as_start t self = + let before = Atomic.get t in + match before.takers with + | Q.T Zero -> + let takers = Q.T Zero in + let givers = Q.singleton self in + let after = { givers; takers } in + if not (Atomic.compare_and_set t before after) then give_as_start t self + | Q.T (One r as o) -> + Q.exec o; + give_as t self before (Q.S Nil) r.head r.cons let give_evt t value = let request computation result = - let self = G { computation; result; value } in - let curr = Q.create_node self in - let prev = Q.add t.givers curr in - if give_as t self (Q.all t.takers) then - Q.remove t.givers prev curr (generalize_giver null_giver) + give_as_start t (G { computation; result; value }) in Event.from_request { request } (* *) -let rec take_as t (T tr as self) prev = - match Q.next prev with - | N Nil -> false - | N (Cons { value = G gr; _ } as curr) -> - if Tx.same tr.computation gr.computation then take_as t self curr - else - let tx = Tx.create () in - let result = tr.result in - let value = gr.value in - if not (Tx.try_return tx gr.computation gr.result) then begin - Q.remove t.givers prev curr (generalize_giver null_giver); - take_as t self curr - end - else if - (not (Tx.try_return tx tr.computation (fun () -> result value))) - || not (Tx.try_commit tx) - then - (not (Computation.is_running tr.computation)) - || take_as t self - (if Computation.is_running gr.computation then prev else curr) - else begin - Q.remove t.givers prev curr (generalize_giver null_giver); - true - end - -let rec take t prev = - match Q.next prev with - | N Nil -> - let computation = Computation.create () in - let self = T { computation; result = Fun.id } in - let curr = Q.create_node self in - let prev = Q.add t.takers curr in - if not (take_as t self (Q.all t.givers)) then wait computation; - Q.remove t.takers prev curr (generalize_taker null_taker); - Computation.await computation () - | N (Cons { value = G { computation; result; value }; _ } as curr) -> - let got = Computation.try_return computation result in - Q.remove t.givers prev curr (generalize_giver null_giver); - if got then value else take t curr +let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ Q.cons) + tail = + let (G gr as giver) = head_r.value in + if Tx.same tr.computation gr.computation then + let selfs = Q.cons giver selfs in + take_as_advance t self before selfs head tail + else + let tx = Tx.create () in + let result = tr.result in + let value = gr.value in + if not (Tx.try_return tx gr.computation gr.result) then + take_as_advance t self before selfs head tail + else if + (not (Tx.try_return tx tr.computation (fun () -> result value))) + || not (Tx.try_commit tx) + then + if not (Computation.is_running gr.computation) then ( (* TODO *) ) + else if Computation.is_running tr.computation then + take_as t self before selfs head tail + else take_as_advance t self before selfs head tail + else + let takers = before.takers in + let givers = + if head == tail then Q.reverse_as_queue selfs + else + let head = Q.reverse_to (Q.as_cons head_r.next) selfs in + Q.T (One { head; tail; cons = tail }) + in + let after = + if takers == Q.T Zero && givers == Q.T Zero then empty + else { givers; takers } + in + if not (Atomic.compare_and_set t before after) then + ( (* TODO: avoid leak *) ) + +and take_as_advance t self before selfs (Cons head_r as head : _ Q.cons) tail = + if head != tail then take_as t self before selfs (Q.as_cons head_r.next) tail + else + let givers = Q.reverse_as_queue selfs in + let takers = Q.add before.takers self in + let after = { givers; takers } in + if not (Atomic.compare_and_set t before after) then take_as_start t self + +and take_as_start t self = + let before = Atomic.get t in + match before.givers with + | Q.T Zero -> + let givers = Q.T Zero in + let takers = Q.singleton self in + let after = { givers; takers } in + if not (Atomic.compare_and_set t before after) then take_as_start t self + | Q.T (One r as o) -> + Q.exec o; + take_as t self before (Q.S Nil) r.head r.cons let take_evt t = let request computation result = - let self = T { computation; result } in - let curr = Q.create_node self in - let prev = Q.add t.takers curr in - if take_as t self (Q.all t.givers) then - Q.remove t.takers prev curr (generalize_taker null_taker) + take_as_start t (T { computation; result }) in Event.from_request { request } (* *) -let[@inline] give t value = give t value (Q.all t.takers) -let[@inline] take t = take t (Q.all t.givers) +let[@inline] take t = take t Backoff.default +let[@inline] give t value = give t value Backoff.default diff --git a/lib/picos_std.sync/q.ml b/lib/picos_std.sync/q.ml index ada4fa0c..7f9e47c3 100644 --- a/lib/picos_std.sync/q.ml +++ b/lib/picos_std.sync/q.ml @@ -73,3 +73,18 @@ let remove (One o as t : (_, _) one) value = let tail = find_tail head in T (One { head; tail; cons = tail }) | exception Not_found -> T t + +let cons value next = S (Cons { value; next }) + +let rec reverse_to tail = function + | S Nil -> tail + | S (Cons cons_r as next) -> + let prev = cons_r.next in + cons_r.next <- S tail; + reverse_to next prev + +let reverse_as_queue = function + | S Nil -> T Zero + | S (Cons cons_r as tail) -> + let head = reverse_to tail cons_r.next in + T (One { head; tail; cons = tail })