lib/cyclone_lariat/middleware.rb in cyclone_lariat-1.0.0.rc7 vs lib/cyclone_lariat/middleware.rb in cyclone_lariat-1.0.0.rc8
- old
+ new
@@ -8,15 +8,16 @@
module CycloneLariat
class Middleware
attr_reader :config
- def initialize(errors_notifier: nil, message_notifier: nil, repo: Repo::InboxMessages, **options)
+ def initialize(errors_notifier: nil, message_notifier: nil, before_save: nil, repo: Repo::InboxMessages, **options)
@config = CycloneLariat::Options.wrap(options).merge!(CycloneLariat.config)
@events_repo = repo.new(**@config.to_h)
@message_notifier = message_notifier
@errors_notifier = errors_notifier
+ @before_save = before_save
end
def call(_worker_instance, queue, _sqs_msg, body, &block)
msg = receive_message!(body)
@@ -32,11 +33,11 @@
end
end
private
- attr_reader :errors_notifier, :message_notifier, :events_repo
+ attr_reader :errors_notifier, :message_notifier, :events_repo, :before_save
def receive_message!(body)
body[:Message] ? JSON.parse(body[:Message], symbolize_names: true) : body
rescue JSON::ParserError => e
errors_notifier&.error(e, message: body[:Message])
@@ -46,13 +47,19 @@
def store_in_dataset(event)
return yield if events_repo.disabled?
existed = events_repo.find(uuid: event.uuid)
return true if existed&.processed?
+ return yield if existed
- events_repo.create(event) unless existed
+ event.clone.tap do |e|
+ before_save.call(e) if before_save
+ events_repo.create(e)
+ end
+
yield
- events_repo.processed! uuid: event.uuid, error: event.client_error
+
+ events_repo.processed!(uuid: event.uuid, error: event.client_error)
end
def catch_business_error(event)
yield
rescue LunaPark::Errors::Business => e