(defprotocol Source
  (key-value [source join-key]
    "get the value for the join-key at the current position")
  (advance [source join-key bound]
    "advance past bound")
  (seek [source join-key bound]
    "seek to the lowest key greater than bound")
  (elements [source join-key bound]
    "return seq of elements with key matching bound"))

(extend-protocol Source
  clojure.lang.ISeq
  (key-value [source join-key]
    (join-key (first source)))
  (advance [source join-key bound]
    (seq (drop-while #(zero? (compare bound (join-key %))) source)))
  (seek [source join-key bound]
    (seq (drop-while #(neg? (compare (join-key %) bound)) source)))
  (elements [source join-key bound]
    (take-while #(zero? (compare bound (join-key %))) source)))

(defn cross
  ([a] (map list a))
  ([a b]
   (for [i a
         ii b]
     (list i ii)))
  ([a b & c]
   (for [i a
         ii b
         iii (apply cross c)]
     (list* i ii iii))))

;; TODO close the loop, make this a source
(defn equi-merge-inner-join [join-keys sources]
  (assert (= (count join-keys)
             (count sources)))
  (eduction
   cat
   (iteration
    (fn f [[sources p end]]
      (when (seq sources)
        (loop [sources sources
               p p
               x' (key-value (nth sources (mod (dec p) (count sources)))
                             (nth join-keys (mod (dec p) (count sources))))
               end end]
          (let [source (nth sources p)
                join-key (nth join-keys p)
                x (key-value source join-key)]
            (if (zero? (compare x x'))
              (if (= p end)             ; complete scan, emit tuples
                (let [results (->> (for [i (range (count sources))]
                                     (elements (nth sources i)
                                               (nth join-keys i)
                                               x'))
                                   (apply cross)
                                   (map vec))]
                  (if-some [source (advance source join-key x')]
                    [(assoc sources p source) (mod (inc p) (count sources)) p results]
                    [nil results]))
                (recur sources (mod (inc p) (count sources)) x' end))
              (when-some [source (seek source join-key x')]
                (recur (assoc sources p source)
                       (mod (inc p) (count sources))
                       (key-value source join-key)
                       p)))))))
    :vf peek
    :initk [(vec
             (map second
                  (sort-by
                   (fn [[join-key source]]
                     (key-value source join-key))
                   (for [i (range (count sources))]
                     [(nth join-keys i) (nth sources i)]))))
            0
            (dec (count sources))])))

(doall
 (seq (equi-merge-inner-join
       [first first first]
       ['([1 :a] [2 :a] [3 :a] [4 :a])
        '([1 :b] [1 :b2] [2 :b] [3 :c])
        '([1 :c] [3 :c] [5 :c] [6 :c])
        ])))

;; ([[1 :a] [1 :b] [1 :c]] [[1 :a] [1 :b2] [1 :c]] [[3 :a] [3 :c] [3 :c]])

Generated At 2025-05-29T11:41:48-0700 original