lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a1 vs lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a2

- old
+ new

@@ -1,60 +1,56 @@ require 'rflow/configuration' class RFlow class Configuration - # Ruby DSL config file controller. # TODO: more docs and examples class RubyDSL - attr_accessor :setting_specs, :shard_specs, :connection_specs, :allocated_system_ports + private + attr_accessor :setting_specs, :shard_specs, :connection_specs, :default_shard + public def initialize + @default_shard = {:name => "DEFAULT", :type => :process, :count => 1, :components => []} + @current_shard = default_shard + @setting_specs = [] - @shard_specs = [{:name => "DEFAULT", :type => :process, :count => 1, :components => []}] + @shard_specs = [default_shard] @connection_specs = [] - - @current_shard = @shard_specs.first end - # Helper function to extract the line of the config that - # specified the operation. Useful in printing helpful error messages - def get_config_line(call_history) - call_history.first.split(':in').first - end - # DSL method to specify a name/value pair. RFlow core uses the # 'rflow.' prefix on all of its settings. Custom settings # should use a custom (unique) prefix - def setting(setting_name, setting_value) - setting_specs << {:name => setting_name.to_s, :value => setting_value.to_s, :config_line => get_config_line(caller)} + def setting(name, value) + setting_specs << {:name => name.to_s, :value => value.to_s, :config_line => get_config_line(caller)} end # DSL method to specify a shard block for either a process or thread - def shard(shard_name, shard_options={}) - raise ArgumentError, "Cannot use DEFAULT as a shard name" if shard_name == 'DEFAULT' - shard_type = if shard_options[:thread] || shard_options[:type] == :thread - :thread - else - :process - end + def shard(name, options = {}) + raise ArgumentError, "Cannot use DEFAULT as a shard name" if name == 'DEFAULT' + raise ArgumentError, "Cannot nest shards" if @current_shard != default_shard - shard_count = shard_options[shard_type] || shard_options[:count] || 1 + type = if options[:thread] || options[:type] == :thread; :thread + else :process + end - @current_shard = {:name => shard_name, :type => shard_type, :count => shard_count, :components => [], :config_line => get_config_line(caller)} - @shard_specs << @current_shard + count = options[type] || options[:count] || 1 + + @current_shard = {:name => name, :type => type, :count => count, :components => [], :config_line => get_config_line(caller)} + shard_specs << @current_shard yield self - @current_shard = @shard_specs.first + @current_shard = default_shard end # DSL method to specify a component. Expects a name, # specification, and set of component specific options, that # must be marshallable into the database (i.e. should all be strings) - def component(component_name, component_specification, component_options={}) + def component(name, specification, options = {}) @current_shard[:components] << { - :name => component_name, - :specification => component_specification.to_s, :options => component_options, + :name => name, + :specification => specification.to_s, :options => options, :config_line => get_config_line(caller) } end # DSL method to specify a connection between a @@ -65,13 +61,12 @@ # connect 'componentA#output' => 'componentB#input' # Array ports are specified with an key suffix in standard # progamming syntax, i.e. # connect 'componentA#arrayport[2]' => 'componentB#in[1]' # Uses the model to assign random UUIDs - def connect(connection_hash) - config_file_line = get_config_line(caller) - connection_hash.each do |output_string, input_string| + def connect(hash) + hash.each do |output_string, input_string| output_component_name, output_port_name, output_port_key = parse_connection_string(output_string) input_component_name, input_port_name, input_port_key = parse_connection_string(input_string) connection_specs << { :name => output_string + '=>' + input_string, @@ -79,129 +74,120 @@ :output_port_name => output_port_name, :output_port_key => output_port_key, :output_string => output_string, :input_component_name => input_component_name, :input_port_name => input_port_name, :input_port_key => input_port_key, :input_string => input_string, - :config_line => config_file_line, + :config_line => get_config_line(caller) } end end - # Splits the connection string into component/port parts - COMPONENT_PORT_STRING_REGEX = /^(\w+)#(\w+)(?:\[([^\]]+)\])?$/ - def parse_connection_string(connection_string) - matched = COMPONENT_PORT_STRING_REGEX.match(connection_string) - raise ArgumentError, "Invalid component/port string specification: #{connection_string}" unless matched - # component_name, port_name, port_key - [matched[1], matched[2], (matched[3] || nil)] + # Method called within the config file itself + def self.configure + config_file = self.new + yield config_file + config_file.process end - # Method to process the 'DSL' objects into the config database # via ActiveRecord def process process_setting_specs process_shard_specs process_connection_specs end + private + # Helper function to extract the line of the config that + # specified the operation. Useful in printing helpful error messages + def get_config_line(call_history) + call_history.first.split(':in').first + end + # Splits the connection string into component/port parts + COMPONENT_PORT_STRING_REGEX = /^(\w+)#(\w+)(?:\[([^\]]+)\])?$/ + + def parse_connection_string(string) + matched = COMPONENT_PORT_STRING_REGEX.match(string) + raise ArgumentError, "Invalid component/port string specification: #{string}" unless matched + component_name, port_name, port_key = matched.captures + [component_name, port_name, port_key] + end + # Iterates through each setting specified in the DSL and # creates rows in the database corresponding to the setting def process_setting_specs - setting_specs.each do |setting_spec| - RFlow.logger.debug "Found config file setting '#{setting_spec[:name]}' = (#{Dir.getwd}) '#{setting_spec[:value]}'" - RFlow::Configuration::Setting.create! :name => setting_spec[:name], :value => setting_spec[:value] + setting_specs.each do |spec| + RFlow.logger.debug "Found config file setting '#{spec[:name]}' = (#{Dir.getwd}) '#{spec[:value]}'" + RFlow::Configuration::Setting.create! :name => spec[:name], :value => spec[:value] end end - # Iterates through each shard specified in the DSL and creates # rows in the database corresponding to the shard and included # components def process_shard_specs - @shard_specs.each do |shard_spec| - RFlow.logger.debug "Found #{shard_spec[:type]} shard '#{shard_spec[:name]}', creating" + shard_specs.each do |spec| + RFlow.logger.debug "Found #{spec[:type]} shard '#{spec[:name]}', creating" - shard_class = case shard_spec[:type] - when :process - RFlow::Configuration::ProcessShard - when :thread - RFlow::Configuration::ThreadShard - else - raise RFlow::Configuration::Shard::ShardInvalid, "Invalid shard: #{shard_spec.inspect}" - end + if spec[:components].empty? + RFlow.logger.warn "Skipping shard '#{spec[:name]}' because it has no components" + next + end - shard = shard_class.create! :name => shard_spec[:name], :count => shard_spec[:count] + clazz = case spec[:type] + when :process; RFlow::Configuration::ProcessShard + when :thread; RFlow::Configuration::ThreadShard + else raise RFlow::Configuration::Shard::ShardInvalid, "Invalid shard: #{spec.inspect}" + end - shard_spec[:components].each do |component_spec| - RFlow.logger.debug "Shard '#{shard_spec[:name]}' found component '#{component_spec[:name]}', creating" + shard = clazz.create! :name => spec[:name], :count => spec[:count] + + spec[:components].each do |component_spec| + RFlow.logger.debug "Shard '#{spec[:name]}' found component '#{component_spec[:name]}', creating" RFlow::Configuration::Component.create!(:shard => shard, :name => component_spec[:name], :specification => component_spec[:specification], :options => component_spec[:options]) end end end - - # Iterates through each component specified in the DSL and uses - # 'process_connection' to insert all the parts of the connection - # into the database - def process_connection_specs - connection_specs.each do |connection_spec| - process_connection_spec(connection_spec) - end - end - - # For the given connection, break up each input/output + # For each given connection, break up each input/output # component/port specification, ensure that the component # already exists in the database (by name). Also, only supports # ZeroMQ ipc sockets - def process_connection_spec(connection_spec) - RFlow.logger.debug "Found connection from '#{connection_spec[:output_string]}' to '#{connection_spec[:input_string]}', creating" + def process_connection_specs + connection_specs.each do |spec| + begin + RFlow.logger.debug "Found connection from '#{spec[:output_string]}' to '#{spec[:input_string]}', creating" - # an input port can be associated with multiple outputs, but - # an output port can only be associated with one input - output_component = RFlow::Configuration::Component.find_by_name connection_spec[:output_component_name] - raise RFlow::Configuration::Component::ComponentNotFound, "#{connection_spec[:output_component_name]}" unless output_component - output_port = output_component.output_ports.find_or_initialize_by_name :name => connection_spec[:output_port_name] - output_port.save! + # an input port can be associated with multiple outputs, but + # an output port can only be associated with one input + output_component = RFlow::Configuration::Component.find_by_name spec[:output_component_name] + raise RFlow::Configuration::Connection::ConnectionInvalid, + "Component '#{spec[:output_component_name]}' not found at #{spec[:config_line]}" unless output_component + output_port = output_component.output_ports.find_or_initialize_by_name :name => spec[:output_port_name] + output_port.save! - input_component = RFlow::Configuration::Component.find_by_name connection_spec[:input_component_name] - raise RFlow::Configuration::Component::ComponentNotFound, "#{connection_spec[:input_component_name]}" unless input_component - input_port = input_component.input_ports.find_or_initialize_by_name :name => connection_spec[:input_port_name] - input_port.save! + input_component = RFlow::Configuration::Component.find_by_name spec[:input_component_name] + raise RFlow::Configuration::Connection::ConnectionInvalid, + "Component '#{spec[:input_component_name]}' not found at #{spec[:config_line]}" unless input_component + input_port = input_component.input_ports.find_or_initialize_by_name :name => spec[:input_port_name] + input_port.save! - connection = RFlow::Configuration::ZMQConnection.new(:name => connection_spec[:name], - :output_port_key => connection_spec[:output_port_key], - :input_port_key => connection_spec[:input_port_key]) - - connection.output_port = output_port - connection.input_port = input_port - connection.save! - - rescue RFlow::Configuration::Component::ComponentNotFound => e - error_message = "Component '#{e.message}' not found at #{connection_spec[:config_line]}" - RFlow.logger.error error_message - raise RFlow::Configuration::Connection::ConnectionInvalid, error_message - rescue Exception => e - # TODO: Figure out why an ArgumentError doesn't put the - # offending message into e.message, even though it is printed - # out if not caught - error_message = "#{e.class}: #{e.message} at config '#{connection_spec[:config_line]}'" - RFlow.logger.debug "Exception #{e.class} - " + error_message - RFlow.logger.error error_message - raise RFlow::Configuration::Connection::ConnectionInvalid, error_message + RFlow::Configuration::ZMQConnection.create!(:name => spec[:name], + :output_port_key => spec[:output_port_key], + :input_port_key => spec[:input_port_key], + :output_port => output_port, + :input_port => input_port) + rescue Exception => e + # TODO: Figure out why an ArgumentError doesn't put the + # offending message into e.message, even though it is printed + # out if not caught + raise RFlow::Configuration::Connection::ConnectionInvalid, "#{e.class}: #{e.message} at config '#{spec[:config_line]}'" + end + end end - - - # Method called within the config file itself - def self.configure - config_file = self.new - yield config_file - config_file.process - end - - end # class RubyDSL - end # class Configuration -end # class RFlow + end + end +end