Sha256: e80991d7f6ea0723f1244cf6084889cb89399406b53692b3bed02b86e8488b0c

Contents?: true

Size: 1.29 KB

Versions: 2

Compression:

Stored size: 1.29 KB

Contents

# encoding: utf-8

require 'zoomq/jeromq-0.3.0-20130721.175323-20.jar'
java_import org.zeromq.ZMQ
java_import org.zeromq.ZContext
java_import org.zeromq.ZMsg
java_import org.zeromq.ZFrame

require 'zoomq/zookeeper'

module ZooMQ
  class ServerUnavailable < StandardError; end

  class Client

    attr_reader :servers

    def initialize(service_name, zookeeper_uri, log)
      @zk = Zookeeper.new("#{zookeeper_uri}/#{service_name}")
      @ctx = ZContext.new
      @socket = @ctx.create_socket(ZMQ::ROUTER)
      @socket.identity = SecureRandom.uuid.to_java_bytes
      watch
      refresh
    end

    def watch
      @zk.watch { refresh }
    end

    def refresh
      @servers = @zk.servers
      @servers.each do |server|
        @socket.connect("tcp://#{server}")
        sleep(0.1)
      end
      @cycle = @servers.cycle
    end

    def handle(request)
      if @servers.empty?
        raise ServerUnavailable.new("no servers are online, please try again later")
      end

      server = @cycle.next

      msg = ZMsg.new_string_msg(request.class.to_s, request.to_s)
      msg.wrap(ZFrame.new(server))
      msg.send(@socket)

      msg = ZMsg.recvMsg(@socket)
      msg.unwrap
      cls = String.from_java_bytes(msg.pop.data).constantize
      cls.parse(String.from_java_bytes(msg.pop.data))
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
zoomq-0.1.1 lib/zoomq/client.rb
zoomq-0.1.0 lib/zoomq/client.rb