Sha256: 33e58398caa8925b209ef128a64da757d384fc118f2ea3c58d907756871a2519
Contents?: true
Size: 1.91 KB
Versions: 3
Compression:
Stored size: 1.91 KB
Contents
# frozen_string_literal: true module PubSubModelSync class MessageProcessor < PubSubModelSync::Base attr_accessor :payload # @param payload (Payload): payload to be delivered # @Deprecated: def initialize(data, klass, action) def initialize(payload, klass = nil, action = nil) @payload = payload return if @payload.is_a?(Payload) # support for deprecated log('Deprecated: Use Payload instead of new(data, klass, action)') @payload = PubSubModelSync::Payload.new(payload, { klass: klass, action: action }) end def process! filter_subscribers.each(&method(:run_subscriber)) end def process process! rescue => e notify_error(e) end private def run_subscriber(subscriber) processor = PubSubModelSync::RunSubscriber.new(subscriber, payload) return unless processable?(subscriber) errors = [ActiveRecord::ConnectionTimeoutError, 'deadlock detected', 'could not serialize access'] retry_error(errors, qty: 5) do processor.call res = config.on_success_processing.call(payload, { subscriber: subscriber }) log "processed message with: #{payload.inspect}" if res != :skip_log end end def processable?(subscriber) cancel = config.on_before_processing.call(payload, { subscriber: subscriber }) == :cancel log("process message cancelled: #{payload}") if cancel && config.debug !cancel end # @param error (Error) def notify_error(error) info = [payload, error.message, error.backtrace] res = config.on_error_processing.call(error, { payload: payload }) log("Error processing message: #{info}", :error) if res != :skip_log end def filter_subscribers config.subscribers.select do |subscriber| subscriber.from_klass == payload.klass && subscriber.action == payload.action && payload.mode == subscriber.mode end end end end
Version data entries
3 entries across 3 versions & 1 rubygems