lib/tobox/fetcher.rb in tobox-0.5.0 vs lib/tobox/fetcher.rb in tobox-0.5.1

- old
+ new

@@ -36,31 +36,24 @@ end def fetch_events(&blk) num_events = 0 events_tr do - event_ids = EMPTY + event_id = nil - event_ids_tr do - event_ids = fetch_event_ids - mark_as_fetched(event_ids) + event_id_tr do + event_id = fetch_event_id + mark_as_fetched(event_id) if event_id end - evts = nil + if event_id + with_event(event_id) do |event| + num_events = 1 - unless event_ids.empty? - with_events(event_ids) do |events| - evts = events - num_events = events.count - - evts = events.filter_map do |ev| - prepare_event(ev, &blk) - end + prepare_event(event, &blk) end end - - return 0 if evts.nil? end num_events end @@ -68,59 +61,55 @@ def prepare_event(event) event[:metadata] = try_json_parse(event[:metadata]) handle_before_event(event) yield(to_message(event)) - event end - def fetch_event_ids + def fetch_event_id @pick_next_sql.for_update .skip_locked - .limit(1).select_map(:id) # lock starts here + .limit(1).select_map(:id).first # lock starts here end - def mark_as_fetched(event_ids) - @ds.where(id: event_ids).update(@mark_as_fetched_params) unless event_ids.empty? + def mark_as_fetched(event_id) + @ds.where(id: event_id).update(@mark_as_fetched_params) end def events_tr(&block) @db.transaction(savepoint: false, &block) end - def event_ids_tr + def event_id_tr yield end - def with_events(event_ids, &blk) - events, error = yield_events(event_ids, &blk) + def with_event(event_id, &blk) + event, error = yield_event(event_id, &blk) - events.each do |event| - if error - event.merge!(mark_as_error(event, error)) - handle_error_event(event, error) - else - handle_after_event(event) - end + if error + event.merge!(mark_as_error(event, error)) + handle_error_event(event, error) + else + handle_after_event(event) end end - def yield_events(event_ids) - events_ds = @ds.where(id: event_ids) - events = EMPTY - error = nil + def yield_event(event_id) + events_ds = @ds.where(id: event_id) + event = error = nil begin - events = events_ds.all + event = events_ds.first - yield events + yield event events_ds.delete rescue StandardError => e error = e end - [events, error] + [event, error] end def log_message(msg) "(worker: #{@label}) -> #{msg}" end