FEATURE: add support for streaming results

FEATURE: add support for streaming results

This implements query_each and query_each_hash these methods implement lazy streaming from the database and help keep memory usage down when streaming enormous results.

diff --git a/CHANGELOG b/CHANGELOG
index a8e6767..15a28c8 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,7 @@
+2020-06-25 - 0.3
+
+- Added support for query_each and query_each_hash, which lazily queries rows and enables selecting large result sets by streaming
+
 2020-04-07 - 0.2.5
 
 - Added support for custom type maps with Postgres connections
diff --git a/README.md b/README.md
index a6a4675..7a1db3f 100644
--- a/README.md
+++ b/README.md
@@ -160,6 +160,33 @@ mini_sql_cnn = MiniSql::Connection.get(pg_cnn, type_map: pg_cnn.type_map_for_res
 
 Note the type mapper for Rails may miss some of the mapping MiniSql ships with such as `IPAddr`, MiniSql is also careful to use the very efficient TimestampUtc decoders where available.
 
+## Streaming support
+
+In some exceptional cases you may want to stream results directly from the database. This enables selection of 100s of thousands of rows with limited memory impact.
+
+Two interfaces exists for this:
+
+`query_each` : which can be used to get materialized objects  
+`query_each_hash` : which can be used to iterate through Hash objects
+
+Usage:
+
+`‍``ruby
+mini_sql_cnn.query_each("SELECT * FROM tons_of_cows limit :limit", limit: 1_000_000) do |row|
+  puts row.cow_name
+  puts row.cow_age
+end
+
+mini_sql_cnn.query_each_hash("SELECT * FROM one_million_cows") do |row|
+  puts row["cow_name"]
+  puts row["cow_age"]
+end
+`‍``
+
+Note, in Postgres streaming is going to be slower than non-streaming options due to internal implementation in the pq gem, each row gets a full result object and additional bookkeeping is needed. Only use it if you need to optimize memory usage.
+
+Streaming support is only implemented in the postgres backend at the moment, PRs welcome to add to other backends.
+
 ## I want more features!
 
 MiniSql is designed to be very minimal. Even though the query builder and type materializer give you a lot of mileage, it is not intended to be a fully fledged ORM. If you are looking for an ORM I recommend investigating ActiveRecord or Sequel which provide significantly more features.
diff --git a/lib/mini_sql/connection.rb b/lib/mini_sql/connection.rb
index 6dd78c3..ea467b9 100644
--- a/lib/mini_sql/connection.rb
+++ b/lib/mini_sql/connection.rb
@@ -31,11 +31,23 @@ module MiniSql
       raise NotImplementedError, "must be implemented by child connection"
     end
 
-    def exec(sql, *params)
+    def query_hash(sql, *params)
       raise NotImplementedError, "must be implemented by child connection"
     end
 
-    def query_hash(sql, *params)
+    def query_decorator(sql, *params)
+      raise NotImplementedError, "must be implemented by child connection"
+    end
+
+    def query_each(sql, *params)
+      raise NotImplementedError, "must be implemented by child connection"
+    end
+
+    def query_each_hash(sql, *params)
+      raise NotImplementedError, "must be implemented by child connection"
+    end
+
+    def exec(sql, *params)
       raise NotImplementedError, "must be implemented by child connection"
     end
 
diff --git a/lib/mini_sql/postgres/connection.rb b/lib/mini_sql/postgres/connection.rb
index affcc4a..ff86e82 100644
--- a/lib/mini_sql/postgres/connection.rb
+++ b/lib/mini_sql/postgres/connection.rb
@@ -83,7 +83,7 @@ module MiniSql
         result = run(sql, params)
         result.type_map = type_map
         result.values
-       ensure
+      ensure
          result.clear if result
       end
 
@@ -95,6 +95,72 @@ module MiniSql
         result.clear if result
       end
 
+      def query_each(sql, *params)
+        raise StandardError, "Please supply a block when calling query_each" if !block_given?
+        if params && params.length > 0
+          sql = param_encoder.encode(sql, *params)
+        end
+
+        raw_connection.send_query(sql)
+        raw_connection.set_single_row_mode
+
+        loop do
+          result = raw_connection.get_result
+          break if !result
+
+          result.check
+
+          if result.ntuples == 0
+            # skip, this happens at the end when we get totals
+          else
+            materializer ||= @deserializer_cache.materializer(result)
+            result.type_map = type_map
+            i = 0
+            # technically we should only get 1 row here
+            # but protect against future batching changes
+            while i < result.ntuples
+              yield materializer.materialize(result, i)
+              i += 1
+            end
+          end
+
+          result.clear
+        end
+      end
+
+      def query_each_hash(sql, *params)
+        raise StandardError, "Please supply a block when calling query_each_hash" if !block_given?
+        if params && params.length > 0
+          sql = param_encoder.encode(sql, *params)
+        end
+
+        raw_connection.send_query(sql)
+        raw_connection.set_single_row_mode
+
+        loop do
+          result = raw_connection.get_result
+          break if !result
+
+          result.check
+
+          if result.ntuples == 0
+            # skip, this happens at the end when we get totals
+          else
+            result.type_map = type_map
+            i = 0
+
+            # technically we should only get 1 row here
+            # but protect against future batching changes
+            while i < result.ntuples
+              yield result[i]
+              i += 1
+            end
+          end
+
+          result.clear
+        end
+      end
+
       def query_decorator(decorator, sql, *params)
         result = run(sql, params)
         result.type_map = type_map
diff --git a/lib/mini_sql/postgres/deserializer_cache.rb b/lib/mini_sql/postgres/deserializer_cache.rb
index 518c581..5179822 100644
--- a/lib/mini_sql/postgres/deserializer_cache.rb
+++ b/lib/mini_sql/postgres/deserializer_cache.rb
@@ -11,6 +11,20 @@ module MiniSql
         @max_size = max_size || DEFAULT_MAX_SIZE
       end
 
+      def materializer(result)
+        key = result.fields
+
+        materializer = @cache.delete(key)
+        if materializer
+          @cache[key] = materializer
+        else
+          materializer = @cache[key] = new_row_matrializer(result)
+          @cache.shift if @cache.length > @max_size
+        end
+
+        materializer
+      end
+
       def materialize(result, decorator_module = nil)
         return [] if result.ntuples == 0
 
@@ -42,6 +56,15 @@ module MiniSql
       def new_row_matrializer(result)
         fields = result.fields
 
+        i = 0
+        while i < fields.length
+          # special handling for unamed column
+          if fields[i] == "?column?"
+            fields[i] = "column#{i}"
+          end
+          i += 1
+        end
+
         Class.new do
           attr_accessor(*fields)
 
diff --git a/lib/mini_sql/version.rb b/lib/mini_sql/version.rb
index f9f5666..5721e8b 100644
--- a/lib/mini_sql/version.rb
+++ b/lib/mini_sql/version.rb
@@ -1,4 +1,4 @@
 # frozen_string_literal: true
 module MiniSql
-  VERSION = "0.2.5"
+  VERSION = "0.3"
 end
diff --git a/test/mini_sql/postgres/connection_test.rb b/test/mini_sql/postgres/connection_test.rb
index d7ad749..974f1aa 100644
--- a/test/mini_sql/postgres/connection_test.rb
+++ b/test/mini_sql/postgres/connection_test.rb
@@ -63,4 +63,81 @@ class MiniSql::Postgres::TestConnection < MiniTest::Test
     assert(delta < 5)
   end
 
+  def test_query_each_hash
+    query = "select 1 a, 2 b union all select 3,4 union all select 5,6"
+    rows = []
+    @connection.query_each_hash(query) do |row|
+      rows << row
+    end
+
+    assert_equal(rows.length, 3)
+
+    assert_equal(rows[0]["a"], 1)
+    assert_equal(rows[0]["b"], 2)
+
+    assert_equal(rows[1]["a"], 3)
+    assert_equal(rows[1]["b"], 4)
+
+    assert_equal(rows[2]["a"], 5)
+    assert_equal(rows[2]["b"], 6)
+
+    row = nil
+    @connection.query_each_hash("select :a a", a: 1) do |r|
+      row = r
+    end
+
+    assert_equal(row["a"], 1)
+  end
+
+  def test_query_each

[... diff too long, it was truncated ...]

GitHub sha: b96bca38

2 Likes

This is so cool :+1:

1 Like