lib/tobox/fetcher.rb in tobox-0.6.1 vs lib/tobox/fetcher.rb in tobox-0.7.0

- old
+ new

@@ -17,19 +17,32 @@ max_attempts = configuration[:max_attempts] @ds = @db[@table] - run_at_conds = [ - { Sequel[@table][:run_at] => nil }, - (Sequel.expr(Sequel[@table][:run_at]) < Sequel::CURRENT_TIMESTAMP) - ].reduce { |agg, cond| Sequel.expr(agg) | Sequel.expr(cond) } + @visibility_column = configuration[:visibility_column] + @attempts_column = configuration[:attempts_column] - @pick_next_sql = @ds.where(Sequel[@table][:attempts] < max_attempts) # filter out exhausted attempts - .where(run_at_conds) - .order(Sequel.desc(:run_at, nulls: :first), :id) + @pick_next_sql = @ds + if @attempts_column + # filter out exhausted attempts + @pick_next_sql = @pick_next_sql.where(Sequel[@table][@attempts_column] < max_attempts) + end + + if configuration.visibility_type_bool? + @pick_next_sql = @pick_next_sql.where(@visibility_column => false).order(:id) + else + visibility_conds = [ + { Sequel[@table][@visibility_column] => nil }, + (Sequel.expr(Sequel[@table][@visibility_column]) < Sequel::CURRENT_TIMESTAMP) + ].reduce { |agg, cond| Sequel.expr(agg) | Sequel.expr(cond) } + + @pick_next_sql = @pick_next_sql.where(visibility_conds) + .order(Sequel.desc(@visibility_column, nulls: :first), :id) + end + @batch_size = configuration[:batch_size] @before_event_handlers = Array(@configuration.lifecycle_events[:before_event]) @after_event_handlers = Array(@configuration.lifecycle_events[:after_event]) @error_event_handlers = Array(@configuration.lifecycle_events[:error_event]) @@ -127,21 +140,31 @@ events.each do |event| event[:error] = e end end - def log_message(msg) - "(worker: #{@label}) -> #{msg}" + def log_message(msg, event) + tags = { type: event[:type], attempts: event[@attempts_column] }.compact + + "(worker: #{@label}) -> outbox event " \ + "(#{tags.map { |*pair| pair.join(": ") }.join(", ")}) #{msg}" end def mark_as_error(event, error) + # @type var update_params: Hash[Symbol, untyped] update_params = { - run_at: calculate_event_retry_interval(event[:attempts]), - attempts: Sequel[@table][:attempts] + 1, last_error: error.full_message(highlight: false) } + update_params[@attempts_column] = Sequel[@table][@attempts_column] + 1 if @attempts_column + + update_params[@visibility_column] = if @configuration.visibility_type_bool? + false + else + calculate_event_retry_interval(event[@attempts_column]) + end + set_event_retry_attempts(event, update_params) end def set_event_retry_attempts(event, update_params) ds = @ds.where(id: event[:id]) @@ -175,28 +198,28 @@ data end def handle_before_event(event) @logger.debug do - log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) starting...") + log_message("starting...", event) end @before_event_handlers.each do |hd| hd.call(event) end end def handle_after_event(event) - @logger.debug { log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) completed") } + @logger.debug { log_message("completed", event) } @after_event_handlers.each do |hd| hd.call(event) end end def handle_error_event(event, error) @logger.error do - log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) failed with error\n" \ + log_message("failed with error\n" \ "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") + "#{error.backtrace.join("\n")}", event) end @error_event_handlers.each do |hd| hd.call(event, error) end end