Sha256: efbd1a0ef0991261b88eb05dc5ee3a847908a295e95d08286bd31d65f97ee56b

Contents?: true

Size: 886 Bytes

Versions: 1

Compression:

Stored size: 886 Bytes

Contents

module OmfCommon
  class Topic
    attr_accessor :id, :comm

    def initialize(id, comm)
      self.id ||= id
      self.comm ||= comm
    end

    def subscribe(&block)
      comm.subscribe(id, &block)
    end

    def on_message(message_guard_proc = nil, &message_block)
      event_block = proc do |event|
        message_block.call(Message.parse(event.items.first.payload))
      end
      guard_block = proc do |event|
        (event.items?) && (!event.delayed?) &&
          event.items.first.payload &&
          (omf_message = Message.parse(event.items.first.payload)) &&
          event.node == self.id &&
          (valid_guard?(message_guard_proc) ? message_guard_proc.call(omf_message) : true)
      end
      comm.pubsub_event(guard_block, &event_block)
    end

    private

    def valid_guard?(guard_proc)
      guard_proc && guard_proc.class == Proc
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
omf_common-6.0.0.pre.8 lib/omf_common/topic.rb