Sha256: 5067f29881c66cd1d6ca275750a24a66d59754897cef604cfaf76efb0825f178

Contents?: true

Size: 1.76 KB

Versions: 1

Compression:

Stored size: 1.76 KB

Contents

require 'ffi-rzmq'
require 'zmqmachine'
require 'uri'
require 'dripdrop/message'

class DripDrop
  class CollectorPub
    attr_reader :context, :socket, :address
    
    def initialize(address)
      @address = address
      @context = ZMQ::Context.new(1)
      @socket  = @context.socket(ZMQ::PUB)
      @socket.bind(@address.to_s)
    end
    
    def send_message(message)
      @socket.send(message)
    end
  end
   
  class CollectorSub
    attr_reader :context, :socket, :address, :collector
    
    def initialize(context, collector, address, publisher=nil)
      @context   = context
      @collector = collector
      @address   = address
      @publisher = publisher
    end
    
    def on_attach(socket)
      socket.connect(@address)
      socket.subscribe ''
    end
    
    def on_readable(socket, messages)
      messages.each do |message|
        @collector.on_recv(DripDrop::Message.parse(message.copy_out_string))
      end
    end
  end
  
  class Collector
    attr_reader :sub_reactor, :sub_addr, :pub_addr
    def initialize(sub_addr='tcp://127.0.0.1:2900',pub_addr=nil)
      sub_addr_uri = URI.parse(sub_addr)
      host, port   = sub_addr_uri.host, sub_addr_uri.port.to_i
      scheme       = sub_addr_uri.scheme.to_sym
      @sub_addr    = ZM::Address.new(host, port, scheme)
      @sub_reactor = ZM::Reactor.new(:sub_reactor)
      
      if @pub_addr
        @pub_addr  = URI.parse(pub_addr)
        @publisher = CollectorPub.new(@pub_addr)
      end
    end

    def run
      puts "Run"
      @sub_reactor.run do |context|
        context.sub_socket CollectorSub.new(context,self,@sub_addr,@publisher)
      end
      @sub_reactor
    end

    def publish(message)
      @publisher.send_string(message.encoded)
    end
     
    def on_recv(message); end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
dripdrop-0.0.2 lib/dripdrop/collector.rb