lib/rflow/connections/zmq_connection.rb in rflow-1.3.0 vs lib/rflow/connections/zmq_connection.rb in rflow-1.3.1
- old
+ new
@@ -7,30 +7,36 @@
require 'rflow/message'
require 'rflow/broker'
require 'sys/filesystem'
class RFlow
+ # Contains all connections classes.
module Connections
+ # Represents a ZeroMQ connection.
class ZMQConnection < RFlow::Connection
class << self
+ # The ZeroMQ context object.
+ # @return [EM::ZeroMQ::Context]
attr_accessor :zmq_context
+ # @!visibility private
def create_zmq_context
version = LibZMQ::version
RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
if EM.reactor_running?
raise RuntimeError, 'EventMachine reactor is running when attempting to create a ZeroMQ context'
end
EM::ZeroMQ::Context.new(1)
end
- # Returns the current ZeroMQ context object or creates it if it does not exist.
def zmq_context
@zmq_context ||= create_zmq_context
end
end
+ # The ZeroMQ context object.
+ # @return [EM::ZeroMQ::Context]
def zmq_context; ZMQConnection.zmq_context; end
private
attr_accessor :input_socket, :output_socket
@@ -39,10 +45,12 @@
super
validate_options!
zmq_context # cause the ZMQ context to be created before the reactor is running
end
+ # Hook up the input to the real ZeroMQ sockets.
+ # @return [void]
def connect_input!
RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}"
check_address(options['input_address'])
self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type']))
@@ -63,19 +71,23 @@
end
input_socket
end
+ # Hook up the output to the real ZeroMQ sockets.
+ # @return [void]
def connect_output!
RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}"
check_address(options['output_address'])
self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type']))
output_socket.send(options['output_responsibility'].to_sym, options['output_address'].to_s)
output_socket
end
+ # Send a message along the connection into ZeroMQ.
+ # @return [void]
def send_message(message)
RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'"
begin
output_socket.send_msg(message.data_type_name.to_s, message.to_avro)
@@ -121,10 +133,13 @@
end
end
end
end
+ # Subclass of {ZMQConnection} representing a brokered ZeroMQ connection
+ # (one where messages are sent to a separate process performing the
+ # many-to-many brokering function).
class BrokeredZMQConnection < ZMQConnection
end
# The broker process responsible for shuttling messages back and forth on a
# many-to-many pipeline link. (Solutions without a broker only allow a
@@ -137,9 +152,11 @@
def initialize(config)
@connection = config.connection
super("broker-#{connection.name}", 'Broker')
end
+ # Start the broker process. Returns when things are shutting down.
+ # @return [void]
def run_process
version = LibZMQ::version
RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
@context = ZMQ::Context.new
RFlow.logger.debug { "Connecting message broker to route from #{connection.options['output_address']} to #{connection.options['input_address']}" }