lib/cyclone_lariat/middleware.rb in cyclone_lariat-1.0.0.rc7 vs lib/cyclone_lariat/middleware.rb in cyclone_lariat-1.0.0.rc8

- old
+ new

@@ -8,15 +8,16 @@ module CycloneLariat class Middleware attr_reader :config - def initialize(errors_notifier: nil, message_notifier: nil, repo: Repo::InboxMessages, **options) + def initialize(errors_notifier: nil, message_notifier: nil, before_save: nil, repo: Repo::InboxMessages, **options) @config = CycloneLariat::Options.wrap(options).merge!(CycloneLariat.config) @events_repo = repo.new(**@config.to_h) @message_notifier = message_notifier @errors_notifier = errors_notifier + @before_save = before_save end def call(_worker_instance, queue, _sqs_msg, body, &block) msg = receive_message!(body) @@ -32,11 +33,11 @@ end end private - attr_reader :errors_notifier, :message_notifier, :events_repo + attr_reader :errors_notifier, :message_notifier, :events_repo, :before_save def receive_message!(body) body[:Message] ? JSON.parse(body[:Message], symbolize_names: true) : body rescue JSON::ParserError => e errors_notifier&.error(e, message: body[:Message]) @@ -46,13 +47,19 @@ def store_in_dataset(event) return yield if events_repo.disabled? existed = events_repo.find(uuid: event.uuid) return true if existed&.processed? + return yield if existed - events_repo.create(event) unless existed + event.clone.tap do |e| + before_save.call(e) if before_save + events_repo.create(e) + end + yield - events_repo.processed! uuid: event.uuid, error: event.client_error + + events_repo.processed!(uuid: event.uuid, error: event.client_error) end def catch_business_error(event) yield rescue LunaPark::Errors::Business => e