lib/tobox/fetcher.rb in tobox-0.1.0 vs lib/tobox/fetcher.rb in tobox-0.1.1
- old
+ new
@@ -2,11 +2,12 @@
require "json"
module Tobox
class Fetcher
- def initialize(configuration)
+ def initialize(label, configuration)
+ @label = label
@configuration = configuration
@logger = @configuration.default_logger
database_uri = @configuration[:database_uri]
@@ -54,11 +55,11 @@
if blk
num_events = events.size
events.each do |ev|
- ev[:metadata] = JSON.parse(ev[:metadata].to_s) if ev[:metadata]
+ ev[:metadata] = try_json_parse(ev[:metadata])
handle_before_event(ev)
yield(to_message(ev))
rescue StandardError => e
error = e
raise Sequel::Rollback
@@ -88,10 +89,14 @@
num_events
end
private
+ def log_message(msg)
+ "(worker: #{@label}) -> #{msg}"
+ end
+
def mark_as_error(event, error)
@ds.where(id: event[:id]).returning.update(
attempts: Sequel[@table][:attempts] + 1,
run_at: Sequel.date_add(Sequel::CURRENT_TIMESTAMP,
seconds: event[:attempts] + (1**@exponential_retry_factor)),
@@ -103,34 +108,44 @@
def to_message(event)
{
id: event[:id],
type: event[:type],
- before: (JSON.parse(event[:data_before].to_s) if event[:data_before]),
- after: (JSON.parse(event[:data_after].to_s) if event[:data_after]),
+ before: try_json_parse(event[:data_before]),
+ after: try_json_parse(event[:data_after]),
at: event[:created_at]
}
end
+ def try_json_parse(data)
+ return unless data
+
+ data = JSON.parse(data.to_s) unless data.is_a?(Hash)
+
+ data
+ end
+
def handle_before_event(event)
- @logger.debug { "outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) starting..." }
+ @logger.debug do
+ log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) starting...")
+ end
@before_event_handlers.each do |hd|
hd.call(event)
end
end
def handle_after_event(event)
- @logger.debug { "outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) completed" }
+ @logger.debug { log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) completed") }
@after_event_handlers.each do |hd|
hd.call(event)
end
end
def handle_error_event(event, error)
@logger.error do
- "outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) failed with error\n" \
- "#{error.class}: #{error.message}\n" \
- "#{error.backtrace.join("\n")}"
+ log_message("outbox event (type: \"#{event[:type]}\", attempts: #{event[:attempts]}) failed with error\n" \
+ "#{error.class}: #{error.message}\n" \
+ "#{error.backtrace.join("\n")}")
end
@error_event_handlers.each do |hd|
hd.call(event, error)
end
end