# frozen_string_literal: true require "json" module Tobox class Fetcher def initialize(label, configuration) @label = label @configuration = configuration @logger = @configuration.default_logger @db = configuration.database @table = configuration[:table] @exponential_retry_factor = configuration[:exponential_retry_factor] max_attempts = configuration[:max_attempts] @ds = @db[@table] @visibility_column = configuration[:visibility_column] @attempts_column = configuration[:attempts_column] @pick_next_sql = @ds if @attempts_column # filter out exhausted attempts @pick_next_sql = @pick_next_sql.where(Sequel[@table][@attempts_column] < max_attempts) end if configuration.visibility_type_bool? @pick_next_sql = @pick_next_sql.where(@visibility_column => false).order(:id) else visibility_conds = [ { Sequel[@table][@visibility_column] => nil }, (Sequel.expr(Sequel[@table][@visibility_column]) < Sequel::CURRENT_TIMESTAMP) ].reduce { |agg, cond| Sequel.expr(agg) | Sequel.expr(cond) } @pick_next_sql = @pick_next_sql.where(visibility_conds) .order(Sequel.desc(@visibility_column, nulls: :first), :id) end @batch_size = configuration[:batch_size] @before_event_handlers = Array(@configuration.lifecycle_events[:before_event]) @after_event_handlers = Array(@configuration.lifecycle_events[:after_event]) @error_event_handlers = Array(@configuration.lifecycle_events[:error_event]) end def fetch_events(&blk) num_events = 0 events_tr do events = nil # @type var events: Array[event]? event_id_tr do events = do_fetch_events end if events && !events.empty? with_events(events) do |events| num_events = events.size prepare_events(events, &blk) end end end num_events end private def prepare_events(events) prepared_events = events.map do |event| event[:metadata] = try_json_parse(event[:metadata]) if event[:metadata] handle_before_event(event) to_message(event) end yield(prepared_events) end def fetch_event_ids @pick_next_sql.for_update .skip_locked .limit(@batch_size).select(:id) # lock starts here end def do_fetch_events @ds.where(id: fetch_event_ids).all end def events_tr(&block) @db.transaction(savepoint: false, &block) end def event_id_tr yield end def with_events(events, &blk) yield_events(events, &blk) events.each do |event| event_error = event[:error] if event_error event.merge!(mark_as_error(event, event_error)) handle_error_event(event, event_error) else handle_after_event(event) end end end def yield_events(events) unless events.empty? errors_by_id = catch(:tobox_batch_errors) do yield events nil end # some events from batch errored if errors_by_id failed = events.values_at(*errors_by_id.keys) successful = events - failed # fill in with batch error failed.each do |ev| ev[:error] = errors_by_id[events.index(ev)] end # delete successful @ds.where(id: successful.map { |ev| ev[:id] }).delete unless successful.empty? else @ds.where(id: events.map { |ev| ev[:id] }).delete end end rescue StandardError => e events.each do |event| event[:error] = e end end def log_message(msg, event) tags = { type: event[:type], attempts: event[@attempts_column] }.compact "(worker: #{@label}) -> outbox event " \ "(#{tags.map { |*pair| pair.join(": ") }.join(", ")}) #{msg}" end def mark_as_error(event, error) # @type var update_params: Hash[Symbol, untyped] update_params = { last_error: error.full_message(highlight: false) } update_params[@attempts_column] = Sequel[@table][@attempts_column] + 1 if @attempts_column update_params[@visibility_column] = if @configuration.visibility_type_bool? false else calculate_event_retry_interval(event[@attempts_column]) end set_event_retry_attempts(event, update_params) end def set_event_retry_attempts(event, update_params) ds = @ds.where(id: event[:id]) if @ds.supports_returning?(:update) ds.returning.update(update_params).first else ds.update(update_params) ds.first end end def calculate_event_retry_interval(attempts) # Sequel.date_add(Sequel::CURRENT_TIMESTAMP, # seconds: Sequel.function(:POWER, Sequel[@table][:attempts] + 1, 4) Sequel.date_add(Sequel::CURRENT_TIMESTAMP, seconds: @exponential_retry_factor**attempts) end def to_message(event) { id: event[:id], type: event[:type], before: (try_json_parse(event[:data_before]) if event[:data_before]), after: (try_json_parse(event[:data_after]) if event[:data_after]), at: event[:created_at] } end def try_json_parse(data) data = JSON.parse(data.to_s) unless data.respond_to?(:to_hash) data end def handle_before_event(event) @logger.debug do log_message("starting...", event) end @before_event_handlers.each do |hd| hd.call(event) end end def handle_after_event(event) @logger.debug { log_message("completed", event) } @after_event_handlers.each do |hd| hd.call(event) end end def handle_error_event(event, error) @logger.error do log_message("failed with error\n" \ "#{error.class}: #{error.message}\n" \ "#{error.backtrace.join("\n")}", event) end @error_event_handlers.each do |hd| hd.call(event, error) end end end end