lib/rflow/connections/zmq_connection.rb in rflow-1.0.1 vs lib/rflow/connections/zmq_connection.rb in rflow-1.1.0

- old
+ new

@@ -16,11 +16,11 @@ def create_zmq_context version = LibZMQ::version RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" } if EM.reactor_running? - raise RuntimeError, "EventMachine reactor is running when attempting to create a ZeroMQ context" + raise RuntimeError, 'EventMachine reactor is running when attempting to create a ZeroMQ context' end EM::ZeroMQ::Context.new(1) end # Returns the current ZeroMQ context object or creates it if it does not exist. @@ -72,19 +72,17 @@ self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type'])) output_socket.send(options['output_responsibility'].to_sym, options['output_address'].to_s) output_socket end - # TODO: fix this tight loop of retries def send_message(message) RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'" begin output_socket.send_msg(message.data_type_name.to_s, message.to_avro) rescue Exception => e - RFlow.logger.debug "Exception #{e.class}: #{e.message}, retrying send" - retry + RFlow.logger.error "Exception #{e.class}: #{e.message}, because: #{e.backtrace}" end end private def validate_options! @@ -157,12 +155,12 @@ when 'SUB'; context.socket(ZMQ::XPUB) else raise ArgumentError, "Unknown input socket type #{connection.options['input_socket_type']}" end front.bind(connection.options['output_address']) back.bind(connection.options['input_address']) - ZMQ::Proxy.new(front, back) - back.close - front.close + while true + ZMQ::Proxy.new(front, back) + end rescue Exception => e RFlow.logger.error "Error running message broker: #{e.class}: #{e.message}, because: #{e.backtrace.inspect}" ensure back.close if back front.close if front