Skip to content

Commit

Permalink
Hmm...
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Aug 31, 2024
1 parent 02485aa commit 93c2d19
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 133 deletions.
24 changes: 12 additions & 12 deletions lib/picos_sync/ch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ let rec take t backoff =

(* *)

let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ Q.cons)
let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ S.cons)
tail =
let (T tr as taker) = head_r.value in
if Tx.same tr.computation gr.computation then
let selfs = Q.cons taker selfs in
let selfs = S.cons taker selfs in
give_as_advance t self before selfs head tail
else
let tx = Tx.create () in
Expand All @@ -113,7 +113,7 @@ let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ Q.cons)
let takers =
if head == tail then Q.reverse_as_queue selfs
else
let head = Q.reverse_to (Q.as_cons head_r.next) selfs in
let head = S.reverse_to (S.as_cons head_r.next) selfs in
Q.T (One { head; tail; cons = tail })
in
let givers = before.givers in
Expand All @@ -124,8 +124,8 @@ let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ Q.cons)
if not (Atomic.compare_and_set t before after) then
( (* TODO: avoid leak *) )

and give_as_advance t self before selfs (Cons head_r as head : _ Q.cons) tail =
if head != tail then give_as t self before selfs (Q.as_cons head_r.next) tail
and give_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail =
if head != tail then give_as t self before selfs (S.as_cons head_r.next) tail
else
let takers = Q.reverse_as_queue selfs in
let givers = Q.add before.givers self in
Expand All @@ -142,7 +142,7 @@ and give_as_start t self =
if not (Atomic.compare_and_set t before after) then give_as_start t self
| Q.T (One r as o) ->
Q.exec o;
give_as t self before (Q.S Nil) r.head r.cons
give_as t self before (T Nil) r.head r.cons

let give_evt t value =
let request computation result =
Expand All @@ -152,11 +152,11 @@ let give_evt t value =

(* *)

let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ Q.cons)
let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ S.cons)
tail =
let (G gr as giver) = head_r.value in
if Tx.same tr.computation gr.computation then
let selfs = Q.cons giver selfs in
let selfs = S.cons giver selfs in
take_as_advance t self before selfs head tail
else
let tx = Tx.create () in
Expand All @@ -177,7 +177,7 @@ let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ Q.cons)
let givers =
if head == tail then Q.reverse_as_queue selfs
else
let head = Q.reverse_to (Q.as_cons head_r.next) selfs in
let head = S.reverse_to (S.as_cons head_r.next) selfs in
Q.T (One { head; tail; cons = tail })
in
let after =
Expand All @@ -187,8 +187,8 @@ let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ Q.cons)
if not (Atomic.compare_and_set t before after) then
( (* TODO: avoid leak *) )

and take_as_advance t self before selfs (Cons head_r as head : _ Q.cons) tail =
if head != tail then take_as t self before selfs (Q.as_cons head_r.next) tail
and take_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail =
if head != tail then take_as t self before selfs (S.as_cons head_r.next) tail
else
let givers = Q.reverse_as_queue selfs in
let takers = Q.add before.takers self in
Expand All @@ -205,7 +205,7 @@ and take_as_start t self =
if not (Atomic.compare_and_set t before after) then take_as_start t self
| Q.T (One r as o) ->
Q.exec o;
take_as t self before (Q.S Nil) r.head r.cons
take_as t self before (T Nil) r.head r.cons

let take_evt t =
let request computation result =
Expand Down
14 changes: 8 additions & 6 deletions lib/picos_sync/condition.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ let broadcast (t : t) =
if Atomic.get t != T Zero then
match Atomic.exchange t (T Zero) with
| T Zero -> ()
| T (One _ as q) -> Q.iter q Trigger.signal
| T (One _ as q) -> Q.iter Trigger.signal q

(* We try to avoid starvation of signal by making it so that when, at the start
of signal or wait, the head is empty, the tail is reversed into the head.
Expand Down Expand Up @@ -38,26 +38,28 @@ let rec cleanup backoff trigger (t : t) =
else if not (Atomic.compare_and_set t before after) then
cleanup (Backoff.once backoff) trigger t

let rec wait (t : t) mutex trigger fiber backoff =
let rec wait (t : t) mutex cons fiber backoff =
let before = Atomic.get t in
let after = Q.add before trigger in
let after = Q.add_cons before cons in
if Atomic.compare_and_set t before after then begin
Mutex.unlock_as (Fiber.Maybe.of_fiber fiber) mutex Backoff.default;
let trigger = S.value cons in
let result = Trigger.await trigger in
let forbid = Fiber.exchange fiber ~forbid:true in
Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex Nothing Backoff.default;
Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex (T Nil) Backoff.default;
Fiber.set fiber ~forbid;
match result with
| None -> ()
| Some exn_bt ->
cleanup Backoff.default trigger t;
Exn_bt.raise exn_bt
end
else wait t mutex trigger fiber (Backoff.once backoff)
else wait t mutex cons fiber (Backoff.once backoff)

let wait t mutex =
let fiber = Fiber.current () in
let trigger = Trigger.create () in
wait t mutex trigger fiber Backoff.default
let cons = S.Cons { value = trigger; next = T Nil } in
wait t mutex cons fiber Backoff.default

let[@inline] signal t = signal t Backoff.default
126 changes: 84 additions & 42 deletions lib/picos_sync/mutex.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,101 +4,143 @@ let[@inline never] owner () = raise (Sys_error "Mutex: owner")
let[@inline never] unlocked () = raise (Sys_error "Mutex: unlocked")
let[@inline never] not_owner () = raise (Sys_error "Mutex: not owner")

type _ tdt =
| Entry : { trigger : Trigger.t; fiber : Fiber.Maybe.t } -> [> `Entry ] tdt
| Nothing : [> `Nothing ] tdt
type entry = { trigger : Trigger.t; fiber : Fiber.Maybe.t }

type state =
| Unlocked
| Locked of { fiber : Fiber.Maybe.t; waiters : [ `Entry ] tdt Q.t }
| Locked of { fiber : Fiber.Maybe.t }
| Contended of {
fiber : Fiber.Maybe.t;
head : entry S.cons;
tail : entry S.cons;
cons : entry S.cons;
}

type t = state Atomic.t

let create ?padded () = Multicore_magic.copy_as ?padded @@ Atomic.make Unlocked
let locked_nothing = Locked { fiber = Fiber.Maybe.nothing; waiters = T Zero }
let locked_nothing = Locked { fiber = Fiber.Maybe.nothing }

let rec unlock_as owner t backoff =
match Atomic.get t with
| Unlocked -> unlocked ()
| Locked r as before ->
if Fiber.Maybe.equal r.fiber owner then
match r.waiters with
| T Zero ->
if not (Atomic.compare_and_set t before Unlocked) then
unlock_as owner t (Backoff.once backoff)
| T (One _ as q) ->
let (Entry { trigger; fiber }) = Q.head q in
let waiters = Q.tail q in
let after = Locked { fiber; waiters } in
if Atomic.compare_and_set t before after then Trigger.signal trigger
else unlock_as owner t (Backoff.once backoff)
if Fiber.Maybe.equal r.fiber owner then begin
if not (Atomic.compare_and_set t before Unlocked) then
unlock_as owner t (Backoff.once backoff)
end
else not_owner ()
| Contended r as before ->
if Fiber.Maybe.equal r.fiber owner then begin
S.exec r.tail r.cons;
let { trigger; fiber } = S.value r.head in
let after =
if r.head != r.cons then
let head = S.next_as_cons r.head in
Contended { fiber; head; tail = r.cons; cons = r.cons }
else if fiber == Fiber.Maybe.nothing then locked_nothing
else Locked { fiber }
in
if Atomic.compare_and_set t before after then Trigger.signal trigger
else unlock_as owner t (Backoff.once backoff)
end
else not_owner ()

let[@inline] unlock ?checked t =
let owner = Fiber.Maybe.current_if checked in
unlock_as owner t Backoff.default

let rec cleanup_as (Entry entry_r as entry : [ `Entry ] tdt) t backoff =
let rec cleanup_as entry t backoff =
(* We have been canceled. If we are the owner, we must unlock the mutex.
Otherwise we must remove our entry from the queue. *)
match Atomic.get t with
| Locked r as before -> begin
match r.waiters with
| T Zero -> unlock_as entry_r.fiber t backoff
| T (One _ as q) ->
let waiters = Q.remove q entry in
if r.waiters == waiters then unlock_as entry_r.fiber t backoff
else
let after = Locked { fiber = r.fiber; waiters } in
if not (Atomic.compare_and_set t before after) then
cleanup_as entry t (Backoff.once backoff)
| Locked _ -> unlock_as entry.fiber t backoff
| Contended r as before -> begin
S.exec r.tail r.cons;
match S.reject r.head entry with
| S.T Nil ->
let after = Locked { fiber = r.fiber } in
if not (Atomic.compare_and_set t before after) then
cleanup_as entry t (Backoff.once backoff)
| S.T (Cons _ as head) ->
let tail = S.find_tail head in
let after = Contended { fiber = r.fiber; head; tail; cons = tail } in
if not (Atomic.compare_and_set t before after) then
cleanup_as entry t (Backoff.once backoff)
| exception Not_found -> unlock_as entry.fiber t backoff
end
| Unlocked -> unlocked ()

let rec lock_as fiber t entry backoff =
let rec lock_as fiber t node backoff =
match Atomic.get t with
| Unlocked as before ->
let after =
if fiber == Fiber.Maybe.nothing then locked_nothing
else Locked { fiber; waiters = T Zero }
else Locked { fiber }
in
if not (Atomic.compare_and_set t before after) then
lock_as fiber t entry (Backoff.once backoff)
lock_as fiber t node (Backoff.once backoff)
| Locked r as before ->
if Fiber.Maybe.unequal r.fiber fiber then
let (Entry entry_r as entry : [ `Entry ] tdt) =
match entry with
| Nothing ->
let cons =
match node with
| S.T Nil ->
let trigger = Trigger.create () in
let value = { trigger; fiber } in
S.Cons { value; next = T Nil }
| S.T (Cons _ as cons) -> cons
in
let after =
Contended { fiber = r.fiber; head = cons; tail = cons; cons }
in
if Atomic.compare_and_set t before after then begin
let entry = S.value cons in
match Trigger.await entry.trigger with
| None -> ()
| Some exn_bt ->
cleanup_as entry t Backoff.default;
Exn_bt.raise exn_bt
end
else lock_as fiber t (T cons) (Backoff.once backoff)
else owner ()
| Contended r as before ->
if Fiber.Maybe.unequal r.fiber fiber then begin
S.exec r.tail r.cons;
let cons =
match node with
| S.T Nil ->
let trigger = Trigger.create () in
Entry { trigger; fiber }
| Entry _ as entry -> entry
let value = { trigger; fiber } in
S.Cons { value; next = T Nil }
| S.T (Cons _ as cons) -> cons
in
let waiters = Q.add r.waiters entry in
let after = Locked { fiber = r.fiber; waiters } in
let after = Contended { r with tail = r.cons; cons } in
if Atomic.compare_and_set t before after then begin
match Trigger.await entry_r.trigger with
let entry = S.value cons in
match Trigger.await entry.trigger with
| None -> ()
| Some exn_bt ->
cleanup_as entry t Backoff.default;
Exn_bt.raise exn_bt
end
else lock_as fiber t entry (Backoff.once backoff)
else lock_as fiber t (T cons) (Backoff.once backoff)
end
else owner ()

let[@inline] lock ?checked t =
let fiber = Fiber.Maybe.current_and_check_if checked in
lock_as fiber t Nothing Backoff.default
lock_as fiber t (T Nil) Backoff.default

let try_lock ?checked t =
let fiber = Fiber.Maybe.current_and_check_if checked in
Atomic.get t == Unlocked
&& Atomic.compare_and_set t Unlocked
(if fiber == Fiber.Maybe.nothing then locked_nothing
else Locked { fiber; waiters = T Zero })
else Locked { fiber })

let protect ?checked t body =
let fiber = Fiber.Maybe.current_and_check_if checked in
lock_as fiber t Nothing Backoff.default;
lock_as fiber t (T Nil) Backoff.default;
match body () with
| value ->
unlock_as fiber t Backoff.default;
Expand Down
Loading

0 comments on commit 93c2d19

Please sign in to comment.