lib/tobox/fetcher.rb in tobox-0.4.5 vs lib/tobox/fetcher.rb in tobox-0.5.0
- old
+ new
@@ -11,18 +11,14 @@
@logger = @configuration.default_logger
@db = configuration.database
@table = configuration[:table]
- @group_column = configuration[:group_column]
@exponential_retry_factor = configuration[:exponential_retry_factor]
max_attempts = configuration[:max_attempts]
- @inbox_table = configuration[:inbox_table]
- @inbox_column = configuration[:inbox_column]
-
@ds = @db[@table]
run_at_conds = [
{ Sequel[@table][:run_at] => nil },
(Sequel.expr(Sequel[@table][:run_at]) < Sequel::CURRENT_TIMESTAMP)
@@ -30,108 +26,130 @@
@pick_next_sql = @ds.where(Sequel[@table][:attempts] < max_attempts) # filter out exhausted attempts
.where(run_at_conds)
.order(Sequel.desc(:run_at, nulls: :first), :id)
+ @mark_as_fetched_params = { attempts: Sequel[@table][:attempts] + 1, last_error: nil }
+
@before_event_handlers = Array(@configuration.lifecycle_events[:before_event])
@after_event_handlers = Array(@configuration.lifecycle_events[:after_event])
@error_event_handlers = Array(@configuration.lifecycle_events[:error_event])
end
def fetch_events(&blk)
num_events = 0
- @db.transaction(savepoint: false) do
- if @group_column
- group = @pick_next_sql.for_update
- .skip_locked
- .limit(1)
- .select(@group_column)
+ events_tr do
+ event_ids = EMPTY
- # get total from a group, to compare to the number of future locked rows.
- total_from_group = @ds.where(@group_column => group).count
-
- event_ids = @ds.where(@group_column => group)
- .order(Sequel.desc(:run_at, nulls: :first), :id)
- .for_update.skip_locked.select_map(:id)
-
- if event_ids.size != total_from_group
- # this happens if concurrent workers locked different rows from the same group,
- # or when new rows from a given group have been inserted after the lock has been
- # acquired
- event_ids = []
- end
-
- # lock all, process 1
- event_ids = event_ids[0, 1]
- else
- event_ids = @pick_next_sql.for_update
- .skip_locked
- .limit(1).select_map(:id) # lock starts here
+ event_ids_tr do
+ event_ids = fetch_event_ids
+ mark_as_fetched(event_ids)
end
- events = nil
- error = nil
+ evts = nil
+
unless event_ids.empty?
- @db.transaction(savepoint: true) do
- events = @ds.where(id: event_ids).returning.delete
+ with_events(event_ids) do |events|
+ evts = events
+ num_events = events.count
- if blk
- num_events = events.size
-
- events.map! do |ev|
- try_insert_inbox(ev) do
- ev[:metadata] = try_json_parse(ev[:metadata])
- handle_before_event(ev)
- yield(to_message(ev))
- ev
- end
- rescue StandardError => e
- error = e
- raise Sequel::Rollback
- end.compact!
- else
- events.map!(&method(:to_message))
+ evts = events.filter_map do |ev|
+ prepare_event(ev, &blk)
end
end
end
- return blk ? 0 : [] if events.nil?
-
- return events unless blk
-
- if events
- events.each do |event|
- if error
- event.merge!(mark_as_error(event, error))
- handle_error_event(event, error)
- else
- handle_after_event(event)
- end
- end
- end
+ return 0 if evts.nil?
end
num_events
end
private
+ def prepare_event(event)
+ event[:metadata] = try_json_parse(event[:metadata])
+ handle_before_event(event)
+ yield(to_message(event))
+ event
+ end
+
+ def fetch_event_ids
+ @pick_next_sql.for_update
+ .skip_locked
+ .limit(1).select_map(:id) # lock starts here
+ end
+
+ def mark_as_fetched(event_ids)
+ @ds.where(id: event_ids).update(@mark_as_fetched_params) unless event_ids.empty?
+ end
+
+ def events_tr(&block)
+ @db.transaction(savepoint: false, &block)
+ end
+
+ def event_ids_tr
+ yield
+ end
+
+ def with_events(event_ids, &blk)
+ events, error = yield_events(event_ids, &blk)
+
+ events.each do |event|
+ if error
+ event.merge!(mark_as_error(event, error))
+ handle_error_event(event, error)
+ else
+ handle_after_event(event)
+ end
+ end
+ end
+
+ def yield_events(event_ids)
+ events_ds = @ds.where(id: event_ids)
+ events = EMPTY
+ error = nil
+
+ begin
+ events = events_ds.all
+
+ yield events
+
+ events_ds.delete
+ rescue StandardError => e
+ error = e
+ end
+
+ [events, error]
+ end
+
def log_message(msg)
"(worker: #{@label}) -> #{msg}"
end
def mark_as_error(event, error)
- @ds.where(id: event[:id]).returning.update(
- attempts: Sequel[@table][:attempts] + 1,
+ update_params = {
run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP,
- seconds: event[:attempts] + (1**@exponential_retry_factor)),
+ seconds: @exponential_retry_factor**(event[:attempts] - 1)),
# run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP,
# seconds: Sequel.function(:POWER, Sequel[@table][:attempts] + 1, 4)),
last_error: "#{error.message}\n#{error.backtrace.join("\n")}"
- ).first
+ }
+
+ set_event_retry_attempts(event, update_params)
end
+ def set_event_retry_attempts(event, update_params)
+ ds = @ds.where(id: event[:id])
+ if @ds.supports_returning?(:update)
+ ds.returning.update(update_params).first
+ else
+ ds.update(update_params)
+ ds.first
+ end
+ end
+
def to_message(event)
{
id: event[:id],
type: event[:type],
before: try_json_parse(event[:data_before]),
@@ -144,19 +162,9 @@
return unless data
data = JSON.parse(data.to_s) unless data.respond_to?(:to_hash)
data
- end
-
- def try_insert_inbox(event)
- return yield unless @inbox_table && @inbox_column
-
- ret = @db[@inbox_table].insert_conflict.insert(@inbox_column => event[@inbox_column])
-
- return unless ret
-
- yield
end
def handle_before_event(event)
@logger.debug do
log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) starting...")