Sha256: 385560fa7cd5954fb8500e1670d73a2fa02fe35b5c480d0dbb3aec1869f607fc
Contents?: true
Size: 1.93 KB
Versions: 3
Compression:
Stored size: 1.93 KB
Contents
# frozen_string_literal: true require 'cyclone_lariat/repo/messages' require 'cyclone_lariat/core' require 'luna_park/errors' require 'cyclone_lariat/messages/builder' require 'json' module CycloneLariat class Middleware attr_reader :config def initialize(errors_notifier: nil, message_notifier: nil, repo: Repo::Messages, **options) @config = CycloneLariat::Options.wrap(options).merge!(CycloneLariat.config) @events_repo = repo.new(**@config.to_h) @message_notifier = message_notifier @errors_notifier = errors_notifier end def call(_worker_instance, queue, _sqs_msg, body, &block) msg = receive_message!(body) message_notifier&.info 'Receive message', message: msg, queue: queue return if msg.is_a? String catch_standard_error(queue, msg) do event = Messages::Builder.new(raw_message: msg).call store_in_dataset(event) do catch_business_error(event, &block) end end end private attr_reader :errors_notifier, :message_notifier, :events_repo def receive_message!(body) body[:Message] ? JSON.parse(body[:Message], symbolize_names: true) : body rescue JSON::ParserError => e errors_notifier&.error(e, message: body[:Message]) body[:Message] end def store_in_dataset(event) return yield if events_repo.disabled? existed = events_repo.find(uuid: event.uuid) return true if existed&.processed? events_repo.create(event) unless existed yield events_repo.processed! uuid: event.uuid, error: event.client_error end def catch_business_error(event) yield rescue LunaPark::Errors::Business => e errors_notifier&.error(e, event: event) event.client_error = e end def catch_standard_error(queue, msg) yield rescue Exception => e errors_notifier&.error(e, queue: queue, message: msg) raise e end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
cyclone_lariat-1.0.0.rc5 | lib/cyclone_lariat/middleware.rb |
cyclone_lariat-1.0.0.rc4 | lib/cyclone_lariat/middleware.rb |
cyclone_lariat-1.0.0.rc3 | lib/cyclone_lariat/middleware.rb |