lib/rubyrep/proxy_connection.rb in rubyrep-1.0.1 vs lib/rubyrep/proxy_connection.rb in rubyrep-1.0.2
- old
+ new
@@ -7,10 +7,91 @@
require 'active_record/connection_adapters/abstract_adapter'
module RR
+ # Enables the fetching of (potential large) result sets in chunks.
+ class ResultFetcher
+
+ # The current database ProxyConnection
+ attr_accessor :connection
+
+ # hash of select options as described under ProxyConnection#select_cursor
+ attr_accessor :options
+
+ # column_name => value hash of the last returned row
+ attr_accessor :last_row
+
+ # The current row set: an array of column_name => value hashes
+ attr_accessor :rows
+
+ # Index to the current row in rows
+ attr_accessor :current_row_index
+
+ # Creates a new fetcher.
+ # * +connection+: the current ProxyConnection
+ # * +options+: hash of select options as described under ProxyConnection#select_cursor
+ def initialize(connection, options)
+ self.connection = connection
+ self.options = options.clone
+ end
+
+ # Returns +true+ if there are more rows to read.
+ def next?
+ unless self.rows
+ # Try to load some records
+
+ if options[:query] and last_row != nil
+ # A query was directly specified and all it's rows were returned
+ # ==> Finished.
+ return false
+ end
+
+ if options[:query]
+ # If a query has been directly specified, just directly execute it
+ query = options[:query]
+ else
+ # Otherwise build the query
+ if last_row
+ # There was a previous batch.
+ # Next batch will start after the last returned row
+ options.merge! :from => last_row, :exclude_starting_row => true
+ end
+
+ query = connection.table_select_query(options[:table], options)
+
+ if options[:row_buffer_size]
+ # Set the batch size
+ query += " limit #{options[:row_buffer_size]}"
+ end
+ end
+
+ self.rows = connection.select_all query
+ self.current_row_index = 0
+ end
+ self.current_row_index < self.rows.size
+ end
+
+ # Returns the row as a column => value hash and moves the cursor to the next row.
+ def next_row
+ raise("no more rows available") unless next?
+ self.last_row = self.rows[self.current_row_index]
+ self.current_row_index += 1
+
+ if self.current_row_index == self.rows.size
+ self.rows = nil
+ end
+
+ self.last_row
+ end
+
+ # Frees up all ressources
+ def clear
+ self.rows = nil
+ end
+ end
+
# This class represents a remote activerecord database connection.
# Normally created by DatabaseProxy
class ProxyConnection
extend Forwardable
@@ -100,12 +181,10 @@
# * :+table+: name of the table from which to read data
# * further options as taken by #table_select_query to build the query
# * :+row_buffer_size+:
# Integer controlling how many rows a read into memory at one time.
def select_cursor(options)
- row_buffer_size = options[:row_buffer_size] || DEFAULT_ROW_BUFFER_SIZE
- query = options[:query] || table_select_query(options[:table], options)
- cursor = connection.select_cursor query, row_buffer_size
+ cursor = ResultFetcher.new(self, options)
if options[:type_cast]
cursor = TypeCastingCursor.new(self, options[:table], cursor)
end
cursor
end