# frozen_string_literal: true require "json" module Tobox class Fetcher def initialize(label, configuration) @label = label @configuration = configuration @logger = @configuration.default_logger @db = configuration.database @table = configuration[:table] @exponential_retry_factor = configuration[:exponential_retry_factor] max_attempts = configuration[:max_attempts] @ds = @db[@table] run_at_conds = [ { Sequel[@table][:run_at] => nil }, (Sequel.expr(Sequel[@table][:run_at]) < Sequel::CURRENT_TIMESTAMP) ].reduce { |agg, cond| Sequel.expr(agg) | Sequel.expr(cond) } @pick_next_sql = @ds.where(Sequel[@table][:attempts] < max_attempts) # filter out exhausted attempts .where(run_at_conds) .order(Sequel.desc(:run_at, nulls: :first), :id) @batch_size = configuration[:batch_size] @mark_as_fetched_params = { attempts: Sequel[@table][:attempts] + 1, last_error: nil } @before_event_handlers = Array(@configuration.lifecycle_events[:before_event]) @after_event_handlers = Array(@configuration.lifecycle_events[:after_event]) @error_event_handlers = Array(@configuration.lifecycle_events[:error_event]) end def fetch_events(&blk) num_events = 0 events_tr do event_ids = nil # @type var event_ids: Array[Integer] event_id_tr do event_ids = fetch_event_ids mark_as_fetched(event_ids) unless event_ids.empty? end if event_ids && !event_ids.empty? with_events(event_ids) do |events| num_events = events.size prepare_events(events, &blk) end end end num_events end private def prepare_events(events) prepared_events = events.map do |event| event[:metadata] = try_json_parse(event[:metadata]) if event[:metadata] handle_before_event(event) to_message(event) end yield(prepared_events) end def fetch_event_ids @pick_next_sql.for_update .skip_locked .limit(@batch_size).select_map(:id) # lock starts here end def mark_as_fetched(event_ids) @ds.where(id: event_ids).update(@mark_as_fetched_params) end def events_tr(&block) @db.transaction(savepoint: false, &block) end def event_id_tr yield end def with_events(event_ids, &blk) events, error = yield_events(event_ids, &blk) events.each do |event| event_error = error || event[:error] if event_error event.merge!(mark_as_error(event, event_error)) handle_error_event(event, event_error) else handle_after_event(event) end end end def yield_events(event_ids) events_ds = @ds.where(id: event_ids) events = error = nil begin events = events_ds.all unless events.empty? errors_by_id = catch(:tobox_batch_errors) do yield events nil end # some events from batch errored if errors_by_id failed = events.values_at(*errors_by_id.keys) successful = events - failed # fill in with batch error failed.each do |ev| ev[:error] = errors_by_id[events.index(ev)] end # delete successful @ds.where(id: successful.map { |ev| ev[:id] }).delete unless successful.empty? else events_ds.delete end end rescue StandardError => e error = e end [events, error] end def log_message(msg) "(worker: #{@label}) -> #{msg}" end def mark_as_error(event, error) update_params = { run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP, seconds: @exponential_retry_factor**(event[:attempts] - 1)), # run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP, # seconds: Sequel.function(:POWER, Sequel[@table][:attempts] + 1, 4)), last_error: error.full_message(highlight: false) } set_event_retry_attempts(event, update_params) end def set_event_retry_attempts(event, update_params) ds = @ds.where(id: event[:id]) if @ds.supports_returning?(:update) ds.returning.update(update_params).first else ds.update(update_params) ds.first end end def to_message(event) { id: event[:id], type: event[:type], before: (try_json_parse(event[:data_before]) if event[:data_before]), after: (try_json_parse(event[:data_after]) if event[:data_after]), at: event[:created_at] } end def try_json_parse(data) data = JSON.parse(data.to_s) unless data.respond_to?(:to_hash) data end def handle_before_event(event) @logger.debug do log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) starting...") end @before_event_handlers.each do |hd| hd.call(event) end end def handle_after_event(event) @logger.debug { log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) completed") } @after_event_handlers.each do |hd| hd.call(event) end end def handle_error_event(event, error) @logger.error do log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) failed with error\n" \ "#{error.class}: #{error.message}\n" \ "#{error.backtrace.join("\n")}") end @error_event_handlers.each do |hd| hd.call(event, error) end end end end