;; single file web app
;; run the following to start the process
;; clj -J-Xmx500M -J-XX:-OmitStackTraceInFastThrow -J-Dclojure.server.repl="{:port 9457 :accept clojure.core.server/repl}" -Sdeps '{:deps {net.sourceforge.htmlunit/htmlunit {:mvn/version "2.53.0"} org.clojure/data.json {:mvn/version "2.4.0"} com.github.seancorfield/next.jdbc {:mvn/version "1.2.731"} org.postgresql/postgresql {:mvn/version "42.2.24"} org.clojure/clojurescript {:mvn/version "1.10.879"} io.undertow/undertow-core {:mvn/version "2.2.12.Final"} reagent/reagent {:mvn/version "1.1.0"} cljsjs/react {:mvn/version "17.0.2-0"} cljsjs/react-dom {:mvn/version "17.0.2-0"} com.cognitect/transit-clj {:mvn/version "1.0.324"} com.cognitect/transit-cljs {:mvn/version "0.8.269"} com.microsoft.playwright/playwright {:mvn/version "1.17.0"}}}' -M -i src/packages.cljc -e '(packages/main)'

(ns packages
  #?(:cljs (:require-macros [packages :as m]))
  (:require
   #?@(:clj ([clojure.data.json :as json]
             [next.jdbc :as jdbc]
             [cljs.build.api :as cljs]
             [clojure.java.io :as io]
             [clojure.repl :refer :all]
             [clojure.data :refer [diff]]
             [clojure.string :as str])
       :cljs ([reagent.core :as r]
              [reagent.dom :as rdom]
              [clojure.string :as str]))
   [cognitect.transit :as transit])
  #?(:clj (:import (io.undertow.util Headers)
                   (io.undertow.websockets.core AbstractReceiveListener
                                                WebSockets
                                                WebSocketChannel
                                                BufferedTextMessage)
                   (io.undertow.websockets WebSocketProtocolHandshakeHandler
                                           WebSocketConnectionCallback)
                   (io.undertow Undertow)
                   (io.undertow.server HttpHandler
                                       HttpServerExchange)
                   (java.util UUID)
                   (java.util.logging Logger
                                      Level
                                      ConsoleHandler
                                      SimpleFormatter
                                      LogRecord)
                   (java.io File)
                   (java.nio ByteBuffer)
                   (java.time Instant)
                   (java.time.format DateTimeFormatter)
                   (java.util.concurrent TimeUnit
                                         Executor
                                         SubmissionPublisher
                                         Flow$Subscription
                                         Flow$Subscriber
                                         CompletableFuture
                                         ScheduledExecutorService
                                         Executors)
                   (java.net URLEncoder
                             URL)
                   (com.gargoylesoftware.htmlunit WebClient
                                                  BrowserVersion))))

(comment

  ;; remote repls without the source file and can load it like this
  (clojure.lang.Compiler/load
   (java.io.StringReader. (slurp "https://downey.family/ddd/source"))
   "packages.cljc"
   "packages.cljc")

  )


(def port 9456)

#?(:clj (def carrier-url nil))

(defmulti carrier-url (fn [carrier tracking] carrier))

(defmethod carrier-url "fedex" [carrier tracking]
  #_(str "https://www.fedex.com/fedextrack/?trknbr="
       tracking
       "&trkqual=12023~"
       tracking
       "~FDEG"
       )
  (str "https://www.fedex.com/fedextrack/?trknbr=" tracking))

(defmethod carrier-url "ups" [carrier tracking]
  (str "https://www.ups.com/track?loc=en_US&tracknum=" tracking))

(defmethod carrier-url "usps" [carrier tracking]
  (str "https://tools.usps.com/go/TrackConfirmAction?tLabels=" tracking))

(defmethod carrier-url "dhl" [carrier tracking]
  (str "https://webtrack.dhlglobalmail.com/?trackingnumber=" tracking))

(comment

  (with-open [pw (com.microsoft.playwright.Playwright/create)
              ctxt (-> pw
                       .chromium
                       (.launchPersistentContext (java.nio.file.Paths/get "" (into-array String []))
                                                 (doto (com.microsoft.playwright.BrowserType$LaunchPersistentContextOptions.)
                                                   (.setHeadless false)
                                                   (.setIsMobile true)
                                                   (.setUserAgent "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36")))
                       #_(.launch
                                    (doto (com.microsoft.playwright.BrowserType$LaunchOptions.)
                                      (.setHeadless false))))]
    (.grantPermissions ctxt
                       ["geolocation"
                        "midi"
                        "notifications"
                        "camera"
                        "microphone"
                        "clipboard-read"
                        "clipboard-write"])
    (let [p (.newPage ctxt)]
      (.navigate p (carrier-url "fedex"  "...."))
      (Thread/sleep (* 1000 60))
      (.screenshot p (doto (com.microsoft.playwright.Page$ScreenshotOptions.)
                       (.setPath (java.nio.file.Paths/get "foo.png" (into-array String [])))))))

  (java.nio.file.Paths/get "foo.png")

  
  )

(defprotocol FlowExchangeSupport
  (tx [comm msg])
  (handle-incoming [comm process on-close]))

(defprotocol PartitionExchange
  (connect [src])
  (accept [src]))


(comment

  (fn f [exf]
    (let [r (java.util.concurrent.ConcurrentHashMap.)
          a (atom [])
          incoming-new (java.util.concurrent.LinkedBlockingQueue.)]
      (reify
        PartitionExchange
        (connect [src]
          (let [id (UUID/randomUUID)]
            (fn [msg]
              ((fn f [p]
                 (let [w (CompletableFuture.)]
                   (.putIfAbsent r id (java.util.concurrent.LinkedBlockingQueue.))
                   (.offer (.get r id) w)
                   (.thenApply p
                               (reify
                                 Function
                                 (apply [_ msg]
                                   (fn  f [msg]
                                     (let [id (subs msg 0 (str/index-of msg ":"))]
                                       (.complete (.take (.get @r id)) (subs msg (+ (count id) 1))))))))
                   w))
               (exf (str id ":" msg))))))
        (accept [src]
          (g (exf "c"))
          (let [id (.poll incoming-new)]
            )))))
  
)

#?(:clj
   (extend-protocol FlowExchangeSupport
     WebSocketChannel
     (tx [comm msg]
       (WebSockets/sendText
        msg
        comm
        (reify
          io.undertow.websockets.core.WebSocketCallback
          (complete [_ channel context])
          (onError [_ channel context throwable]))
        (long 1000)))
     (handle-incoming [comm process on-close]
       (-> comm
           .getReceiveSetter
           (.set
            (proxy [AbstractReceiveListener] []
              (onFullCloseMessage [^WebSocketChannel channel msg]
                (.close channel)
                (on-close))
              (onFullTextMessage [channel msg]
                (try
                  (process msg)
                  (catch Throwable t
                    #_(reset! most-recent-error t)
                    #_(severe "whoops" t))))))))))


#?(:cljs
   (extend-protocol FlowExchangeSupport
     js/WebSocket
     (tx [comm msg]
       (.send comm msg))
     (handle-incoming [comm process on-close]
       (fn f []
         (aset comm "onopen" (fn [evt]
                               (m/info "onopen" {})))
         (aset comm "onclose" (fn [evt]
                                (on-close)))
         (aset comm "onerror" (fn [evt]
                                (.close comm)
                                (on-close)))
         (aset comm "onmessage"
               (fn [event]
                 (process (aget event "data"))))))))

(defn flow-exchange
  ([comm]
   (flow-exchange
    comm
    #?(:clj (fn [p result]
              (.complete p result))
       :cljs (assert nil))
    #?(:clj (CompletableFuture.)
       :cljs (assert nil))))
  ([comm complete-p new-promise]
   (let [seq-n (atom 0)
         buffer (atom nil)
         last-seen (atom -1)
         closed? (atom false)
         process (fn [msg]
                   (case (first msg)
                     \a (let [sn #?(:clj (Long/parseLong (subs msg 1 (str/index-of msg ":")))
                                    :cljs (assert nil))
                              x (subs msg (str/index-of msg ":"))
                              ls @last-seen]
                          (loop []
                            (let [b @buffer]
                              (if (compare-and-set! buffer b (into [] (filter #(= (first %) sn)) buffer))
                                (doseq [[i msg p] b
                                        :when (= i sn)]
                                  (complete-p p x))
                                (recur)))))))
         on-close (fn []
                    (when (compare-and-set! closed? false true)
                      (doseq [[i msg p] @buffer]
                        (complete-p p nil))))]
     (handle-incoming comm process on-close)
     (fn [msg]
       (let [p (new-promise)
             s (swap! seq-n inc)
             packet (str "a" s ":" msg)]
         (swap! buffer conj [s msg p])
         (if @closed?
           (complete-p p nil)
           (tx comm msg))
         p)))))

#?(:clj
   (do

     (alias 'm (ns-name *ns*))
     
     (defonce ^Logger logger (doto (Logger/getLogger "clojure")
                               (.setUseParentHandlers false)
                               (.addHandler
                                (doto (ConsoleHandler.)
                                  (.setLevel Level/ALL)
                                  (.setFormatter
                                   (proxy [SimpleFormatter] []
                                     (format [^LogRecord record]
                                       (let [sb (StringBuilder.)]
                                         (.append sb "#:log{")
                                         (.append sb ":z ")
                                         (.append sb (pr-str (str (Instant/ofEpochMilli (.getMillis record)))))
                                         ;; (.append sb " :b ")
                                         ;; (.append sb (format "%02d" (.getSequenceNumber record)))
                                         ;; (.append sb " :c ")
                                         ;; (.append sb (format "%02d" (.getThreadID record)))
                                         (.append sb " :v :")
                                         (.append sb (.toLowerCase (.getName (.getLevel record))))
                                         (.append sb " :n ")
                                         (.append sb (.getSourceClassName record))
                                         (.append sb " :l ")
                                         (let [x (.getSourceMethodName record)]
                                           (if (empty? x)
                                             (.append sb "nil")
                                             (.append sb (.getSourceMethodName record))))
                                         (.append sb " :m ")
                                         (.append sb (pr-str (.getMessage record)))
                                         (doseq [p (seq (.getParameters record))
                                                 :when (map? p)
                                                 [k v] (seq p)]
                                           (doto sb
                                             (.append " ")
                                             (cond->
                                                 (namespace k) (.append (pr-str k))
                                                 (not (namespace k)) (-> (.append ":_/") (.append (name k))))
                                             (.append " ")
                                             (.append (pr-str v))))
                                         (when-let [t (.getThrown record)]
                                           (.append sb " :thrown \n")
                                           (.append sb (pr-str t)))
                                         (.append sb "}\n")
                                         (str sb)))))))))

     (defmacro def-levels [level-lookup]
       `(do
          ~@(for [level '[severe warning info config fine finer finest]]
              `(def ~(symbol (str "log-level-" (name level)))
                 (~(first level-lookup)
                  ~(name level)
                  ~@(rest level-lookup))))))

     (def-levels (-> .toUpperCase Level/parse))

     (doseq [level '[severe warning info config fine finer finest]]
       (intern *ns*
               level
               (fn f
                 ([&form &env msg]
                  (f &form &env msg {}))
                 ([&form &env msg parameters]
                  (let [ns (name (ns-name *ns*))]
                    `(let [p# ~parameters]
                       (if (map? p#)
                         (log-context ~(symbol (str "log-level-" (name level)))
                                      ~ns
                                      ~(str (:line (meta &form)))
                                      (print-str ~msg)
                                      p#)
                         (log-error ~(symbol (str "log-level-" (name level)))
                                    ~ns
                                    ~(str (:line (meta &form)))
                                    (print-str ~msg)
                                    ^Throwable p#)))))))
       (.setMacro ^clojure.lang.Var (ns-resolve *ns* level)))

     (defn log-context [^Level level ^String ns ^String line ^String msg ^Object context]
       (.logp logger level ns line msg context))

     (defn log-error [^Level level ^String ns ^String line ^String msg ^Throwable error]
       (.logp logger level ns line msg error))



     )

   :cljs
   (do

     ;; TODO needs improvement
     (m/def-levels ({"severe" (fn [ns line msg obj context?]
                                (if context?
                                  (prn ns line msg obj)
                                  (prn ns line msg obj)))
                     "warning" (fn [ns line msg obj context?]
                                 (if context?
                                   (prn ns line msg obj)
                                   (prn ns line msg obj)))
                     "info" (fn [ns line msg obj context?]
                              (if context?
                                (let [sb (new js/Array)]
                                  (.push sb "#:log{")
                                  (.push sb " :v :info")
                                  (.push sb " :n ")
                                  (.push sb ns)
                                  (.push sb " :l ")
                                  (.push sb line)
                                  (.push sb " :m ")
                                  (.push sb (pr-str msg))
                                  (doseq [[k v] obj]
                                    (.push sb " ")
                                    (if (namespace k)
                                      (.push sb (pr-str k))
                                      (doto sb
                                        (.push ":_/")
                                        (.push (name k))))
                                    (.push sb " ")
                                    (.push sb (pr-str v)))
                                  (.push sb "}")
                                  (.info js/console (.join sb " ")))
                                (prn ns line msg obj)))
                     "config" (fn [ns line msg obj context?]
                                (if context?
                                  (prn ns line msg obj)
                                  (prn ns line msg obj)))
                     "fine" (fn [ns line msg obj context?]
                                (if context?
                                  (prn ns line msg obj)
                                  (prn ns line msg obj)))
                     "finer" (fn [ns line msg obj context?]
                                (if context?
                                  (prn ns line msg obj)
                                  (prn ns line msg obj)))
                     "finest" (fn [ns line msg obj context?]
                                (if context?
                                  (prn ns line msg obj)
                                  (prn ns line msg obj)))}))

     (defn log-context [level ns line msg context]
       (level ns line msg context true))
     
     (defn log-error [level ns line msg context]
       (level ns line msg context false))
     
     ))



#?(:clj
   (do

     (set! *warn-on-reflection* true)

     ;; the logging macros depend on form meta data from the reader,
     ;; which is missing for macro generated forms, so this gets
     ;; weird.
     (defmacro log-time
       ([form]
        (let [start (gensym)
              result (gensym)]
          (list
           (list `fn [start result]
                 (with-meta (list `info "elapsed time" {:log/time `(/ (- (System/nanoTime) ~start) 1e6)}) (meta &form))
                 result)
           '(System/nanoTime)
           form)))
       ([m form]
        (let [start (gensym)
              result (gensym)]
          (list
           (list `fn [start result]
                 (with-meta (list `info "elapsed time" (assoc m :log/time `(/ (- (System/nanoTime) ~start) 1e6))) (meta &form))
                 result)
           '(System/nanoTime)
           form))))

     (defonce file *file*)
     (defonce state (atom {}))
     (defonce most-recent-error (atom nil))
     (def requests-count (atom 0))

     (defonce ^ScheduledExecutorService sched (Executors/newScheduledThreadPool 1))

     (defonce ^SubmissionPublisher pub (SubmissionPublisher.))

     (def ds (jdbc/get-datasource "jdbc:postgresql://192.168.1.229/postgres?user=postgres"))

     (comment

       (last (jdbc/execute! ds ["select * from package where carrier = 'fedex'"]))

       
       )

     (declare placify)

     (def m (delay
              (transduce
               (comp (map placify)
                     (partition-by :package_event/package_id)
                     (filter (fn [rows]
                               (.contains (:package_event/status (last rows)) "Delivered")))
                     (mapcat (fn [rows]
                               (partition 2 1 (concat [{:place "initial" :package_event/at (java.util.Date.
                                                                                            (min (inst-ms (:package/date_created (first rows)))
                                                                                                 (inst-ms (:package_event/at (first rows)))))}]
                                                      rows
                                                      [{:place "terminal" :package_event/at (:package_event/at (last rows))}]))))
                     (map (fn [[a b]]
                            (assert a)
                            (assert b)
                            [(:place a)
                             (:place b)
                             (Math/round
                              (/ (- (inst-ms (:package_event/at b))
                                    (inst-ms (:package_event/at a)))
                                 1000.0
                                 60
                                 60))])))
               (completing
                (fn [m [a b t]]
                  (update-in m [a [b t]] (fnil inc 0))))
               {}
               (jdbc/execute! ds ["select * from package_event join package on package.id = package_id order by package_id,at asc"]))))

     (defn placify [{:package_event/keys [place] :as m}]
       (let [place (.trim (.replaceAll (.toLowerCase place) "\u00a0" " "))
             [_ city _ state] (re-find #"(\w+),(\s+)([a-z]+)" place)
             city-state (str city ", " state)]
         (if city
           (assoc m :place city-state)
           (assoc m :place place))))


     (defn draw [m event]
       (if (contains? m event)
         (let [p (get m event)
               m (apply + (vals p))]
           (loop [[[out x] & ps] (seq p)
                  r (* m (rand))]
             (let [r (- r x)]
               (if (neg? r)
                 out
                 (recur ps r)))))
         (if (.contains event ",")
           (let [[_ state] (.split event ",")]
             (recur
              m
              (rand-nth
               (for [[k _] m
                     :when (.endsWith k state)]
                 k))))
           (assert nil event))))
     
     (defn mean [data]
       (/ (double (apply + data))
          (double (count data))))


     (defn quartiles [data]
       (let [i (Math/floor (/ (count data) 4))
             ii (Math/ceil (/ (count data) 4))
             q1 (/ (+ (nth data i) (nth data ii)) 2.0)
             i (Math/floor (/ (count data) 2))
             ii (Math/ceil (/ (count data) 2))
             q2 (/ (+ (nth data i) (nth data ii)) 2.0)
             i (Math/floor (/ (count data) 4))
             ii (Math/ceil (/ (count data) 4))
             q3 (/ (+ (nth data (* 3 i)) (nth data (* 3 ii))) 2.0)]
         {:q1 q1
          :q2 q2
          :median q2
          :q3 q3
          :iqr (- q3 q1)
          :trimean (/ (+ q1 (* 2 q2) q3) 4.0)
          ;; :yule-coeff (/ (+ (- q3 (* 2 q2)) q1) (- q3 q1))
          }))

     (defn summerize [data]
       (let [r (vec (sort (map double data)))
             avg (mean r)
             q (quartiles r)
             mad ((fn [s]
                    (let [i (Math/floor (/ (count s) 2))
                          ii (Math/ceil (/ (count s) 2))]
                      (/ (+ (nth s i) (nth s ii)) 2.0)))
                  (sort (for [i r] (Math/abs (- (:q2 q) i)))))
             std-dev (Math/sqrt (mean (for [x data] (Math/pow (- x avg) 2))))
             [[mode]] (sort-by (comp - val) (frequencies data))]
         (into (sorted-map)
               (map (fn [[k v]] [k (/ (Math/round (* v 100.0)) 100.0)]))
               (assoc q
                      :mode mode
                      :std-dev std-dev
                      :count (count r)
                      :mean avg
                      :mad mad
                      :min (apply min r)
                      :max (apply max r)))))



     (comment

       
       
       
       (->> (jdbc/execute! ds ["select * from package_event order by package_id,at"])
            (partition-by :package_event/package_id)
            (mapcat (fn [x] (partition-all 2 1 (map (juxt (comp bigdec #(/ (Math/round (* % 10)) 10) :package_event/lat)
                                                          (comp bigdec #(/ (Math/round (* % 10)) 10) :package_event/lon)) x))))
            (filter #(= 2 (count %)))
            (reduce (fn [s [a b]] (update-in s [a b] (fnil inc 0))) {}))
       

       )

     (defn package-data []
       (info "package data")
       (log-time
        {:what :package-data}
        (jdbc/execute! ds ["

with
most_recent_evts as (select * from package_event where package_event.at in (select max(at) from package_event group by package_id) and package_event.id in (select max(id) from package_event group by package_id, at)),
earliest_evts as (select * from package_event where package_event.at in (select min(at) from package_event group by package_id) and package_event.id in (select min(id) from package_event group by package_id, at))
select
  package.id,
  carrier,
  earliest_evts.place as first_place,
  least(package.date_created,earliest_evts.at) as first_date,
  most_recent_evts.place as last_place,
  most_recent_evts.at as last_date,
  most_recent_evts.status as last_status,
  tracking,
  active,
  case
  when earliest_evts.place = most_recent_evts.place then 0
  else 
  (asin(
  sqrt(
  pow(sin(((earliest_evts.lat - most_recent_evts.lat) * PI())/180/2), 2) +
  pow(sin(((earliest_evts.lon - most_recent_evts.lon) * PI())/180/2), 2) *
  cos((most_recent_evts.lat * PI())/180) * cos((earliest_evts.lat * PI())/180))) * 2 * 6371) /
  ((extract(epoch from most_recent_evts.at) - extract(epoch from least(package.date_created,earliest_evts.at))) / 60 / 60)
  end kph,
  extract(days from now() - most_recent_evts.at) days_ago
  from package
  left join most_recent_evts
    on most_recent_evts.package_id = package.id
  left join earliest_evts
    on earliest_evts.package_id = package.id
  order by active desc, most_recent_evts.at desc;

"])))

     (defonce timed-publish ((fn f [s]
                               (.schedule s (partial f s) (* 1000 60) TimeUnit/MILLISECONDS)
                               (when (pos? (.getNumberOfSubscribers pub))
                                 (.submit pub (package-data))))
                             sched))
     

     (defn geocode [place]
       (json/read-str
        (slurp
         (URL.
          (format "https://nominatim.openstreetmap.org/search?q=%s&format=json"
                  (URLEncoder/encode place "utf-8"))))))

     (defn timezone-offset [lat lon]
       (get (->> (json/read-str
                  (slurp
                   (URL.
                    (format "https://api.teleport.org/api/locations/%s,%s/?embed=location:nearest-cities/location:nearest-city/city:timezone/tz:offsets-now"
                            lat
                            lon))))
                 (tree-seq coll? seq)
                 (filter map?)
                 (filter #(contains? % "total_offset_min"))
                 (first))
            "total_offset_min"))

     (defmulti apply-to-carrier-events (fn [carrier tracking f] carrier))

     (comment
       (with-open [wc (WebClient. BrowserVersion/CHROME)]
         (println (carrier-url "fedex" "..."))
         (let [page (.getPage wc (carrier-url "fedex" "..."))
               ;; _ (Thread/sleep (* 60 1000))
               ;; _ (doseq [button (.getElementsByTagName page "button")
               ;;           :when (= (.asText (.getFirstChild button))
               ;;                    "Expand History")]
               ;;     (.click button))
               ;; _ (Thread/sleep 1000)
               ]
           (Thread/sleep (* 120 1000))
           (println (.asNormalizedText (.getBody page)))
           #_(f
            (tree-seq
             (comp seq #(.getChildren %))
             (comp seq #(.getChildren %))
             page))))
       )

     (defmethod apply-to-carrier-events "fedex" [carrier tracking f]
       (with-open [wc (WebClient.)]
         (let [page (.getPage wc (carrier-url carrier tracking))
               _ (Thread/sleep (* 60 1000))
               _ (doseq [button (.getElementsByTagName page "button")
                         :when (= (.asText (.getFirstChild button))
                                  "Expand History")]
                   (.click button))
               _ (Thread/sleep 1000)]
           (f
            (tree-seq
             (comp seq #(.getChildren %))
             (comp seq #(.getChildren %))
             page)))))

     (defmethod apply-to-carrier-events "ups" [carrier tracking f]
       (with-open [wc (WebClient. BrowserVersion/FIREFOX)]
         (-> wc
             (.getOptions)
             (.setThrowExceptionOnScriptError false))
         (let [page (.getPage wc (carrier-url carrier tracking))
               _ (Thread/sleep (* 60 1000))
               _ (.click (.getElementById page "st_App_View_Details"))]
           (f
            (tree-seq
             (comp seq #(.getChildren %))
             (comp seq #(.getChildren %))
             (first (.getElementsByTagName (first (.getElementsByTagName page "ups-shipment-progress")) "table")))))))

     (defmethod apply-to-carrier-events "usps" [carrier tracking f]
       (info "a" {})
       (with-open [wc (WebClient. BrowserVersion/FIREFOX)]
         (-> wc
             (.getOptions)
             (.setThrowExceptionOnScriptError false))
         (info "b" {:carrier carrier :tracking tracking})
         (let [page (.getPage wc (carrier-url carrier tracking))
               _ (info "c" {})
               _ (Thread/sleep (* 60 1000))
               page (-> page .getEnclosingWindow .getEnclosedPage)
               _ (info "d" {})
               _ (doseq [elem (.getElementsByTagName page "a")
                         :when (some-> elem
                                       (.getAttributes)
                                       (get "class")
                                       (.getNodeValue)
                                       (.contains "see-all"))]
                   (.click elem))
               _ (Thread/sleep (* 5 1000))]
           (f
            (-> page
                (.getElementById "trackingHistory_1")
                (.getChildren)
                (->> (filter #(= "div" (.getNodeName %))))
                (first)
                (.getChildren)
                (->> (filter #(= "div" (.getNodeName %))))
                (first)
                (.getChildren)
                (->> (filter #(= "div" (.getNodeName %))))
                (first)
                (.getChildren))))))

     (defmethod apply-to-carrier-events "dhl" [carrier tracking f]
       (with-open [wc (WebClient. BrowserVersion/FIREFOX)]
         (-> wc
             (.getOptions)
             (.setThrowExceptionOnScriptError false))
         (let [page (.getPage wc (carrier-url carrier tracking))]
           (f
            (seq
             (.getChildren
              (first
               (filter
                #(and (some-> %
                              (.getAttributes)
                              (get "class")
                              (.getNodeValue)
                              (.contains "timeline"))
                      (= "ol" (.getNodeName %)))
                (tree-seq
                 (comp seq #(.getChildren %))
                 (comp seq #(.getChildren %))
                 page)))))))))
     
     (def observe-carrier nil)

     (defmulti observe-carrier (fn [id carrier tracking] carrier))

     (comment
       (observe-carrier 69 "fedex" "...")

       (apply-to-carrier-events
        "fedex"
        "..."
        (fn [nodes]
          (prn nodes)))

       )

     

     (defmethod observe-carrier "fedex" [id carrier tracking]
       (apply-to-carrier-events
        carrier
        tracking
        (fn [nodes]
          (reduce
           (fn [accum elem]
             (prn accum elem)
             (cond (some-> (.getAttributes elem)
                           (get "class")
                           (.getNodeValue)
                           (.contains "travel-history-table__scan-event-date-row"))
                   (assoc accum :date (some-> elem .getFirstChild .getFirstChild .asText))
                   (some-> (.getAttributes elem)
                           (get "class")
                           (.getNodeValue)
                           (.contains "travel-history-table__scan-event-details-row"))
                   (let [place (first (for [elem (.getChildren elem)
                                            :when (some-> (.getAttributes elem)
                                                          (get "class")
                                                          (.getNodeValue)
                                                          (.contains "travel-history-table__location"))]
                                        (-> elem .asText)))
                         place-data (when (seq place)
                                      (or (get-in accum [:places place])
                                          
                                          (for [pd (geocode place)
                                                :let [minutes-offset
                                                      (timezone-offset (-> pd (get "lat")) (-> pd (get "lon")))]]
                                            
                                            (assoc pd :tz-minutes-offset minutes-offset))))
                         time (first (for [elem (.getChildren elem)
                                           :when (some-> (.getAttributes elem)
                                                         (get "class")
                                                         (.getNodeValue)
                                                         (.contains "travel-history-table__time-stamp"))]
                                       (-> elem .asText)))
                         date (:date accum)
                         tz-minutes-offset (:tz-minutes-offset (first place-data))
                         _ (Thread/sleep 1000)]
                     (if-not (seq place)
                       accum
                       (do
                         (try
                           (jdbc/execute-one!
                            ds
                            ["insert into package_event(package_id,lat,lon,tz_minutes_offset,place,at,status)
values (?,?,?,?,?,?,?)"
                             id
                             (Double/parseDouble (-> place-data first (get "lat")))
                             (Double/parseDouble (-> place-data first (get "lon")))
                             tz-minutes-offset
                             place
                             (when tz-minutes-offset
                               (java.sql.Timestamp.
                                (.toEpochMilli
                                 (Instant/from
                                  (.parse (DateTimeFormatter/ofPattern "EEEE, LLLL d, y h:m a XXXX")
                                          (format "%s %s %+03d%02d" date time (quot tz-minutes-offset 60)
                                                  (Math/abs (rem tz-minutes-offset 60))))))))
                             (first (for [elem (.getChildren elem)
                                          :when (some-> (.getAttributes elem)
                                                        (get "class")
                                                        (.getNodeValue)
                                                        (.contains "travel-history-table__status-and-details"))
                                          elem (.getChildren elem)
                                          :when (some-> (.getAttributes elem)
                                                        (get "class")
                                                        (.getNodeValue)
                                                        (.contains "travel-history-table__event-status"))]
                                      (-> elem .asText)))])
                           (catch Throwable t
                             (reset! most-recent-error t)
                             (severe "whoops" t)))
                         (-> accum
                             (assoc-in [:places place] place-data)))))
                   :else
                   accum))
           {}
           nodes))))

     (defmethod observe-carrier "ups" [id carrier tracking]
       (apply-to-carrier-events
        carrier
        tracking
        (fn [nodes]
          (reduce
           (fn [accum elem]
             (cond (and (= "td" (.getNodeName elem))
                        (nil? (:time accum)))
                   (let [[date time] (map
                                      #(.getTextContent %)
                                      (filter
                                       #(= "#text" (.getNodeName %))
                                       (.getChildren elem)))]
                     (assoc accum :date date :time time))
                   (and (= "td" (.getNodeName elem))
                        (:time accum))
                   (do
                     (let [[status place] (->> (tree-seq (comp seq #(.getChildren %)) (comp seq #(.getChildren %)) elem)
                                               (filter #(= "#text" (.getNodeName %)))
                                               (map #(some-> % .getTextContent .trim))
                                               (filter seq))]
                       (assert status)
                       (assert place)
                       (if (= status "Label Created")
                         (assoc accum :time nil)
                         (let [pd (when (seq place)
                                    (or (get-in accum [:places place])
                                        
                                        (for [pd (geocode place)
                                              :let [minutes-offset (timezone-offset (-> pd (get "lat"))
                                                                                    (-> pd (get "lon")))]]
                                          
                                          (assoc pd :tz-minutes-offset minutes-offset))))]
                           (try
                             (when (= (.trim place) "United States")
                               (throw (Exception.)))
                             (jdbc/execute-one!
                              ds
                              ["insert into package_event(package_id,lat,lon,tz_minutes_offset,place,at,status)
values (?,?,?,?,?,?,?)"
                               id
                               (Double/parseDouble (-> pd first (get "lat")))
                               (Double/parseDouble (-> pd first (get "lon")))
                               (:tz-minutes-offset (first pd))
                               place
                               (when (:tz-minutes-offset (first pd))
                                 (java.sql.Timestamp.
                                  (.toEpochMilli
                                   (Instant/from
                                    (.parse (DateTimeFormatter/ofPattern "MM/dd/yyyy h:m a XXXX")
                                            (-> (format "%s %s %+03d%02d"
                                                        (:date accum)
                                                        (:time accum)
                                                        (quot (-> pd first :tz-minutes-offset) 60)
                                                        (Math/abs (rem (-> pd first :tz-minutes-offset) 60))
                                                        )
                                                (.replaceAll "P.M." "PM")
                                                (.replaceAll "A.M." "AM")))))))
                               status])
                             (catch Throwable t
                               (reset! most-recent-error t)
                               (severe "whoops" t)))
                           (-> accum
                               (assoc-in [:places place] pd)
                               (assoc :time nil))))))
                   :else
                   accum))
           {}
           nodes))))

     ;; (re-find #"\w+ \d+, \d+," "November 20, 2021,\n\t\t\t\t\t\t\t\t\t8:26 pm")

     ;; (re-find #"\w+,? \w\w" "INDIANAPOLIS IN LOGISTICS CENTER")
     ;; (re-find #"\w+,? \w\w (\d+)?" "INDIANAPOLIS IN LOGISTICS CENTER")
     
     (defmethod observe-carrier "usps" [id carrier tracking]
       (apply-to-carrier-events
        carrier
        tracking
        (fn [nodes]
          (reduce
           (fn [accum elem]
             
             (cond (and (= "span" (.getNodeName elem))
                        (some #{"strong"} (->> elem .getChildren (map #(.getNodeName %))))
                        (> (-> elem
                               .getChildren
                               (->> (filter #(= "strong" (.getNodeName %))))
                               first
                               .getTextContent
                               .trim
                               (.split "\n")
                               (count))
                           1))
                   (let [[date raw-time] (-> elem
                                             .getChildren
                                             (->> (filter #(= "strong" (.getNodeName %))))
                                             first
                                             .getTextContent
                                             .trim
                                             (.split "\n"))
                         time (.trim raw-time)]
                     (assoc accum :date date :time time))
                   (and (= "span" (.getNodeName elem))
                        (:time accum)
                        (not (:status accum)))
                   (assoc accum :status (.trim (.getTextContent elem)))
                   (and (= "span" (.getNodeName elem))
                        (:time accum)
                        (:status accum))
                   (let [place (.trim (.getTextContent elem))
                         state (:status accum)]
                     (let [pd (when (seq place)
                                (or (get-in accum [:places place])
                                    
                                    (seq (for [pd (geocode place)
                                               :let [minutes-offset (timezone-offset (-> pd (get "lat"))
                                                                                     (-> pd (get "lon")))]]
                                           
                                           (assoc pd :tz-minutes-offset minutes-offset)))
                                    (seq (for [pd (geocode (re-find #"\w+,? \w\w " place))
                                               :let [minutes-offset (timezone-offset (-> pd (get "lat"))
                                                                                     (-> pd (get "lon")))]]
                                           
                                           (assoc pd :tz-minutes-offset minutes-offset)))))]
                       (try
                         (jdbc/execute-one!
                          ds
                          ["insert into package_event(package_id,lat,lon,tz_minutes_offset,place,at,status)
values (?,?,?,?,?,?,?)"
                           id
                           (Double/parseDouble (-> pd first (get "lat")))
                           (Double/parseDouble (-> pd first (get "lon")))
                           (:tz-minutes-offset (first pd))
                           place
                           (when (:tz-minutes-offset (first pd))
                             (java.sql.Timestamp.
                              (.toEpochMilli
                               (Instant/from
                                (.parse (DateTimeFormatter/ofPattern "MMMM d, yyyy, h:m a XXXX")
                                        (-> (format "%s %s %+03d%02d"
                                                    (:date accum)
                                                    (:time accum)
                                                    (quot (-> pd first :tz-minutes-offset) 60)
                                                    (Math/abs (rem (-> pd first :tz-minutes-offset) 60)))
                                            (.replaceAll "am" "AM")
                                            (.replaceAll "pm" "PM")))))))
                           (:status accum)])
                         (catch Throwable t
                           (reset! most-recent-error t)
                           (severe "whoops"t)))
                       (-> accum
                           (dissoc :time :status)
                           (assoc-in [:places place] pd))))
                   (= "hr" (.getNodeName elem))
                   (empty accum)
                   :else
                   accum))
           {}
           nodes))))
     

     (defmethod observe-carrier "dhl" [id carrier tracking]
       (apply-to-carrier-events
        carrier
        tracking
        (fn [nodes]
          (reduce
           (fn [accum elem]
             (cond (and (= "li" (.getNodeName elem))
                        (some-> elem
                                (.getAttributes)
                                (get "class")
                                (.getNodeValue)
                                (.contains "timeline-date")))
                   (assoc accum :date (.asText elem))
                   (and (:date accum)
                        (= "li" (.getNodeName elem))
                        (some-> elem
                                (.getAttributes)
                                (get "class")
                                (.getNodeValue)
                                (.contains "timeline-event")))
                   (let [[time-a time-b] (some-> elem
                                                 (.getChildren)
                                                 (->> (filter #(some-> %
                                                                       (.getAttributes)
                                                                       (.get "class")
                                                                       (.getNodeValue)
                                                                       (.contains "timeline-time"))))
                                                 (first)
                                                 (.asText)
                                                 (.split "\n"))
                         [location] (for [c (.getChildren elem)
                                          :when (some-> c
                                                        (.getAttributes)
                                                        (.get "class")
                                                        (.getNodeValue)
                                                        (.contains "timeline-unit"))
                                          c (.getChildren c)
                                          :when (some-> c
                                                        (.getAttributes)
                                                        (.get "class")
                                                        (.getNodeValue)
                                                        (.contains "timeline-location"))]
                                      (.trim (.asText c)))
                         [description] (for [c (.getChildren elem)
                                             :when (some-> c
                                                           (.getAttributes)
                                                           (.get "class")
                                                           (.getNodeValue)
                                                           (.contains "timeline-unit"))
                                             c (.getChildren c)
                                             :when (some-> c
                                                           (.getAttributes)
                                                           (.get "class")
                                                           (.getNodeValue)
                                                           (.contains "timeline-description"))]
                                         (.trim (.asText c)))
                         pd (when (seq location)
                              (or (get-in accum [:places location])
                                  
                                  (for [pd (geocode location)
                                        :let [minutes-offset (timezone-offset (-> pd (get "lat"))
                                                                              (-> pd (get "lon")))]]
                                    
                                    (assoc pd :tz-minutes-offset minutes-offset))))]
                     (if (and time-a
                              time-b
                              location
                              description)
                       (let [[am-pm _] (.split time-b " ")]
                         (try
                           (jdbc/execute-one!
                            ds
                            ["insert into package_event(package_id,lat,lon,tz_minutes_offset,place,at,status)
values (?,?,?,?,?,?,?)"
                             id
                             (Double/parseDouble (-> pd first (get "lat")))
                             (Double/parseDouble (-> pd first (get "lon")))
                             (:tz-minutes-offset (first pd))
                             location
                             (when (:tz-minutes-offset (first pd))
                               (java.sql.Timestamp.
                                (.toEpochMilli
                                 (Instant/from
                                  (.parse (DateTimeFormatter/ofPattern "MMM d, yyyy h:m a XXXX")
                                          (format "%s %s %s %+03d%02d"
                                                  (:date accum)
                                                  time-a
                                                  am-pm
                                                  (quot (-> pd first :tz-minutes-offset) 60)
                                                  (Math/abs (rem (-> pd first :tz-minutes-offset) 60))))))))
                             description])
                           (catch Throwable t
                             (reset! most-recent-error t)
                             (severe "whoops"t)))
                         (assoc-in accum [:places location] pd))
                       (assoc-in accum [:places location] pd)))
                   :else
                   (do
                     #_(prn elem)
                     accum)))
           {}
           nodes))))

     
     (defn observe []
       (transduce
        identity
        (completing
         (fn [n {:package/keys [id carrier tracking]}]
           (try
             (observe-carrier id carrier tracking)
             (inc n)
             (catch Throwable t
               (info (print-str id carrier tracking))
               (reset! most-recent-error t)
               (severe "whoops" t)
               n))))
        0
        (jdbc/execute! ds ["select * from package where active = true"])))

     (defn observe-loop []
       (while true
         (observe)
         (let [i (/ (- (Math/log (- 1 (rand)))) (/ 1 (* 1000 60 60)))]
           (info "sleep" {:interval i})
           (Thread/sleep i))))

     (def ^CompletableFuture page (CompletableFuture.))

     ;; dumps a lot of management bean stuff out in a format that
     ;; prometheus(https://prometheus.io/) can digest
     (defn metrics-data []
       (let [baos (java.io.ByteArrayOutputStream.)]
         (loop [s nil
                e {:result (java.util.HashMap.)}
                c (java.lang.management.ManagementFactory/getPlatformMBeanServer)
                d nil]
           (cond (instance? javax.management.MBeanServerConnection c)
                 (let [x (.queryNames (java.lang.management.ManagementFactory/getPlatformMBeanServer) nil nil)
                       e (assoc e :con c)]
                   (recur s e nil (concat (for [i x] [s e i]) d)))
                 (instance? javax.management.ObjectName c)
                 (if (contains? e c)
                   (recur s e nil d)
                   (let [info (.getMBeanInfo (:con e) c)
                         atttibutes (.getAttributes info)
                         e (assoc e :info info :name c c true)
                         s (conj s (-> c (.getDomain)))]
                     (recur s e nil (concat (for [i atttibutes] [s e i]) d))))
                 (instance? javax.management.MBeanAttributeInfo c)
                 (let [^javax.management.MBeanAttributeInfo c c]
                   (if (.isReadable c)
                     (let [[attr] (.getAttributes (:con e) (:name e) (into-array String [(.getName c)]))
                           e (assoc e :attribute-info c)
                           s (conj s (-> c (.getName)))]
                       (recur s e attr d))
                     (recur s e nil d)))
                 (instance? javax.management.Attribute c)
                 (let [^javax.management.Attribute c c]
                   (recur s e (.getValue c) d))
                 (and c (.isArray (class c)))
                 (recur s e nil (concat (for [i c] [s e i]) d))
                 (instance? java.util.Map  c)
                 (recur s e nil (concat (for [[k v] c] [(into s k) e v]) d))
                 (instance? javax.management.openmbean.CompositeData c)
                 (let [^javax.management.openmbean.CompositeData c c]
                   (recur s e nil (concat (for [k (seq (.keySet (.getCompositeType c)))] [(conj s k) e (.get c k)]) d)))
                 (string? c)
                 (recur s e nil d)
                 (boolean? c)
                 (recur s e nil d)
                 (number? c)
                 (do
                   (.put ^java.util.Map (:result e)
                         (->> s
                              (map (fn [x]
                                     (-> (str x)
                                         (.toLowerCase)
                                         (.replaceAll " " "_")
                                         (.replaceAll "-" "_")
                                         (.replaceAll "\\." "_")
                                         (.replaceAll "'" ""))))
                              (reverse)
                              (interpose "_")
                              (apply str))
                         c)
                   (recur s e nil d))
                 (and (nil? c) d)
                 (let [[[s e c] & d] d]
                   (recur s e c d))
                 (and (nil? c) (nil? d))
                 (doseq [[k v] (:result e)]
                   (.write baos (.getBytes (str k)))
                   (.write baos (.getBytes " "))
                   (.write baos (.getBytes (str (double v))))
                   (.write baos (.getBytes "\n")))
                 :else
                 (do
                   (prn c)
                   (prn (supers (class c)))
                   (assert nil))))
         (.write baos (.getBytes (format "http_request_count %s\n" @requests-count)))
         (.write baos (.getBytes (format "undertow_channel_count %s\n" (-> @state :channels count))))
         (-> baos
             (.toByteArray)
             (ByteBuffer/wrap))))

     (def handle-request* nil)
     (defmulti handle-request* (fn [state ^HttpServerExchange exchange] (.getRequestPath exchange)))

     (defmethod handle-request* "/metrics" [state ^HttpServerExchange exchange]
       (doto exchange
         (-> .getResponseHeaders (.put Headers/CONTENT_TYPE "text/plain"))
         (-> .getResponseSender (.send ^ByteBuffer (metrics-data)))))

     (defmethod handle-request* "/forecast" [state ^HttpServerExchange exchange]
       (info (.getQueryString exchange) {})
       (let [[_ n] (re-find #"id=(\d+)" (.getQueryString exchange))]
         (if n
           (.dispatch exchange
                      (fn []
                        (doto exchange
                          (-> .getResponseHeaders (.put Headers/CONTENT_TYPE "text/plain"))
                          (-> .getResponseSender (.send (letfn [(f [id] (first (map (comp :place placify) (jdbc/execute! ds ["select * from package_event where package_id = ? order by at desc limit 1" (Long/parseLong id)]))))]
                                                          (with-out-str
                                                            (clojure.pprint/pprint
                                                             (summerize (repeatedly
                                                                         1000
                                                                         (fn [] (/ (apply +
                                                                                          (map second
                                                                                               (take-while #(not= (first %) "terminal")
                                                                                                           (iterate (fn [[x y]] (draw @m x)) [(f n) 0]))))
                                                                                   24.0))))))
                                                          ))))))
           (doto exchange
             (-> .getResponseHeaders (.put Headers/CONTENT_TYPE "text/plain"))
             (-> .getResponseSender (.send "Error"))))))

     (defmethod handle-request* "/forecast2" [state ^HttpServerExchange exchange]
       (info (.getQueryString exchange) {})
       (let [[_ n] (re-find #"id=(\d+)" (.getQueryString exchange))]
         (if n
           (.dispatch exchange
                      (fn []
                        (doto exchange
                          (-> .getResponseHeaders (.put Headers/CONTENT_TYPE "text/html"))
                          (-> .getResponseSender (.send (with-out-str
                                                          (println "<html>")
                                                          (println "<head></head>")
                                                          (println "<body>")
                                                          (printf "<img src=\"/ddd/forecast.png?id=%s\">\n" n)
                                                          (println "</body>")
                                                          (println "</html>")))))))
           (doto exchange
             (-> .getResponseHeaders (.put Headers/CONTENT_TYPE "text/plain"))
             (-> .getResponseSender (.send "Error"))))))

     

     (defmethod handle-request* "/forecast.png" [state ^HttpServerExchange exchange]
       (let [[_ n] (re-find #"id=(\d+)" (.getQueryString exchange))]
         (.dispatch exchange
                    (fn []
                      (doto exchange
                        (-> .getResponseHeaders (.put Headers/CONTENT_TYPE "image/png"))
                        (-> .getResponseHeaders (.put Headers/CACHE_CONTROL "max-age=60"))
                        (-> .getResponseSender (.send (java.nio.ByteBuffer/wrap
                                                       (letfn [(f [id] (first (map (comp :place placify) (jdbc/execute! ds ["select * from package_event where package_id = ? order by at desc limit 1" (Long/parseLong id)]))))]
                                                         (let [y (repeatedly
                                                                  200
                                                                  (fn [] (/ (apply +
                                                                                   (map second
                                                                                        (take-while #(not= (first %) "terminal")
                                                                                                    (iterate (fn [[x y]] (draw @m x)) [(f n) 0]))))
                                                                            24.0)))
                                                               x (map #(Math/round %) y)
                                                               f (frequencies x)
                                                               scale-x (/ 800 (inc (apply max x)))
                                                               scale-y (/ 570 (inc (apply max (vals f))))
                                                               height (apply max (vals f))
                                                               width (inc (apply max x))
                                                               image (java.awt.image.BufferedImage. 800 600 java.awt.image.BufferedImage/TYPE_INT_ARGB)
                                                               g (.createGraphics image)
                                                               _ (.setColor g java.awt.Color/WHITE)
                                                               _ (.fillRect g  0 0 800 600)
                                                               _ (.setColor g java.awt.Color/RED)
                                                               _ (dotimes [i height]
                                                                   (dotimes [ii width]
                                                                     (if (and (contains? f ii)
                                                                              (<= (- height i) (get f ii)))
                                                                       (.fillRect g  (* ii scale-x) (+ (* i scale-y) 5)
                                                                                  (Math/ceil (* 1.0 scale-x))
                                                                                  (Math/ceil (* 1.0 scale-y))))))
                                                               _ (.setColor g java.awt.Color/BLACK)
                                                               _ (dotimes [ii width]
                                                                   (.drawString g (str ii)
                                                                                (long (+ (* ii scale-x) (/ scale-x 2)))
                                                                                590))
                                                               _ (reduce
                                                                  (fn [i [label n]]
                                                                    (.drawString g
                                                                                 (name label)
                                                                                 10
                                                                                 i)
                                                                    (.drawString g
                                                                                 (str n)
                                                                                 100
                                                                                 i)
                                                                    (+ i 20))
                                                                  20
                                                                  (summerize y))
                                                               baos (java.io.ByteArrayOutputStream.)]
                                                           (.flush image)
                                                           (javax.imageio.ImageIO/write image "png" baos)
                                                           (.toByteArray baos))))
                                                      io.undertow.io.IoCallback/END_EXCHANGE)))))))

     (defmethod handle-request* "/source" [state ^HttpServerExchange exchange]
       (.dispatch exchange
                  (fn []
                    (doto exchange
                      (-> .getResponseHeaders (.put Headers/CONTENT_TYPE "text/plain"))
                      (-> .getResponseSender (.send (slurp file) io.undertow.io.IoCallback/END_EXCHANGE))))))

     (defmethod handle-request* :default [state ^HttpServerExchange exchange]
       (.dispatch
        exchange
        (fn []
          (.whenCompleteAsync
           page
           (reify
             java.util.function.BiConsumer
             (accept [_ t u]
               (let [^ByteBuffer t t]
                 (when u
                   (severe "error" u))
                 (doto exchange
                   (-> .getResponseHeaders (.put Headers/CONTENT_TYPE "text/html"))
                   (-> .getResponseSender (.send (.slice t) io.undertow.io.IoCallback/END_EXCHANGE))))))
           (.getIoThread exchange)))))
     
     (defn handle-request [state ^HttpServerExchange exchange]
       (swap! requests-count inc)
       (log-time
        {:http/path (.getRequestPath exchange) :http/method (str (.getRequestMethod exchange))}
        (handle-request* state exchange)))

     (def dispatch nil)
     (defmulti dispatch (fn [state id channel v resp exec] (nth v 2)))

     (defmethod dispatch "log" [state id channel v resp exec]
       (info (nth v 3) {:id id})
       (resp nil))

     (defmethod dispatch "observe" [state id channel v resp exec]
       (observe)
       (resp nil))

     (defmethod dispatch "new-tracking" [state id channel v resp exec]
       (jdbc/execute! ds ["insert into package(carrier,tracking) values (?,?)"
                          (.trim (.toLowerCase (str (second (nth v 3)))))
                          (.trim (.toLowerCase (str (first (nth v 3)))))])
       (info "new-tracking")
       (resp "Some Response"))

     (defmethod dispatch "set-active" [state id channel v resp exec]
       (info v)
       (jdbc/execute! ds ["update package set active = ? where id = ?"
                          (boolean (second (nth v 3)))
                          (first (nth v 3))])
       (resp "Some Response"))
     
     (defmethod dispatch "get-note" [state id channel v resp exec]
       (resp
        (or (:package_note/note (jdbc/execute-one! ds ["select note from package_note where package_id = ?" (first (nth v 3))]))
            "")))

     (defmethod dispatch "set-note" [state id channel v resp exec]
       (try
         (jdbc/execute! ds ["insert into package_note(package_id,note) values (?,?)"
                            (first (nth v 3))
                            (second (nth v 3))])
         (catch Throwable _
           (jdbc/execute! ds ["update package_note set note = ? where package_id = ?"
                              (second (nth v 3))
                              (first (nth v 3))])))
       (resp true))

     (defmethod dispatch "list" [state id channel v resp exec]
       (resp
        (jdbc/execute! ds [(str "select * from "
                                (#{"package" "event" "observation" "fedex_event_data"}
                                 (first (nth v 3))))])))

     (defmethod dispatch "packages" [state id channel v resp exec]
       (resp
        (log-time
         (package-data))))

     (defn on-next [pd state id ^WebSocketChannel channel v resp exec]
       (.execute
        (.getWorker channel)
        (fn []
          (try
            (log-time
             {:id id :what :tick}
             (let [ps (get-in @state [:data id (nth v 1) :ps] #{})
                   new-ps (set (for [row pd
                                     [k v] row]
                                 [(:package/id row) k v]))
                   [to-remove to-add _] (diff ps new-ps)]
               (resp
                {:assert to-add
                 :retract to-remove})
               (swap! state assoc-in [:data id (nth v 1) :ps] new-ps)))
            (.request ^Flow$Subscription (get-in @state [:data id (nth v 1) :sub]) 1)
            (catch Throwable t
              (severe "error" t))))))
     
     (defn on-subscribe [^Flow$Subscription s state id ^WebSocketChannel channel v resp exec]
       (swap! state assoc-in [:data id (nth v 1) :sub] s)
       (.addCloseTask channel
                      (reify
                        org.xnio.ChannelListener
                        (handleEvent [this evt]
                          (info "cancel subscription" {:id id})
                          (.cancel s))))
       (.request s 1))

     (defmethod dispatch "packages-sub" [state id channel v resp exec]
       (.subscribe
        pub
        (reify
          Flow$Subscriber
          (onComplete [_])
          (onError [_ err]
            (severe "error" err))
          (onNext [_ pd]
            (on-next pd state id channel v resp exec))
          (onSubscribe [_ s]
            (on-subscribe s state id channel v resp exec))))
       (.submit pub (package-data)))

     (defmethod dispatch "packages-refresh" [state id channel v resp exec]
       (info "packages-refresh")
       (.submit pub (package-data))
       (resp nil))

     (defn send-text [^String s ^WebSocketChannel channel timeout]
       (let [cf (java.util.concurrent.CompletableFuture.)]
         (WebSockets/sendText s
                              channel
                              (reify
                                io.undertow.websockets.core.WebSocketCallback
                                (complete [_ channel context]
                                  (.complete cf true))
                                (onError [_ channel context throwable]
                                  (.completeExceptionally cf throwable)))
                              (long timeout))
         cf))

     (defn on-full-text-message [state id ^WebSocketChannel channel ^BufferedTextMessage msg]
       (let [v (transit/read (transit/reader (java.io.ByteArrayInputStream. (.getBytes (.getData msg) "utf-8")) :json))]
         (info v)
         (case (long (nth v 0))
           0 (dispatch state
                       id
                       channel
                       v
                       (fn [result]
                         (send-text
                          (let [bao (java.io.ByteArrayOutputStream.)]
                            (transit/write (transit/writer bao :json)
                                           [1 (nth v 1) result])
                            (String. (.toByteArray bao)))
                          channel
                          1000))
                       (fn [f]
                         (.execute (.getWorker channel) f)))
           1 (info v))))

     (defn websocket-connect [state exchange ^WebSocketChannel channel]
       (let [id (UUID/randomUUID)]
         (.addCloseTask channel
                        (reify
                          org.xnio.ChannelListener
                          (handleEvent [this evt]
                            (info "closed" {:id id})
                            (swap! state update-in [:channels] dissoc id)
                            (swap! state update-in [:data] dissoc id))))
         (-> channel
             .getReceiveSetter
             (.set
              (proxy [AbstractReceiveListener] []
                (onFullCloseMessage [^WebSocketChannel channel msg]
                  (swap! state update-in [:channels] dissoc id)
                  (swap! state update-in [:data] dissoc id)
                  (.close channel))
                (onFullTextMessage [channel msg]
                  (try
                    (on-full-text-message state id channel msg)
                    (catch Throwable t
                      (reset! most-recent-error t)
                      (severe "whoops" t)))))))
         (swap! state assoc-in [:channels id] channel)
         (.resumeReceives channel)))
     
     (defonce ^Undertow server
       (doto (-> (Undertow/builder)
                 (.setIoThreads 1)
                 (.setWorkerThreads 2)
                 ;; listens everwhere, watch out
                 (.addHttpListener port "0.0.0.0")
                 (.setHandler
                  (WebSocketProtocolHandshakeHandler.
                   (reify
                     WebSocketConnectionCallback
                     (onConnect [_ exchange channel]
                       (websocket-connect state exchange channel)))
                   (reify
                     HttpHandler
                     (handleRequest [_ exchange]
                       (try
                         (handle-request state exchange)
                         (catch Throwable t
                           (reset! most-recent-error t)
                           (severe "whoops" t)))))))
                 (.build))
         (.start)))

     ;; recompile file as cljs
     (.execute (.getWorker server)
               (bound-fn []
                 (log-time
                  {:time/what :cljs-compile}
                  (let [js (File/createTempFile "whatever" ".js")]
                    (try
                      (cljs/build (.getAbsolutePath (io/file *file*))
                                  {:output-to (.getAbsolutePath js)
                                   :optimizations :advanced})
                      (.complete
                       page
                       (ByteBuffer/wrap
                        (.toByteArray
                         (doto (java.io.ByteArrayOutputStream.)
                           (.write (.getBytes "<meta charset=\"utf-8\" /><html><head><script>"))
                           ((fn [a] (clojure.java.io/copy js a)))
                           (.write (.getBytes "</script></head><body><script>packages.main();</script></body></html>"))))))
                      (catch Throwable t
                        (.completeExceptionally page t))
                      (finally
                        (info "file size" {:file/size (.length js)})
                        (.delete js)))))))

     ;; reload this file
     (defn reload []
       (log-time (load-file file))
       (info "reload complete"))

     ;; tell all clients to reload, close channels, shutdown server,
     ;; reload this file to restart
     (defn new-server []
       (doseq [[_ ^WebSocketChannel ch] (-> @state :channels)]
         (try
           (WebSockets/sendTextBlocking
            (str (json/write-str [0 1 "reload" (* 1000 30)]))
            ch)
           (.close ch)
           (catch Throwable t
             (reset! most-recent-error t)
             (severe "whoops" t))))
       (.stop server)
       (reset! state {})
       (ns-unmap 'packages 'server)
       (reload))

     ;; tell clients to reload
     (defn reload-clients []
       (doseq [[_ ch] (-> @state :channels)]
         (try
           (WebSockets/sendTextBlocking
            (str (json/write-str [0 1 "reload" 0]))
            ^WebSocketChannel ch)
           (catch Throwable t
             (severe "whoops" t)))))


     (defn main [& args]
       (.delete (clojure.java.io/file "/tmp/packages"))
       (let [addr (java.net.UnixDomainSocketAddress/of "/tmp/packages")
             sc (doto (java.nio.channels.ServerSocketChannel/open java.net.StandardProtocolFamily/UNIX)
                  (.bind addr))]
         (while true
           (let [c (.accept sc)]
             (future
               (let [in (java.nio.channels.Channels/newInputStream c)
                     out (clojure.java.io/writer (java.nio.channels.Channels/newOutputStream c))]
                 (binding [*in* (clojure.lang.LineNumberingPushbackReader.
                                 (clojure.java.io/reader in))
                           *out* out
                           *err* out]
                   (clojure.main/repl)))))))

       @(promise))

     ))



(defn elide [s]
  (if (> (count s) 7)
    (str (subs s 0 2) "..." (subs s (- (count s) 2)))
    s))

#?(:cljs
   (do

     (m/info "Top" {})

     (def i (atom 0))

     (defn make-rpc [url]
       (let [ws (atom nil)
             registry (atom {})
             f (fn f []
                 (reset! ws (js/WebSocket. url))
                 (aset @ws "onopen" (fn [evt]
                                      (m/info "onopen" {})))
                 (aset @ws "onclose" (fn [evt]
                                       (doseq [[k v] @registry
                                               :when (number? k)]
                                         (v)
                                         (swap! registry dissoc k))
                                       (js/setTimeout f 1000)))
                 (aset @ws "onerror" (fn [evt]
                                       (m/info "onerror" {:event evt})))
                 (aset @ws "onmessage"
                       (fn [event]
                         (let [msg (transit/read (transit/reader :json) (aget event "data"))]
                           (case (get msg 0)
                             1 (let [s (get msg 1)
                                     g (get @registry s)]
                                 (g [(first (rest (rest msg)))
                                     (js/Promise.
                                      (fn [complete error]
                                        (swap! registry assoc s (fn
                                                                  ([]
                                                                   (g))
                                                                  ([result] (complete result))))))]))
                             0 ((get @registry (get msg 2))
                                msg
                                (fn [r]
                                  (.send @ws (transit/write (transit/writer :json) [1 (get msg 1) r])))))))))]
         (f)
         (doto (fn [name eos & args]
                 (js/Promise.
                  (fn [complete error]
                    (let [s (swap! i inc)
                          m (transit/write (transit/writer :json) [0 s name args eos])]
                      (try
                        (.send @ws m)
                        (swap! registry
                               assoc
                               s
                               (fn
                                 ([]
                                  (complete eos))
                                 ([result] (complete result))))
                        (catch :default t
                          (complete eos)))))))
           (aset "register" (fn [name f]
                              (swap! registry assoc name f))))))

     (defn new-tracking [rpc t c]
       (fn [evt]
         (rpc "new-tracking" nil (t) (c))
         (t "")
         (c "")))

     (defn table-ui [state rpc quil]
       (fn []
         [:div
          [:h1 "Untitled 2021 Downey Project"]
          [:h2 (first (for [[a b c] @state
                            :when (= a -1)
                            :when (= b "last-data")]
                        (str c)))]
          [:div
           [:span "Carrier" [:input#carrier10]]
           [:span "Tracking" [:input#tracking10]]
           [:button {:on-click (new-tracking
                                rpc
                                (fn
                                  ([]
                                   (.-value (.getElementById js/document "tracking10")))
                                  ([value]
                                   (aset (.getElementById js/document "tracking10") "value" value)))
                                (fn
                                  ([]
                                   (.-value (.getElementById js/document "carrier10")))
                                  ([value]
                                   (aset (.getElementById js/document "carrier10") "value" value))))}
            "Add Tracking"]
           [:button {:on-click (fn [evt] (rpc "observe" nil))} "Observe"]
           [:button {:on-click (fn [evt] (rpc "packages-refresh" nil))} "Refresh"]]
          [:table
           [:tr {:style {:background-color "#436318" :color "#ddd"}}
            [:th "id"]
            [:th "carrier"]
            [:th "tracking"]
            [:th "start place"]
            [:th "start at"]
            [:th "last place"]
            [:th "last at"]
            [:th "days ago"]
            [:th "KPH"]
            [:th "active"]
            [:th "status"]]
           (let [s @state]
             (doall
              (map-indexed
               (fn [i {:as m :keys [package/carrier
                                    package/tracking
                                    package_event/first_place
                                    first_date
                                    package_event/last_place
                                    package_event/last_date
                                    days_ago
                                    kph
                                    package/active
                                    package_event/last_status]}]
                 (list
                  ^{:key (:package/id m)}
                  [:tr
                   (if (not (zero? (mod i 2)))
                     {:style {:background-color "#436318" :color "#ddd"}}
                     {})
                   [:td {:style {:text-align "right"}
                         :on-click (fn [_]
                                     (swap! state (fn [s]
                                                    (if (contains? s [(:package/id m) "display_note" true])
                                                      (disj s [(:package/id m) "display_note" true])
                                                      (conj s [(:package/id m) "display_note" true])))))}
                    (:package/id m)]
                   [:td carrier]
                   [:td [:a {:href (carrier-url carrier tracking) :target "_blank"} (elide tracking)]]
                   [:td first_place]
                   [:td (.format (.DateTimeFormat js/Intl "en" #js {:timeStyle "short" :dateStyle "short"}) first_date)]
                   [:td [:span {:on-click (fn [_]
                                            (swap! state (fn [s]
                                                    (if (contains? s [(:package/id m) "display_forecast" true])
                                                      (disj s [(:package/id m) "display_forecast" true])
                                                      (conj s [(:package/id m) "display_forecast" true])))))}
                     last_place]]
                   [:td (.format (.DateTimeFormat js/Intl "en" #js {:timeStyle "short" :dateStyle "short"}) last_date)]
                   [:td {:style {:text-align "right"}} days_ago]
                   [:td {:style {:text-align "right"}} (long kph)]
                   [:td
                    {:style {:text-align "center"}}
                    (if-not (= active :unknown)
                      [:input
                       {:type "checkbox"
                        :checked (boolean active)
                        :on-change (fn [evt]
                                     (swap! state #(-> %
                                                       (disj [(:package/id m) :package/active active])
                                                       (conj [(:package/id m) :package/active :unknown])))
                                     (-> (rpc "set-active" nil (:package/id m) (not active))
                                         (.then (fn [_] (rpc "packages-refresh")))
                                         (.then (fn [_]
                                                  (swap! state #(-> %
                                                                    (disj [(:package/id m) :package/active :unknown])))))))}]
                      [:div {:class "loadingspinner" :style {:width "0.5em" :height "0.5em"}}])]
                   [:td
                    [:div {:style (if (contains? @state [(:package/id m) "display_status" true]) {:display "none"} {})
                           :on-click (fn [_] (swap! state conj [(:package/id m) "display_status" true]))}
                     (elide last_status)]
                    [:div {:style (if (contains? @state [(:package/id m) "display_status" true]) {} {:display "none"})
                           :on-click (fn [_] (swap! state disj [(:package/id m) "display_status" true]))}
                     last_status]]]
                  (when (contains? @state [(:package/id m) "display_note" true])
                    ^{:key (str "note-" (:package/id m))}
                    [:tr
                     [:td {:colspan "10"}
                      [quil (:package/id m)]]])
                  (when (contains? @state [(:package/id m) "display_forecast" true])
                    ^{:key (str "forecast-" (:package/id m))}
                    [:tr
                     [:td {:colspan "10"}
                      [:img {:src (str "/ddd/forecast.png?id=" (:package/id m))}]]])))
               (reverse
                (sort-by
                 (juxt #(boolean (get % :package/active))
                       #(get % :package_event/last_date))
                 (for [[a b c] s
                       :when (= b :package/id)
                       :let [m (into {} (for [[ap b c] s
                                              :when (= a ap)]
                                          [b c]))]]
                   m))))))]]))


     ;; make a component for the quil editor
     (defn make-quil [rpc id]
       (r/create-class
        {:display-name "Quil"
         :reagent-render (fn [] [:div])
         :component-did-mount (fn [this]
                                (.then
                                 (rpc "get-note" nil id)
                                 (fn [[note continue]]
                                   (when note
                                     (set! (.-innerHTML (rdom/dom-node this)) note)
                                     (new js/Quill (rdom/dom-node this) #js {"theme" "snow"})))))
         :component-will-unmount (fn [this]
                                   (rpc
                                    "set-note"
                                    nil
                                    id
                                    (-> (.find js/Quill (rdom/dom-node this))
                                        (aget "root")
                                        (aget "innerHTML"))))}))


     (defn ^:export main []
       (let [;; I should just give up and pull in datascript
             state (r/atom #{})
             loc (.-location js/window)
             url (str (case (.-protocol loc)
                        "http:" "ws"
                        "https:" "wss")
                      "://"
                      (.-host loc)
                      (.-pathname loc))
             rpc (make-rpc url)
             _ (swap! state conj [-1 "last-data" (js/Date.)])
             f (fn [] (table-ui state rpc (partial make-quil rpc)))]
         (set! (.-title js/document) "Untitled 2021 Downey Project")
         (doto (aget js/document "head")
           (.appendChild
            (doto (.createElement js/document "link")
              (.setAttribute "href" "https://downey.family/~kevin/tufte.min.css")
              (.setAttribute "rel" "stylesheet")))
           (.appendChild
            (doto (.createElement js/document "link")
              (.setAttribute "href" "https://cdn.quilljs.com/1.3.6/quill.snow.css")
              (.setAttribute "rel" "stylesheet")))
           (.appendChild
            (doto (.createElement js/document "script")
              (.setAttribute "src" "https://cdn.quilljs.com/1.3.6/quill.js")))
           (.appendChild
            (doto (.createElement js/document "style")
              (.appendChild
               (.createTextNode js/document
                                "

.loadingspinner {
        pointer-events: none;
        width: 2.5em;
        height: 2.5em;
        border: 0.4em solid transparent;
        border-color: #eee;
        border-top-color: #3E67EC;
        border-radius: 50%;
        animation: loadingspin 1s linear infinite;
}

@keyframes loadingspin {
        100% {
                        transform: rotate(360deg)
        }
}

")))))
         (rpc "log" nil "Hi, I am a client")
         ((fn f []
            (.then (rpc "packages-sub" nil)
                   (fn g [[x continue :as y]]
                     (if continue
                       (do
                         (m/info "here" {})
                         (swap! state (fn [s]
                                        (transduce
                                         (mapcat (fn [[k ops]]
                                                   (eduction
                                                    (map (fn [item] [k item]))
                                                    (seq ops))))
                                         (fn
                                           ([s]
                                            (into #{[-1 "last-data" (js/Date.)]}
                                                  (remove #(and (= -1 (nth % 0))
                                                                (= "last-data" (nth % 1))))
                                                  s))
                                           ([s [op triple]]
                                            (case op
                                              :assert (conj s triple)
                                              :retract (disj s triple))))
                                         s
                                         (seq x))))
                         (.then continue g))
                       (js/setTimeout f 1000))))))
         ((aget rpc "register") "log" (fn [msg reply]
                                        (m/info msg {})
                                        (reply nil)))
         ((aget rpc "register") "reload" (fn [msg reply]
                                           (js/setTimeout (fn [] (.reload (.-location js/window) true)) (nth msg 3))
                                           (reply nil)))
         (rdom/render [f] (.-body js/document))))))



(comment

  [113
   112
   118
   96
   104
   122
   97
   109
   111
   107
   120
   131
   125
   118
   117
   105
   129
   149
   127]
  
  )



(comment

  (extend-type clojure.core.async.impl.exec.threadpool$thread_pool_executor$reify__24386
    clojure.core.async.impl.protocols/Executor
    (exec [_ r]
      (clojure.core.async.impl.protocols/exec
       clojure.core.async.impl.exec.threadpool/thread-pool-executor
       r)))  

  )

Generated At 2025-05-15T15:26:41-0700 original