Sha256: 367690ca41011797ef8ca995d96ac7c6ad41ddd1cb01d63208e12316689f94de

Contents?: true

Size: 1.67 KB

Versions: 1

Compression:

Stored size: 1.67 KB

Contents

module Aggro
  module ZeroMQTransport
    # Public: Handles subscribing to messages on a given endpoint.
    class Subscriber
      class SubscriberAlreadyRunning < RuntimeError; end

      def initialize(endpoint, callable = nil, &block)
        fail ArgumentError unless callable || block_given?

        @callable = block_given? ? block : callable
        @endpoint = endpoint
        @mutex = Mutex.new
      end

      def add_subscription(topic)
        start unless @mutex.synchronize { @running }

        @mutex.synchronize { sub_socket.setsockopt ZMQ::SUBSCRIBE, topic }

        self
      end

      def start
        @mutex.synchronize do
          return self if @running

          sub_socket
          start_on_thread

          sleep 0.01 until @running
        end

        self
      end

      def stop
        @mutex.synchronize do
          return self unless @running

          @running = false
        end

        self
      end

      private

      def handle_message
        message = ''
        sub_socket.recv_string message

        @callable.call message if message.present?
      end

      def sub_socket
        @sub_socket ||= begin
          socket = ZeroMQTransport.context.socket(ZMQ::SUB)
          socket.connect @endpoint

          socket
        end
      end

      def start_on_thread
        Concurrent::SingleThreadExecutor.new.post do
          poller = ZeroMQ::Poller.new
          poller.register_readable sub_socket

          @running = true

          (handle_message while poller.poll(1) > 0) while @running

          poller.deregister_readable sub_socket

          sub_socket.close
          @sub_socket = nil
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
aggro-0.0.4 lib/aggro/zeromq_transport/subscriber.rb