lib/miu/socket.rb in miu-0.1.0 vs lib/miu/socket.rb in miu-0.2.0
- old
+ new
@@ -1,53 +1,125 @@
require 'miu'
require 'ffi-rzmq'
+require 'forwardable'
module Miu
class Socket
- attr_reader :host, :port
- attr_reader :context, :socket
+ class << self
+ def socket_type(type)
+ class_eval <<-EOS
+ def socket_type; :#{type.to_s.upcase}; end
+ EOS
+ end
- def initialize(socket_type, options = {})
- @host = options[:host] || '127.0.0.1'
- @port = options[:port]
- @context = Miu.context
- @socket = @context.socket socket_type
+ def build_address(*args)
+ host = args.shift
+ port = args.shift
+ port ? "tcp://#{host}:#{port}" : host
+ end
end
- def bind
- rc = @socket.bind "tcp://#{@host}:#{@port}"
- error_check rc
- self
+ attr_reader :socket
+ attr_reader :linger
+
+ def initialize
+ @socket = Miu.context.socket ::ZMQ.const_get(socket_type)
+ @linger = 0
end
- def connect
- rc = @socket.connect "tcp://#{@host}:#{@port}"
- error_check rc
- self
+ def bind(address)
+ error_wrapper { @socket.bind address }
end
- def forward(to)
- parts = []
- loop do
- message = ZMQ::Message.new
- @socket.recvmsg message
- parts << message.copy_out_string
- more = @socket.more_parts?
- to.socket.sendmsg message, (more ? ZMQ::SNDMORE : 0)
- return parts unless more
- end
+ def connect(address)
+ error_wrapper { @socket.connect address }
end
+ def linger=(value)
+ @linger = value || -1
+ error_wrapper { @socket.setsockopt(::ZMQ::LINGER, value) }
+ end
+
def close
@socket.close
end
protected
- def error_check(rc, source = nil)
- unless ZMQ::Util.resultcode_ok? rc
- raise ZMQ::ZeroMQError.new source, rc, ZMQ::Util.errno, ZMQ::Util.error_string
+ def error_wrapper(source = nil, &block)
+ error = nil
+
+ begin
+ rc = block.call
+ error = "#{::ZMQ::Util.error_string} (#{::ZMQ::Util.errno})" unless ::ZMQ::Util.resultcode_ok?(rc)
+ rescue => e
+ error = e.to_s
end
+
+ raise IOError, error if error
+ true
+ end
+ end
+
+ module ReadableSocket
+ extend Forwardable
+ def_delegator :@socket, :more_parts?
+
+ def bind(address)
+ self.linger = @linger
+ super address
+ end
+
+ def connect(address)
+ self.linger = @linger
+ super address
+ end
+
+ def read(buffer = '')
+ error_wrapper { @socket.recv_string buffer }
+ buffer
+ end
+ end
+
+ module WritableSocket
+ def write(*args)
+ error_wrapper { @socket.send_strings args.flatten }
+ args
+ end
+
+ alias_method :<<, :write
+ end
+
+ class PubSocket < Socket
+ include WritableSocket
+ socket_type :pub
+ end
+
+ class SubSocket < Socket
+ include ReadableSocket
+ socket_type :sub
+
+ def subscribe(topic)
+ error_wrapper { @socket.setsockopt(::ZMQ::SUBSCRIBE, topic) }
+ end
+
+ def unsubscribe(topic)
+ error_wrapper { @socket.setsockopt(::ZMQ::UNSUBSCRIBE, topic) }
+ end
+ end
+
+ class XPubSocket < PubSocket
+ socket_type :xpub
+ end
+
+ class XSubSocket < SubSocket
+ socket_type :xsub
+
+ def subscribe(topic)
+ true
+ end
+
+ def unsubscribe(topic)
true
end
end
end