(defprotocol Source (key-value [source join-key] "get the value for the join-key at the current position") (advance [source join-key bound] "advance past bound") (seek [source join-key bound] "seek to the lowest key greater than bound") (elements [source join-key bound] "return seq of elements with key matching bound")) (extend-protocol Source clojure.lang.ISeq (key-value [source join-key] (join-key (first source))) (advance [source join-key bound] (seq (drop-while #(zero? (compare bound (join-key %))) source))) (seek [source join-key bound] (seq (drop-while #(neg? (compare (join-key %) bound)) source))) (elements [source join-key bound] (take-while #(zero? (compare bound (join-key %))) source))) (defn cross ([a] (map list a)) ([a b] (for [i a ii b] (list i ii))) ([a b & c] (for [i a ii b iii (apply cross c)] (list* i ii iii)))) ;; TODO close the loop, make this a source (defn equi-merge-inner-join [join-keys sources] (assert (= (count join-keys) (count sources))) (eduction cat (iteration (fn f [[sources p end]] (when (seq sources) (loop [sources sources p p x' (key-value (nth sources (mod (dec p) (count sources))) (nth join-keys (mod (dec p) (count sources)))) end end] (let [source (nth sources p) join-key (nth join-keys p) x (key-value source join-key)] (if (zero? (compare x x')) (if (= p end) ; complete scan, emit tuples (let [results (->> (for [i (range (count sources))] (elements (nth sources i) (nth join-keys i) x')) (apply cross) (map vec))] (if-some [source (advance source join-key x')] [(assoc sources p source) (mod (inc p) (count sources)) p results] [nil results])) (recur sources (mod (inc p) (count sources)) x' end)) (when-some [source (seek source join-key x')] (recur (assoc sources p source) (mod (inc p) (count sources)) (key-value source join-key) p))))))) :vf peek :initk [(vec (map second (sort-by (fn [[join-key source]] (key-value source join-key)) (for [i (range (count sources))] [(nth join-keys i) (nth sources i)])))) 0 (dec (count sources))]))) (doall (seq (equi-merge-inner-join [first first first] ['([1 :a] [2 :a] [3 :a] [4 :a]) '([1 :b] [1 :b2] [2 :b] [3 :c]) '([1 :c] [3 :c] [5 :c] [6 :c]) ]))) ;; ([[1 :a] [1 :b] [1 :c]] [[1 :a] [1 :b2] [1 :c]] [[3 :a] [3 :c] [3 :c]])