lib/sequel/extensions/batches.rb in sequel-batches-0.1.3 vs lib/sequel/extensions/batches.rb in sequel-batches-0.2.0
- old
+ new
@@ -3,66 +3,55 @@
module Sequel
module Extensions
module Batches
MissingPKError = Class.new(StandardError)
+ NullPKError = Class.new(StandardError)
+ InvalidPKError = Class.new(StandardError)
- def in_batches(pk: nil, of: 1000, start: {}, finish: {})
- pk ||= self.db.schema(first_source)
- .select{|r| r[1][:primary_key]}
- .map(&:first) or raise MissingPKError
- qualified_pk = pk.map { |c| Sequel[first_source][c] }
+ def in_batches(pk: nil, of: 1000, start: nil, finish: nil)
+ pk ||= db.schema(first_source).select { |x| x[1][:primary_key] }.map(&:first)
+ raise MissingPKError if pk.empty?
- pk_expr = (-> (pk:) do
- pk.map do |col|
- colname = col.is_a?(Symbol) ? col : col.column
- Sequel.as(
- Sequel.pg_array(
- [
- Sequel.function(:min, col),
- Sequel.function(:max, col)
- ]
- ), :"#{colname}"
- )
- end
- end)
+ qualified_pk = pk.map { |x| Sequel[first_source][x] }
- entire_min_max = self.order(*pk).select(*pk_expr.call(pk: qualified_pk)).first
- min_max = {}
+ check_pk = lambda do |input_pk|
+ raise InvalidPKError if input_pk.keys != pk
+ input_pk
+ end
- range_expr = (-> (col, range) do
- Sequel.&(
- Sequel.expr(Sequel[first_source][col]) >= range[0],
- Sequel.expr(Sequel[first_source][col]) <= range[1],
- )
- end)
+ conditions = lambda do |pk, sign:|
+ raise NullPKError if pk.values.any?(&:nil?)
+ row_expr = Sequel.function(:row, *pk.values)
+ Sequel.function(:row, *qualified_pk).public_send(sign, row_expr)
+ end
+ base_ds = order(*qualified_pk)
+ base_ds = base_ds.where(conditions.call(check_pk.call(start), sign: :>=)) if start
+ base_ds = base_ds.where(conditions.call(check_pk.call(finish), sign: :<=)) if finish
+
+ pk_ds = db.from(base_ds).select(*pk).order(*pk)
+ actual_start = pk_ds.first
+ actual_finish = pk_ds.last
+
+ return unless actual_start && actual_finish
+
+ base_ds = base_ds.where(conditions.call(actual_start, sign: :>=))
+ base_ds = base_ds.where(conditions.call(actual_finish, sign: :<=))
+
+ current_instance = nil
+
loop do
- pk.each do |col|
- entire_min_max[col][0] = start[col] || entire_min_max[col][0]
- entire_min_max[col][1] = finish[col] || entire_min_max[col][1]
+ if current_instance
+ working_ds = base_ds.where(conditions.call(current_instance.to_h, sign: :>))
+ else
+ working_ds = base_ds
end
- ds = self.order(*qualified_pk).limit(of).where(
- Sequel.&(*pk.map { |col| range_expr.call(col, entire_min_max[col]) })
- )
- if min_max.present?
- pk_combinations = pk.each_with_index.map { |x, i| pk[0..-i] }
- ds = ds.where(Sequel.|(*pk_combinations.each_with_index.map do |pks, i|
- Sequel.&(*pks.each_with_index.map do |col, j|
- if j == i
- Sequel[first_source][col] > min_max[col].last
- else
- Sequel[first_source][col] >= min_max[col].last
- end
- end)
- end))
- end
+ current_instance = db.from(working_ds.limit(of)).select(*pk).order(*pk).last or break
+ working_ds = working_ds.where(conditions.call(current_instance.to_h, sign: :<=))
- min_max = self.db.from(ds).select(*pk_expr.call(pk: pk)).first
-
- break if min_max.values.flatten.any?(&:blank?)
- yield self.where(Sequel.&(*pk.map { |col| range_expr.call(col, min_max[col]) }))
+ yield working_ds
end
end
private