Sha256: e3459b9d958b85fda21ef411218ea7043a14555c39ce86d0638bee9edb8c2312
Contents?: true
Size: 1.02 KB
Versions: 3
Compression:
Stored size: 1.02 KB
Contents
module Aggro # Public: Subscribes to topics at a given endpoint. class Subscriber RAW_HANDLER = :handle_raw def initialize(endpoint, callable = nil, &block) if callable @callback = callable elsif block_given? @callback = block else fail ArgumentError end @transport_sub = Aggro.transport.subscriber endpoint, method(RAW_HANDLER) @subscribed_topics = Set.new end def bind @transport_sub.start end def handle_message(message) @callback.call message.id, message.events if message.is_a? Message::Events end def stop @transport_sub.stop end def subscribe_to_topic(topic) return if @subscribed_topics.include? topic @subscribed_topics << topic @transport_sub.add_subscription message_prefix_for_topic(topic) end private def handle_raw(raw) handle_message MessageParser.parse raw end def message_prefix_for_topic(topic) Message::Events::TYPE_CODE + topic end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
aggro-0.0.4 | lib/aggro/subscriber.rb |
aggro-0.0.3 | lib/aggro/subscriber.rb |
aggro-0.0.2 | lib/aggro/subscriber.rb |