Sha256: e80991d7f6ea0723f1244cf6084889cb89399406b53692b3bed02b86e8488b0c
Contents?: true
Size: 1.29 KB
Versions: 2
Compression:
Stored size: 1.29 KB
Contents
# encoding: utf-8 require 'zoomq/jeromq-0.3.0-20130721.175323-20.jar' java_import org.zeromq.ZMQ java_import org.zeromq.ZContext java_import org.zeromq.ZMsg java_import org.zeromq.ZFrame require 'zoomq/zookeeper' module ZooMQ class ServerUnavailable < StandardError; end class Client attr_reader :servers def initialize(service_name, zookeeper_uri, log) @zk = Zookeeper.new("#{zookeeper_uri}/#{service_name}") @ctx = ZContext.new @socket = @ctx.create_socket(ZMQ::ROUTER) @socket.identity = SecureRandom.uuid.to_java_bytes watch refresh end def watch @zk.watch { refresh } end def refresh @servers = @zk.servers @servers.each do |server| @socket.connect("tcp://#{server}") sleep(0.1) end @cycle = @servers.cycle end def handle(request) if @servers.empty? raise ServerUnavailable.new("no servers are online, please try again later") end server = @cycle.next msg = ZMsg.new_string_msg(request.class.to_s, request.to_s) msg.wrap(ZFrame.new(server)) msg.send(@socket) msg = ZMsg.recvMsg(@socket) msg.unwrap cls = String.from_java_bytes(msg.pop.data).constantize cls.parse(String.from_java_bytes(msg.pop.data)) end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
zoomq-0.1.1 | lib/zoomq/client.rb |
zoomq-0.1.0 | lib/zoomq/client.rb |