Sha256: 48a23ffaa9ec0808217501851d0513f651859f13ded9812caf0ffeb0c5278149

Contents?: true

Size: 1.8 KB

Versions: 3

Compression:

Stored size: 1.8 KB

Contents

# frozen_string_literal: true

module PubSubModelSync
  class MessageProcessor < PubSubModelSync::Base
    attr_accessor :payload

    # @param payload (Payload): payload to be delivered
    # @Deprecated: def initialize(data, klass, action)
    def initialize(payload, klass = nil, action = nil)
      @payload = payload
      return if @payload.is_a?(Payload)

      # support for deprecated
      log('Deprecated: Use Payload instead of new(data, klass, action)')
      @payload = PubSubModelSync::Payload.new(payload, { klass: klass, action: action })
    end

    def process!
      filter_subscribers.each(&method(:run_subscriber))
    end

    def process
      process!
    rescue => e
      notify_error(e)
    end

    private

    def run_subscriber(subscriber)
      return unless processable?(subscriber)

      retry_error(ActiveRecord::ConnectionTimeoutError, qty: 2) do
        subscriber.process!(payload)
        res = config.on_success_processing.call(payload, { subscriber: subscriber })
        log "processed message with: #{payload.inspect}" if res != :skip_log
      end
    end

    def processable?(subscriber)
      cancel = config.on_before_processing.call(payload, { subscriber: subscriber }) == :cancel
      log("process message cancelled: #{payload}") if cancel && config.debug
      !cancel
    end

    # @param error (Error)
    def notify_error(error)
      info = [payload, error.message, error.backtrace]
      res = config.on_error_processing.call(error, { payload: payload })
      log("Error processing message: #{info}", :error) if res != :skip_log
    end

    def filter_subscribers
      config.subscribers.select do |subscriber|
        subscriber.settings[:from_klass].to_s == payload.klass.to_s &&
          subscriber.settings[:from_action].to_s == payload.action.to_s
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
pub_sub_model_sync-0.5.10 lib/pub_sub_model_sync/message_processor.rb
pub_sub_model_sync-0.5.9.1 lib/pub_sub_model_sync/message_processor.rb
pub_sub_model_sync-0.5.9 lib/pub_sub_model_sync/message_processor.rb