lib/rflow/connections/zmq_connection.rb in rflow-1.0.1 vs lib/rflow/connections/zmq_connection.rb in rflow-1.1.0
- old
+ new
@@ -16,11 +16,11 @@
def create_zmq_context
version = LibZMQ::version
RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
if EM.reactor_running?
- raise RuntimeError, "EventMachine reactor is running when attempting to create a ZeroMQ context"
+ raise RuntimeError, 'EventMachine reactor is running when attempting to create a ZeroMQ context'
end
EM::ZeroMQ::Context.new(1)
end
# Returns the current ZeroMQ context object or creates it if it does not exist.
@@ -72,19 +72,17 @@
self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type']))
output_socket.send(options['output_responsibility'].to_sym, options['output_address'].to_s)
output_socket
end
- # TODO: fix this tight loop of retries
def send_message(message)
RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'"
begin
output_socket.send_msg(message.data_type_name.to_s, message.to_avro)
rescue Exception => e
- RFlow.logger.debug "Exception #{e.class}: #{e.message}, retrying send"
- retry
+ RFlow.logger.error "Exception #{e.class}: #{e.message}, because: #{e.backtrace}"
end
end
private
def validate_options!
@@ -157,12 +155,12 @@
when 'SUB'; context.socket(ZMQ::XPUB)
else raise ArgumentError, "Unknown input socket type #{connection.options['input_socket_type']}"
end
front.bind(connection.options['output_address'])
back.bind(connection.options['input_address'])
- ZMQ::Proxy.new(front, back)
- back.close
- front.close
+ while true
+ ZMQ::Proxy.new(front, back)
+ end
rescue Exception => e
RFlow.logger.error "Error running message broker: #{e.class}: #{e.message}, because: #{e.backtrace.inspect}"
ensure
back.close if back
front.close if front