# encoding: utf-8 require 'zmachine' require 'zoomq/zookeeper' require 'zoomq/client/connection' java_import org.zeromq.ZFrame java_import java.util.concurrent.atomic.AtomicInteger module ZooMQ class ServerUnavailableError < StandardError; end class TimeoutError < StandardError; end class Client attr_reader :servers def initialize @requests = {} @request_id = AtomicInteger.new @zk = Zookeeper.new(service_name) @channel = ZMachine.connect(nil, ZMQ::ROUTER, Connection) do |connection| connection.client = self end watch refresh end def watch @zk.watch { refresh } end def refresh @servers = @zk.servers @servers.each do |server| $log.debug("#{service_name}:connect", server: server) @channel.connect("tcp://#{server}") sleep(0.1) end @cycle = @servers.cycle end def request(obj) if @servers.empty? raise ServerUnavailableError.new("no servers are online, please try again later") end server = @cycle.next deferred = ZMachine::DefaultDeferrable.new deferred.callback { |result| yield result } if block_given? request_id = @request_id.increment_and_get msg = ZMsg.new_string_msg(obj.class.to_s, obj.to_s) msg.wrap(ZFrame.new(request_id.to_s)) msg.wrap(ZFrame.new(server)) $log.debug("#{service_name}:request", id: request_id, obj: obj.inspect) @channel.send_data(msg) @requests[request_id] = deferred end def response(request_id, obj) $log.debug("#{service_name}:response", id: request_id, obj: obj.inspect) @requests[request_id].succeed(obj) end end end