lib/tobox/fetcher.rb in tobox-0.6.0 vs lib/tobox/fetcher.rb in tobox-0.6.1

- old
+ new

@@ -28,30 +28,27 @@ .where(run_at_conds) .order(Sequel.desc(:run_at, nulls: :first), :id) @batch_size = configuration[:batch_size] - @mark_as_fetched_params = { attempts: Sequel[@table][:attempts] + 1, last_error: nil } - @before_event_handlers = Array(@configuration.lifecycle_events[:before_event]) @after_event_handlers = Array(@configuration.lifecycle_events[:after_event]) @error_event_handlers = Array(@configuration.lifecycle_events[:error_event]) end def fetch_events(&blk) num_events = 0 events_tr do - event_ids = nil - # @type var event_ids: Array[Integer] + events = nil + # @type var events: Array[event]? event_id_tr do - event_ids = fetch_event_ids - mark_as_fetched(event_ids) unless event_ids.empty? + events = do_fetch_events end - if event_ids && !event_ids.empty? - with_events(event_ids) do |events| + if events && !events.empty? + with_events(events) do |events| num_events = events.size prepare_events(events, &blk) end end @@ -72,85 +69,76 @@ end def fetch_event_ids @pick_next_sql.for_update .skip_locked - .limit(@batch_size).select_map(:id) # lock starts here + .limit(@batch_size).select(:id) # lock starts here end - def mark_as_fetched(event_ids) - @ds.where(id: event_ids).update(@mark_as_fetched_params) + def do_fetch_events + @ds.where(id: fetch_event_ids).all end def events_tr(&block) @db.transaction(savepoint: false, &block) end def event_id_tr yield end - def with_events(event_ids, &blk) - events, error = yield_events(event_ids, &blk) + def with_events(events, &blk) + yield_events(events, &blk) events.each do |event| - event_error = error || event[:error] + event_error = event[:error] if event_error event.merge!(mark_as_error(event, event_error)) handle_error_event(event, event_error) else handle_after_event(event) end end end - def yield_events(event_ids) - events_ds = @ds.where(id: event_ids) - events = error = nil + def yield_events(events) + unless events.empty? + errors_by_id = catch(:tobox_batch_errors) do + yield events + nil + end - begin - events = events_ds.all + # some events from batch errored + if errors_by_id + failed = events.values_at(*errors_by_id.keys) + successful = events - failed - unless events.empty? - errors_by_id = catch(:tobox_batch_errors) do - yield events - nil + # fill in with batch error + failed.each do |ev| + ev[:error] = errors_by_id[events.index(ev)] end - # some events from batch errored - if errors_by_id - failed = events.values_at(*errors_by_id.keys) - successful = events - failed - - # fill in with batch error - failed.each do |ev| - ev[:error] = errors_by_id[events.index(ev)] - end - - # delete successful - @ds.where(id: successful.map { |ev| ev[:id] }).delete unless successful.empty? - else - events_ds.delete - end + # delete successful + @ds.where(id: successful.map { |ev| ev[:id] }).delete unless successful.empty? + else + @ds.where(id: events.map { |ev| ev[:id] }).delete end - rescue StandardError => e - error = e end - - [events, error] + rescue StandardError => e + events.each do |event| + event[:error] = e + end end def log_message(msg) "(worker: #{@label}) -> #{msg}" end def mark_as_error(event, error) update_params = { - run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP, - seconds: @exponential_retry_factor**(event[:attempts] - 1)), - # run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP, - # seconds: Sequel.function(:POWER, Sequel[@table][:attempts] + 1, 4)), + run_at: calculate_event_retry_interval(event[:attempts]), + attempts: Sequel[@table][:attempts] + 1, last_error: error.full_message(highlight: false) } set_event_retry_attempts(event, update_params) end @@ -161,9 +149,15 @@ ds.returning.update(update_params).first else ds.update(update_params) ds.first end + end + + def calculate_event_retry_interval(attempts) + # Sequel.date_add(Sequel::CURRENT_TIMESTAMP, + # seconds: Sequel.function(:POWER, Sequel[@table][:attempts] + 1, 4) + Sequel.date_add(Sequel::CURRENT_TIMESTAMP, seconds: @exponential_retry_factor**attempts) end def to_message(event) { id: event[:id],