# frozen_string_literal: true ################################################################################ # PostgreSQLCursor: library class provides postgresql cursor for large result # set processing. Requires ActiveRecord, but can be adapted to other DBI/ORM libraries. # If you don't use AR, this assumes #connection and #instantiate methods are available. # # options - Hash to control operation and loop breaks # connection: instance - ActiveRecord connection to use # fraction: 0.1..1.0 - The cursor_tuple_fraction (default 1.0) # block_size: 1..n - The number of rows to fetch per db block fetch # while: value - Exits loop when block does not return this value. # until: value - Exits loop when block returns this value. # with_hold: boolean - Allows the query to remain open across commit points. # cursor_name: string - Allows you to name your cursor. # # Exmaples: # PostgreSQLCursor::Cursor.new("select ...").each { |hash| ... } # ActiveRecordModel.where(...).each_row { |hash| ... } # ActiveRecordModel.each_row_by_sql("select ...") { |hash| ... } # ActiveRecordModel.each_instance_by_sql("select ...") { |model| ... } # module PostgreSQLCursor class Cursor include Enumerable attr_reader :sql, :options, :connection, :count, :result # Public: Start a new PostgreSQL cursor query # sql - The SQL statement with interpolated values # options - hash of processing controls # while: value - Exits loop when block does not return this value. # until: value - Exits loop when block returns this value. # fraction: 0.1..1.0 - The cursor_tuple_fraction (default 1.0) # block_size: 1..n - The number of rows to fetch per db block fetch # Defaults to 1000 # with_hold - Allows the query to remain open across commit points. # # Examples # # PostgreSQLCursor::Cursor.new("select ....") # # Returns the cursor object when called with new. def initialize(sql, options = {}) @sql = sql @options = options @connection = @options.fetch(:connection) { ::ActiveRecord::Base.connection } @count = 0 @iterate = options[:instances] ? :each_instance : :each_row @batched = false end # Specify the type to instantiate, or reset to return a Hash. # # Explicitly check for type class to prevent calling equality # operator on active record relation, which will load it. def iterate_type(type = nil) if type.nil? || (type.instance_of?(Class) && type == Hash) @iterate = :each_row elsif type.instance_of?(Class) && type == Array @iterate = :each_array else @iterate = :each_instance @type = type end self end def iterate_batched(batched = true) @batched = batched self end # ActiveRecord call #size when rendering a collection # Define it and return some dummy value def size -1 end # Public: Yields each row of the result set to the passed block # # Yields the row to the block. The row is a hash with symbolized keys. # {colname: value, ....} # # Returns the count of rows processed def each(&block) if @iterate == :each_row @batched ? each_row_batch(&block) : each_row(&block) elsif @iterate == :each_array @batched ? each_array_batch(&block) : each_array(&block) else @batched ? each_instance_batch(@type, &block) : each_instance(@type, &block) end end def each_row(&block) each_tuple do |row| row = row.symbolize_keys if @options[:symbolize_keys] block.call(row) end end def each_array(&block) old_iterate = @iterate @iterate = :each_array begin rv = each_tuple do |row| block.call(row) end ensure @iterate = old_iterate end rv end def each_instance(klass = nil, &block) klass ||= @type each_tuple do |row| if ::ActiveRecord::VERSION::MAJOR < 4 model = klass.send(:instantiate, row) else @column_types ||= column_types model = klass.send(:instantiate, row, @column_types) end block.call(model) end end def each_row_batch(&block) each_batch do |batch| batch.map!(&:symbolize_keys) if @options[:symbolize_keys] block.call(batch) end end def each_array_batch(&block) old_iterate = @iterate @iterate = :each_array begin rv = each_batch do |batch| block.call(batch) end ensure @iterate = old_iterate end rv end def each_instance_batch(klass = nil, &block) klass ||= @type each_batch do |batch| models = batch.map do |row| if ::ActiveRecord::VERSION::MAJOR < 4 klass.send(:instantiate, row) else @column_types ||= column_types klass.send(:instantiate, row, @column_types) end end block.call(models) end end # Returns an array of columns plucked from the result rows. # Experimental function, as this could still use too much memory # and negate the purpose of this libarary. # Should this return a lazy enumerator instead? def pluck(*cols) options = cols.last.is_a?(Hash) ? cols.pop : {} @options.merge!(options) @options[:symbolize_keys] = true iterate_type(options[:class]) if options[:class] cols = cols.map { |c| c.to_sym } result = [] each do |row| row = row.symbolize_keys if row.is_a?(Hash) result << cols.map { |c| row[c] } end result.flatten! if cols.size == 1 result end def each_tuple(&block) # :nodoc: has_do_until = @options.has_key?(:until) has_do_while = @options.has_key?(:while) @count = 0 @column_types = nil with_optional_transaction do open while (row = fetch) break if row.size == 0 @count += 1 rc = block.call(row) break if has_do_until && rc == @options[:until] break if has_do_while && rc != @options[:while] end rescue => e raise e ensure close if @block && connection.active? end @count end def each_batch(&block) # :nodoc: has_do_until = @options.key?(:until) has_do_while = @options.key?(:while) @count = 0 @column_types = nil with_optional_transaction do open while (batch = fetch_block) break if batch.empty? @count += 1 rc = block.call(batch) break if has_do_until && rc == @options[:until] break if has_do_while && rc != @options[:while] end ensure close if @block && connection.active? end @count end def cast_types(row) row end def column_types return nil if ::ActiveRecord::VERSION::MAJOR < 4 return @column_types if @column_types types = {} fields = @result.fields fields.each_with_index do |fname, i| ftype = @result.ftype(i) fmod = @result.fmod(i) # From @netrusov 2023-01-18. This is the same call used in the PostgreSQL Adapter types[fname] = @connection.send(:get_oid_type, ftype, fmod, fname) # # From @simi 2023-01-18 (Works as well, used old calling method) # types[fname] = @connection.get_type_map.fetch(ftype) end @column_types = types end # Public: Opens (actually, "declares") the cursor. Call this before fetching def open set_cursor_tuple_fraction @cursor = @options[:cursor_name] || ("cursor_" + SecureRandom.uuid.delete("-")) hold = @options[:with_hold] ? "with hold " : "" @result = @connection.execute("declare #{@cursor} no scroll cursor #{hold}for #{@sql}") @block = [] end # Public: Returns the next row from the cursor, or empty hash if end of results # # Returns a row as a hash of {'colname'=>value,...} def fetch(options = {}) open unless @block fetch_block if @block.size == 0 row = @block.shift row = row.symbolize_keys if row && options[:symbolize_keys] row end # Private: Fetches the next block of rows into @block def fetch_block(block_size = nil) block_size ||= @block_size ||= @options.fetch(:block_size, 1000) @result = @connection.execute("fetch #{block_size} from #{@cursor}") @block = if @iterate == :each_array @result.each_row.collect { |row| row } else @result.collect { |row| row } end end # Public: Closes the cursor def close @connection.execute("close #{@cursor}") end # Private: Open transaction unless with_hold option, specified def with_optional_transaction if @options[:with_hold] yield else @connection.transaction { yield } end end # Private: Sets the PostgreSQL cursor_tuple_fraction value = 1.0 to assume all rows will be fetched # This is a value between 0.1 and 1.0 (PostgreSQL defaults to 0.1, this library defaults to 1.0) # used to determine the expected fraction (percent) of result rows returned the the caller. # This value determines the access path by the query planner. def set_cursor_tuple_fraction(frac = 1.0) @cursor_tuple_fraction ||= @options.fetch(:fraction, 1.0) return @cursor_tuple_fraction if frac == @cursor_tuple_fraction @cursor_tuple_fraction = frac @result = @connection.execute("set cursor_tuple_fraction to #{frac}") frac end end end