lib/zoomq/client.rb in zoomq-0.1.1 vs lib/zoomq/client.rb in zoomq-0.2.0
- old
+ new
@@ -1,27 +1,30 @@
# encoding: utf-8
-require 'zoomq/jeromq-0.3.0-20130721.175323-20.jar'
-java_import org.zeromq.ZMQ
-java_import org.zeromq.ZContext
-java_import org.zeromq.ZMsg
-java_import org.zeromq.ZFrame
+require 'zmachine'
require 'zoomq/zookeeper'
+require 'zoomq/client/connection'
+java_import org.zeromq.ZFrame
+java_import java.util.concurrent.atomic.AtomicInteger
+
module ZooMQ
- class ServerUnavailable < StandardError; end
+ class ServerUnavailableError < StandardError; end
+ class TimeoutError < StandardError; end
class Client
attr_reader :servers
- def initialize(service_name, zookeeper_uri, log)
- @zk = Zookeeper.new("#{zookeeper_uri}/#{service_name}")
- @ctx = ZContext.new
- @socket = @ctx.create_socket(ZMQ::ROUTER)
- @socket.identity = SecureRandom.uuid.to_java_bytes
+ def initialize
+ @requests = {}
+ @request_id = AtomicInteger.new
+ @zk = Zookeeper.new(service_name)
+ @channel = ZMachine.connect(nil, ZMQ::ROUTER, Connection) do |connection|
+ connection.client = self
+ end
watch
refresh
end
def watch
@@ -29,30 +32,39 @@
end
def refresh
@servers = @zk.servers
@servers.each do |server|
- @socket.connect("tcp://#{server}")
+ $log.debug("#{service_name}:connect", server: server)
+ @channel.connect("tcp://#{server}")
sleep(0.1)
end
@cycle = @servers.cycle
end
- def handle(request)
+ def request(obj)
if @servers.empty?
- raise ServerUnavailable.new("no servers are online, please try again later")
+ raise ServerUnavailableError.new("no servers are online, please try again later")
end
server = @cycle.next
- msg = ZMsg.new_string_msg(request.class.to_s, request.to_s)
+ deferred = ZMachine::DefaultDeferrable.new
+ deferred.callback { |result| yield result } if block_given?
+
+ request_id = @request_id.increment_and_get
+ msg = ZMsg.new_string_msg(obj.class.to_s, obj.to_s)
+ msg.wrap(ZFrame.new(request_id.to_s))
msg.wrap(ZFrame.new(server))
- msg.send(@socket)
+ $log.debug("#{service_name}:request", id: request_id, obj: obj.inspect)
+ @channel.send_data(msg)
- msg = ZMsg.recvMsg(@socket)
- msg.unwrap
- cls = String.from_java_bytes(msg.pop.data).constantize
- cls.parse(String.from_java_bytes(msg.pop.data))
+ @requests[request_id] = deferred
+ end
+
+ def response(request_id, obj)
+ $log.debug("#{service_name}:response", id: request_id, obj: obj.inspect)
+ @requests[request_id].succeed(obj)
end
end
end