Sha256: cbf3723291b3e092272e7a3c9933a5cfdd59d6cc5372708c9ec76b58c200de7e
Contents?: true
Size: 1.62 KB
Versions: 2
Compression:
Stored size: 1.62 KB
Contents
# frozen_string_literal: true module PubSubModelSync class MessageProcessor < PubSubModelSync::Base attr_accessor :payload # @param payload (Payload): payload to be delivered # @Deprecated: def initialize(data, klass, action) def initialize(payload, klass = nil, action = nil) @payload = payload return if @payload.is_a?(Payload) # support for deprecated log('Deprecated: Use Payload instead of new(data, klass, action)') @payload = PubSubModelSync::Payload.new(payload, { klass: klass, action: action }) end def process filter_subscribers.each(&method(:run_subscriber)) end private def run_subscriber(subscriber) return unless processable?(subscriber) subscriber.process!(payload) config.on_success_processing.call(payload, subscriber) log "processed message with: #{payload}" rescue => e print_subscriber_error(e) end def processable?(subscriber) cancel = config.on_before_processing.call(payload, subscriber) == :cancel log("process message cancelled: #{payload}") if cancel && config.debug !cancel end # @param error (Error) def print_subscriber_error(error) info = [payload, error.message, error.backtrace] res = config.on_error_processing.call(error, payload) log("Error processing message: #{info}", :error) if res != :skip_log end def filter_subscribers config.subscribers.select do |subscriber| subscriber.settings[:from_klass].to_s == payload.klass.to_s && subscriber.settings[:from_action].to_s == payload.action.to_s end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
pub_sub_model_sync-0.5.1.1 | lib/pub_sub_model_sync/message_processor.rb |
pub_sub_model_sync-0.5.1 | lib/pub_sub_model_sync/message_processor.rb |