lib/tobox/fetcher.rb in tobox-0.1.6 vs lib/tobox/fetcher.rb in tobox-0.2.0
- old
+ new
@@ -17,10 +17,11 @@
@db.extension :date_arithmetic
@db.loggers << @logger unless @configuration[:environment] == "production"
@table = configuration[:table]
+ @group_column = configuration[:group_column]
@exponential_retry_factor = configuration[:exponential_retry_factor]
max_attempts = configuration[:max_attempts]
@ds = @db[@table]
@@ -31,22 +32,45 @@
].reduce { |agg, cond| Sequel.expr(agg) | Sequel.expr(cond) }
@pick_next_sql = @ds.where(Sequel[@table][:attempts] < max_attempts) # filter out exhausted attempts
.where(run_at_conds)
.order(Sequel.desc(:run_at, nulls: :first), :id)
- .for_update
- .skip_locked
- .limit(1)
@before_event_handlers = Array(@configuration.lifecycle_events[:before_event])
@after_event_handlers = Array(@configuration.lifecycle_events[:after_event])
@error_event_handlers = Array(@configuration.lifecycle_events[:error_event])
end
def fetch_events(&blk)
num_events = 0
- @db.transaction do
- event_ids = @pick_next_sql.select_map(:id) # lock starts here
+ @db.transaction(savepoint: false) do
+ if @group_column
+ group = @pick_next_sql.for_update
+ .skip_locked
+ .limit(1)
+ .select(@group_column)
+
+ # get total from a group, to compare to the number of future locked rows.
+ total_from_group = @ds.where(@group_column => group).count
+
+ event_ids = @ds.where(@group_column => group)
+ .order(Sequel.desc(:run_at, nulls: :first), :id)
+ .for_update.skip_locked.select_map(:id)
+
+ if event_ids.size != total_from_group
+ # this happens if concurrent workers locked different rows from the same group,
+ # or when new rows from a given group have been inserted after the lock has been
+ # acquired
+ event_ids = []
+ end
+
+ # lock all, process 1
+ event_ids = event_ids[0, 1]
+ else
+ event_ids = @pick_next_sql.for_update
+ .skip_locked
+ .limit(1).select_map(:id) # lock starts here
+ end
events = nil
error = nil
unless event_ids.empty?
@db.transaction(savepoint: true) do