(import '(java.util.concurrent.locks LockSupport)
        '(java.util.concurrent ConcurrentLinkedQueue))

(defprotocol Channel
  (-op [_ op-name state value]))

(deftype C [take-queue put-queue]
  Channel
  (-op [_ op-name state value]
    (assert (some? value))
    (let [package [(Thread/currentThread) state value]]
      (case op-name
        :put (.offer put-queue package)
        :take (.offer take-queue package))
      (fn []
        (if-let [a-take (.peek take-queue)]
          (if-let [a-put (.peek put-queue)]
            (if (identical? package (case op-name
                                      :put a-put
                                      :take a-take))
              (if (compare-and-set! (nth a-take 1) :waiting :claimed)
                (if (compare-and-set! (nth a-put 1) :waiting :claimed)
                  (do
                    (compare-and-set! (nth a-take 1) :claimed (nth a-put 2))
                    (compare-and-set! (nth a-put 1) :claimed (nth a-take 2))
                    (.poll take-queue)
                    (.poll put-queue)
                    (case op-name
                      :take (LockSupport/unpark (nth a-put 0))
                      :put (LockSupport/unpark (nth a-take 0)))
                    (when-let [nxt-take (.peek take-queue)]
                      (LockSupport/unpark (nth nxt-take 0)))
                    (when-let [nxt-put (.peek put-queue)]
                      (LockSupport/unpark (nth nxt-put 0))))
                  (compare-and-set! (nth a-take 1) :claimed :waiting))
                nil)
              nil)
            nil)
          nil)))))

(defn take [channel]
  (let [state (atom :waiting)
        poller (-op channel :take state true)]
    (loop []
      (poller)
      (let [s @state]
        (if (= s :waiting)
          (do
            (LockSupport/park)
            (recur))
          (if (= s :claimed)
            (recur)
            s))))))

(defn put [channel value]
  (let [state (atom :waiting)
        poller (-op channel :put state value)]
    (loop []
      (poller)
      (let [s @state]
        (if (= s :waiting)
          (do
            (LockSupport/park)
            (recur))
          (if (= s :claimed)
            (recur)
            s))))))

(defn alts [ops]
  (let [state (atom :waiting)
        pollers (vec
                 (for [[op channel opt-value] (shuffle ops)
                       :let [value (if (some? opt-value) opt-value true)]]
                   (-op channel op state value)))]
    (loop []
      (doseq [f pollers]
        (f))
      (let [s @state]
        (if (= s :waiting)
          (do
            (LockSupport/park)
            (recur))
          (if (= s :claimed)
            (recur)
            s))))))

(defn chan []
  (->C (ConcurrentLinkedQueue.) (ConcurrentLinkedQueue.)))

(def c0 (chan))

(def c1 (chan))

(Thread/startVirtualThread
 #(try
   (prn 'c0 (take c0))
   (catch Throwable t
     (prn t))))


(Thread/startVirtualThread
 #(try
   (prn 'c1 (take c1))
   (catch Throwable t
     (prn t))))

(Thread/startVirtualThread
 #(try
    (alts
     [[:put c0 "Hello World"]
      [:put c1 "Hello World"]])
    (catch Throwable t
      (prn t))))

Generated At 2024-12-06T11:22:14-0800 original