(import '(java.util.concurrent.locks LockSupport) '(java.util.concurrent ConcurrentLinkedQueue)) (defprotocol Channel (-op [_ op-name state value])) (deftype C [take-queue put-queue] Channel (-op [_ op-name state value] (assert (some? value)) (let [package [(Thread/currentThread) state value]] (case op-name :put (.offer put-queue package) :take (.offer take-queue package)) (fn [] (if-let [a-take (.peek take-queue)] (if-let [a-put (.peek put-queue)] (if (identical? package (case op-name :put a-put :take a-take)) (if (compare-and-set! (nth a-take 1) :waiting :claimed) (if (compare-and-set! (nth a-put 1) :waiting :claimed) (do (compare-and-set! (nth a-take 1) :claimed (nth a-put 2)) (compare-and-set! (nth a-put 1) :claimed (nth a-take 2)) (.poll take-queue) (.poll put-queue) (case op-name :take (LockSupport/unpark (nth a-put 0)) :put (LockSupport/unpark (nth a-take 0))) (when-let [nxt-take (.peek take-queue)] (LockSupport/unpark (nth nxt-take 0))) (when-let [nxt-put (.peek put-queue)] (LockSupport/unpark (nth nxt-put 0)))) (compare-and-set! (nth a-take 1) :claimed :waiting)) nil) nil) nil) nil))))) (defn take [channel] (let [state (atom :waiting) poller (-op channel :take state true)] (loop [] (poller) (let [s @state] (if (= s :waiting) (do (LockSupport/park) (recur)) (if (= s :claimed) (recur) s)))))) (defn put [channel value] (let [state (atom :waiting) poller (-op channel :put state value)] (loop [] (poller) (let [s @state] (if (= s :waiting) (do (LockSupport/park) (recur)) (if (= s :claimed) (recur) s)))))) (defn alts [ops] (let [state (atom :waiting) pollers (vec (for [[op channel opt-value] (shuffle ops) :let [value (if (some? opt-value) opt-value true)]] (-op channel op state value)))] (loop [] (doseq [f pollers] (f)) (let [s @state] (if (= s :waiting) (do (LockSupport/park) (recur)) (if (= s :claimed) (recur) s)))))) (defn chan [] (->C (ConcurrentLinkedQueue.) (ConcurrentLinkedQueue.))) (def c0 (chan)) (def c1 (chan)) (Thread/startVirtualThread #(try (prn 'c0 (take c0)) (catch Throwable t (prn t)))) (Thread/startVirtualThread #(try (prn 'c1 (take c1)) (catch Throwable t (prn t)))) (Thread/startVirtualThread #(try (alts [[:put c0 "Hello World"] [:put c1 "Hello World"]]) (catch Throwable t (prn t))))