- Atoms
- Futures & Promises
- Delays
- Agents
- Worker Threads
- Locking
- Schedulers
- Bare Threads
- Thread local vars
- Examples
Atoms provide uncoordinated, synchronous access to a single identity.
The primary use of an atom is to hold Venice’s immutable data structures.
The value held by an atom is changed with the swap!
method.
(do
(def counter (atom 2))
(swap! counter + 2)
(deref counter))
@foo
is the dereference short form for (deref foo)
(do
(def counter (atom 2))
(swap! counter + 2)
@counter)
A future takes a function and yields a future object that will invoke the function
in another thread, and will cache the result and return it on all subsequent calls
to deref. If the computation has not yet finished, calls to deref will block,
unless the variant of deref with timeout is used.
A future can be cancelled (cancel f)
as long its computation has not yet
finished. A future can be checked if it has been cancelled (cancelled? f)
or
if its computation has finished (realized? f)
.
(do
(defn task [] (sleep 1000) 200)
(deref (future task)))
Dereference with a timeout, and a default value. The default value that is returned if the computation is not finished within the timeout time.
(do
(defn task [] (sleep 1000) 200)
(deref (future task) 100 :timeout))
A promise is a thread-safe object that encapsulates an immutable value. This value might not be available yet and can be delivered exactly once, from any thread, later. If another thread tries to dereference a promise before it's delivered, it will block the calling thread. If the promise is already resolved (delivered), no blocking occurs at all. A promise can only be delivered once and can never change its value once set.
Promises are futures too. They come with additional features for chaining and combining asynchronous tasks.
(do
(def p (promise))
(defn task [] (sleep 500) (deliver p 123))
; deliver a value to the promise
(future task)
; deref the promise
(deref p)) ; => 123
Using a promise like a future:
(do
(defn task [] (sleep 500) 123)
(def p (promise task))
; deref the promise
(deref p)) ; => 123
Chaining asynchronous tasks:
(-> (promise (fn [] (sleep 50) 5))
(then-apply (fn [x] (sleep 30) (+ x 2)))
(then-apply (fn [x] (sleep 20) (* x 3)))
(deref)) ; => 21
Combining the result of two asynchronous tasks:
(-> (promise #(do (sleep 50) 1000))
(then-apply #(do (sleep 20) (+ %1 50)))
(then-combine (-> (promise #(do (sleep 30) "eur"))
(then-apply str/upper-case))
#(str %1 " " %2))
(deref)) ; => "1050 EUR"
Composing the result of two asynchronous tasks:
(-> (promise #(do (sleep 20) 1000))
(then-apply #(do (sleep 20) (+ %1 50)))
(then-compose (fn [o] (-> (promise #(do (sleep 20) "eur"))
(then-apply str/upper-case)
(then-apply #(str o " " %1)))))
(deref)) ; => "1050 EUR"
Suppose we want to make coffee. This involves 4 steps:
- 1a grind coffee beans
- 1b heat water
- 2 mix the hot water with the ground beans
- 3 filter the coffee
+------------------------+ +------------------------+
| 1a) grind coffee beans | | 1b) heat water |
+------------------------+ +------------------------+
| |
| |
| |
v v
+------------------------+
| 2) mix water and beans |
+------------------------+
|
v
+------------------------+
| 3) filter the coffee |
+------------------------+
All these steps take varying time, so they run asynchronously and have to be orchestrated.
(do
(def trace (let [mutex 0]
(fn [& xs] (locking mutex (println (apply str xs))))))
;; the domain data
(deftype :coffee-beans [])
(deftype :ground-coffee [])
(deftype :cold-water [])
(deftype :warm-water [])
(deftype :unfiltered-coffee [])
(deftype :filtered-coffee [])
;; the domain functions
(defn grind-beans [coffee-beans]
(trace "1a) grinding beans...")
(ground-coffee.))
(defn heat-water [cold-water]
(trace "1b) heating water...")
(warm-water.))
(defn mix [warm-water ground-coffee]
(trace "2) mixing water and coffee...")
(unfiltered-coffee.))
(defn filter-coffee [unfiltered-coffee]
(trace "3) filtering coffee...")
(filtered-coffee.))
;; make coffee: version with promises
(defn make-coffee []
(-> (promise #(grind-beans (coffee-beans.))) ;; 1a
(then-combine (promise #(heat-water (cold-water.))) ;; 1b
#(mix %1 %2)) ;; 2
(then-apply #(filter-coffee %1)) ;; 3
(deref)))
;; make coffee: version with futures and promises
(defn make-coffee-2 []
(let [step_1a (promise)
step_1b (promise)
step_2 (promise)
step_3 (promise)]
(future #(deliver step_1a (grind-beans (coffee-beans.)))) ;; 1a
(future #(deliver step_1b (heat-water (cold-water.)))) ;; 1b
(future #(deliver step_2 (mix @step_1a @step_1b))) ;; 2
(future #(deliver step_3 (filter-coffee @step_2))) ;; 3
@step_3))
(make-coffee))
Timeouts:
(-> (promise (fn [] (sleep 100) "The quick brown fox..."))
(or-timeout 300 :milliseconds) ; throws a TimeoutException
(then-apply str/upper-case)
(deref))
(-> (promise (fn [] (sleep 500) "The quick brown fox..."))
(complete-on-timeout "The fox did not jump" 300 :milliseconds)
(deref))
The delay function takes a body of expressions and yields a Delay object that will
invoke the body only the first time it is forced (with force
or deref
/@
), and
will cache the result and return it on all subsequent force
calls.
(do
(def x (delay (println "realizing...") 100))
(sleep 1 :seconds)
(deref x))
(do
(def x (delay (println "realizing...") 100))
(sleep 1 :seconds)
(force x))
Agents provide uncoordinated, asynchronous access to a single identity. Actions are functions that are asynchronously applied to an Agent's state and whose return value becomes the Agent's new state.
(do
(def x (agent 100))
(send x + 5)
(await-for 100 x)
@x)
The following example is an implementation of a message relay. A chain of n agents is created, then a sequence of m actions are dispatched to the head of the chain and relayed through it:
Note: The example has been taken from the Clojure Agent demo and uses a synchronous queue
;; Agent chain
;;
;; +-------+ +-------+ ........ +-------+ +--------+ +-------+
;; | agent | | agent | . . | agent | | agent | | queue |
;; | 998 | | 997 | . . | 0 | | * | | --- |
;; | | | | . . | | | | | --- |
;; | :next o--->| :next o--->. .--->| :next o--->| :queue o--->| --- |
;; +-------+ +-------+ ........ +-------+ +--------+ +-------+
(do
(defn relay [x i]
(when (:next x)
(send (:next x) relay i))
(when (and (zero? i) (:queue x))
(offer! (:queue x) :indefinite i))
x)
(defn chain-agents [m q]
(reduce (fn [next _] (agent {:next next}))
(agent {:queue q})
(range (dec m))))
(defn run [m n]
(let [q (queue)
hd (chain-agents m q)]
(doseq [i (reverse (range n))]
(send hd relay i))
(poll! q :indefinite)))
;; 1 million message sends:
(time (run 1000 1000))) ; Elapsed time: 2.94s
While Agents accept functions to process the Agent's state, Actors accept data to be processed by the Actor's function
A simple Actors model can be implemented on top of Agents:
(do
(def actors (atom {}))
(defn wait [timeout] (apply await-for timeout (vals @actors)))
(defn make! [name state handler]
(let [actor (agent {:state state :handler handler})]
(swap! actors assoc name actor)))
(defn invoke-handler [context msg]
(let [{:keys [state handler]} context
new-state (handler state msg)]
(assoc context :state new-state)))
(defn send! [name & args]
(let [actor (get @actors name)]
(send actor invoke-handler args)
nil))
(make! :printer nil (fn [_ msg] (apply println msg)))
(send! :printer "hello world")
(wait 200)
nil)
futures
are the means of choice when worker threads are required. futures
are basically threads served from a Java ThreadPoolExecutor.
Venice provides the functions futures-fork
and futures-wait
to rig the
workers and wait for its termination.
(do
;; define a factory function that creates the workers
(defn worker-factory [n]
(fn [] (sleep 3 :seconds)))
;; create 5 worker threads and wait for its termination
(let [threads (futures-fork 5 #(worker-factory %))]
(apply futures-wait threads)))
The locking
special form executes expressions in an implicit do, while
holding a monitor allowing only one thread to execute at any time.
The monitor can be any Venice value and will be released in all
circumstances.
Locking operates like the synchronized keyword in Java.
Example: coordinating multiple threads printing to stdout
(do
(def monitor 0)
(defn log [& xs]
(locking monitor (println (apply str xs))))
(defn worker [n end]
(fn []
(log "Worker " n " started")
(while (< (current-time-millis) end)
(log "Worker " n " message")
(sleep (rand-long 3000) :milliseconds))
(log "Worker " n " stopped")))
;; launch 5 worker threads, run 20s
(println "Starting")
(let [end (+ (current-time-millis) 20000)]
(apply futures-wait (futures-fork 5 #(worker % end))))
)
Executes a one-shot action that becomes enabled after the specified time. 3 seconds in the example:
(schedule-delay #(println 100) 3 :seconds)
schedule-delay
returns a future that can be dereferenced to get the
scheduled function’s value or to cancel the waiting execution.
(deref s)
blocks the current thread until the result gets available.
(let [s (schedule-delay (fn [] 100) 3 :seconds)]
(println "result: " (deref s)))
Executes a periodic action that becomes enabled first after the initial delay and then subsequently with the given period:
(schedule-at-fixed-rate #(println "test") 1 3 :seconds)
schedule-at-fixed-rate
returns a future that can be used
to cancel the scheduled task.
Execute a periodic task with a 1s initial delay, a period of 3s, and cancel it after 16s:
(let [s (schedule-at-fixed-rate (fn [] (println "test")) 1 3 :seconds)]
(sleep 16000)
(cancel s)
(println "done."))
The thread
function executes a function in another thread, returning immediately to the
calling thread. It returns a promise which will receive the result of the calling function
when completed.
Note: Each call to thread
creates a new expensive system thread. Consider to use
futures or promises that use an ExecutorService to deal efficiently with threads.
Simple Thread:
@(thread #(do (sleep 1000) 1))
Producer/Consumer:
(do
(defn produce [q n]
(doseq [x (range n)] (sleep 1000) (put! q x))
(put! q nil))
(defn consume [q]
(docoll println q))
(let [q (queue 10)]
(thread #(produce q 4))
@(thread #(consume q))))
Dynamic variables start off as a global variable and can be bound with 'binding' to a new value on the local thread.
(do
(def-dynamic x 100)
(println x) ; x level 1 => 100
(binding [x 200]
(println x)) ; x level 2 => 200
(println x)) ; x level 1 => 100
Thread local var bindings can be nested
(do
(binding [y 100]
(println y) ; x level 1 => 100
(binding [y 200]
(println y)) ; x level 2 => 200
(println y))) ; x level 1 => 100
Thread local vars get inherited by child threads
(do
;; parent thread locals
(binding [a 10 b 20]
;; future with child thread locals
(let [f (future (fn [] (binding [b 90] {:a a :b b})))]
{:child @f :parent {:a a :b b}})))
;; => {:parent {:a 10 :b 20} :child {:a 10 :b 90}}
See PI Monte Carlo
(do
(defn circle? [x y]
(<= (+ (* x x) (* y y)) 1.0))
(defn sample []
(circle? (rand-double) (rand-double)))
(defn pi [iterations]
(let [measurements (repeatedly iterations sample)
inside (filter true? measurements)]
(* 4.0 (/ (double (count inside)) iterations)))))
;; (time (pi 10_000_000))
;; Elapsed time: 16.34s
;; => 3.1418628
(do
(defn circle? [x y]
(<= (+ (* x x) (* y y)) 1.0))
(defn sample []
(circle? (rand-double) (rand-double)))
(defn worker [iterations]
(->> (repeatedly iterations sample)
(filter true?)
(count)))
(defn pi [iterations workers]
(let [iter_worker (/ iterations workers)
inside (->> (repeatedly workers (fn [] (future #(worker iter_worker))))
(map deref)
(apply +))]
(* 4.0 (/ (double inside) iter_worker workers)))))
;; (time (pi 10_000_000 n))
;; n 1 2 3 4 5 6
;; time 16.68s 12.32s 11.28s 12.47s 14.83s 16.02s
(do
(import :java.util.concurrent.Semaphore)
;; [P0]
;; F0 F1
;;
;; [P4] [P1]
;;
;; F4 F2
;;
;; [P3] [P2] Px: philosopher
;; F3 Fx: fork
(def n-philosophers 5)
(def max-eating-time 5000)
(def max-thinking-time 3000)
(def retry-time 5)
(def forks (repeatedly n-philosophers #(. :Semaphore :new 1)))
(def log-mutex 0)
(defn log [& xs]
(locking log-mutex (println (apply str xs))))
(defn left-fork [n]
(nth forks (mod (inc n) n-philosophers)))
(defn right-fork [n]
(nth forks n))
(defn aquire-fork [fork]
(. fork :tryAcquire))
(defn release-fork [fork]
(. fork :release))
(defn take-forks [n]
(if (aquire-fork (left-fork n))
(if (aquire-fork (right-fork n))
true
(do (release-fork (left-fork n))
false))
false))
(defn put-down-forks [n]
(release-fork (left-fork n))
(release-fork (right-fork n)))
(defn eat [n]
(log "Philosopher " n " is dining")
(sleep (rand-long max-eating-time))
(put-down-forks n)
(log "Philosopher " n " put down forks"))
(defn think [n]
(log "Philosopher " n " is thinking")
(sleep (rand-long max-thinking-time)))
(defn philosopher [n]
(fn []
(try
(log "Philosopher " n " just sat down")
(while true
(if (take-forks n)
(do (log "Philosopher " n " picked up forks")
(eat n)
(think n))
(sleep retry-time)))
(catch :RuntimeException ex
(log "Philosopher " n " died! " (:message ex))))))
;; launch
(println "Starting (stop with <ctrl-c>)")
(apply futures-wait (futures-fork n-philosophers #(philosopher %)))
)
(do
;; [P0]
;; F0 F1
;;
;; [P4] [P1]
;;
;; F4 F2
;;
;; [P3] [P2] Px: philosopher
;; F3 Fx: fork
(def n-philosophers 5)
(def max-eating-time 5000)
(def max-thinking-time 3000)
(def retry-time 5)
;; fork values: nil if the fork is not locked else the philosophers index that
;; locked the fork
(def forks (atom (into [] (repeat n-philosophers nil))))
(def log-mutex 0)
(defn log [& xs]
(locking log-mutex (println (apply str xs))))
(defn left-fork [n]
(mod (inc n) n-philosophers))
(defn right-fork [n]
n)
(defn forks-acquired? [fs n]
(and (some? (fs (left-fork n))) (some? (fs (right-fork n)))))
(defn forks-acquired-by? [fs n x]
(and (= (fs (left-fork n)) x) (= (fs (right-fork n)) x)))
(defn forks-free? [fs n]
(and (nil? (fs (left-fork n))) (nil? (fs (right-fork n)))))
(defn forks-set [fs n val]
(-> fs
(assoc (left-fork n) val)
(assoc (right-fork n) val)))
(defn debug [fs n]
(log "Philosopher " n " forks: " fs))
(defn take-forks [n]
(let [upd (swap! forks
(fn [fs]
(if (forks-free? fs n)
(forks-set fs n n)
fs)))
acquired (forks-acquired-by? upd n n)]
(when acquired (debug upd n))
acquired))
(defn put-down-forks [n]
(let [upd (swap! forks (fn [fs] (forks-set fs n nil)))]
(debug upd n)))
(defn eat [n]
(log "Philosopher " n " is dining")
(sleep (rand-long max-eating-time))
(put-down-forks n)
(log "Philosopher " n " put down forks"))
(defn think [n]
(log "Philosopher " n " is thinking")
(sleep (rand-long max-thinking-time)))
(defn philosopher [n]
(fn []
(try
(log "Philosopher " n " just sat down")
(while true
(if (take-forks n)
(do (log "Philosopher " n " picked up forks")
(eat n)
(think n))
(sleep retry-time)))
(catch :RuntimeException ex
(log "Philosopher " n " died! " (:message ex))))))
;; launch
(println "Starting (stop with <ctrl-c>)")
(apply futures-wait (futures-fork n-philosophers #(philosopher %)))
)