lib/miu/socket.rb in miu-0.1.0 vs lib/miu/socket.rb in miu-0.2.0

- old
+ new

@@ -1,53 +1,125 @@ require 'miu' require 'ffi-rzmq' +require 'forwardable' module Miu class Socket - attr_reader :host, :port - attr_reader :context, :socket + class << self + def socket_type(type) + class_eval <<-EOS + def socket_type; :#{type.to_s.upcase}; end + EOS + end - def initialize(socket_type, options = {}) - @host = options[:host] || '127.0.0.1' - @port = options[:port] - @context = Miu.context - @socket = @context.socket socket_type + def build_address(*args) + host = args.shift + port = args.shift + port ? "tcp://#{host}:#{port}" : host + end end - def bind - rc = @socket.bind "tcp://#{@host}:#{@port}" - error_check rc - self + attr_reader :socket + attr_reader :linger + + def initialize + @socket = Miu.context.socket ::ZMQ.const_get(socket_type) + @linger = 0 end - def connect - rc = @socket.connect "tcp://#{@host}:#{@port}" - error_check rc - self + def bind(address) + error_wrapper { @socket.bind address } end - def forward(to) - parts = [] - loop do - message = ZMQ::Message.new - @socket.recvmsg message - parts << message.copy_out_string - more = @socket.more_parts? - to.socket.sendmsg message, (more ? ZMQ::SNDMORE : 0) - return parts unless more - end + def connect(address) + error_wrapper { @socket.connect address } end + def linger=(value) + @linger = value || -1 + error_wrapper { @socket.setsockopt(::ZMQ::LINGER, value) } + end + def close @socket.close end protected - def error_check(rc, source = nil) - unless ZMQ::Util.resultcode_ok? rc - raise ZMQ::ZeroMQError.new source, rc, ZMQ::Util.errno, ZMQ::Util.error_string + def error_wrapper(source = nil, &block) + error = nil + + begin + rc = block.call + error = "#{::ZMQ::Util.error_string} (#{::ZMQ::Util.errno})" unless ::ZMQ::Util.resultcode_ok?(rc) + rescue => e + error = e.to_s end + + raise IOError, error if error + true + end + end + + module ReadableSocket + extend Forwardable + def_delegator :@socket, :more_parts? + + def bind(address) + self.linger = @linger + super address + end + + def connect(address) + self.linger = @linger + super address + end + + def read(buffer = '') + error_wrapper { @socket.recv_string buffer } + buffer + end + end + + module WritableSocket + def write(*args) + error_wrapper { @socket.send_strings args.flatten } + args + end + + alias_method :<<, :write + end + + class PubSocket < Socket + include WritableSocket + socket_type :pub + end + + class SubSocket < Socket + include ReadableSocket + socket_type :sub + + def subscribe(topic) + error_wrapper { @socket.setsockopt(::ZMQ::SUBSCRIBE, topic) } + end + + def unsubscribe(topic) + error_wrapper { @socket.setsockopt(::ZMQ::UNSUBSCRIBE, topic) } + end + end + + class XPubSocket < PubSocket + socket_type :xpub + end + + class XSubSocket < SubSocket + socket_type :xsub + + def subscribe(topic) + true + end + + def unsubscribe(topic) true end end end