lib/tobox/fetcher.rb in tobox-0.4.5 vs lib/tobox/fetcher.rb in tobox-0.5.0

- old
+ new

@@ -11,18 +11,14 @@ @logger = @configuration.default_logger @db = configuration.database @table = configuration[:table] - @group_column = configuration[:group_column] @exponential_retry_factor = configuration[:exponential_retry_factor] max_attempts = configuration[:max_attempts] - @inbox_table = configuration[:inbox_table] - @inbox_column = configuration[:inbox_column] - @ds = @db[@table] run_at_conds = [ { Sequel[@table][:run_at] => nil }, (Sequel.expr(Sequel[@table][:run_at]) < Sequel::CURRENT_TIMESTAMP) @@ -30,108 +26,130 @@ @pick_next_sql = @ds.where(Sequel[@table][:attempts] < max_attempts) # filter out exhausted attempts .where(run_at_conds) .order(Sequel.desc(:run_at, nulls: :first), :id) + @mark_as_fetched_params = { attempts: Sequel[@table][:attempts] + 1, last_error: nil } + @before_event_handlers = Array(@configuration.lifecycle_events[:before_event]) @after_event_handlers = Array(@configuration.lifecycle_events[:after_event]) @error_event_handlers = Array(@configuration.lifecycle_events[:error_event]) end def fetch_events(&blk) num_events = 0 - @db.transaction(savepoint: false) do - if @group_column - group = @pick_next_sql.for_update - .skip_locked - .limit(1) - .select(@group_column) + events_tr do + event_ids = EMPTY - # get total from a group, to compare to the number of future locked rows. - total_from_group = @ds.where(@group_column => group).count - - event_ids = @ds.where(@group_column => group) - .order(Sequel.desc(:run_at, nulls: :first), :id) - .for_update.skip_locked.select_map(:id) - - if event_ids.size != total_from_group - # this happens if concurrent workers locked different rows from the same group, - # or when new rows from a given group have been inserted after the lock has been - # acquired - event_ids = [] - end - - # lock all, process 1 - event_ids = event_ids[0, 1] - else - event_ids = @pick_next_sql.for_update - .skip_locked - .limit(1).select_map(:id) # lock starts here + event_ids_tr do + event_ids = fetch_event_ids + mark_as_fetched(event_ids) end - events = nil - error = nil + evts = nil + unless event_ids.empty? - @db.transaction(savepoint: true) do - events = @ds.where(id: event_ids).returning.delete + with_events(event_ids) do |events| + evts = events + num_events = events.count - if blk - num_events = events.size - - events.map! do |ev| - try_insert_inbox(ev) do - ev[:metadata] = try_json_parse(ev[:metadata]) - handle_before_event(ev) - yield(to_message(ev)) - ev - end - rescue StandardError => e - error = e - raise Sequel::Rollback - end.compact! - else - events.map!(&method(:to_message)) + evts = events.filter_map do |ev| + prepare_event(ev, &blk) end end end - return blk ? 0 : [] if events.nil? - - return events unless blk - - if events - events.each do |event| - if error - event.merge!(mark_as_error(event, error)) - handle_error_event(event, error) - else - handle_after_event(event) - end - end - end + return 0 if evts.nil? end num_events end private + def prepare_event(event) + event[:metadata] = try_json_parse(event[:metadata]) + handle_before_event(event) + yield(to_message(event)) + event + end + + def fetch_event_ids + @pick_next_sql.for_update + .skip_locked + .limit(1).select_map(:id) # lock starts here + end + + def mark_as_fetched(event_ids) + @ds.where(id: event_ids).update(@mark_as_fetched_params) unless event_ids.empty? + end + + def events_tr(&block) + @db.transaction(savepoint: false, &block) + end + + def event_ids_tr + yield + end + + def with_events(event_ids, &blk) + events, error = yield_events(event_ids, &blk) + + events.each do |event| + if error + event.merge!(mark_as_error(event, error)) + handle_error_event(event, error) + else + handle_after_event(event) + end + end + end + + def yield_events(event_ids) + events_ds = @ds.where(id: event_ids) + events = EMPTY + error = nil + + begin + events = events_ds.all + + yield events + + events_ds.delete + rescue StandardError => e + error = e + end + + [events, error] + end + def log_message(msg) "(worker: #{@label}) -> #{msg}" end def mark_as_error(event, error) - @ds.where(id: event[:id]).returning.update( - attempts: Sequel[@table][:attempts] + 1, + update_params = { run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP, - seconds: event[:attempts] + (1**@exponential_retry_factor)), + seconds: @exponential_retry_factor**(event[:attempts] - 1)), # run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP, # seconds: Sequel.function(:POWER, Sequel[@table][:attempts] + 1, 4)), last_error: "#{error.message}\n#{error.backtrace.join("\n")}" - ).first + } + + set_event_retry_attempts(event, update_params) end + def set_event_retry_attempts(event, update_params) + ds = @ds.where(id: event[:id]) + if @ds.supports_returning?(:update) + ds.returning.update(update_params).first + else + ds.update(update_params) + ds.first + end + end + def to_message(event) { id: event[:id], type: event[:type], before: try_json_parse(event[:data_before]), @@ -144,19 +162,9 @@ return unless data data = JSON.parse(data.to_s) unless data.respond_to?(:to_hash) data - end - - def try_insert_inbox(event) - return yield unless @inbox_table && @inbox_column - - ret = @db[@inbox_table].insert_conflict.insert(@inbox_column => event[@inbox_column]) - - return unless ret - - yield end def handle_before_event(event) @logger.debug do log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) starting...")