lib/rubyrep/proxy_connection.rb in rubyrep-1.0.1 vs lib/rubyrep/proxy_connection.rb in rubyrep-1.0.2

- old
+ new

@@ -7,10 +7,91 @@ require 'active_record/connection_adapters/abstract_adapter' module RR + # Enables the fetching of (potential large) result sets in chunks. + class ResultFetcher + + # The current database ProxyConnection + attr_accessor :connection + + # hash of select options as described under ProxyConnection#select_cursor + attr_accessor :options + + # column_name => value hash of the last returned row + attr_accessor :last_row + + # The current row set: an array of column_name => value hashes + attr_accessor :rows + + # Index to the current row in rows + attr_accessor :current_row_index + + # Creates a new fetcher. + # * +connection+: the current ProxyConnection + # * +options+: hash of select options as described under ProxyConnection#select_cursor + def initialize(connection, options) + self.connection = connection + self.options = options.clone + end + + # Returns +true+ if there are more rows to read. + def next? + unless self.rows + # Try to load some records + + if options[:query] and last_row != nil + # A query was directly specified and all it's rows were returned + # ==> Finished. + return false + end + + if options[:query] + # If a query has been directly specified, just directly execute it + query = options[:query] + else + # Otherwise build the query + if last_row + # There was a previous batch. + # Next batch will start after the last returned row + options.merge! :from => last_row, :exclude_starting_row => true + end + + query = connection.table_select_query(options[:table], options) + + if options[:row_buffer_size] + # Set the batch size + query += " limit #{options[:row_buffer_size]}" + end + end + + self.rows = connection.select_all query + self.current_row_index = 0 + end + self.current_row_index < self.rows.size + end + + # Returns the row as a column => value hash and moves the cursor to the next row. + def next_row + raise("no more rows available") unless next? + self.last_row = self.rows[self.current_row_index] + self.current_row_index += 1 + + if self.current_row_index == self.rows.size + self.rows = nil + end + + self.last_row + end + + # Frees up all ressources + def clear + self.rows = nil + end + end + # This class represents a remote activerecord database connection. # Normally created by DatabaseProxy class ProxyConnection extend Forwardable @@ -100,12 +181,10 @@ # * :+table+: name of the table from which to read data # * further options as taken by #table_select_query to build the query # * :+row_buffer_size+: # Integer controlling how many rows a read into memory at one time. def select_cursor(options) - row_buffer_size = options[:row_buffer_size] || DEFAULT_ROW_BUFFER_SIZE - query = options[:query] || table_select_query(options[:table], options) - cursor = connection.select_cursor query, row_buffer_size + cursor = ResultFetcher.new(self, options) if options[:type_cast] cursor = TypeCastingCursor.new(self, options[:table], cursor) end cursor end