# encoding: utf-8 require 'zoomq/jeromq-0.3.0-20130721.175323-20.jar' java_import org.zeromq.ZMQ java_import org.zeromq.ZContext java_import org.zeromq.ZMsg java_import org.zeromq.ZFrame require 'zoomq/zookeeper' module ZooMQ class ServerUnavailable < StandardError; end class Client attr_reader :servers def initialize(service_name, zookeeper_uri, log) @zk = Zookeeper.new("#{zookeeper_uri}/#{service_name}") @ctx = ZContext.new @socket = @ctx.create_socket(ZMQ::ROUTER) @socket.identity = SecureRandom.uuid.to_java_bytes watch refresh end def watch @zk.watch { refresh } end def refresh @servers = @zk.servers @servers.each do |server| @socket.connect("tcp://#{server}") sleep(0.1) end @cycle = @servers.cycle end def handle(request) if @servers.empty? raise ServerUnavailable.new("no servers are online, please try again later") end server = @cycle.next msg = ZMsg.new_string_msg(request.class.to_s, request.to_s) msg.wrap(ZFrame.new(server)) msg.send(@socket) msg = ZMsg.recvMsg(@socket) msg.unwrap cls = String.from_java_bytes(msg.pop.data).constantize cls.parse(String.from_java_bytes(msg.pop.data)) end end end