Sha256: f36440c61a9385f9627c6b66b57649685b0d526a09e022906108bb33398f6c8a
Contents?: true
Size: 1.78 KB
Versions: 1
Compression:
Stored size: 1.78 KB
Contents
# frozen_string_literal: true require_relative 'messages_repo' require 'luna_park/errors' require 'json' module CycloneLariat class Middleware def initialize(dataset: nil, errors_notifier: nil, message_notifier: nil, repo: MessagesRepo) @events_repo = repo.new(dataset) if dataset @message_notifier = message_notifier @errors_notifier = errors_notifier end def call(_worker_instance, queue, _sqs_msg, body, &block) log_received_message queue, body catch_standard_error(queue, body) do return true unless check(body[:Message]) event = Event.wrap(JSON.parse(body[:Message])) catch_business_error(event) do store_in_dataset(event, &block) end end end private attr_reader :errors_notifier, :message_notifier, :events_repo def log_received_message(queue, body) message_notifier&.info 'Receive message', queue: queue, aws_message_id: body[:MessageId], message: body[:Message] end def store_in_dataset(event) return yield if events_repo.nil? return true if events_repo.exists?(uuid: event.uuid) events_repo.create(event) yield events_repo.processed! uuid: event.uuid, error: event.client_error end def catch_business_error(event) yield rescue LunaPark::Errors::Business => e errors_notifier&.error(e, event: event) event.client_error = e end def catch_standard_error(queue, body) yield rescue StandardError => e errors_notifier&.error(e, queue: queue, aws_message_id: body[:MessageId], message: body[:Message]) raise e end def check(msg) if msg.nil? || msg.empty? errors_notifier&.error(Errors::EmptyMessage.new) false else true end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
cyclone_lariat-0.3.0 | lib/cyclone_lariat/middleware.rb |