lib/rflow/connections/zmq_connection.rb in rflow-1.3.0 vs lib/rflow/connections/zmq_connection.rb in rflow-1.3.1

- old
+ new

@@ -7,30 +7,36 @@ require 'rflow/message' require 'rflow/broker' require 'sys/filesystem' class RFlow + # Contains all connections classes. module Connections + # Represents a ZeroMQ connection. class ZMQConnection < RFlow::Connection class << self + # The ZeroMQ context object. + # @return [EM::ZeroMQ::Context] attr_accessor :zmq_context + # @!visibility private def create_zmq_context version = LibZMQ::version RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" } if EM.reactor_running? raise RuntimeError, 'EventMachine reactor is running when attempting to create a ZeroMQ context' end EM::ZeroMQ::Context.new(1) end - # Returns the current ZeroMQ context object or creates it if it does not exist. def zmq_context @zmq_context ||= create_zmq_context end end + # The ZeroMQ context object. + # @return [EM::ZeroMQ::Context] def zmq_context; ZMQConnection.zmq_context; end private attr_accessor :input_socket, :output_socket @@ -39,10 +45,12 @@ super validate_options! zmq_context # cause the ZMQ context to be created before the reactor is running end + # Hook up the input to the real ZeroMQ sockets. + # @return [void] def connect_input! RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}" check_address(options['input_address']) self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type'])) @@ -63,19 +71,23 @@ end input_socket end + # Hook up the output to the real ZeroMQ sockets. + # @return [void] def connect_output! RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}" check_address(options['output_address']) self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type'])) output_socket.send(options['output_responsibility'].to_sym, options['output_address'].to_s) output_socket end + # Send a message along the connection into ZeroMQ. + # @return [void] def send_message(message) RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'" begin output_socket.send_msg(message.data_type_name.to_s, message.to_avro) @@ -121,10 +133,13 @@ end end end end + # Subclass of {ZMQConnection} representing a brokered ZeroMQ connection + # (one where messages are sent to a separate process performing the + # many-to-many brokering function). class BrokeredZMQConnection < ZMQConnection end # The broker process responsible for shuttling messages back and forth on a # many-to-many pipeline link. (Solutions without a broker only allow a @@ -137,9 +152,11 @@ def initialize(config) @connection = config.connection super("broker-#{connection.name}", 'Broker') end + # Start the broker process. Returns when things are shutting down. + # @return [void] def run_process version = LibZMQ::version RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" } @context = ZMQ::Context.new RFlow.logger.debug { "Connecting message broker to route from #{connection.options['output_address']} to #{connection.options['input_address']}" }