lib/tobox/fetcher.rb in tobox-0.1.6 vs lib/tobox/fetcher.rb in tobox-0.2.0

- old
+ new

@@ -17,10 +17,11 @@ @db.extension :date_arithmetic @db.loggers << @logger unless @configuration[:environment] == "production" @table = configuration[:table] + @group_column = configuration[:group_column] @exponential_retry_factor = configuration[:exponential_retry_factor] max_attempts = configuration[:max_attempts] @ds = @db[@table] @@ -31,22 +32,45 @@ ].reduce { |agg, cond| Sequel.expr(agg) | Sequel.expr(cond) } @pick_next_sql = @ds.where(Sequel[@table][:attempts] < max_attempts) # filter out exhausted attempts .where(run_at_conds) .order(Sequel.desc(:run_at, nulls: :first), :id) - .for_update - .skip_locked - .limit(1) @before_event_handlers = Array(@configuration.lifecycle_events[:before_event]) @after_event_handlers = Array(@configuration.lifecycle_events[:after_event]) @error_event_handlers = Array(@configuration.lifecycle_events[:error_event]) end def fetch_events(&blk) num_events = 0 - @db.transaction do - event_ids = @pick_next_sql.select_map(:id) # lock starts here + @db.transaction(savepoint: false) do + if @group_column + group = @pick_next_sql.for_update + .skip_locked + .limit(1) + .select(@group_column) + + # get total from a group, to compare to the number of future locked rows. + total_from_group = @ds.where(@group_column => group).count + + event_ids = @ds.where(@group_column => group) + .order(Sequel.desc(:run_at, nulls: :first), :id) + .for_update.skip_locked.select_map(:id) + + if event_ids.size != total_from_group + # this happens if concurrent workers locked different rows from the same group, + # or when new rows from a given group have been inserted after the lock has been + # acquired + event_ids = [] + end + + # lock all, process 1 + event_ids = event_ids[0, 1] + else + event_ids = @pick_next_sql.for_update + .skip_locked + .limit(1).select_map(:id) # lock starts here + end events = nil error = nil unless event_ids.empty? @db.transaction(savepoint: true) do