Sha256: 7f66b5f8e6ab973230c833508b77cdb91b1f6225fac279166a88786ec4378609
Contents?: true
Size: 1.74 KB
Versions: 2
Compression:
Stored size: 1.74 KB
Contents
require 'aggro/nanomsg_transport/subscribe' module Aggro module NanomsgTransport # Public: Handles subscribing to messages on a given endpoint. class Subscriber class SubscriberAlreadyRunning < RuntimeError; end def initialize(endpoint, callable = nil, &block) fail ArgumentError unless callable || block_given? @callable = block_given? ? block : callable @endpoint = endpoint @mutex = Mutex.new @selector = NIO::Selector.new ObjectSpace.define_finalizer self, method(:stop) end def add_subscription(topic) start unless @running @mutex.synchronize do sub_socket.add_subscription(topic) end self end def start @mutex.synchronize do return self if @running @running = true start_on_thread sleep 0.01 while @selector.empty? end self end def stop @mutex.synchronize do return self unless @running @running = false @selector.wakeup sleep 0.01 until @selector.empty? end self end private def handle_message message = sub_socket.recv_msg @callable.call(message) if message end def sub_socket @sub_socket ||= Subscribe.new(@endpoint) end def start_on_thread Concurrent::SingleThreadExecutor.new.post do io = IO.new(sub_socket.recv_fd, 'rb', autoclose: false) @selector.register io, :r @selector.select { handle_message } while @running @selector.deregister io io.close sub_socket.terminate @sub_socket = nil end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
aggro-0.0.3 | lib/aggro/nanomsg_transport/subscriber.rb |
aggro-0.0.2 | lib/aggro/nanomsg_transport/subscriber.rb |