Sha256: 367690ca41011797ef8ca995d96ac7c6ad41ddd1cb01d63208e12316689f94de
Contents?: true
Size: 1.67 KB
Versions: 1
Compression:
Stored size: 1.67 KB
Contents
module Aggro module ZeroMQTransport # Public: Handles subscribing to messages on a given endpoint. class Subscriber class SubscriberAlreadyRunning < RuntimeError; end def initialize(endpoint, callable = nil, &block) fail ArgumentError unless callable || block_given? @callable = block_given? ? block : callable @endpoint = endpoint @mutex = Mutex.new end def add_subscription(topic) start unless @mutex.synchronize { @running } @mutex.synchronize { sub_socket.setsockopt ZMQ::SUBSCRIBE, topic } self end def start @mutex.synchronize do return self if @running sub_socket start_on_thread sleep 0.01 until @running end self end def stop @mutex.synchronize do return self unless @running @running = false end self end private def handle_message message = '' sub_socket.recv_string message @callable.call message if message.present? end def sub_socket @sub_socket ||= begin socket = ZeroMQTransport.context.socket(ZMQ::SUB) socket.connect @endpoint socket end end def start_on_thread Concurrent::SingleThreadExecutor.new.post do poller = ZeroMQ::Poller.new poller.register_readable sub_socket @running = true (handle_message while poller.poll(1) > 0) while @running poller.deregister_readable sub_socket sub_socket.close @sub_socket = nil end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
aggro-0.0.4 | lib/aggro/zeromq_transport/subscriber.rb |