lib/tobox/fetcher.rb in tobox-0.5.0 vs lib/tobox/fetcher.rb in tobox-0.5.1
- old
+ new
@@ -36,31 +36,24 @@
end
def fetch_events(&blk)
num_events = 0
events_tr do
- event_ids = EMPTY
+ event_id = nil
- event_ids_tr do
- event_ids = fetch_event_ids
- mark_as_fetched(event_ids)
+ event_id_tr do
+ event_id = fetch_event_id
+ mark_as_fetched(event_id) if event_id
end
- evts = nil
+ if event_id
+ with_event(event_id) do |event|
+ num_events = 1
- unless event_ids.empty?
- with_events(event_ids) do |events|
- evts = events
- num_events = events.count
-
- evts = events.filter_map do |ev|
- prepare_event(ev, &blk)
- end
+ prepare_event(event, &blk)
end
end
-
- return 0 if evts.nil?
end
num_events
end
@@ -68,59 +61,55 @@
def prepare_event(event)
event[:metadata] = try_json_parse(event[:metadata])
handle_before_event(event)
yield(to_message(event))
- event
end
- def fetch_event_ids
+ def fetch_event_id
@pick_next_sql.for_update
.skip_locked
- .limit(1).select_map(:id) # lock starts here
+ .limit(1).select_map(:id).first # lock starts here
end
- def mark_as_fetched(event_ids)
- @ds.where(id: event_ids).update(@mark_as_fetched_params) unless event_ids.empty?
+ def mark_as_fetched(event_id)
+ @ds.where(id: event_id).update(@mark_as_fetched_params)
end
def events_tr(&block)
@db.transaction(savepoint: false, &block)
end
- def event_ids_tr
+ def event_id_tr
yield
end
- def with_events(event_ids, &blk)
- events, error = yield_events(event_ids, &blk)
+ def with_event(event_id, &blk)
+ event, error = yield_event(event_id, &blk)
- events.each do |event|
- if error
- event.merge!(mark_as_error(event, error))
- handle_error_event(event, error)
- else
- handle_after_event(event)
- end
+ if error
+ event.merge!(mark_as_error(event, error))
+ handle_error_event(event, error)
+ else
+ handle_after_event(event)
end
end
- def yield_events(event_ids)
- events_ds = @ds.where(id: event_ids)
- events = EMPTY
- error = nil
+ def yield_event(event_id)
+ events_ds = @ds.where(id: event_id)
+ event = error = nil
begin
- events = events_ds.all
+ event = events_ds.first
- yield events
+ yield event
events_ds.delete
rescue StandardError => e
error = e
end
- [events, error]
+ [event, error]
end
def log_message(msg)
"(worker: #{@label}) -> #{msg}"
end