lib/rflow/connections/zmq_connection.rb in rflow-1.0.0a3 vs lib/rflow/connections/zmq_connection.rb in rflow-1.0.0a4

- old
+ new

@@ -4,10 +4,11 @@ raise LoadError, 'Error loading ZeroMQ; perhaps the wrong system library version is installed?' end require 'rflow/connection' require 'rflow/message' require 'rflow/broker' +require 'sys/filesystem' class RFlow module Connections class ZMQConnection < RFlow::Connection class << self @@ -40,12 +41,17 @@ zmq_context # cause the ZMQ context to be created before the reactor is running end def connect_input! RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}" - self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type'].to_sym)) + check_address(options['input_address']) + + self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type'])) input_socket.send(options['input_responsibility'].to_sym, options['input_address']) + if config.delivery == 'broadcast' + input_socket.setsockopt(ZMQ::SUBSCRIBE, '') # request all messages + end input_socket.on(:message) do |*message_parts| begin message = RFlow::Message.from_avro(message_parts.last.copy_out_string) RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'" @@ -59,11 +65,13 @@ input_socket end def connect_output! RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}" - self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type'].to_sym)) + check_address(options['output_address']) + + self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type'])) output_socket.send(options['output_responsibility'].to_sym, options['output_address'].to_s) output_socket end # TODO: fix this tight loop of retries @@ -97,10 +105,27 @@ raise ArgumentError, "#{self.class.to_s}: configuration missing options: #{missing_options.join ', '}" end true end + + def check_address(address) + # make sure we're not trying to create IPC sockets in an NFS share + # because that works poorly + if address.start_with?('ipc://') + filename = address[6..-1] + mount_point = Sys::Filesystem.mount_point(File.dirname(filename)) + return unless mount_point + mount_type = Sys::Filesystem.mounts.find {|m| m.mount_point == mount_point }.mount_type + return unless mount_type + + case mount_type + when 'vmhgfs', 'vboxsf', 'nfs' # vmware, virtualbox, nfs + raise ArgumentError, "Cannot safely create IPC sockets in network filesystem '#{mount_point}' of type #{mount_type}" + end + end + end end class BrokeredZMQConnection < ZMQConnection end @@ -120,14 +145,23 @@ def run_process version = LibZMQ::version RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" } @context = ZMQ::Context.new RFlow.logger.debug { "Connecting message broker to route from #{connection.options['output_address']} to #{connection.options['input_address']}" } - @back = context.socket(ZMQ::PULL) - back.bind(connection.options['output_address']) - @front = context.socket(ZMQ::PUSH) - front.bind(connection.options['input_address']) - ZMQ::Proxy.new(back, front) + + @front = case connection.options['output_socket_type'] + when 'PUSH'; context.socket(ZMQ::PULL) + when 'PUB'; context.socket(ZMQ::XSUB) + else raise ArgumentError, "Unknown output socket type #{connection.options['output_socket_type']}" + end + @back = case connection.options['input_socket_type'] + when 'PULL'; context.socket(ZMQ::PUSH) + when 'SUB'; context.socket(ZMQ::XPUB) + else raise ArgumentError, "Unknown input socket type #{connection.options['input_socket_type']}" + end + front.bind(connection.options['output_address']) + back.bind(connection.options['input_address']) + ZMQ::Proxy.new(front, back) back.close front.close rescue Exception => e RFlow.logger.error "Error running message broker: #{e.class}: #{e.message}, because: #{e.backtrace.inspect}" ensure