Sha256: 33e58398caa8925b209ef128a64da757d384fc118f2ea3c58d907756871a2519

Contents?: true

Size: 1.91 KB

Versions: 3

Compression:

Stored size: 1.91 KB

Contents

# frozen_string_literal: true

module PubSubModelSync
  class MessageProcessor < PubSubModelSync::Base
    attr_accessor :payload

    # @param payload (Payload): payload to be delivered
    # @Deprecated: def initialize(data, klass, action)
    def initialize(payload, klass = nil, action = nil)
      @payload = payload
      return if @payload.is_a?(Payload)

      # support for deprecated
      log('Deprecated: Use Payload instead of new(data, klass, action)')
      @payload = PubSubModelSync::Payload.new(payload, { klass: klass, action: action })
    end

    def process!
      filter_subscribers.each(&method(:run_subscriber))
    end

    def process
      process!
    rescue => e
      notify_error(e)
    end

    private

    def run_subscriber(subscriber)
      processor = PubSubModelSync::RunSubscriber.new(subscriber, payload)
      return unless processable?(subscriber)

      errors = [ActiveRecord::ConnectionTimeoutError, 'deadlock detected', 'could not serialize access']
      retry_error(errors, qty: 5) do
        processor.call
        res = config.on_success_processing.call(payload, { subscriber: subscriber })
        log "processed message with: #{payload.inspect}" if res != :skip_log
      end
    end

    def processable?(subscriber)
      cancel = config.on_before_processing.call(payload, { subscriber: subscriber }) == :cancel
      log("process message cancelled: #{payload}") if cancel && config.debug
      !cancel
    end

    # @param error (Error)
    def notify_error(error)
      info = [payload, error.message, error.backtrace]
      res = config.on_error_processing.call(error, { payload: payload })
      log("Error processing message: #{info}", :error) if res != :skip_log
    end

    def filter_subscribers
      config.subscribers.select do |subscriber|
        subscriber.from_klass == payload.klass && subscriber.action == payload.action && payload.mode == subscriber.mode
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
pub_sub_model_sync-1.0.beta2 lib/pub_sub_model_sync/message_processor.rb
pub_sub_model_sync-1.0.beta1 lib/pub_sub_model_sync/message_processor.rb
pub_sub_model_sync-1.0.beta lib/pub_sub_model_sync/message_processor.rb