Skip to content

Commit 953cd1a

Browse files
committed
Issue #18: Add support for streaming query rows
1 parent fd12005 commit 953cd1a

File tree

3 files changed

+53
-8
lines changed

3 files changed

+53
-8
lines changed

src/postgres/async.clj

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@
6161
(execute! db sql (fn [rs err]
6262
(f (:rows rs) err)))))
6363

64+
(defn query-rows!
65+
"Executes an sql query with parameters and returns a channel where 0-n rows are emitted."
66+
[^QueryExecutor db [sql & params]]
67+
(let [c (chan)]
68+
(-> (.queryRows db sql (into-array params))
69+
(.subscribe (pg/row-observer c)))
70+
c))
71+
6472
(defn insert!
6573
"Executes an sql insert and returns update count and returned rows.
6674
Spec format is

src/postgres/async/impl.clj

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,40 @@
1717
(apply f (concat args [callback]))
1818
channel))
1919

20-
(defn column->value [^Object value]
20+
(defn- column->value [^Object value]
2121
(if (and value (-> value .getClass .isArray))
2222
(vec (map column->value value))
2323
value))
2424

25+
;; TODO: make columns public in the Java driver
26+
(defn- get-columns [^PgRow row]
27+
(-> (doto (.getDeclaredField PgRow "columns")
28+
(.setAccessible true))
29+
(.get row)
30+
(keys)))
31+
32+
(defn- row->map [^PgRow row ^Object rowmap ^String col]
33+
(assoc rowmap
34+
(keyword (.toLowerCase col))
35+
(column->value (.get row col))))
36+
2537
(defn result->map [^ResultSet result]
26-
(let [columns (.getColumns result)
27-
row->map (fn [^PgRow row rowmap ^String col]
28-
(assoc rowmap (keyword (.toLowerCase col))
29-
(column->value (.get row col))))]
38+
(let [columns (.getColumns result)]
3039
{:updated (.updatedRows result)
3140
:rows (vec (map (fn [row]
32-
(reduce (partial row->map row) {} columns))
33-
result))}))
41+
(reduce (partial row->map row) {} columns))
42+
result))}))
43+
44+
(defn ^rx.Observer row-observer [channel]
45+
(reify rx.Observer
46+
(onNext [_ row]
47+
(put! channel (reduce (partial row->map row)
48+
{} (get-columns row))))
49+
(onError [_ err]
50+
(put! channel err)
51+
(close! channel))
52+
(onCompleted [_]
53+
(close! channel))))
3454

3555
(defn- list-columns [data]
3656
(if (map? data)
@@ -68,3 +88,5 @@
6888
" WHERE " (first where)
6989
(when returning
7090
(str " RETURNING " returning))))
91+
92+

test/postgres/async_test.clj

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
(ns postgres.async-test
22
(:require [clojure.test :refer :all]
33
[clojure.core.async :refer [<!! go]]
4-
[postgres.async :refer :all]))
4+
[postgres.async :refer :all])
5+
(:import [com.github.pgasync SqlException]))
56

67
(def ^:dynamic *db*)
78
(def table "clj_pg_test")
@@ -51,6 +52,20 @@
5152
(let [rs (wait (query! *db* ["select '{{1,2},{3,4},{5,NULL}}'::INT[][] as a"]))]
5253
(is (= [[1 2] [3 4] [5 nil]] (get-in rs [0 :a]))))))
5354

55+
(deftest query-for-rows
56+
57+
(testing "each row is emitted to channel"
58+
(let [c (query-rows! *db* ["select generate_series(1, $1::int4) as s", 3])]
59+
(is (= {:s 1} (<!! c)))
60+
(is (= {:s 2} (<!! c)))
61+
(is (= {:s 3} (<!! c)))
62+
(is (nil? (<!! c)))))
63+
64+
(testing "a single error is emitted on query failure")
65+
(let [c (query-rows! *db* ["selectx"])]
66+
(is (instance? SqlException (<!! c)))
67+
(is (nil? (<!! c)))))
68+
5469
(deftest inserts
5570

5671
(testing "insert returns row count"

0 commit comments

Comments
 (0)