lib/rflow/connections/zmq_connection.rb in rflow-0.0.5 vs lib/rflow/connections/zmq_connection.rb in rflow-1.0.0a1
- old
+ new
@@ -1,8 +1,6 @@
-#require 'ffi'
-#require 'ffi-rzmq'
-require 'em-zeromq-mri'
+require 'em-zeromq'
require 'rflow/connection'
require 'rflow/message'
class RFlow
@@ -12,90 +10,93 @@
class << self
attr_accessor :zmq_context
def create_zmq_context
RFlow.logger.debug "Creating a new ZeroMQ context"
- unless EM.reactor_running?
- raise RuntimeError, "EventMachine reactor is not running when attempting to create a ZeroMQ context"
+ if EM.reactor_running?
+ raise RuntimeError, "EventMachine reactor is running when attempting to create a ZeroMQ context"
end
EM::ZeroMQ::Context.new(1)
end
-
+
# Returns the current ZeroMQ context object or creates it if
- # it does not exist. Assumes that we are within a running
- # EventMachine reactor
+ # it does not exist.
def zmq_context
@zmq_context ||= create_zmq_context
end
end
- attr_accessor :socket
+ def zmq_context; self.class.zmq_context; end
- REQUIRED_OPTION_SUFFIXES = ['_socket_type', '_address', '_responsibility']
+ attr_accessor :input_socket, :output_socket
- def self.configuration_errors(configuration)
+ def initialize(config)
+ super
+ validate_options!
+ # Cause the ZMQ context to be created before the reactor is running
+ zmq_context
+ end
+
+
+ def validate_options!
# TODO: Normalize/validate configuration
- missing_config_elements = []
+ missing_options = []
['input', 'output'].each do |direction_prefix|
- REQUIRED_OPTION_SUFFIXES.each do |option_suffix|
- config_element = "#{direction_prefix}#{option_suffix}"
- unless configuration.include? config_element
- missing_config_elements << config_element
+ ['_socket_type', '_address', '_responsibility'].each do |option_suffix|
+ option_name = "#{direction_prefix}#{option_suffix}"
+ unless options.include? option_name
+ missing_options << option_name
end
end
end
- missing_config_elements
- end
-
-
- def initialize(connection_instance_uuid, connection_name, connection_configuration)
- configuration_errors = self.class.configuration_errors(connection_configuration)
- unless configuration_errors.empty?
- raise ArgumentError, "#{self.class.to_s}: configuration missing elements: #{configuration_errors.join ', '}"
+ unless missing_options.empty?
+ raise ArgumentError, "#{self.class.to_s}: configuration missing options: #{missing_options.join ', '}"
end
- super
+ true
end
-
+
def connect_input!
- RFlow.logger.debug "Connecting input #{instance_uuid} with #{configuration.find_all {|k, v| k.to_s =~ /input/}}"
- self.socket = self.class.zmq_context.send(configuration['input_responsibility'],
- ZMQ.const_get(configuration['input_socket_type'].to_sym),
- configuration['input_address'],
- self)
- end
+ RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}"
+ self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type'].to_sym))
+ input_socket.send(options['input_responsibility'].to_sym,
+ options['input_address'])
+ input_socket.on(:message) do |*message_parts|
+ message = RFlow::Message.from_avro(message_parts.last.copy_out_string)
+ RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'"
+ message_parts.each { |part| part.close } # avoid memory leaks
+ recv_callback.call(message)
+ end
- def connect_output!
- RFlow.logger.debug "Connecting output #{instance_uuid} with #{configuration.find_all {|k, v| k.to_s =~ /output/}}"
- self.socket = self.class.zmq_context.send(configuration['output_responsibility'].to_s,
- ZMQ.const_get(configuration['output_socket_type'].to_sym),
- configuration['output_address'].to_s,
- self)
+ input_socket
end
- def on_readable(socket, message_parts)
- message = RFlow::Message.from_avro(message_parts.last.copy_out_string)
- RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'"
- recv_callback.call(message)
+ def connect_output!
+ RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}"
+ self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type'].to_sym))
+ output_socket.send(options['output_responsibility'].to_sym,
+ options['output_address'].to_s)
+ output_socket
end
+
# TODO: fix this tight loop of retries
def send_message(message)
RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'"
begin
- socket.send_msg(message.data_type_name.to_s, message.to_avro)
+ output_socket.send_msg(message.data_type_name.to_s, message.to_avro)
RFlow.logger.debug "#{name}: Successfully sent message of type '#{message.data_type_name.to_s}'"
rescue Exception => e
RFlow.logger.debug "Exception #{e.class}: #{e.message}, retrying send"
retry
end
end
-
+
end
end
end