Sha256: 0a43e178323ac2a4f8b85439d8f5b5491dac9eee558455d3b924412f12213e38

Contents?: true

Size: 998 Bytes

Versions: 1

Compression:

Stored size: 998 Bytes

Contents

require 'miu'
require 'miu/socket'
require 'miu/packet'

module Miu
  class Subscriber < Socket
    attr_reader :subscribe

    def initialize(options = {})
      options[:port] ||= Miu.default_pub_port
      super socket_type, options

      subscribe options[:subscribe] || ''
      yield self if block_given?
    end

    def subscribe(value = nil)
      if value
        unsubscribe if @subscribe
        @subscribe = value.to_s
        @socket.setsockopt ZMQ::SUBSCRIBE, @subscribe
      else
        @subscribe
      end
    end

    def unsubscribe
      if @subscribe
        @socket.setsockopt ZMQ::UNSUBSCRIBE, @subscribe
        @subscribe = nil
      end
      nil
    end

    def recv
      parts = []
      @socket.recv_strings parts
      Packet.load parts
    end

    def each
      if block_given?
        loop do
          packet = recv rescue nil
          yield packet if packet
        end
      end
    end

    private

    def socket_type
      ZMQ::SUB
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
miu-0.1.0 lib/miu/subscriber.rb