lib/celluloid/zmq/sockets.rb in celluloid-zmq-0.10.0 vs lib/celluloid/zmq/sockets.rb in celluloid-zmq-0.12.0
- old
+ new
@@ -1,15 +1,14 @@
module Celluloid
module ZMQ
- attr_reader :linger
-
class Socket
# Create a new socket
def initialize(type)
@socket = Celluloid::ZMQ.context.socket ::ZMQ.const_get(type.to_s.upcase)
@linger = 0
end
+ attr_reader :linger
# Connect to the given 0MQ address
# Address should be in the form: tcp://1.2.3.4:5678/
def connect(addr)
unless ::ZMQ::Util.resultcode_ok? @socket.connect addr
@@ -77,31 +76,34 @@
end
# Writable 0MQ sockets have a send method
module WritableSocket
# Send a message to the socket
- def send(message)
- unless ::ZMQ::Util.resultcode_ok? @socket.send_string message
+ def write(*messages)
+ unless ::ZMQ::Util.resultcode_ok? @socket.send_strings messages.flatten
raise IOError, "error sending 0MQ message: #{::ZMQ::Util.error_string}"
end
- message
+ messages
end
- alias_method :<<, :send
+ alias_method :<<, :write
+ alias_method :send, :write # deprecated
end
# ReqSockets are the counterpart of RepSockets (REQ/REP)
class ReqSocket < Socket
include ReadableSocket
+ include WritableSocket
def initialize
super :req
end
end
# RepSockets are the counterpart of ReqSockets (REQ/REP)
class RepSocket < Socket
+ include ReadableSocket
include WritableSocket
def initialize
super :rep
end
@@ -138,9 +140,21 @@
class SubSocket < Socket
include ReadableSocket
def initialize
super :sub
+ end
+
+ def subscribe(topic)
+ unless ::ZMQ::Util.resultcode_ok? @socket.setsockopt(::ZMQ::SUBSCRIBE, topic)
+ raise IOError, "couldn't set subscribe: #{::ZMQ::Util.error_string}"
+ end
+ end
+
+ def unsubscribe(topic)
+ unless ::ZMQ::Util.resultcode_ok? @socket.setsockopt(::ZMQ::UNSUBSCRIBE, topic)
+ raise IOError, "couldn't set unsubscribe: #{::ZMQ::Util.error_string}"
+ end
end
end
end
end