lib/sequel/extensions/batches.rb in sequel-batches-0.1.3 vs lib/sequel/extensions/batches.rb in sequel-batches-0.2.0

- old
+ new

@@ -3,66 +3,55 @@ module Sequel module Extensions module Batches MissingPKError = Class.new(StandardError) + NullPKError = Class.new(StandardError) + InvalidPKError = Class.new(StandardError) - def in_batches(pk: nil, of: 1000, start: {}, finish: {}) - pk ||= self.db.schema(first_source) - .select{|r| r[1][:primary_key]} - .map(&:first) or raise MissingPKError - qualified_pk = pk.map { |c| Sequel[first_source][c] } + def in_batches(pk: nil, of: 1000, start: nil, finish: nil) + pk ||= db.schema(first_source).select { |x| x[1][:primary_key] }.map(&:first) + raise MissingPKError if pk.empty? - pk_expr = (-> (pk:) do - pk.map do |col| - colname = col.is_a?(Symbol) ? col : col.column - Sequel.as( - Sequel.pg_array( - [ - Sequel.function(:min, col), - Sequel.function(:max, col) - ] - ), :"#{colname}" - ) - end - end) + qualified_pk = pk.map { |x| Sequel[first_source][x] } - entire_min_max = self.order(*pk).select(*pk_expr.call(pk: qualified_pk)).first - min_max = {} + check_pk = lambda do |input_pk| + raise InvalidPKError if input_pk.keys != pk + input_pk + end - range_expr = (-> (col, range) do - Sequel.&( - Sequel.expr(Sequel[first_source][col]) >= range[0], - Sequel.expr(Sequel[first_source][col]) <= range[1], - ) - end) + conditions = lambda do |pk, sign:| + raise NullPKError if pk.values.any?(&:nil?) + row_expr = Sequel.function(:row, *pk.values) + Sequel.function(:row, *qualified_pk).public_send(sign, row_expr) + end + base_ds = order(*qualified_pk) + base_ds = base_ds.where(conditions.call(check_pk.call(start), sign: :>=)) if start + base_ds = base_ds.where(conditions.call(check_pk.call(finish), sign: :<=)) if finish + + pk_ds = db.from(base_ds).select(*pk).order(*pk) + actual_start = pk_ds.first + actual_finish = pk_ds.last + + return unless actual_start && actual_finish + + base_ds = base_ds.where(conditions.call(actual_start, sign: :>=)) + base_ds = base_ds.where(conditions.call(actual_finish, sign: :<=)) + + current_instance = nil + loop do - pk.each do |col| - entire_min_max[col][0] = start[col] || entire_min_max[col][0] - entire_min_max[col][1] = finish[col] || entire_min_max[col][1] + if current_instance + working_ds = base_ds.where(conditions.call(current_instance.to_h, sign: :>)) + else + working_ds = base_ds end - ds = self.order(*qualified_pk).limit(of).where( - Sequel.&(*pk.map { |col| range_expr.call(col, entire_min_max[col]) }) - ) - if min_max.present? - pk_combinations = pk.each_with_index.map { |x, i| pk[0..-i] } - ds = ds.where(Sequel.|(*pk_combinations.each_with_index.map do |pks, i| - Sequel.&(*pks.each_with_index.map do |col, j| - if j == i - Sequel[first_source][col] > min_max[col].last - else - Sequel[first_source][col] >= min_max[col].last - end - end) - end)) - end + current_instance = db.from(working_ds.limit(of)).select(*pk).order(*pk).last or break + working_ds = working_ds.where(conditions.call(current_instance.to_h, sign: :<=)) - min_max = self.db.from(ds).select(*pk_expr.call(pk: pk)).first - - break if min_max.values.flatten.any?(&:blank?) - yield self.where(Sequel.&(*pk.map { |col| range_expr.call(col, min_max[col]) })) + yield working_ds end end private