lib/ezmq.rb in ezmq-0.1.2 vs lib/ezmq.rb in ezmq-0.2.0

- old
+ new

@@ -12,182 +12,212 @@ # @param [Object] type the type of socket to use. # @param [Hash] options optional parameters. # # @option options [ZMQ::Context] context a context to use for this socket # (one will be created if not provided). - # @option options [lambda] encode how to encode messages. Default unaltered. - # @option options [lambda] decode how to decode messages. Default unaltered. - # @option options [String] address specifies protocol, address and port (if - # needed). Default is 'tcp://127.0.0.1:5555' + # @option options [lambda] encode how to encode messages. + # @option options [lambda] decode how to decode messages. + # @option options [String] protocol protocol to use for transport. + # Default: 'tcp' + # @option options [String] address address to use for endpoint. + # Default: '127.0.0.1' + # @note 'localhost' does not always work as expected. Prefer '127.0.0.1' + # @option options [Fixnum] port port to use for endpoint. + # Default: 5555 + # @note `port` is ignored unless protocol is either 'tcp' or 'udp'. # # @return [Socket] a new instance of Socket. # def initialize(mode, type, **options) fail ArgumentError unless [:bind, :connect].include? mode @context = options[:context] || ZMQ::Context.new @socket = @context.socket type @encode = options[:encode] || -> m { m } @decode = options[:decode] || -> m { m } - method(mode).call address: options[:address] || 'tcp://127.0.0.1:5555' + endpoint = options.select { |k, _| [:protocol, :address, :port].include? k } + method(mode).call endpoint end - # Receive a message from the socket. + # Sends a message on the socket. # - # @note This method blocks until a message arrives. + # @note If `message` is not a String, `encode` must convert it to one. # - # @param [lambda] decode how to decode the message. + # @param [String] message the message to send. + # @param [Hash] options optional parameters. + # @option options [lambda] encode how to encode the message. # - # @return [void] the decoded message. + # @return [Fixnum] the size of the message. # - def receive(decode: @decode) - message = '' - @socket.recv_string message - decode.call message + def send(message = '', **options) + encoded = (options[:encode] || @encode).call message + @socket.send_string encoded end - # Sends a message on the socket. + # Receive a message from the socket. # - # @note If `message` is not a String, `encode` must convert it to one. + # @note This method blocks until a message arrives. # - # @param [String] message the message to send. - # @param [lambda] encode how to encode the message. + # @param [Hash] options optional parameters. + # @option options [lambda] decode how to decode the message. # - # @return [Fixnum] the size of the message. + # @yield message passes the message received to the block. + # @yieldparam [Object] message the message received (decoded). # - def send(message = '', encode: @encode) - @socket.send_string encode.call message + # @return [Object] the message received (decoded). + # + def receive(**options) + message = '' + @socket.recv_string message + decoded = (options[:decode] || @decode).call message + if block_given? + yield decoded + else + decoded + end end # Binds the socket to the given address. # + # @param [String] protocol protocol to use for transport. Default: 'tcp' + # @param [String] address address to use for endpoint. Default: '127.0.0.1' # @note 'localhost' does not always work as expected. Prefer '127.0.0.1' + # @param [Fixnum] port port to use for endpoint. Default: 5555 + # @note `port` is ignored unless protocol is either 'tcp' or 'udp'. # - # @param [String] address specifies protocol, address and port (if needed). - # Default is 'tcp://127.0.0.1:5555' - # # @return [Boolean] was binding successful? # - def bind(address: 'tcp://127.0.0.1:5555') - @socket.bind(address) == 0 ? true : false + def bind(protocol: 'tcp', address: '127.0.0.1', port: 5555) + endpoint = "#{ protocol }://#{ address }" + endpoint = "#{ endpoint }:#{ port }" if %w(tcp udp).include? protocol + @socket.bind(endpoint) == 0 end # Connects the socket to the given address. # - # @param [String] address specifies protocol, address and port (if needed). - # Default is 'tcp://127.0.0.1:5555' + # @param [String] protocol protocol to use for transport. Default: 'tcp' + # @param [String] address address to use for endpoint. Default: '127.0.0.1' + # @param [Fixnum] port port to use for endpoint. Default: 5555 + # @note `port` is ignored unless protocol is either 'tcp' or 'udp'. # # @return [Boolean] was connection successful? # - def connect(address: 'tcp://127.0.0.1:5555') - @socket.connect(address) == 0 ? true : false + def connect(protocol: 'tcp', address: '127.0.0.1', port: 5555) + endpoint = "#{ protocol }://#{ address }" + endpoint = "#{ endpoint }:#{ port }" if %w(tcp udp).include? protocol + @socket.connect(endpoint) == 0 end end - # Reply socket that listens for and replies to requests. - class Server < EZMQ::Socket - attr_accessor :provides - - # Creates a new Server socket. + # Request socket that sends messages and receives replies. + class Client < EZMQ::Socket + # Creates a new Client socket. # - # @param [lambda] provides the service provided by this server. - # @param [Hash] options optional parameters + # @param [Hash] options optional parameters. + # @see EZMQ::Socket EZMQ::Socket for optional parameters. # - # @see EZMQ::Socket EZMQ::Socket for a list of optional parameters. + # @return [Client] a new instance of Client. # - # @return [Server] a new instance of Server - # - def initialize(provides: -> m { m }, **options) - @provides = provides - super :bind, ZMQ::REP, options + def initialize(**options) + super :connect, ZMQ::REQ, options end - # By default, waits to receive a message, calls @action with it, replies - # with the result, then loops. + # Sends a message and waits to receive a response. # - # @param [lambda] handler how requests are handled. + # @param [String] message the message to send. + # @param [Hash] options optional parameters. + # @option options [lambda] encode how to encode the message. + # @option options [lambda] decode how to decode the message. # - # @return [void] the return from handler. + # @return [void] the decoded response message. # - def listen(handler: -> { send @provides.call(receive) }) - loop { handler.call } + def request(message = '', **options) + send message, options + if block_given? + yield receive options + else + receive options + end end end - # Request socket that sends messages and receives replies. - class Client < EZMQ::Socket - # Creates a new Client socket. + # Reply socket that listens for and replies to requests. + class Server < EZMQ::Socket + # Creates a new Server socket. # # @param [Hash] options optional parameters # - # @see EZMQ::Socket EZMQ::Socket for a list of optional parameters. + # @see EZMQ::Socket EZMQ::Socket for optional parameters. # - # @return [Client] a new instance of Client. + # @return [Server] a new instance of Server # def initialize(**options) - super :connect, ZMQ::REQ, options + super :bind, ZMQ::REP, options end - # Sends a message and waits to receive a response. + # Listens for a request, and responds to it. # - # @param [String] message the message to send. - # @param [lambda] encode how to encode the message. - # @param [lambda] decode how to decode the message. + # If no block is given, responds with the request message. # - # @return [void] the decoded response message. + # @yield message passes the message received to the block. + # @yieldparam [String] message the message received. + # @yieldreturn [void] the message to reply with. # - def request(message = '', encode: @encode, decode: @decode) - send message, encode: encode - receive decode: decode + # @return [void] the return from handler. + # + def listen + loop do + if block_given? + send yield receive + else + send receive + end + end end end # Publish socket that broadcasts messages with an optional topic. class Publisher < EZMQ::Socket # Creates a new Publisher socket. # - # @param [Hash] options optional parameters + # @param [Hash] options optional parameters. + # @see EZMQ::Socket EZMQ::Socket for optional parameters. # - # @see EZMQ::Socket EZMQ::Socket for a list of optional parameters. - # # @return [Publisher] a new instance of Publisher. # def initialize(**options) super :bind, ZMQ::PUB, options end # Sends a message on the socket, with an optional topic. # # @param [String] message the message to send. # @param [String] topic an optional topic for the message. - # @param [lambda] encode how to encode the message. + # @param [Hash] options optional parameters. + # @option options [lambda] encode how to encode the message. # # @return [Fixnum] the size of the message. # - def send(message = '', topic: '', encode: @encode) - @socket.send_string "#{ topic } #{ encode.call message }" + def send(message = '', topic: '', **options) + @socket.send_string "#{ topic } #{ (options[:encode] || @encode).call message }" end end # Subscribe socket that listens for messages with an optional topic. class Subscriber < EZMQ::Socket attr_accessor :action # Creates a new Subscriber socket. - # + # # @note The default behaviour is to output and messages received to STDOUT. # - # @param [lambda] action the action to perform when a message is received. - # @param [Hash] options optional parameters - # + # @param [Hash] options optional parameters. # @option options [String] topic a topic to subscribe to. + # @see EZMQ::Socket EZMQ::Socket for optional parameters. # - # @see EZMQ::Socket EZMQ::Socket for a list of optional parameters. - # # @return [Publisher] a new instance of Publisher. # - def initialize(action: -> m { puts m }, **options) - @action = action + def initialize(**options) super :connect, ZMQ::SUB, options subscribe options[:topic] if options[:topic] end # Establishes a new message filter on the socket. @@ -200,30 +230,37 @@ # prefix will be accepted. # # @return [Boolean] was subscription successful? # def subscribe(topic) - @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0 ? true : false + @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0 end # Removes a message filter (as set with subscribe) from the socket. # # @param [String] topic the topic to unsubscribe from. If multiple filters # with the same topic are set, this will only remove one. # # @return [Boolean] was unsubscription successful? # def unsubscribe(topic) - @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0 ? true : false + @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0 end - # By default, waits for a message and calls @action with the message. + # By default, waits for a message and prints it to STDOUT. # - # @param [lambda] handler how requests are handled. + # @yield message passes the message received to the block. + # @yieldparam [String] message the message received. # - # @return [void] the return from handler. + # @return [void] # - def listen(handler: -> { @action.call(receive) }) - loop { handler.call } + def listen + loop do + if block_given? + yield receive + else + puts receive + end + end end end end