lib/ezmq.rb in ezmq-0.1.2 vs lib/ezmq.rb in ezmq-0.2.0
- old
+ new
@@ -12,182 +12,212 @@
# @param [Object] type the type of socket to use.
# @param [Hash] options optional parameters.
#
# @option options [ZMQ::Context] context a context to use for this socket
# (one will be created if not provided).
- # @option options [lambda] encode how to encode messages. Default unaltered.
- # @option options [lambda] decode how to decode messages. Default unaltered.
- # @option options [String] address specifies protocol, address and port (if
- # needed). Default is 'tcp://127.0.0.1:5555'
+ # @option options [lambda] encode how to encode messages.
+ # @option options [lambda] decode how to decode messages.
+ # @option options [String] protocol protocol to use for transport.
+ # Default: 'tcp'
+ # @option options [String] address address to use for endpoint.
+ # Default: '127.0.0.1'
+ # @note 'localhost' does not always work as expected. Prefer '127.0.0.1'
+ # @option options [Fixnum] port port to use for endpoint.
+ # Default: 5555
+ # @note `port` is ignored unless protocol is either 'tcp' or 'udp'.
#
# @return [Socket] a new instance of Socket.
#
def initialize(mode, type, **options)
fail ArgumentError unless [:bind, :connect].include? mode
@context = options[:context] || ZMQ::Context.new
@socket = @context.socket type
@encode = options[:encode] || -> m { m }
@decode = options[:decode] || -> m { m }
- method(mode).call address: options[:address] || 'tcp://127.0.0.1:5555'
+ endpoint = options.select { |k, _| [:protocol, :address, :port].include? k }
+ method(mode).call endpoint
end
- # Receive a message from the socket.
+ # Sends a message on the socket.
#
- # @note This method blocks until a message arrives.
+ # @note If `message` is not a String, `encode` must convert it to one.
#
- # @param [lambda] decode how to decode the message.
+ # @param [String] message the message to send.
+ # @param [Hash] options optional parameters.
+ # @option options [lambda] encode how to encode the message.
#
- # @return [void] the decoded message.
+ # @return [Fixnum] the size of the message.
#
- def receive(decode: @decode)
- message = ''
- @socket.recv_string message
- decode.call message
+ def send(message = '', **options)
+ encoded = (options[:encode] || @encode).call message
+ @socket.send_string encoded
end
- # Sends a message on the socket.
+ # Receive a message from the socket.
#
- # @note If `message` is not a String, `encode` must convert it to one.
+ # @note This method blocks until a message arrives.
#
- # @param [String] message the message to send.
- # @param [lambda] encode how to encode the message.
+ # @param [Hash] options optional parameters.
+ # @option options [lambda] decode how to decode the message.
#
- # @return [Fixnum] the size of the message.
+ # @yield message passes the message received to the block.
+ # @yieldparam [Object] message the message received (decoded).
#
- def send(message = '', encode: @encode)
- @socket.send_string encode.call message
+ # @return [Object] the message received (decoded).
+ #
+ def receive(**options)
+ message = ''
+ @socket.recv_string message
+ decoded = (options[:decode] || @decode).call message
+ if block_given?
+ yield decoded
+ else
+ decoded
+ end
end
# Binds the socket to the given address.
#
+ # @param [String] protocol protocol to use for transport. Default: 'tcp'
+ # @param [String] address address to use for endpoint. Default: '127.0.0.1'
# @note 'localhost' does not always work as expected. Prefer '127.0.0.1'
+ # @param [Fixnum] port port to use for endpoint. Default: 5555
+ # @note `port` is ignored unless protocol is either 'tcp' or 'udp'.
#
- # @param [String] address specifies protocol, address and port (if needed).
- # Default is 'tcp://127.0.0.1:5555'
- #
# @return [Boolean] was binding successful?
#
- def bind(address: 'tcp://127.0.0.1:5555')
- @socket.bind(address) == 0 ? true : false
+ def bind(protocol: 'tcp', address: '127.0.0.1', port: 5555)
+ endpoint = "#{ protocol }://#{ address }"
+ endpoint = "#{ endpoint }:#{ port }" if %w(tcp udp).include? protocol
+ @socket.bind(endpoint) == 0
end
# Connects the socket to the given address.
#
- # @param [String] address specifies protocol, address and port (if needed).
- # Default is 'tcp://127.0.0.1:5555'
+ # @param [String] protocol protocol to use for transport. Default: 'tcp'
+ # @param [String] address address to use for endpoint. Default: '127.0.0.1'
+ # @param [Fixnum] port port to use for endpoint. Default: 5555
+ # @note `port` is ignored unless protocol is either 'tcp' or 'udp'.
#
# @return [Boolean] was connection successful?
#
- def connect(address: 'tcp://127.0.0.1:5555')
- @socket.connect(address) == 0 ? true : false
+ def connect(protocol: 'tcp', address: '127.0.0.1', port: 5555)
+ endpoint = "#{ protocol }://#{ address }"
+ endpoint = "#{ endpoint }:#{ port }" if %w(tcp udp).include? protocol
+ @socket.connect(endpoint) == 0
end
end
- # Reply socket that listens for and replies to requests.
- class Server < EZMQ::Socket
- attr_accessor :provides
-
- # Creates a new Server socket.
+ # Request socket that sends messages and receives replies.
+ class Client < EZMQ::Socket
+ # Creates a new Client socket.
#
- # @param [lambda] provides the service provided by this server.
- # @param [Hash] options optional parameters
+ # @param [Hash] options optional parameters.
+ # @see EZMQ::Socket EZMQ::Socket for optional parameters.
#
- # @see EZMQ::Socket EZMQ::Socket for a list of optional parameters.
+ # @return [Client] a new instance of Client.
#
- # @return [Server] a new instance of Server
- #
- def initialize(provides: -> m { m }, **options)
- @provides = provides
- super :bind, ZMQ::REP, options
+ def initialize(**options)
+ super :connect, ZMQ::REQ, options
end
- # By default, waits to receive a message, calls @action with it, replies
- # with the result, then loops.
+ # Sends a message and waits to receive a response.
#
- # @param [lambda] handler how requests are handled.
+ # @param [String] message the message to send.
+ # @param [Hash] options optional parameters.
+ # @option options [lambda] encode how to encode the message.
+ # @option options [lambda] decode how to decode the message.
#
- # @return [void] the return from handler.
+ # @return [void] the decoded response message.
#
- def listen(handler: -> { send @provides.call(receive) })
- loop { handler.call }
+ def request(message = '', **options)
+ send message, options
+ if block_given?
+ yield receive options
+ else
+ receive options
+ end
end
end
- # Request socket that sends messages and receives replies.
- class Client < EZMQ::Socket
- # Creates a new Client socket.
+ # Reply socket that listens for and replies to requests.
+ class Server < EZMQ::Socket
+ # Creates a new Server socket.
#
# @param [Hash] options optional parameters
#
- # @see EZMQ::Socket EZMQ::Socket for a list of optional parameters.
+ # @see EZMQ::Socket EZMQ::Socket for optional parameters.
#
- # @return [Client] a new instance of Client.
+ # @return [Server] a new instance of Server
#
def initialize(**options)
- super :connect, ZMQ::REQ, options
+ super :bind, ZMQ::REP, options
end
- # Sends a message and waits to receive a response.
+ # Listens for a request, and responds to it.
#
- # @param [String] message the message to send.
- # @param [lambda] encode how to encode the message.
- # @param [lambda] decode how to decode the message.
+ # If no block is given, responds with the request message.
#
- # @return [void] the decoded response message.
+ # @yield message passes the message received to the block.
+ # @yieldparam [String] message the message received.
+ # @yieldreturn [void] the message to reply with.
#
- def request(message = '', encode: @encode, decode: @decode)
- send message, encode: encode
- receive decode: decode
+ # @return [void] the return from handler.
+ #
+ def listen
+ loop do
+ if block_given?
+ send yield receive
+ else
+ send receive
+ end
+ end
end
end
# Publish socket that broadcasts messages with an optional topic.
class Publisher < EZMQ::Socket
# Creates a new Publisher socket.
#
- # @param [Hash] options optional parameters
+ # @param [Hash] options optional parameters.
+ # @see EZMQ::Socket EZMQ::Socket for optional parameters.
#
- # @see EZMQ::Socket EZMQ::Socket for a list of optional parameters.
- #
# @return [Publisher] a new instance of Publisher.
#
def initialize(**options)
super :bind, ZMQ::PUB, options
end
# Sends a message on the socket, with an optional topic.
#
# @param [String] message the message to send.
# @param [String] topic an optional topic for the message.
- # @param [lambda] encode how to encode the message.
+ # @param [Hash] options optional parameters.
+ # @option options [lambda] encode how to encode the message.
#
# @return [Fixnum] the size of the message.
#
- def send(message = '', topic: '', encode: @encode)
- @socket.send_string "#{ topic } #{ encode.call message }"
+ def send(message = '', topic: '', **options)
+ @socket.send_string "#{ topic } #{ (options[:encode] || @encode).call message }"
end
end
# Subscribe socket that listens for messages with an optional topic.
class Subscriber < EZMQ::Socket
attr_accessor :action
# Creates a new Subscriber socket.
- #
+ #
# @note The default behaviour is to output and messages received to STDOUT.
#
- # @param [lambda] action the action to perform when a message is received.
- # @param [Hash] options optional parameters
- #
+ # @param [Hash] options optional parameters.
# @option options [String] topic a topic to subscribe to.
+ # @see EZMQ::Socket EZMQ::Socket for optional parameters.
#
- # @see EZMQ::Socket EZMQ::Socket for a list of optional parameters.
- #
# @return [Publisher] a new instance of Publisher.
#
- def initialize(action: -> m { puts m }, **options)
- @action = action
+ def initialize(**options)
super :connect, ZMQ::SUB, options
subscribe options[:topic] if options[:topic]
end
# Establishes a new message filter on the socket.
@@ -200,30 +230,37 @@
# prefix will be accepted.
#
# @return [Boolean] was subscription successful?
#
def subscribe(topic)
- @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0 ? true : false
+ @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0
end
# Removes a message filter (as set with subscribe) from the socket.
#
# @param [String] topic the topic to unsubscribe from. If multiple filters
# with the same topic are set, this will only remove one.
#
# @return [Boolean] was unsubscription successful?
#
def unsubscribe(topic)
- @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0 ? true : false
+ @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0
end
- # By default, waits for a message and calls @action with the message.
+ # By default, waits for a message and prints it to STDOUT.
#
- # @param [lambda] handler how requests are handled.
+ # @yield message passes the message received to the block.
+ # @yieldparam [String] message the message received.
#
- # @return [void] the return from handler.
+ # @return [void]
#
- def listen(handler: -> { @action.call(receive) })
- loop { handler.call }
+ def listen
+ loop do
+ if block_given?
+ yield receive
+ else
+ puts receive
+ end
+ end
end
end
end