lib/rflow/connections/zmq_connection.rb in rflow-1.0.0a2 vs lib/rflow/connections/zmq_connection.rb in rflow-1.0.0a3
- old
+ new
@@ -1,17 +1,23 @@
-require 'em-zeromq'
+begin
+ require 'em-zeromq'
+rescue Exception => e
+ raise LoadError, 'Error loading ZeroMQ; perhaps the wrong system library version is installed?'
+end
require 'rflow/connection'
require 'rflow/message'
+require 'rflow/broker'
class RFlow
module Connections
class ZMQConnection < RFlow::Connection
class << self
attr_accessor :zmq_context
def create_zmq_context
- RFlow.logger.debug "Creating a new ZeroMQ context"
+ version = LibZMQ::version
+ RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
if EM.reactor_running?
raise RuntimeError, "EventMachine reactor is running when attempting to create a ZeroMQ context"
end
EM::ZeroMQ::Context.new(1)
end
@@ -20,11 +26,11 @@
def zmq_context
@zmq_context ||= create_zmq_context
end
end
- def zmq_context; self.class.zmq_context; end
+ def zmq_context; ZMQConnection.zmq_context; end
private
attr_accessor :input_socket, :output_socket
public
@@ -90,9 +96,46 @@
unless missing_options.empty?
raise ArgumentError, "#{self.class.to_s}: configuration missing options: #{missing_options.join ', '}"
end
true
+ end
+ end
+
+ class BrokeredZMQConnection < ZMQConnection
+ end
+
+ # The broker process responsible for shuttling messages back and forth on a
+ # many-to-many pipeline link. (Solutions without a broker only allow a
+ # 1-to-many or many-to-1 connection.)
+ class ZMQStreamer < Broker
+ private
+ attr_reader :connection, :context, :back, :front
+
+ public
+ def initialize(config)
+ @connection = config.connection
+ super("broker-#{connection.name}", 'Broker')
+ end
+
+ def run_process
+ version = LibZMQ::version
+ RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
+ @context = ZMQ::Context.new
+ RFlow.logger.debug { "Connecting message broker to route from #{connection.options['output_address']} to #{connection.options['input_address']}" }
+ @back = context.socket(ZMQ::PULL)
+ back.bind(connection.options['output_address'])
+ @front = context.socket(ZMQ::PUSH)
+ front.bind(connection.options['input_address'])
+ ZMQ::Proxy.new(back, front)
+ back.close
+ front.close
+ rescue Exception => e
+ RFlow.logger.error "Error running message broker: #{e.class}: #{e.message}, because: #{e.backtrace.inspect}"
+ ensure
+ back.close if back
+ front.close if front
+ context.terminate if context
end
end
end
end