Sha256: f36440c61a9385f9627c6b66b57649685b0d526a09e022906108bb33398f6c8a

Contents?: true

Size: 1.78 KB

Versions: 1

Compression:

Stored size: 1.78 KB

Contents

# frozen_string_literal: true

require_relative 'messages_repo'
require 'luna_park/errors'
require 'json'

module CycloneLariat
  class Middleware
    def initialize(dataset: nil, errors_notifier: nil, message_notifier: nil, repo: MessagesRepo)
      @events_repo      = repo.new(dataset) if dataset
      @message_notifier = message_notifier
      @errors_notifier  = errors_notifier
    end

    def call(_worker_instance, queue, _sqs_msg, body, &block)
      log_received_message queue, body

      catch_standard_error(queue, body) do
        return true unless check(body[:Message])

        event = Event.wrap(JSON.parse(body[:Message]))

        catch_business_error(event) do
          store_in_dataset(event, &block)
        end
      end
    end

    private

    attr_reader :errors_notifier, :message_notifier, :events_repo

    def log_received_message(queue, body)
      message_notifier&.info 'Receive message', queue: queue, aws_message_id: body[:MessageId], message: body[:Message]
    end

    def store_in_dataset(event)
      return yield if events_repo.nil?
      return true  if events_repo.exists?(uuid: event.uuid)

      events_repo.create(event)
      yield
      events_repo.processed! uuid: event.uuid, error: event.client_error
    end

    def catch_business_error(event)
      yield
    rescue LunaPark::Errors::Business => e
      errors_notifier&.error(e, event: event)
      event.client_error = e
    end

    def catch_standard_error(queue, body)
      yield
    rescue StandardError => e
      errors_notifier&.error(e, queue: queue, aws_message_id: body[:MessageId], message: body[:Message])
      raise e
    end

    def check(msg)
      if msg.nil? || msg.empty?
        errors_notifier&.error(Errors::EmptyMessage.new)
        false
      else
        true
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
cyclone_lariat-0.3.0 lib/cyclone_lariat/middleware.rb