lib/tobox/fetcher.rb in tobox-0.5.1 vs lib/tobox/fetcher.rb in tobox-0.5.2

- old
+ new

@@ -26,91 +26,99 @@ @pick_next_sql = @ds.where(Sequel[@table][:attempts] < max_attempts) # filter out exhausted attempts .where(run_at_conds) .order(Sequel.desc(:run_at, nulls: :first), :id) + @batch_size = configuration[:batch_size] + @mark_as_fetched_params = { attempts: Sequel[@table][:attempts] + 1, last_error: nil } @before_event_handlers = Array(@configuration.lifecycle_events[:before_event]) @after_event_handlers = Array(@configuration.lifecycle_events[:after_event]) @error_event_handlers = Array(@configuration.lifecycle_events[:error_event]) end def fetch_events(&blk) num_events = 0 events_tr do - event_id = nil + event_ids = nil + # @type var event_ids: Array[Integer] event_id_tr do - event_id = fetch_event_id - mark_as_fetched(event_id) if event_id + event_ids = fetch_event_ids + mark_as_fetched(event_ids) unless event_ids.empty? end - if event_id - with_event(event_id) do |event| - num_events = 1 + if event_ids && !event_ids.empty? + with_events(event_ids) do |events| + num_events = events.size - prepare_event(event, &blk) + prepare_events(events, &blk) end end end num_events end private - def prepare_event(event) - event[:metadata] = try_json_parse(event[:metadata]) - handle_before_event(event) - yield(to_message(event)) + def prepare_events(events) + prepared_events = events.map do |event| + event[:metadata] = try_json_parse(event[:metadata]) if event[:metadata] + handle_before_event(event) + to_message(event) + end + yield(prepared_events) end - def fetch_event_id + def fetch_event_ids @pick_next_sql.for_update .skip_locked - .limit(1).select_map(:id).first # lock starts here + .limit(@batch_size).select_map(:id) # lock starts here end - def mark_as_fetched(event_id) - @ds.where(id: event_id).update(@mark_as_fetched_params) + def mark_as_fetched(event_ids) + @ds.where(id: event_ids).update(@mark_as_fetched_params) end def events_tr(&block) @db.transaction(savepoint: false, &block) end def event_id_tr yield end - def with_event(event_id, &blk) - event, error = yield_event(event_id, &blk) + def with_events(event_ids, &blk) + events, error = yield_events(event_ids, &blk) - if error - event.merge!(mark_as_error(event, error)) - handle_error_event(event, error) - else - handle_after_event(event) + events.each do |event| + if error + event.merge!(mark_as_error(event, error)) + handle_error_event(event, error) + else + handle_after_event(event) + end end end - def yield_event(event_id) - events_ds = @ds.where(id: event_id) - event = error = nil + def yield_events(event_ids) + events_ds = @ds.where(id: event_ids) + events = error = nil begin - event = events_ds.first + events = events_ds.all - yield event + yield events - events_ds.delete + events_ds.delete unless events.empty? rescue StandardError => e error = e end - [event, error] + [events, error] end def log_message(msg) "(worker: #{@label}) -> #{msg}" end @@ -139,18 +147,16 @@ def to_message(event) { id: event[:id], type: event[:type], - before: try_json_parse(event[:data_before]), - after: try_json_parse(event[:data_after]), + before: (try_json_parse(event[:data_before]) if event[:data_before]), + after: (try_json_parse(event[:data_after]) if event[:data_after]), at: event[:created_at] } end def try_json_parse(data) - return unless data - data = JSON.parse(data.to_s) unless data.respond_to?(:to_hash) data end