# frozen_string_literal: true require "json" module Tobox class Fetcher def initialize(label, configuration) @label = label @configuration = configuration @logger = @configuration.default_logger @db = configuration.database @table = configuration[:table] @exponential_retry_factor = configuration[:exponential_retry_factor] max_attempts = configuration[:max_attempts] @ds = @db[@table] run_at_conds = [ { Sequel[@table][:run_at] => nil }, (Sequel.expr(Sequel[@table][:run_at]) < Sequel::CURRENT_TIMESTAMP) ].reduce { |agg, cond| Sequel.expr(agg) | Sequel.expr(cond) } @pick_next_sql = @ds.where(Sequel[@table][:attempts] < max_attempts) # filter out exhausted attempts .where(run_at_conds) .order(Sequel.desc(:run_at, nulls: :first), :id) @mark_as_fetched_params = { attempts: Sequel[@table][:attempts] + 1, last_error: nil } @before_event_handlers = Array(@configuration.lifecycle_events[:before_event]) @after_event_handlers = Array(@configuration.lifecycle_events[:after_event]) @error_event_handlers = Array(@configuration.lifecycle_events[:error_event]) end def fetch_events(&blk) num_events = 0 events_tr do event_id = nil event_id_tr do event_id = fetch_event_id mark_as_fetched(event_id) if event_id end if event_id with_event(event_id) do |event| num_events = 1 prepare_event(event, &blk) end end end num_events end private def prepare_event(event) event[:metadata] = try_json_parse(event[:metadata]) handle_before_event(event) yield(to_message(event)) end def fetch_event_id @pick_next_sql.for_update .skip_locked .limit(1).select_map(:id).first # lock starts here end def mark_as_fetched(event_id) @ds.where(id: event_id).update(@mark_as_fetched_params) end def events_tr(&block) @db.transaction(savepoint: false, &block) end def event_id_tr yield end def with_event(event_id, &blk) event, error = yield_event(event_id, &blk) if error event.merge!(mark_as_error(event, error)) handle_error_event(event, error) else handle_after_event(event) end end def yield_event(event_id) events_ds = @ds.where(id: event_id) event = error = nil begin event = events_ds.first yield event events_ds.delete rescue StandardError => e error = e end [event, error] end def log_message(msg) "(worker: #{@label}) -> #{msg}" end def mark_as_error(event, error) update_params = { run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP, seconds: @exponential_retry_factor**(event[:attempts] - 1)), # run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP, # seconds: Sequel.function(:POWER, Sequel[@table][:attempts] + 1, 4)), last_error: "#{error.message}\n#{error.backtrace.join("\n")}" } set_event_retry_attempts(event, update_params) end def set_event_retry_attempts(event, update_params) ds = @ds.where(id: event[:id]) if @ds.supports_returning?(:update) ds.returning.update(update_params).first else ds.update(update_params) ds.first end end def to_message(event) { id: event[:id], type: event[:type], before: try_json_parse(event[:data_before]), after: try_json_parse(event[:data_after]), at: event[:created_at] } end def try_json_parse(data) return unless data data = JSON.parse(data.to_s) unless data.respond_to?(:to_hash) data end def handle_before_event(event) @logger.debug do log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) starting...") end @before_event_handlers.each do |hd| hd.call(event) end end def handle_after_event(event) @logger.debug { log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) completed") } @after_event_handlers.each do |hd| hd.call(event) end end def handle_error_event(event, error) @logger.error do log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) failed with error\n" \ "#{error.class}: #{error.message}\n" \ "#{error.backtrace.join("\n")}") end @error_event_handlers.each do |hd| hd.call(event, error) end end end end