lib/ezmq.rb in ezmq-0.3.0 vs lib/ezmq.rb in ezmq-0.3.1
- old
+ new
@@ -1,293 +1,8 @@
require 'ffi-rzmq'
-
-# Syntactic sugar for 0MQ, because Ruby shouldn't feel like C.
-module EZMQ
- # Wrapper class to simplify 0MQ sockets.
- class Socket
- attr_accessor :context, :socket, :encode, :decode
-
- # Creates a 0MQ socket.
- #
- # @param [:bind, :connect] mode the mode of the socket.
- # @param [Object] type the type of socket to use.
- # @param [Hash] options optional parameters.
- #
- # @option options [ZMQ::Context] context a context to use for this socket
- # (one will be created if not provided).
- # @option options [lambda] encode how to encode messages.
- # @option options [lambda] decode how to decode messages.
- # @option options [String] protocol ('tcp') protocol for transport.
- # @option options [String] address ('127.0.0.1') address for endpoint.
- # @option options [Fixnum] port (5555) port for endpoint.
- # @note port is ignored unless protocol is either 'tcp' or 'udp'.
- #
- # @return [Socket] a new instance of Socket.
- #
- def initialize(mode, type, **options)
- fail ArgumentError unless [:bind, :connect].include? mode
- @context = options[:context] || ZMQ::Context.new
- @socket = @context.socket type
- @encode = options[:encode] || -> m { m }
- @decode = options[:decode] || -> m { m }
- endpoint = options.select { |k, _| [:protocol, :address, :port].include? k }
- method(mode).call endpoint
- end
-
- # Sends a message to the socket.
- #
- # @note If message is not a String, #encode must convert it to one.
- #
- # @param [String] message the message to send.
- # @param [Hash] options optional parameters.
- # @option options [lambda] encode how to encode the message.
- #
- # @return [Fixnum] the size of the message.
- #
- def send(message = '', **options)
- encoded = (options[:encode] || @encode).call message
- @socket.send_string encoded
- end
-
- # Receive a message from the socket.
- #
- # @note This method blocks until a message arrives.
- #
- # @param [Hash] options optional parameters.
- # @option options [lambda] decode how to decode the message.
- #
- # @yield message passes the message received to the block.
- # @yieldparam [Object] message the message received (decoded).
- #
- # @return [Object] the message received (decoded).
- #
- def receive(**options)
- message = ''
- @socket.recv_string message
- decoded = (options[:decode] || @decode).call message
- if block_given?
- yield decoded
- else
- decoded
- end
- end
-
- # Binds the socket to the given address.
- #
- # @param [String] protocol ('tcp') protocol for transport.
- # @param [String] address ('127.0.0.1') address for endpoint.
- # @note An address of 'localhost' is not reliable on all platforms.
- # Prefer '127.0.0.1' instead.
- # @param [Fixnum] port (5555) port for endpoint.
- # @note port is ignored unless protocol is either 'tcp' or 'udp'.
- #
- # @return [Boolean] was binding successful?
- #
- def bind(protocol: 'tcp', address: '127.0.0.1', port: 5555)
- endpoint = "#{ protocol }://#{ address }"
- endpoint = "#{ endpoint }:#{ port }" if %w(tcp udp).include? protocol
- @socket.bind(endpoint) == 0
- end
-
- # Connects the socket to the given address.
- #
- # @param [String] protocol ('tcp') protocol for transport.
- # @param [String] address ('127.0.0.1') address for endpoint.
- # @param [Fixnum] port (5555) port for endpoint.
- # @note port is ignored unless protocol is either 'tcp' or 'udp'.
- #
- # @return [Boolean] was connection successful?
- #
- def connect(protocol: 'tcp', address: '127.0.0.1', port: 5555)
- endpoint = "#{ protocol }://#{ address }"
- endpoint = "#{ endpoint }:#{ port }" if %w(tcp udp).include? protocol
- @socket.connect(endpoint) == 0
- end
-
- # By default, waits for a message and prints it to STDOUT.
- #
- # @yield message passes the message received to the block.
- # @yieldparam [String] message the message received.
- #
- # @return [void]
- #
- def listen
- loop do
- if block_given?
- yield receive
- else
- puts receive
- end
- end
- end
- end
-
- # Request socket that sends messages and receives replies.
- class Client < EZMQ::Socket
- # Creates a new Client socket.
- #
- # @param [Hash] options optional parameters.
- # @see EZMQ::Socket EZMQ::Socket for optional parameters.
- #
- # @return [Client] a new instance of Client.
- #
- def initialize(**options)
- super :connect, ZMQ::REQ, options
- end
-
- # Sends a message and waits to receive a response.
- #
- # @param [String] message the message to send.
- # @param [Hash] options optional parameters.
- # @option options [lambda] encode how to encode the message.
- # @option options [lambda] decode how to decode the message.
- #
- # @return [void] the decoded response message.
- #
- def request(message = '', **options)
- send message, options
- if block_given?
- yield receive options
- else
- receive options
- end
- end
- end
-
- # Reply socket that listens for and replies to requests.
- class Server < EZMQ::Socket
- # Creates a new Server socket.
- #
- # @param [Hash] options optional parameters
- #
- # @see EZMQ::Socket EZMQ::Socket for optional parameters.
- #
- # @return [Server] a new instance of Server
- #
- def initialize(**options)
- super :bind, ZMQ::REP, options
- end
-
- # Listens for a request, and responds to it.
- #
- # If no block is given, responds with the request message.
- #
- # @yield message passes the message received to the block.
- # @yieldparam [String] message the message received.
- # @yieldreturn [void] the message to reply with.
- #
- # @return [void] the return from handler.
- #
- def listen
- loop do
- if block_given?
- send yield receive
- else
- send receive
- end
- end
- end
- end
-
- # Publish socket that broadcasts messages with an optional topic.
- class Publisher < EZMQ::Socket
- # Creates a new Publisher socket.
- #
- # @param [Hash] options optional parameters.
- # @see EZMQ::Socket EZMQ::Socket for optional parameters.
- #
- # @return [Publisher] a new instance of Publisher.
- #
- def initialize(**options)
- super :bind, ZMQ::PUB, options
- end
-
- # Sends a message on the socket, with an optional topic.
- #
- # @param [String] message the message to send.
- # @param [String] topic an optional topic for the message.
- # @param [Hash] options optional parameters.
- # @option options [lambda] encode how to encode the message.
- #
- # @return [Fixnum] the size of the message.
- #
- def send(message = '', topic: '', **options)
- @socket.send_string "#{ topic } #{ (options[:encode] || @encode).call message }"
- end
- end
-
- # Subscribe socket that listens for messages with an optional topic.
- class Subscriber < EZMQ::Socket
- attr_accessor :action
-
- # Creates a new Subscriber socket.
- #
- # @note The default behaviour is to output and messages received to STDOUT.
- #
- # @param [Hash] options optional parameters.
- # @option options [String] topic a topic to subscribe to.
- # @see EZMQ::Socket EZMQ::Socket for optional parameters.
- #
- # @return [Publisher] a new instance of Publisher.
- #
- def initialize(**options)
- super :connect, ZMQ::SUB, options
- subscribe options[:topic] if options[:topic]
- end
-
- # Establishes a new message filter on the socket.
- #
- # @note By default, a Subscriber filters all incoming messages. Without
- # calling subscribe at least once, no messages will be accepted. If topic
- # was provided, #initialize calls #subscribe automatically.
- #
- # @param [String] topic a topic to subscribe to. Messages matching this
- # prefix will be accepted.
- #
- # @return [Boolean] was subscription successful?
- #
- def subscribe(topic)
- @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0
- end
-
- # Removes a message filter (as set with subscribe) from the socket.
- #
- # @param [String] topic the topic to unsubscribe from. If multiple filters
- # with the same topic are set, this will only remove one.
- #
- # @return [Boolean] was unsubscription successful?
- #
- def unsubscribe(topic)
- @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0
- end
- end
-
- # Push socket that sends messages but does not receive them.
- class Pusher < EZMQ::Socket
- # Creates a new Pusher socket.
- #
- # @param [:bind, :connect] mode a mode for the socket.
- # @param [Hash] options optional parameters.
- # @see EZMQ::Socket EZMQ::Socket for optional parameters.
- #
- # @return [Pusher] a new instance of Pusher.
- #
- def initialize(mode = :connect, **options)
- super mode, ZMQ::PUSH, options
- end
- end
-
- # Pull socket that receives messages but does not send them.
- class Puller < EZMQ::Socket
- # Creates a new Puller socket.
- #
- # @param [:bind, :connect] mode a mode for the socket.
- # @param [Hash] options optional parameters.
- # @see EZMQ::Socket EZMQ::Socket for optional parameters.
- #
- # @return [Puller] a new instance of Puller.
- #
- def initialize(mode = :bind, **options)
- super mode, ZMQ::PULL, options
- end
- end
-end
+require_relative 'ezmq/socket'
+require_relative 'ezmq/request'
+require_relative 'ezmq/reply'
+require_relative 'ezmq/publish'
+require_relative 'ezmq/subscribe'
+require_relative 'ezmq/push'
+require_relative 'ezmq/pull'