diff --git a/lib/picos_sync/ch.ml b/lib/picos_sync/ch.ml index fa003c81..69734930 100644 --- a/lib/picos_sync/ch.ml +++ b/lib/picos_sync/ch.ml @@ -89,11 +89,11 @@ let rec take t backoff = (* *) -let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ Q.cons) +let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ S.cons) tail = let (T tr as taker) = head_r.value in if Tx.same tr.computation gr.computation then - let selfs = Q.cons taker selfs in + let selfs = S.cons taker selfs in give_as_advance t self before selfs head tail else let tx = Tx.create () in @@ -113,7 +113,7 @@ let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ Q.cons) let takers = if head == tail then Q.reverse_as_queue selfs else - let head = Q.reverse_to (Q.as_cons head_r.next) selfs in + let head = S.reverse_to (S.as_cons head_r.next) selfs in Q.T (One { head; tail; cons = tail }) in let givers = before.givers in @@ -124,8 +124,8 @@ let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ Q.cons) if not (Atomic.compare_and_set t before after) then ( (* TODO: avoid leak *) ) -and give_as_advance t self before selfs (Cons head_r as head : _ Q.cons) tail = - if head != tail then give_as t self before selfs (Q.as_cons head_r.next) tail +and give_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail = + if head != tail then give_as t self before selfs (S.as_cons head_r.next) tail else let takers = Q.reverse_as_queue selfs in let givers = Q.add before.givers self in @@ -142,7 +142,7 @@ and give_as_start t self = if not (Atomic.compare_and_set t before after) then give_as_start t self | Q.T (One r as o) -> Q.exec o; - give_as t self before (Q.S Nil) r.head r.cons + give_as t self before (T Nil) r.head r.cons let give_evt t value = let request computation result = @@ -152,11 +152,11 @@ let give_evt t value = (* *) -let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ Q.cons) +let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ S.cons) tail = let (G gr as giver) = head_r.value in if Tx.same tr.computation gr.computation then - let selfs = Q.cons giver selfs in + let selfs = S.cons giver selfs in take_as_advance t self before selfs head tail else let tx = Tx.create () in @@ -177,7 +177,7 @@ let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ Q.cons) let givers = if head == tail then Q.reverse_as_queue selfs else - let head = Q.reverse_to (Q.as_cons head_r.next) selfs in + let head = S.reverse_to (S.as_cons head_r.next) selfs in Q.T (One { head; tail; cons = tail }) in let after = @@ -187,8 +187,8 @@ let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ Q.cons) if not (Atomic.compare_and_set t before after) then ( (* TODO: avoid leak *) ) -and take_as_advance t self before selfs (Cons head_r as head : _ Q.cons) tail = - if head != tail then take_as t self before selfs (Q.as_cons head_r.next) tail +and take_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail = + if head != tail then take_as t self before selfs (S.as_cons head_r.next) tail else let givers = Q.reverse_as_queue selfs in let takers = Q.add before.takers self in @@ -205,7 +205,7 @@ and take_as_start t self = if not (Atomic.compare_and_set t before after) then take_as_start t self | Q.T (One r as o) -> Q.exec o; - take_as t self before (Q.S Nil) r.head r.cons + take_as t self before (T Nil) r.head r.cons let take_evt t = let request computation result = diff --git a/lib/picos_sync/condition.ml b/lib/picos_sync/condition.ml index 5fc6592a..1875da7c 100644 --- a/lib/picos_sync/condition.ml +++ b/lib/picos_sync/condition.ml @@ -9,7 +9,7 @@ let broadcast (t : t) = if Atomic.get t != T Zero then match Atomic.exchange t (T Zero) with | T Zero -> () - | T (One _ as q) -> Q.iter q Trigger.signal + | T (One _ as q) -> Q.iter Trigger.signal q (* We try to avoid starvation of signal by making it so that when, at the start of signal or wait, the head is empty, the tail is reversed into the head. @@ -38,14 +38,15 @@ let rec cleanup backoff trigger (t : t) = else if not (Atomic.compare_and_set t before after) then cleanup (Backoff.once backoff) trigger t -let rec wait (t : t) mutex trigger fiber backoff = +let rec wait (t : t) mutex cons fiber backoff = let before = Atomic.get t in - let after = Q.add before trigger in + let after = Q.add_cons before cons in if Atomic.compare_and_set t before after then begin Mutex.unlock_as (Fiber.Maybe.of_fiber fiber) mutex Backoff.default; + let trigger = S.value cons in let result = Trigger.await trigger in let forbid = Fiber.exchange fiber ~forbid:true in - Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex Nothing Backoff.default; + Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex (T Nil) Backoff.default; Fiber.set fiber ~forbid; match result with | None -> () @@ -53,11 +54,12 @@ let rec wait (t : t) mutex trigger fiber backoff = cleanup Backoff.default trigger t; Exn_bt.raise exn_bt end - else wait t mutex trigger fiber (Backoff.once backoff) + else wait t mutex cons fiber (Backoff.once backoff) let wait t mutex = let fiber = Fiber.current () in let trigger = Trigger.create () in - wait t mutex trigger fiber Backoff.default + let cons = S.Cons { value = trigger; next = T Nil } in + wait t mutex cons fiber Backoff.default let[@inline] signal t = signal t Backoff.default diff --git a/lib/picos_sync/mutex.ml b/lib/picos_sync/mutex.ml index 04b54b18..d40147a2 100644 --- a/lib/picos_sync/mutex.ml +++ b/lib/picos_sync/mutex.ml @@ -4,101 +4,143 @@ let[@inline never] owner () = raise (Sys_error "Mutex: owner") let[@inline never] unlocked () = raise (Sys_error "Mutex: unlocked") let[@inline never] not_owner () = raise (Sys_error "Mutex: not owner") -type _ tdt = - | Entry : { trigger : Trigger.t; fiber : Fiber.Maybe.t } -> [> `Entry ] tdt - | Nothing : [> `Nothing ] tdt +type entry = { trigger : Trigger.t; fiber : Fiber.Maybe.t } type state = | Unlocked - | Locked of { fiber : Fiber.Maybe.t; waiters : [ `Entry ] tdt Q.t } + | Locked of { fiber : Fiber.Maybe.t } + | Contended of { + fiber : Fiber.Maybe.t; + head : entry S.cons; + tail : entry S.cons; + cons : entry S.cons; + } type t = state Atomic.t let create ?padded () = Multicore_magic.copy_as ?padded @@ Atomic.make Unlocked -let locked_nothing = Locked { fiber = Fiber.Maybe.nothing; waiters = T Zero } +let locked_nothing = Locked { fiber = Fiber.Maybe.nothing } let rec unlock_as owner t backoff = match Atomic.get t with | Unlocked -> unlocked () | Locked r as before -> - if Fiber.Maybe.equal r.fiber owner then - match r.waiters with - | T Zero -> - if not (Atomic.compare_and_set t before Unlocked) then - unlock_as owner t (Backoff.once backoff) - | T (One _ as q) -> - let (Entry { trigger; fiber }) = Q.head q in - let waiters = Q.tail q in - let after = Locked { fiber; waiters } in - if Atomic.compare_and_set t before after then Trigger.signal trigger - else unlock_as owner t (Backoff.once backoff) + if Fiber.Maybe.equal r.fiber owner then begin + if not (Atomic.compare_and_set t before Unlocked) then + unlock_as owner t (Backoff.once backoff) + end + else not_owner () + | Contended r as before -> + if Fiber.Maybe.equal r.fiber owner then begin + S.exec r.tail r.cons; + let { trigger; fiber } = S.value r.head in + let after = + if r.head != r.cons then + let head = S.next_as_cons r.head in + Contended { fiber; head; tail = r.cons; cons = r.cons } + else if fiber == Fiber.Maybe.nothing then locked_nothing + else Locked { fiber } + in + if Atomic.compare_and_set t before after then Trigger.signal trigger + else unlock_as owner t (Backoff.once backoff) + end else not_owner () let[@inline] unlock ?checked t = let owner = Fiber.Maybe.current_if checked in unlock_as owner t Backoff.default -let rec cleanup_as (Entry entry_r as entry : [ `Entry ] tdt) t backoff = +let rec cleanup_as entry t backoff = (* We have been canceled. If we are the owner, we must unlock the mutex. Otherwise we must remove our entry from the queue. *) match Atomic.get t with - | Locked r as before -> begin - match r.waiters with - | T Zero -> unlock_as entry_r.fiber t backoff - | T (One _ as q) -> - let waiters = Q.remove q entry in - if r.waiters == waiters then unlock_as entry_r.fiber t backoff - else - let after = Locked { fiber = r.fiber; waiters } in - if not (Atomic.compare_and_set t before after) then - cleanup_as entry t (Backoff.once backoff) + | Locked _ -> unlock_as entry.fiber t backoff + | Contended r as before -> begin + S.exec r.tail r.cons; + match S.reject r.head entry with + | S.T Nil -> + let after = Locked { fiber = r.fiber } in + if not (Atomic.compare_and_set t before after) then + cleanup_as entry t (Backoff.once backoff) + | S.T (Cons _ as head) -> + let tail = S.find_tail head in + let after = Contended { fiber = r.fiber; head; tail; cons = tail } in + if not (Atomic.compare_and_set t before after) then + cleanup_as entry t (Backoff.once backoff) + | exception Not_found -> unlock_as entry.fiber t backoff end | Unlocked -> unlocked () -let rec lock_as fiber t entry backoff = +let rec lock_as fiber t node backoff = match Atomic.get t with | Unlocked as before -> let after = if fiber == Fiber.Maybe.nothing then locked_nothing - else Locked { fiber; waiters = T Zero } + else Locked { fiber } in if not (Atomic.compare_and_set t before after) then - lock_as fiber t entry (Backoff.once backoff) + lock_as fiber t node (Backoff.once backoff) | Locked r as before -> if Fiber.Maybe.unequal r.fiber fiber then - let (Entry entry_r as entry : [ `Entry ] tdt) = - match entry with - | Nothing -> + let cons = + match node with + | S.T Nil -> + let trigger = Trigger.create () in + let value = { trigger; fiber } in + S.Cons { value; next = T Nil } + | S.T (Cons _ as cons) -> cons + in + let after = + Contended { fiber = r.fiber; head = cons; tail = cons; cons } + in + if Atomic.compare_and_set t before after then begin + let entry = S.value cons in + match Trigger.await entry.trigger with + | None -> () + | Some exn_bt -> + cleanup_as entry t Backoff.default; + Exn_bt.raise exn_bt + end + else lock_as fiber t (T cons) (Backoff.once backoff) + else owner () + | Contended r as before -> + if Fiber.Maybe.unequal r.fiber fiber then begin + S.exec r.tail r.cons; + let cons = + match node with + | S.T Nil -> let trigger = Trigger.create () in - Entry { trigger; fiber } - | Entry _ as entry -> entry + let value = { trigger; fiber } in + S.Cons { value; next = T Nil } + | S.T (Cons _ as cons) -> cons in - let waiters = Q.add r.waiters entry in - let after = Locked { fiber = r.fiber; waiters } in + let after = Contended { r with tail = r.cons; cons } in if Atomic.compare_and_set t before after then begin - match Trigger.await entry_r.trigger with + let entry = S.value cons in + match Trigger.await entry.trigger with | None -> () | Some exn_bt -> cleanup_as entry t Backoff.default; Exn_bt.raise exn_bt end - else lock_as fiber t entry (Backoff.once backoff) + else lock_as fiber t (T cons) (Backoff.once backoff) + end else owner () let[@inline] lock ?checked t = let fiber = Fiber.Maybe.current_and_check_if checked in - lock_as fiber t Nothing Backoff.default + lock_as fiber t (T Nil) Backoff.default let try_lock ?checked t = let fiber = Fiber.Maybe.current_and_check_if checked in Atomic.get t == Unlocked && Atomic.compare_and_set t Unlocked (if fiber == Fiber.Maybe.nothing then locked_nothing - else Locked { fiber; waiters = T Zero }) + else Locked { fiber }) let protect ?checked t body = let fiber = Fiber.Maybe.current_and_check_if checked in - lock_as fiber t Nothing Backoff.default; + lock_as fiber t (T Nil) Backoff.default; match body () with | value -> unlock_as fiber t Backoff.default; diff --git a/lib/picos_sync/q.ml b/lib/picos_sync/q.ml index 7f9e47c3..bef38003 100644 --- a/lib/picos_sync/q.ml +++ b/lib/picos_sync/q.ml @@ -1,19 +1,9 @@ -type ('a, _) tdt = - | Nil : ('a, [> `Nil ]) tdt - | Cons : { value : 'a; mutable next : 'a spine } -> ('a, [> `Cons ]) tdt - -and 'a spine = S : ('a, [< `Nil | `Cons ]) tdt -> 'a spine [@@unboxed] - -type 'a cons = ('a, [ `Cons ]) tdt - -external as_cons : 'a spine -> 'a cons = "%identity" - type ('a, _) queue = | Zero : ('a, [> `Zero ]) queue | One : { - head : 'a cons; - tail : 'a cons; - cons : 'a cons; + head : 'a S.cons; + tail : 'a S.cons; + cons : 'a S.cons; } -> ('a, [> `One ]) queue @@ -21,22 +11,29 @@ type ('a, 'n) one = ('a, ([< `One ] as 'n)) queue type 'a t = T : ('a, [< `Zero | `One ]) queue -> 'a t [@@unboxed] let[@inline] singleton value = - let cons = Cons { value; next = S Nil } in + let cons = S.Cons { value; next = T Nil } in T (One { head = cons; tail = cons; cons }) let[@inline] exec (One o : (_, _) one) = if o.tail != o.cons then let (Cons tl) = o.tail in - if tl.next != S o.cons then tl.next <- S o.cons + if tl.next != T o.cons then tl.next <- T o.cons let[@inline] snoc (One o as t : (_, _) one) value = exec t; - let cons = Cons { value; next = S Nil } in + let cons = S.Cons { value; next = T Nil } in T (One { head = o.head; tail = o.cons; cons }) let[@inline] add t value = match t with T Zero -> singleton value | T (One _ as o) -> snoc o value +let[@inline] add_cons t cons = + match t with + | T Zero -> T (One { head = cons; tail = cons; cons }) + | T (One r as o) -> + exec o; + T (One { head = r.head; tail = r.cons; cons }) + let[@inline] head (One { head = Cons hd; _ } : (_, _) one) = hd.value let[@inline] tail (One o as t : (_, _) one) = @@ -44,47 +41,23 @@ let[@inline] tail (One o as t : (_, _) one) = if o.head == o.cons then T Zero else let (Cons hd) = o.head in - T (One { head = as_cons hd.next; tail = o.cons; cons = o.cons }) + T (One { head = S.as_cons hd.next; tail = o.cons; cons = o.cons }) -let rec iter (Cons cons_r : _ cons) action = - action cons_r.value; - match cons_r.next with S Nil -> () | S (Cons _ as cons) -> iter cons action - -let[@inline] iter (One o as t : (_, _) one) action = +let[@inline] iter action (One o as t : (_, _) one) = exec t; - iter o.head action - -let rec find_tail (Cons cons_r as cons : _ cons) = - match cons_r.next with S Nil -> cons | S (Cons _ as cons) -> find_tail cons - -let[@tail_mod_cons] rec reject (Cons cons_r : _ cons) value = - if cons_r.value != value then - match cons_r.next with - | S Nil -> raise_notrace Not_found - | S (Cons _ as cons) -> - S (Cons { value = cons_r.value; next = reject cons value }) - else cons_r.next + S.iter action o.head o.cons let remove (One o as t : (_, _) one) value = exec t; - match reject o.head value with - | S Nil -> T Zero - | S (Cons _ as head) -> - let tail = find_tail head in + match S.reject o.head value with + | S.T Nil -> T Zero + | S.T (Cons _ as head) -> + let tail = S.find_tail head in T (One { head; tail; cons = tail }) | exception Not_found -> T t -let cons value next = S (Cons { value; next }) - -let rec reverse_to tail = function - | S Nil -> tail - | S (Cons cons_r as next) -> - let prev = cons_r.next in - cons_r.next <- S tail; - reverse_to next prev - let reverse_as_queue = function - | S Nil -> T Zero - | S (Cons cons_r as tail) -> - let head = reverse_to tail cons_r.next in + | S.T Nil -> T Zero + | S.T (Cons cons_r as tail) -> + let head = S.reverse_to tail cons_r.next in T (One { head; tail; cons = tail }) diff --git a/lib/picos_sync/s.ml b/lib/picos_sync/s.ml new file mode 100644 index 00000000..f9c490bd --- /dev/null +++ b/lib/picos_sync/s.ml @@ -0,0 +1,41 @@ +type ('a, _) tdt = + | Nil : ('a, [> `Nil ]) tdt + | Cons : { value : 'a; mutable next : 'a t } -> ('a, [> `Cons ]) tdt + +and 'a t = T : ('a, [< `Nil | `Cons ]) tdt -> 'a t [@@unboxed] + +type 'a cons = ('a, [ `Cons ]) tdt + +external as_cons : 'a t -> 'a cons = "%identity" + +let[@inline] value (Cons cons_r : _ cons) = cons_r.value +let[@inline] next_as_cons (Cons cons_r : _ cons) = as_cons cons_r.next + +let[@inline] exec (tail : _ cons) (cons : _ cons) = + if tail != cons then + let (Cons tl) = tail in + if tl.next != T cons then tl.next <- T cons + +let[@inline] cons value next = T (Cons { value; next }) + +let rec reverse_to tail = function + | T Nil -> tail + | T (Cons cons_r as next) -> + let prev = cons_r.next in + cons_r.next <- T tail; + reverse_to next prev + +let rec iter action (Cons head_r as head : _ cons) tail = + action head_r.value; + if head != tail then iter action (as_cons head_r.next) tail + +let[@tail_mod_cons] rec reject (Cons cons_r : _ cons) value = + if cons_r.value != value then + match cons_r.next with + | T Nil -> raise_notrace Not_found + | T (Cons _ as cons) -> + T (Cons { value = cons_r.value; next = reject cons value }) + else cons_r.next + +let rec find_tail (Cons cons_r as cons : _ cons) = + match cons_r.next with T Nil -> cons | T (Cons _ as cons) -> find_tail cons diff --git a/lib/picos_sync/semaphore.ml b/lib/picos_sync/semaphore.ml index e7a10d76..4b8edd03 100644 --- a/lib/picos_sync/semaphore.ml +++ b/lib/picos_sync/semaphore.ml @@ -38,37 +38,31 @@ module Counting = struct else if not (Atomic.compare_and_set t (Obj.repr before) (Obj.repr after)) then cleanup t trigger (Backoff.once backoff) - let rec acquire t backoff = + let rec acquire t node backoff = let before = Atomic.get t in - if Obj.is_int before then - let count = Obj.obj before in - if 0 < count then begin - let after = Obj.repr (count - 1) in - if not (Atomic.compare_and_set t before after) then - acquire t (Backoff.once backoff) - end - else - let trigger = Trigger.create () in - let after = Q.singleton trigger in - if Atomic.compare_and_set t before (Obj.repr after) then begin - match Trigger.await trigger with - | None -> () - | Some exn_bt -> - cleanup t trigger Backoff.default; - Exn_bt.raise exn_bt - end - else acquire t (Backoff.once backoff) + if Obj.is_int before && 0 < (Obj.obj before : int) then begin + let after = Obj.repr (Obj.obj before - 1) in + if not (Atomic.compare_and_set t before after) then + acquire t node (Backoff.once backoff) + end else - let trigger = Trigger.create () in - let after = Q.snoc (Obj.obj before) trigger in + let cons = + match node with + | S.T Nil -> + let trigger = Trigger.create () in + S.Cons { value = trigger; next = T Nil } + | S.T (Cons _ as cons) -> cons + in + let after = Q.add_cons (Obj.obj before) cons in if Atomic.compare_and_set t before (Obj.repr after) then begin + let trigger = S.value cons in match Trigger.await trigger with | None -> () | Some exn_bt -> cleanup t trigger Backoff.default; Exn_bt.raise exn_bt end - else acquire t (Backoff.once backoff) + else acquire t (T cons) (Backoff.once backoff) let rec try_acquire t backoff = let before = Atomic.get t in @@ -86,7 +80,7 @@ module Counting = struct if Obj.is_int state then Obj.obj state else 0 let[@inline] release t = release t Backoff.default - let[@inline] acquire t = acquire t Backoff.default + let[@inline] acquire t = acquire t (T Nil) Backoff.default let[@inline] try_acquire t = try_acquire t Backoff.default end