lib/zoomq/client.rb in zoomq-0.1.1 vs lib/zoomq/client.rb in zoomq-0.2.0

- old
+ new

@@ -1,27 +1,30 @@ # encoding: utf-8 -require 'zoomq/jeromq-0.3.0-20130721.175323-20.jar' -java_import org.zeromq.ZMQ -java_import org.zeromq.ZContext -java_import org.zeromq.ZMsg -java_import org.zeromq.ZFrame +require 'zmachine' require 'zoomq/zookeeper' +require 'zoomq/client/connection' +java_import org.zeromq.ZFrame +java_import java.util.concurrent.atomic.AtomicInteger + module ZooMQ - class ServerUnavailable < StandardError; end + class ServerUnavailableError < StandardError; end + class TimeoutError < StandardError; end class Client attr_reader :servers - def initialize(service_name, zookeeper_uri, log) - @zk = Zookeeper.new("#{zookeeper_uri}/#{service_name}") - @ctx = ZContext.new - @socket = @ctx.create_socket(ZMQ::ROUTER) - @socket.identity = SecureRandom.uuid.to_java_bytes + def initialize + @requests = {} + @request_id = AtomicInteger.new + @zk = Zookeeper.new(service_name) + @channel = ZMachine.connect(nil, ZMQ::ROUTER, Connection) do |connection| + connection.client = self + end watch refresh end def watch @@ -29,30 +32,39 @@ end def refresh @servers = @zk.servers @servers.each do |server| - @socket.connect("tcp://#{server}") + $log.debug("#{service_name}:connect", server: server) + @channel.connect("tcp://#{server}") sleep(0.1) end @cycle = @servers.cycle end - def handle(request) + def request(obj) if @servers.empty? - raise ServerUnavailable.new("no servers are online, please try again later") + raise ServerUnavailableError.new("no servers are online, please try again later") end server = @cycle.next - msg = ZMsg.new_string_msg(request.class.to_s, request.to_s) + deferred = ZMachine::DefaultDeferrable.new + deferred.callback { |result| yield result } if block_given? + + request_id = @request_id.increment_and_get + msg = ZMsg.new_string_msg(obj.class.to_s, obj.to_s) + msg.wrap(ZFrame.new(request_id.to_s)) msg.wrap(ZFrame.new(server)) - msg.send(@socket) + $log.debug("#{service_name}:request", id: request_id, obj: obj.inspect) + @channel.send_data(msg) - msg = ZMsg.recvMsg(@socket) - msg.unwrap - cls = String.from_java_bytes(msg.pop.data).constantize - cls.parse(String.from_java_bytes(msg.pop.data)) + @requests[request_id] = deferred + end + + def response(request_id, obj) + $log.debug("#{service_name}:response", id: request_id, obj: obj.inspect) + @requests[request_id].succeed(obj) end end end