lib/rflow/connections/zmq_connection.rb in rflow-1.0.0a3 vs lib/rflow/connections/zmq_connection.rb in rflow-1.0.0a4
- old
+ new
@@ -4,10 +4,11 @@
raise LoadError, 'Error loading ZeroMQ; perhaps the wrong system library version is installed?'
end
require 'rflow/connection'
require 'rflow/message'
require 'rflow/broker'
+require 'sys/filesystem'
class RFlow
module Connections
class ZMQConnection < RFlow::Connection
class << self
@@ -40,12 +41,17 @@
zmq_context # cause the ZMQ context to be created before the reactor is running
end
def connect_input!
RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}"
- self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type'].to_sym))
+ check_address(options['input_address'])
+
+ self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type']))
input_socket.send(options['input_responsibility'].to_sym, options['input_address'])
+ if config.delivery == 'broadcast'
+ input_socket.setsockopt(ZMQ::SUBSCRIBE, '') # request all messages
+ end
input_socket.on(:message) do |*message_parts|
begin
message = RFlow::Message.from_avro(message_parts.last.copy_out_string)
RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'"
@@ -59,11 +65,13 @@
input_socket
end
def connect_output!
RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}"
- self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type'].to_sym))
+ check_address(options['output_address'])
+
+ self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type']))
output_socket.send(options['output_responsibility'].to_sym, options['output_address'].to_s)
output_socket
end
# TODO: fix this tight loop of retries
@@ -97,10 +105,27 @@
raise ArgumentError, "#{self.class.to_s}: configuration missing options: #{missing_options.join ', '}"
end
true
end
+
+ def check_address(address)
+ # make sure we're not trying to create IPC sockets in an NFS share
+ # because that works poorly
+ if address.start_with?('ipc://')
+ filename = address[6..-1]
+ mount_point = Sys::Filesystem.mount_point(File.dirname(filename))
+ return unless mount_point
+ mount_type = Sys::Filesystem.mounts.find {|m| m.mount_point == mount_point }.mount_type
+ return unless mount_type
+
+ case mount_type
+ when 'vmhgfs', 'vboxsf', 'nfs' # vmware, virtualbox, nfs
+ raise ArgumentError, "Cannot safely create IPC sockets in network filesystem '#{mount_point}' of type #{mount_type}"
+ end
+ end
+ end
end
class BrokeredZMQConnection < ZMQConnection
end
@@ -120,14 +145,23 @@
def run_process
version = LibZMQ::version
RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
@context = ZMQ::Context.new
RFlow.logger.debug { "Connecting message broker to route from #{connection.options['output_address']} to #{connection.options['input_address']}" }
- @back = context.socket(ZMQ::PULL)
- back.bind(connection.options['output_address'])
- @front = context.socket(ZMQ::PUSH)
- front.bind(connection.options['input_address'])
- ZMQ::Proxy.new(back, front)
+
+ @front = case connection.options['output_socket_type']
+ when 'PUSH'; context.socket(ZMQ::PULL)
+ when 'PUB'; context.socket(ZMQ::XSUB)
+ else raise ArgumentError, "Unknown output socket type #{connection.options['output_socket_type']}"
+ end
+ @back = case connection.options['input_socket_type']
+ when 'PULL'; context.socket(ZMQ::PUSH)
+ when 'SUB'; context.socket(ZMQ::XPUB)
+ else raise ArgumentError, "Unknown input socket type #{connection.options['input_socket_type']}"
+ end
+ front.bind(connection.options['output_address'])
+ back.bind(connection.options['input_address'])
+ ZMQ::Proxy.new(front, back)
back.close
front.close
rescue Exception => e
RFlow.logger.error "Error running message broker: #{e.class}: #{e.message}, because: #{e.backtrace.inspect}"
ensure