lib/tobox/fetcher.rb in tobox-0.6.0 vs lib/tobox/fetcher.rb in tobox-0.6.1
- old
+ new
@@ -28,30 +28,27 @@
.where(run_at_conds)
.order(Sequel.desc(:run_at, nulls: :first), :id)
@batch_size = configuration[:batch_size]
- @mark_as_fetched_params = { attempts: Sequel[@table][:attempts] + 1, last_error: nil }
-
@before_event_handlers = Array(@configuration.lifecycle_events[:before_event])
@after_event_handlers = Array(@configuration.lifecycle_events[:after_event])
@error_event_handlers = Array(@configuration.lifecycle_events[:error_event])
end
def fetch_events(&blk)
num_events = 0
events_tr do
- event_ids = nil
- # @type var event_ids: Array[Integer]
+ events = nil
+ # @type var events: Array[event]?
event_id_tr do
- event_ids = fetch_event_ids
- mark_as_fetched(event_ids) unless event_ids.empty?
+ events = do_fetch_events
end
- if event_ids && !event_ids.empty?
- with_events(event_ids) do |events|
+ if events && !events.empty?
+ with_events(events) do |events|
num_events = events.size
prepare_events(events, &blk)
end
end
@@ -72,85 +69,76 @@
end
def fetch_event_ids
@pick_next_sql.for_update
.skip_locked
- .limit(@batch_size).select_map(:id) # lock starts here
+ .limit(@batch_size).select(:id) # lock starts here
end
- def mark_as_fetched(event_ids)
- @ds.where(id: event_ids).update(@mark_as_fetched_params)
+ def do_fetch_events
+ @ds.where(id: fetch_event_ids).all
end
def events_tr(&block)
@db.transaction(savepoint: false, &block)
end
def event_id_tr
yield
end
- def with_events(event_ids, &blk)
- events, error = yield_events(event_ids, &blk)
+ def with_events(events, &blk)
+ yield_events(events, &blk)
events.each do |event|
- event_error = error || event[:error]
+ event_error = event[:error]
if event_error
event.merge!(mark_as_error(event, event_error))
handle_error_event(event, event_error)
else
handle_after_event(event)
end
end
end
- def yield_events(event_ids)
- events_ds = @ds.where(id: event_ids)
- events = error = nil
+ def yield_events(events)
+ unless events.empty?
+ errors_by_id = catch(:tobox_batch_errors) do
+ yield events
+ nil
+ end
- begin
- events = events_ds.all
+ # some events from batch errored
+ if errors_by_id
+ failed = events.values_at(*errors_by_id.keys)
+ successful = events - failed
- unless events.empty?
- errors_by_id = catch(:tobox_batch_errors) do
- yield events
- nil
+ # fill in with batch error
+ failed.each do |ev|
+ ev[:error] = errors_by_id[events.index(ev)]
end
- # some events from batch errored
- if errors_by_id
- failed = events.values_at(*errors_by_id.keys)
- successful = events - failed
-
- # fill in with batch error
- failed.each do |ev|
- ev[:error] = errors_by_id[events.index(ev)]
- end
-
- # delete successful
- @ds.where(id: successful.map { |ev| ev[:id] }).delete unless successful.empty?
- else
- events_ds.delete
- end
+ # delete successful
+ @ds.where(id: successful.map { |ev| ev[:id] }).delete unless successful.empty?
+ else
+ @ds.where(id: events.map { |ev| ev[:id] }).delete
end
- rescue StandardError => e
- error = e
end
-
- [events, error]
+ rescue StandardError => e
+ events.each do |event|
+ event[:error] = e
+ end
end
def log_message(msg)
"(worker: #{@label}) -> #{msg}"
end
def mark_as_error(event, error)
update_params = {
- run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP,
- seconds: @exponential_retry_factor**(event[:attempts] - 1)),
- # run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP,
- # seconds: Sequel.function(:POWER, Sequel[@table][:attempts] + 1, 4)),
+ run_at: calculate_event_retry_interval(event[:attempts]),
+ attempts: Sequel[@table][:attempts] + 1,
last_error: error.full_message(highlight: false)
}
set_event_retry_attempts(event, update_params)
end
@@ -161,9 +149,15 @@
ds.returning.update(update_params).first
else
ds.update(update_params)
ds.first
end
+ end
+
+ def calculate_event_retry_interval(attempts)
+ # Sequel.date_add(Sequel::CURRENT_TIMESTAMP,
+ # seconds: Sequel.function(:POWER, Sequel[@table][:attempts] + 1, 4)
+ Sequel.date_add(Sequel::CURRENT_TIMESTAMP, seconds: @exponential_retry_factor**attempts)
end
def to_message(event)
{
id: event[:id],