(import '(java.util ArrayList
                    List
                    HashMap)
        '(java.util.concurrent.locks ReentrantLock)
        '(java.util.concurrent.atomic AtomicReference
                                      AtomicReferenceArray))

(set! *warn-on-reflection* true)
(set! *unchecked-math* :warn-on-boxed)

;; At the lowest level, this is a library for synchronizing on
;; events. synchronizing on an event means registering a callback to
;; be invoked when the event occurs.
(defprotocol Event
  (try-event [event resume control nack-group]
    "internal function for syncing"))

(defprotocol Syncable
  (-sync [event]))

(defprotocol QuasiEvent
  (push-down [event ctor lst]))

(defprotocol Nackable
  (check-nack-group [event nack-group]))

;; acts as never
(extend-type nil
  Event
  (try-event [event resume control nack-group])
  Nackable
  (check-nack-group [event nack-group])
  QuasiEvent
  (push-down [event ctor lst]
    (when-let [evt (ctor event)]
      (.add ^java.util.List lst evt)))
  Syncable
  (-sync [event]))

(extend-type Object
  QuasiEvent
  (push-down [event ctor lst]
    (.add ^List lst (ctor event)))
  Syncable
  (-sync [event]
    ;; wrap list access in a volatile to ensure ordering, may not be
    ;; required.
    (let [lst (volatile! (ArrayList.))
          control (atom :waiting)]
      (push-down event identity @lst)
      (vreset! lst @lst)
      (letfn [(resume-f [event value nack-group]
                ;; resume-f could end up invoked on another thread,
                ;; hence the volatile
                (doseq [e @lst]
                  (check-nack-group e nack-group)))]
        (doseq [evt @lst
                :when evt]
          (try-event evt resume-f control #{})))))
  Nackable
  (check-nack-group [event nack-group]))

(def always
  (reify
    Event
    (try-event [event resume control nack-group]
      (let [k (Object.)
            f (fn [& _]
                (when (= @control :synced)
                  (remove-watch control k))
                (when (compare-and-set! control :waiting :synced)
                  (resume event true nack-group)))]
        (add-watch control k f)
        (f)))))

(extend-type java.util.concurrent.CompletionStage
  Event
  (try-event [^java.util.concurrent.CompletionStage event resume control nack-group]
    (.handle event
             (reify
               java.util.function.BiFunction
               (apply [_ result exception]
                 (let [k (Object.)
                       f (fn [& _]
                           (when (= @control :synced)
                             (remove-watch control k))
                           (when (compare-and-set! control :waiting :synced)
                             (resume event (or exception result) nack-group)))]
                   (add-watch control k f)
                   (f)))))))

;; (extend-type clojure.lang.IRef
;;   Event
;;   (try-event [^java.util.concurrent.CompletionStage event resume control nack-group]
;;     (.handle event
;;              (reify
;;                java.util.function.BiFunction
;;                (apply [_ result exception]
;;                  (let [k (Object.)
;;                        f (fn [& _]
;;                            (when (= @control :synced)
;;                              (remove-watch control k))
;;                            (when (compare-and-set! control :waiting :synced)
;;                              (resume event (or exception result) nack-group)))]
;;                    (add-watch control k f)
;;                    (f)))))))

(declare middleware)

;; Middleware wraps an event, passing protocols callls down with the
;; opportunity to intercept and make changes. The middleware smart
;; constructor can collapse multiple middlewares into a single
;; middleware via function composition.
(defrecord Middleware [try-event-transform
                       check-nack-group-transform
                       push-down-transform
                       target]
  Event
  (try-event [_ resume control nack-group]
    ((try-event-transform try-event) target resume control nack-group))
  Nackable
  (check-nack-group [_ nack-group]
    ((check-nack-group-transform check-nack-group) target nack-group))
  QuasiEvent
  (push-down [event ctor lst]
    ((push-down-transform push-down)
     target
     (comp ctor #(middleware try-event-transform
                             check-nack-group-transform
                             push-down-transform
                             %))
     lst)))

(defn middleware
  [try-event-transform check-nack-group-transform push-down-transform target]
  (if (instance? Middleware target)
    (let [^Middleware target target]
      (middleware (comp try-event-transform (.-try-event-transform target))
                  (comp check-nack-group-transform (.-check-nack-group-transform target))
                  (comp push-down-transform (.-push-down-transform target))
                  (.-target target)))
    (->Middleware try-event-transform check-nack-group-transform push-down-transform target)))

(defn barrier
  "Given a function f, applies f to a function g and an event e. e becomes
  enabled when g is invoked. returns the result of invoking f."
  [f]
  (let [root (AtomicReference.)
        done (Object.)
        event (middleware
               (fn [event-transform]
                 (fn [event resume control nack-group]
                   (add-watch control (Object.) (fn [k r os ns]
                                                  (when (= ns :synced)
                                                    (loop [^AtomicReferenceArray r (.get root)]
                                                      (when r
                                                        (when-not (identical? r done)
                                                          (if (identical? control (.get r 1))
                                                            (doto r
                                                              (.set 0 nil)
                                                              (.set 1 nil)
                                                              (.set 2 nil))
                                                            (recur (.get r 3))))))
                                                    (remove-watch r k))))
                   (let [cell (doto (AtomicReferenceArray. 4)
                                (.set 0 resume)
                                (.set 1 control)
                                (.set 2 nack-group))]
                     (loop [r (.get root)]
                       (if (identical? r done)
                         (loop []
                           (if (compare-and-set! control :waiting :synced)
                             (resume event nil nack-group)
                             (when-not (= @control :synced)
                               (recur))))
                         (do
                           (.set cell 3 r)
                           (when-not (.compareAndSet root r cell)
                             (recur (.get root)))))))))
               (fn [check-nack-group]
                 (fn [event nack-group]))
               identity
               nil)]
    (f event
       (fn []
         (loop [^AtomicReferenceArray r (.get root)]
           (when-not (identical? r done)
             (if-not (.compareAndSet root r done)
               (recur (.get root))
               (loop [r r]
                 (when r
                   (let [resume (.get r 0)
                         control (.get r 1)
                         nack-group (.get r 2)]
                     (when (and resume control nack-group)
                       (loop []
                         (if (compare-and-set! control :waiting :synced)
                           (resume event nil nack-group)
                           (when-not (= @control :synced)
                             (recur)))))
                     (recur (.get r 3))))))))))))

;; events end up forming a tree, before synchronization that tree gets
;; flattened into something like disjuntive normal form, then at
;; synchronization one of the possible events is chosen.

(defn choose
  "Takes a list of events, and non-deterministically selects one on synchronization"
  [evts]
  (middleware
   identity
   identity
   (fn [push-down]
     (fn [event ctor ^List lsg]
       (doseq [e (shuffle evts)]
         (push-down e ctor lsg))))
   nil))

(defn nack
  "Like guard, but f is passed an event G. G will occur if on
  synchronization, the chosen event is not the one returned by f."
  [f]
  (middleware
   identity
   identity
   (fn [push-down]
     (fn [event ctor ^List lsg]
       (barrier
        (fn [b break]
          (let [e (f b)
                id (Object.)]
            (push-down
             e
             (comp ctor
                   (fn g [evt]
                     (middleware
                      (fn [try-event]
                        (fn [event resume control nack-group]
                          (try-event event resume control (conj nack-group id))))
                      (fn [check-nack-group]
                        (fn [event nack-group]
                          (when-not (contains? nack-group id)
                            (break))
                          (check-nack-group event nack-group)))
                      identity
                      evt)))
             lsg))))))
   nil))

(defn guard
  "Takes a function of no arguments f, returns an event E, before E is
  synchronized on, f will be invoked, and E replaced with the event
  returned by f."
  [f]
  (middleware
   identity
   identity
   (fn [push-down]
     (fn [event ctor ^List lst]
       (push-down (f) ctor lst)))
   nil))

(defn wrap
  "Wraps the post synchronization action f around the event evt"
  [evt f]
  (middleware
   (fn [try-event]
     (fn [evt resume control nack-group]
       (try-event evt
                  (fn [event value nack-group]
                    (resume event (f value) nack-group))
                  control
                  nack-group)))
   identity
   identity
   evt))

(defonce scheduler
  (delay
    (java.util.concurrent.Executors/newScheduledThreadPool
     1
     (reify
       java.util.concurrent.ThreadFactory
       (newThread [_ r]
         ;; TODO name
         (doto (Thread. r)
           (.setDaemon true)))))))

(defn timeout [delay]
  (let [now (System/nanoTime)
        n (* 1000000 (long delay))
        deadline (+ n now)]
    (barrier
     (fn [b break]
       (nack
        (fn [neg]
          (let [now (System/nanoTime)
                delay (- deadline now)]
            (if (pos? delay)
              (let [f (.schedule ^java.util.concurrent.ScheduledExecutorService @scheduler
                                 ^Runnable break
                                 delay
                                 java.util.concurrent.TimeUnit/NANOSECONDS)]
                (-sync (wrap neg (fn [_] (future-cancel f))))
                b)
              always))))))))

(deftype Channel [fields])

(defn channel []
  (->Channel
   (doto (object-array 7)
     (aset 0 (ReentrantLock.))
     (aset 1 false)
     (aset 2 (HashMap.))
     (aset 3 nil) ; writers head
     (aset 4 nil) ; writers tail
     (aset 5 nil) ; readers head
     (aset 6 nil) ; readers tail
     )))

;; make it a kind of event?
(defn close! [^Channel channel]
  (let [^objects channel (.-fields channel)
        ^ReentrantLock lock (aget channel 0)
        _ (.lock lock)
        ^HashMap controls (aget channel 2)]
    (if-not (aget channel 1)
      (do
        (aset channel 1 true)
        (let [items (into [] (vals controls))]
          (.unlock lock)
          (doseq [^objects item items
                  :let [r (aget item 0)
                        c (aget item 1)
                        n (aget item 2)
                        v (aget item 3)]
                  :when (compare-and-set! c :waiting :claimed)]
            (compare-and-set! c :claimed :synced)
            (r nil nil n))))
      (.unlock lock))))

(defn exchange [^Channel channel writer-index reader-index value]
  (let [^objects channel (.-fields channel)
        writer-index (long writer-index)
        reader-index (long reader-index)
        ^ReentrantLock lock (aget channel 0)
        ^HashMap controls (aget channel 2)]
    (middleware
     (fn [try-event]
       (fn [event resume control nack-group]
         ;; TODO rename clean-up
         (letfn [(clean-up [k r os ns]
                   (when (= ns :waiting)
                     (.lock lock)
                     (let [^objects item (aget channel reader-index)]
                       (if item
                         (let [r (aget item 0)
                               c (aget item 1)
                               n (aget item 2)
                               v (aget item 3)]
                           (if (identical? c control)
                             (do
                               (.unlock lock)
                               ;; TODO figure out how this should really work
                               (throw (ex-info "Same operation on same channel?" {})))
                             (if (compare-and-set! c :waiting :claimed)
                               (if (compare-and-set! control :waiting :claimed)
                                 (do
                                   (.unlock lock)
                                   (compare-and-set! c :claimed :synced)
                                   (compare-and-set! control :claimed :synced)
                                   (r event value n)
                                   (resume event v nack-group))
                                 (do
                                   (compare-and-set! c :claimed :waiting)
                                   (.unlock lock)))
                               (.unlock lock))))
                         (if (aget channel 1)
                           (do
                             (.unlock lock)
                             (when (compare-and-set! control :waiting :synced)
                               ;; ?
                               (resume event nil nack-group)))
                           (.unlock lock)))))
                   (when (= ns :synced)
                     (.lock lock)
                     (loop [^objects item (.get controls control)]
                       (when item
                         (when (aget item 4)
                           (aset ^objects (aget item 4) 5 (aget item 5)))
                         (when (aget item 5)
                           (aset ^objects (aget item 5) 4 (aget item 4)))
                         (when (aget item 6)
                           (aset ^objects (aget item 6) 7 (aget item 7)))
                         (when (aget item 7)
                           (aset ^objects (aget item 7) 6 (aget item 6)))
                         (when (identical? (.get controls control) item)
                           (let [n (aget item 6)]
                             (if n
                               (.put controls control n)
                               (.remove controls control))))
                         (doseq [writer-index [writer-index reader-index]]
                           (when (identical? (aget channel writer-index) item)
                             (aset channel writer-index (aget item 4)))
                           (when (identical? (aget channel (inc (long writer-index))) item)
                             (aset channel writer-index (aget item 5)))
                           (when (nil? (aget channel writer-index))
                             (aset channel (inc (long writer-index)) nil)))
                         (recur (aget item 6))))
                     (.unlock lock)
                     (when k
                       (remove-watch r k))))]
           (.lock lock)
           (let [k (Object.)
                 entry (doto (object-array 8)
                         (aset 0 resume)
                         (aset 1 control)
                         (aset 2 nack-group)
                         (aset 3 value)
                         (aset 4 nil) ; next
                         (aset 5 (aget channel (inc writer-index))) ; prev
                         (aset 6 (.get controls control)) ; lst next
                         (aset 7 nil) ; lst prev
                         )]
             (when (nil? (aget channel writer-index))
               (aset channel writer-index entry))
             (when (aget channel (inc writer-index))
               (aset ^objects (aget channel (inc writer-index)) 4 entry))
             (aset channel (inc writer-index) entry)
             (when (some? (.get controls control))
               (aset ^objects (.get controls control) 7 entry))
             (.put controls control entry)
             (add-watch control k clean-up)
             (.unlock lock)
             (clean-up k control :waiting @control)))))
     identity
     identity
     nil)))

(defn tx
  "Creates an event where synchronization completes after the given value
  has been sent via the given channel, or after the given channel
  closes. Returns true or false"
  [channel value]
  (assert (some? value))
  (exchange channel 3 5 value))

(defn rx
  "Creates an event where synchronization competes after receivng a value
  via the given channel or the channel is closed. Returns the received
  value or nil."
  [channel]
  (exchange channel 5 3 true))

(defn sync!
  ([evt]
   (-sync evt))
  ([evt callback]
   (-sync (wrap evt callback))))

(defn sync!! [evt]
  (let [p (promise)]
    (sync! evt p)
    @p))

(defn poll! [evt success failure]
  (barrier
   (fn [b break]
     (sync!
      (choose (wrap evt success)
              (wrap b (fn [_] (failure)))))
     (break))))

(defprotocol Consumer
  (consume [_ ch])
  (unconsume [_ ch])
  (unconsume-all [_]))

(defprotocol Producer
  (subscribe [_ value ch])
  (unsubscribe [_ value ch])
  (unsubscribe-all [_]))

(defmacro pure% [v]
  `(fn [k# except# state#]
     (try
       (let [r# ~v]
         (fn [] (k# r# state#)))
       (catch Throwable t#
         (fn [] (except# t# state#))))))

(defn bind% [m g]
  (fn [k except state]
    (m (fn [r state] ((g r) k except state))
       except
       state)))

(defn tramp [x] (if (fn? x) (recur (x)) x))

(defn unroll-tramp [x] (if (fn? x) #(unroll-tramp (x)) x))

(defn async% [f]
  (fn x [k except state]
    (let [done (atom false)]
      (f (fn g [value]
           (when (compare-and-set! done false true)
             ((::exec state) k value state)))))))

(defmacro let% [bindings body]
  (if (seq bindings)
    (let [[name value & bindings] bindings]
      `(bind% ~value (fn [~name] (let% [~@bindings] ~body))))
    body))

(defn alter-state% [f & args]
  (fn [k except state]
    (k state (apply f state args))))

(defn except% [c h]
  (fn [k except state]
    (c k
       (fn [t state] ((h t) k except state))
       state)))

(defn run
  ([computation]
   (run tramp computation))
  ([tramp computation]
   (tramp
    (computation
     (fn [value state] value)
     (fn [value state] value)
     {::exec (fn [k value state] (tramp (k value state)))}))))

(defn do%
  ([a] a)
  ([a b]
   (let% [_ a]
     b))
  ([a b & c]
   (apply do% (do% a b) c)))

(defn sync% [evt] (async% (fn [k] (sync! (wrap evt k)))))

;; TODO name remix?
(defn pubsub [selector]
  (let [command (channel)]
    (letfn [(send-loop []
              (let% [state (alter-state% identity)
                     _ (reduce
                        (fn [x c]
                          (let% [^long i x
                                 sent (sync% (tx c (:value-to-send state)))]
                            (if sent
                              (pure% (inc i))
                              (do%
                               (alter-state% update-in [:outputs (:selection-value state)] disj c)
                               (pure% i)))))
                        (pure% 0)
                        (:output-chans state))]
                (do% (if (seq (get-in state [:outputs (:selection-value state)]))
                       (pure% nil)
                       (alter-state% update-in [:outputs] dissoc (:selection-value state)))
                     (main-loop))))
            (main-loop []
              (let% [state (alter-state% assoc
                                         :value-to-send nil
                                         :selection-value nil
                                         :output-chans nil
                                         :new-output-chans nil)
                     [the-chan the-val] (sync%
                                         (choose
                                          (cons
                                           (wrap (rx command) (partial vector command))
                                           (for [i (:inputs state)]
                                             (wrap (rx i) (partial vector i))))))]
                (cond (= the-chan command)
                      (let [[op & args] the-val]
                        (case op
                          :consume (do% (alter-state% update-in [:inputs] conj (first args))
                                        (main-loop))
                          :unconsume (do% (alter-state% update-in [:inputs] disj (first args))
                                          (main-loop))
                          :unconsume-all (do% (alter-state% update-in [:inputs] (constantly #{}))
                                              (main-loop))
                          :subscribe (do% (alter-state% update-in [:ouputs (first args)]
                                                  (fnil conj #{}) (second args))
                                          (main-loop))
                          ;; TODO dissoc empty
                          :unsubscribe (do% (alter-state% update-in [:ouputs (first args)] disj (second args))
                                            (main-loop))
                          :unsubscribe-all (do% (alter-state% update-in [:ouputs] (constantly {}))
                                                (main-loop))
                          :close-outputs (do% (pure%
                                               (doseq [[_ outputs] (get state :outputs)
                                                       output outputs]
                                                 (close! output)))
                                              (alter-state% update-in [:ouputs] (constantly {}))
                                              (main-loop))))
                      (some? the-val)
                      (let% [d (pure% (selector the-val))
                             _ (alter-state% assoc
                                             :value-to-send the-val
                                             :selection-value d
                                             :output-chans (get-in state [:ouputs d])
                                             :new-output-chans #{})]
                        (send-loop))
                      :else
                      (do% (alter-state% update-in [:inputs] disj the-chan)
                           (main-loop)))))]
      (run (main-loop)))
    (reify
      Consumer
      (consume [_ ch]
        (tx command [:consume ch]))
      (unconsume [_ ch]
        (tx command [:unconsume ch]))
      (unconsume-all [_]
        (tx command [:unconsume-all]))
      Producer
      ;; TODO on close?
      (subscribe [_ value ch]
        (tx command [:subscribe value ch]))
      (unsubscribe [_ value ch]
        (tx command [:unsubscribe value ch]))
      (unsubscribe-all [_]
        (tx command [:unsubscribe-all]))
      java.io.Closeable
      (close [_]
        (tx command [:close-outputs])))))

(defn pipe [in out]
  (let [p (pubsub (constantly nil))]
    (sync! (subscribe p nil out) (fn [_] (sync! (consume p in))))
    p))

(assert (= 1
           (sync!!
            (choose
             [nil
              (choose [(barrier (fn [e _] e))])
              (barrier (fn [e _] e))
              (nack (fn [s]
                      (sync! s (fn [_] (prn "not chosen")))
                      (barrier (fn [e _] e))))
              (nack (fn [s]
                      (sync! s (fn [_] (prn "A")))
                      (choose
                       [(nack (fn [s]
                                (sync! s (fn [_] (prn "B")))
                                (barrier (fn [e _] e))))
                        (wrap (guard (fn [] (barrier (fn [e s] (s) e))))
                              (constantly 1))])))]))))

(println "here")
(let [c (channel)]
  (sync! (tx c "Hello World") prn)
  (sync! (rx c) (fn [v] (prn v))))

(let [to (timeout 1000)]
  (time (sync!! to))
  (time (sync!! to)))

(println "===============")
(let [p (pubsub (constantly nil))
      c1 (channel)
      c2 (channel)
      c3 (channel)]
  (sync! (consume p c1))
  (sync! (subscribe p nil c2))
  (sync! (subscribe p nil c3))
  (sync!! (tx c1 "Hello World"))
  (assert (= "Hello World" (sync!! (choose [(rx c3) (rx c2)]))))
  (assert (= "Hello World" (sync!! (choose [(rx c3) (rx c2)]))))
  )

(sync!! (wrap always (fn [_] (println "Here"))))

;; (defn proposer [input acceptors]
;;   (letfn [(main-loop [events ballot]
;;             (sync!
;;              (wrap
;;               (choose (vals events))
;;               (fn [msg]
;;                 (case (:op msg)
;;                   :propose (let [events (dissoc events :input)]
;;                              (for [a acceptors]
;;                                [a (wrap (tx a ...)
;;                                         (fn []))])))))))]
;;     (main-loop
;;      {:input (rx input)}
;;      0)))

Generated At 2023-10-11T15:57:27-0700 original