lib/ezmq.rb in ezmq-0.3.0 vs lib/ezmq.rb in ezmq-0.3.1

- old
+ new

@@ -1,293 +1,8 @@ require 'ffi-rzmq' - -# Syntactic sugar for 0MQ, because Ruby shouldn't feel like C. -module EZMQ - # Wrapper class to simplify 0MQ sockets. - class Socket - attr_accessor :context, :socket, :encode, :decode - - # Creates a 0MQ socket. - # - # @param [:bind, :connect] mode the mode of the socket. - # @param [Object] type the type of socket to use. - # @param [Hash] options optional parameters. - # - # @option options [ZMQ::Context] context a context to use for this socket - # (one will be created if not provided). - # @option options [lambda] encode how to encode messages. - # @option options [lambda] decode how to decode messages. - # @option options [String] protocol ('tcp') protocol for transport. - # @option options [String] address ('127.0.0.1') address for endpoint. - # @option options [Fixnum] port (5555) port for endpoint. - # @note port is ignored unless protocol is either 'tcp' or 'udp'. - # - # @return [Socket] a new instance of Socket. - # - def initialize(mode, type, **options) - fail ArgumentError unless [:bind, :connect].include? mode - @context = options[:context] || ZMQ::Context.new - @socket = @context.socket type - @encode = options[:encode] || -> m { m } - @decode = options[:decode] || -> m { m } - endpoint = options.select { |k, _| [:protocol, :address, :port].include? k } - method(mode).call endpoint - end - - # Sends a message to the socket. - # - # @note If message is not a String, #encode must convert it to one. - # - # @param [String] message the message to send. - # @param [Hash] options optional parameters. - # @option options [lambda] encode how to encode the message. - # - # @return [Fixnum] the size of the message. - # - def send(message = '', **options) - encoded = (options[:encode] || @encode).call message - @socket.send_string encoded - end - - # Receive a message from the socket. - # - # @note This method blocks until a message arrives. - # - # @param [Hash] options optional parameters. - # @option options [lambda] decode how to decode the message. - # - # @yield message passes the message received to the block. - # @yieldparam [Object] message the message received (decoded). - # - # @return [Object] the message received (decoded). - # - def receive(**options) - message = '' - @socket.recv_string message - decoded = (options[:decode] || @decode).call message - if block_given? - yield decoded - else - decoded - end - end - - # Binds the socket to the given address. - # - # @param [String] protocol ('tcp') protocol for transport. - # @param [String] address ('127.0.0.1') address for endpoint. - # @note An address of 'localhost' is not reliable on all platforms. - # Prefer '127.0.0.1' instead. - # @param [Fixnum] port (5555) port for endpoint. - # @note port is ignored unless protocol is either 'tcp' or 'udp'. - # - # @return [Boolean] was binding successful? - # - def bind(protocol: 'tcp', address: '127.0.0.1', port: 5555) - endpoint = "#{ protocol }://#{ address }" - endpoint = "#{ endpoint }:#{ port }" if %w(tcp udp).include? protocol - @socket.bind(endpoint) == 0 - end - - # Connects the socket to the given address. - # - # @param [String] protocol ('tcp') protocol for transport. - # @param [String] address ('127.0.0.1') address for endpoint. - # @param [Fixnum] port (5555) port for endpoint. - # @note port is ignored unless protocol is either 'tcp' or 'udp'. - # - # @return [Boolean] was connection successful? - # - def connect(protocol: 'tcp', address: '127.0.0.1', port: 5555) - endpoint = "#{ protocol }://#{ address }" - endpoint = "#{ endpoint }:#{ port }" if %w(tcp udp).include? protocol - @socket.connect(endpoint) == 0 - end - - # By default, waits for a message and prints it to STDOUT. - # - # @yield message passes the message received to the block. - # @yieldparam [String] message the message received. - # - # @return [void] - # - def listen - loop do - if block_given? - yield receive - else - puts receive - end - end - end - end - - # Request socket that sends messages and receives replies. - class Client < EZMQ::Socket - # Creates a new Client socket. - # - # @param [Hash] options optional parameters. - # @see EZMQ::Socket EZMQ::Socket for optional parameters. - # - # @return [Client] a new instance of Client. - # - def initialize(**options) - super :connect, ZMQ::REQ, options - end - - # Sends a message and waits to receive a response. - # - # @param [String] message the message to send. - # @param [Hash] options optional parameters. - # @option options [lambda] encode how to encode the message. - # @option options [lambda] decode how to decode the message. - # - # @return [void] the decoded response message. - # - def request(message = '', **options) - send message, options - if block_given? - yield receive options - else - receive options - end - end - end - - # Reply socket that listens for and replies to requests. - class Server < EZMQ::Socket - # Creates a new Server socket. - # - # @param [Hash] options optional parameters - # - # @see EZMQ::Socket EZMQ::Socket for optional parameters. - # - # @return [Server] a new instance of Server - # - def initialize(**options) - super :bind, ZMQ::REP, options - end - - # Listens for a request, and responds to it. - # - # If no block is given, responds with the request message. - # - # @yield message passes the message received to the block. - # @yieldparam [String] message the message received. - # @yieldreturn [void] the message to reply with. - # - # @return [void] the return from handler. - # - def listen - loop do - if block_given? - send yield receive - else - send receive - end - end - end - end - - # Publish socket that broadcasts messages with an optional topic. - class Publisher < EZMQ::Socket - # Creates a new Publisher socket. - # - # @param [Hash] options optional parameters. - # @see EZMQ::Socket EZMQ::Socket for optional parameters. - # - # @return [Publisher] a new instance of Publisher. - # - def initialize(**options) - super :bind, ZMQ::PUB, options - end - - # Sends a message on the socket, with an optional topic. - # - # @param [String] message the message to send. - # @param [String] topic an optional topic for the message. - # @param [Hash] options optional parameters. - # @option options [lambda] encode how to encode the message. - # - # @return [Fixnum] the size of the message. - # - def send(message = '', topic: '', **options) - @socket.send_string "#{ topic } #{ (options[:encode] || @encode).call message }" - end - end - - # Subscribe socket that listens for messages with an optional topic. - class Subscriber < EZMQ::Socket - attr_accessor :action - - # Creates a new Subscriber socket. - # - # @note The default behaviour is to output and messages received to STDOUT. - # - # @param [Hash] options optional parameters. - # @option options [String] topic a topic to subscribe to. - # @see EZMQ::Socket EZMQ::Socket for optional parameters. - # - # @return [Publisher] a new instance of Publisher. - # - def initialize(**options) - super :connect, ZMQ::SUB, options - subscribe options[:topic] if options[:topic] - end - - # Establishes a new message filter on the socket. - # - # @note By default, a Subscriber filters all incoming messages. Without - # calling subscribe at least once, no messages will be accepted. If topic - # was provided, #initialize calls #subscribe automatically. - # - # @param [String] topic a topic to subscribe to. Messages matching this - # prefix will be accepted. - # - # @return [Boolean] was subscription successful? - # - def subscribe(topic) - @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0 - end - - # Removes a message filter (as set with subscribe) from the socket. - # - # @param [String] topic the topic to unsubscribe from. If multiple filters - # with the same topic are set, this will only remove one. - # - # @return [Boolean] was unsubscription successful? - # - def unsubscribe(topic) - @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0 - end - end - - # Push socket that sends messages but does not receive them. - class Pusher < EZMQ::Socket - # Creates a new Pusher socket. - # - # @param [:bind, :connect] mode a mode for the socket. - # @param [Hash] options optional parameters. - # @see EZMQ::Socket EZMQ::Socket for optional parameters. - # - # @return [Pusher] a new instance of Pusher. - # - def initialize(mode = :connect, **options) - super mode, ZMQ::PUSH, options - end - end - - # Pull socket that receives messages but does not send them. - class Puller < EZMQ::Socket - # Creates a new Puller socket. - # - # @param [:bind, :connect] mode a mode for the socket. - # @param [Hash] options optional parameters. - # @see EZMQ::Socket EZMQ::Socket for optional parameters. - # - # @return [Puller] a new instance of Puller. - # - def initialize(mode = :bind, **options) - super mode, ZMQ::PULL, options - end - end -end +require_relative 'ezmq/socket' +require_relative 'ezmq/request' +require_relative 'ezmq/reply' +require_relative 'ezmq/publish' +require_relative 'ezmq/subscribe' +require_relative 'ezmq/push' +require_relative 'ezmq/pull'