lib/rflow/connections/zmq_connection.rb in rflow-1.0.0a2 vs lib/rflow/connections/zmq_connection.rb in rflow-1.0.0a3

- old
+ new

@@ -1,17 +1,23 @@ -require 'em-zeromq' +begin + require 'em-zeromq' +rescue Exception => e + raise LoadError, 'Error loading ZeroMQ; perhaps the wrong system library version is installed?' +end require 'rflow/connection' require 'rflow/message' +require 'rflow/broker' class RFlow module Connections class ZMQConnection < RFlow::Connection class << self attr_accessor :zmq_context def create_zmq_context - RFlow.logger.debug "Creating a new ZeroMQ context" + version = LibZMQ::version + RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" } if EM.reactor_running? raise RuntimeError, "EventMachine reactor is running when attempting to create a ZeroMQ context" end EM::ZeroMQ::Context.new(1) end @@ -20,11 +26,11 @@ def zmq_context @zmq_context ||= create_zmq_context end end - def zmq_context; self.class.zmq_context; end + def zmq_context; ZMQConnection.zmq_context; end private attr_accessor :input_socket, :output_socket public @@ -90,9 +96,46 @@ unless missing_options.empty? raise ArgumentError, "#{self.class.to_s}: configuration missing options: #{missing_options.join ', '}" end true + end + end + + class BrokeredZMQConnection < ZMQConnection + end + + # The broker process responsible for shuttling messages back and forth on a + # many-to-many pipeline link. (Solutions without a broker only allow a + # 1-to-many or many-to-1 connection.) + class ZMQStreamer < Broker + private + attr_reader :connection, :context, :back, :front + + public + def initialize(config) + @connection = config.connection + super("broker-#{connection.name}", 'Broker') + end + + def run_process + version = LibZMQ::version + RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" } + @context = ZMQ::Context.new + RFlow.logger.debug { "Connecting message broker to route from #{connection.options['output_address']} to #{connection.options['input_address']}" } + @back = context.socket(ZMQ::PULL) + back.bind(connection.options['output_address']) + @front = context.socket(ZMQ::PUSH) + front.bind(connection.options['input_address']) + ZMQ::Proxy.new(back, front) + back.close + front.close + rescue Exception => e + RFlow.logger.error "Error running message broker: #{e.class}: #{e.message}, because: #{e.backtrace.inspect}" + ensure + back.close if back + front.close if front + context.terminate if context end end end end