lib/rflow/connections/zmq_connection.rb in rflow-0.0.5 vs lib/rflow/connections/zmq_connection.rb in rflow-1.0.0a1

- old
+ new

@@ -1,8 +1,6 @@ -#require 'ffi' -#require 'ffi-rzmq' -require 'em-zeromq-mri' +require 'em-zeromq' require 'rflow/connection' require 'rflow/message' class RFlow @@ -12,90 +10,93 @@ class << self attr_accessor :zmq_context def create_zmq_context RFlow.logger.debug "Creating a new ZeroMQ context" - unless EM.reactor_running? - raise RuntimeError, "EventMachine reactor is not running when attempting to create a ZeroMQ context" + if EM.reactor_running? + raise RuntimeError, "EventMachine reactor is running when attempting to create a ZeroMQ context" end EM::ZeroMQ::Context.new(1) end - + # Returns the current ZeroMQ context object or creates it if - # it does not exist. Assumes that we are within a running - # EventMachine reactor + # it does not exist. def zmq_context @zmq_context ||= create_zmq_context end end - attr_accessor :socket + def zmq_context; self.class.zmq_context; end - REQUIRED_OPTION_SUFFIXES = ['_socket_type', '_address', '_responsibility'] + attr_accessor :input_socket, :output_socket - def self.configuration_errors(configuration) + def initialize(config) + super + validate_options! + # Cause the ZMQ context to be created before the reactor is running + zmq_context + end + + + def validate_options! # TODO: Normalize/validate configuration - missing_config_elements = [] + missing_options = [] ['input', 'output'].each do |direction_prefix| - REQUIRED_OPTION_SUFFIXES.each do |option_suffix| - config_element = "#{direction_prefix}#{option_suffix}" - unless configuration.include? config_element - missing_config_elements << config_element + ['_socket_type', '_address', '_responsibility'].each do |option_suffix| + option_name = "#{direction_prefix}#{option_suffix}" + unless options.include? option_name + missing_options << option_name end end end - missing_config_elements - end - - - def initialize(connection_instance_uuid, connection_name, connection_configuration) - configuration_errors = self.class.configuration_errors(connection_configuration) - unless configuration_errors.empty? - raise ArgumentError, "#{self.class.to_s}: configuration missing elements: #{configuration_errors.join ', '}" + unless missing_options.empty? + raise ArgumentError, "#{self.class.to_s}: configuration missing options: #{missing_options.join ', '}" end - super + true end - + def connect_input! - RFlow.logger.debug "Connecting input #{instance_uuid} with #{configuration.find_all {|k, v| k.to_s =~ /input/}}" - self.socket = self.class.zmq_context.send(configuration['input_responsibility'], - ZMQ.const_get(configuration['input_socket_type'].to_sym), - configuration['input_address'], - self) - end + RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}" + self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type'].to_sym)) + input_socket.send(options['input_responsibility'].to_sym, + options['input_address']) + input_socket.on(:message) do |*message_parts| + message = RFlow::Message.from_avro(message_parts.last.copy_out_string) + RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'" + message_parts.each { |part| part.close } # avoid memory leaks + recv_callback.call(message) + end - def connect_output! - RFlow.logger.debug "Connecting output #{instance_uuid} with #{configuration.find_all {|k, v| k.to_s =~ /output/}}" - self.socket = self.class.zmq_context.send(configuration['output_responsibility'].to_s, - ZMQ.const_get(configuration['output_socket_type'].to_sym), - configuration['output_address'].to_s, - self) + input_socket end - def on_readable(socket, message_parts) - message = RFlow::Message.from_avro(message_parts.last.copy_out_string) - RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'" - recv_callback.call(message) + def connect_output! + RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}" + self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type'].to_sym)) + output_socket.send(options['output_responsibility'].to_sym, + options['output_address'].to_s) + output_socket end + # TODO: fix this tight loop of retries def send_message(message) RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'" begin - socket.send_msg(message.data_type_name.to_s, message.to_avro) + output_socket.send_msg(message.data_type_name.to_s, message.to_avro) RFlow.logger.debug "#{name}: Successfully sent message of type '#{message.data_type_name.to_s}'" rescue Exception => e RFlow.logger.debug "Exception #{e.class}: #{e.message}, retrying send" retry end end - + end end end