lib/tobox/fetcher.rb in tobox-0.5.1 vs lib/tobox/fetcher.rb in tobox-0.5.2
- old
+ new
@@ -26,91 +26,99 @@
@pick_next_sql = @ds.where(Sequel[@table][:attempts] < max_attempts) # filter out exhausted attempts
.where(run_at_conds)
.order(Sequel.desc(:run_at, nulls: :first), :id)
+ @batch_size = configuration[:batch_size]
+
@mark_as_fetched_params = { attempts: Sequel[@table][:attempts] + 1, last_error: nil }
@before_event_handlers = Array(@configuration.lifecycle_events[:before_event])
@after_event_handlers = Array(@configuration.lifecycle_events[:after_event])
@error_event_handlers = Array(@configuration.lifecycle_events[:error_event])
end
def fetch_events(&blk)
num_events = 0
events_tr do
- event_id = nil
+ event_ids = nil
+ # @type var event_ids: Array[Integer]
event_id_tr do
- event_id = fetch_event_id
- mark_as_fetched(event_id) if event_id
+ event_ids = fetch_event_ids
+ mark_as_fetched(event_ids) unless event_ids.empty?
end
- if event_id
- with_event(event_id) do |event|
- num_events = 1
+ if event_ids && !event_ids.empty?
+ with_events(event_ids) do |events|
+ num_events = events.size
- prepare_event(event, &blk)
+ prepare_events(events, &blk)
end
end
end
num_events
end
private
- def prepare_event(event)
- event[:metadata] = try_json_parse(event[:metadata])
- handle_before_event(event)
- yield(to_message(event))
+ def prepare_events(events)
+ prepared_events = events.map do |event|
+ event[:metadata] = try_json_parse(event[:metadata]) if event[:metadata]
+ handle_before_event(event)
+ to_message(event)
+ end
+ yield(prepared_events)
end
- def fetch_event_id
+ def fetch_event_ids
@pick_next_sql.for_update
.skip_locked
- .limit(1).select_map(:id).first # lock starts here
+ .limit(@batch_size).select_map(:id) # lock starts here
end
- def mark_as_fetched(event_id)
- @ds.where(id: event_id).update(@mark_as_fetched_params)
+ def mark_as_fetched(event_ids)
+ @ds.where(id: event_ids).update(@mark_as_fetched_params)
end
def events_tr(&block)
@db.transaction(savepoint: false, &block)
end
def event_id_tr
yield
end
- def with_event(event_id, &blk)
- event, error = yield_event(event_id, &blk)
+ def with_events(event_ids, &blk)
+ events, error = yield_events(event_ids, &blk)
- if error
- event.merge!(mark_as_error(event, error))
- handle_error_event(event, error)
- else
- handle_after_event(event)
+ events.each do |event|
+ if error
+ event.merge!(mark_as_error(event, error))
+ handle_error_event(event, error)
+ else
+ handle_after_event(event)
+ end
end
end
- def yield_event(event_id)
- events_ds = @ds.where(id: event_id)
- event = error = nil
+ def yield_events(event_ids)
+ events_ds = @ds.where(id: event_ids)
+ events = error = nil
begin
- event = events_ds.first
+ events = events_ds.all
- yield event
+ yield events
- events_ds.delete
+ events_ds.delete unless events.empty?
rescue StandardError => e
error = e
end
- [event, error]
+ [events, error]
end
def log_message(msg)
"(worker: #{@label}) -> #{msg}"
end
@@ -139,18 +147,16 @@
def to_message(event)
{
id: event[:id],
type: event[:type],
- before: try_json_parse(event[:data_before]),
- after: try_json_parse(event[:data_after]),
+ before: (try_json_parse(event[:data_before]) if event[:data_before]),
+ after: (try_json_parse(event[:data_after]) if event[:data_after]),
at: event[:created_at]
}
end
def try_json_parse(data)
- return unless data
-
data = JSON.parse(data.to_s) unless data.respond_to?(:to_hash)
data
end