lib/tobox/fetcher.rb in tobox-0.6.1 vs lib/tobox/fetcher.rb in tobox-0.7.0
- old
+ new
@@ -17,19 +17,32 @@
max_attempts = configuration[:max_attempts]
@ds = @db[@table]
- run_at_conds = [
- { Sequel[@table][:run_at] => nil },
- (Sequel.expr(Sequel[@table][:run_at]) < Sequel::CURRENT_TIMESTAMP)
- ].reduce { |agg, cond| Sequel.expr(agg) | Sequel.expr(cond) }
+ @visibility_column = configuration[:visibility_column]
+ @attempts_column = configuration[:attempts_column]
- @pick_next_sql = @ds.where(Sequel[@table][:attempts] < max_attempts) # filter out exhausted attempts
- .where(run_at_conds)
- .order(Sequel.desc(:run_at, nulls: :first), :id)
+ @pick_next_sql = @ds
+ if @attempts_column
+ # filter out exhausted attempts
+ @pick_next_sql = @pick_next_sql.where(Sequel[@table][@attempts_column] < max_attempts)
+ end
+
+ if configuration.visibility_type_bool?
+ @pick_next_sql = @pick_next_sql.where(@visibility_column => false).order(:id)
+ else
+ visibility_conds = [
+ { Sequel[@table][@visibility_column] => nil },
+ (Sequel.expr(Sequel[@table][@visibility_column]) < Sequel::CURRENT_TIMESTAMP)
+ ].reduce { |agg, cond| Sequel.expr(agg) | Sequel.expr(cond) }
+
+ @pick_next_sql = @pick_next_sql.where(visibility_conds)
+ .order(Sequel.desc(@visibility_column, nulls: :first), :id)
+ end
+
@batch_size = configuration[:batch_size]
@before_event_handlers = Array(@configuration.lifecycle_events[:before_event])
@after_event_handlers = Array(@configuration.lifecycle_events[:after_event])
@error_event_handlers = Array(@configuration.lifecycle_events[:error_event])
@@ -127,21 +140,31 @@
events.each do |event|
event[:error] = e
end
end
- def log_message(msg)
- "(worker: #{@label}) -> #{msg}"
+ def log_message(msg, event)
+ tags = { type: event[:type], attempts: event[@attempts_column] }.compact
+
+ "(worker: #{@label}) -> outbox event " \
+ "(#{tags.map { |*pair| pair.join(": ") }.join(", ")}) #{msg}"
end
def mark_as_error(event, error)
+ # @type var update_params: Hash[Symbol, untyped]
update_params = {
- run_at: calculate_event_retry_interval(event[:attempts]),
- attempts: Sequel[@table][:attempts] + 1,
last_error: error.full_message(highlight: false)
}
+ update_params[@attempts_column] = Sequel[@table][@attempts_column] + 1 if @attempts_column
+
+ update_params[@visibility_column] = if @configuration.visibility_type_bool?
+ false
+ else
+ calculate_event_retry_interval(event[@attempts_column])
+ end
+
set_event_retry_attempts(event, update_params)
end
def set_event_retry_attempts(event, update_params)
ds = @ds.where(id: event[:id])
@@ -175,28 +198,28 @@
data
end
def handle_before_event(event)
@logger.debug do
- log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) starting...")
+ log_message("starting...", event)
end
@before_event_handlers.each do |hd|
hd.call(event)
end
end
def handle_after_event(event)
- @logger.debug { log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) completed") }
+ @logger.debug { log_message("completed", event) }
@after_event_handlers.each do |hd|
hd.call(event)
end
end
def handle_error_event(event, error)
@logger.error do
- log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) failed with error\n" \
+ log_message("failed with error\n" \
"#{error.class}: #{error.message}\n" \
- "#{error.backtrace.join("\n")}")
+ "#{error.backtrace.join("\n")}", event)
end
@error_event_handlers.each do |hd|
hd.call(event, error)
end
end