lib/rflow/configuration/ruby_dsl.rb in rflow-0.0.5 vs lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a1

- old
+ new

@@ -4,36 +4,55 @@ class Configuration # Ruby DSL config file controller. # TODO: more docs and examples class RubyDSL - attr_accessor :setting_specs, :component_specs, :connection_specs, :allocated_system_ports - + attr_accessor :setting_specs, :shard_specs, :connection_specs, :allocated_system_ports + def initialize @setting_specs = [] - @component_specs = [] + @shard_specs = [{:name => "DEFAULT", :type => :process, :count => 1, :components => []}] @connection_specs = [] + + @current_shard = @shard_specs.first end # Helper function to extract the line of the config that # specified the operation. Useful in printing helpful error messages def get_config_line(call_history) call_history.first.split(':in').first end - + # DSL method to specify a name/value pair. RFlow core uses the # 'rflow.' prefix on all of its settings. Custom settings # should use a custom (unique) prefix def setting(setting_name, setting_value) setting_specs << {:name => setting_name.to_s, :value => setting_value.to_s, :config_line => get_config_line(caller)} end + # DSL method to specify a shard block for either a process or thread + def shard(shard_name, shard_options={}) + raise ArgumentError, "Cannot use DEFAULT as a shard name" if shard_name == 'DEFAULT' + shard_type = if shard_options[:thread] || shard_options[:type] == :thread + :thread + else + :process + end + + shard_count = shard_options[shard_type] || shard_options[:count] || 1 + + @current_shard = {:name => shard_name, :type => shard_type, :count => shard_count, :components => [], :config_line => get_config_line(caller)} + @shard_specs << @current_shard + yield self + @current_shard = @shard_specs.first + end + # DSL method to specify a component. Expects a name, # specification, and set of component specific options, that # must be marshallable into the database (i.e. should all be strings) def component(component_name, component_specification, component_options={}) - component_specs << { + @current_shard[:components] << { :name => component_name, :specification => component_specification.to_s, :options => component_options, :config_line => get_config_line(caller) } end @@ -53,13 +72,13 @@ connection_hash.each do |output_string, input_string| output_component_name, output_port_name, output_port_key = parse_connection_string(output_string) input_component_name, input_port_name, input_port_key = parse_connection_string(input_string) connection_specs << { - :name => output_string + '=>' + input_string, + :name => output_string + '=>' + input_string, :output_component_name => output_component_name, - :output_port_name => output_port_name, :output_port_key => output_port_key, + :output_port_name => output_port_name, :output_port_key => output_port_key, :output_string => output_string, :input_component_name => input_component_name, :input_port_name => input_port_name, :input_port_key => input_port_key, :input_string => input_string, :config_line => config_file_line, @@ -74,40 +93,59 @@ raise ArgumentError, "Invalid component/port string specification: #{connection_string}" unless matched # component_name, port_name, port_key [matched[1], matched[2], (matched[3] || nil)] end - + # Method to process the 'DSL' objects into the config database # via ActiveRecord def process process_setting_specs - process_component_specs + process_shard_specs process_connection_specs end - + # Iterates through each setting specified in the DSL and # creates rows in the database corresponding to the setting def process_setting_specs setting_specs.each do |setting_spec| RFlow.logger.debug "Found config file setting '#{setting_spec[:name]}' = (#{Dir.getwd}) '#{setting_spec[:value]}'" RFlow::Configuration::Setting.create! :name => setting_spec[:name], :value => setting_spec[:value] end end - - # Iterates through each component specified in the DSL and - # creates rows in the database corresponding to the component. - def process_component_specs - component_specs.each do |component_spec| - RFlow.logger.debug "Found component '#{component_spec[:name]}', creating" - RFlow::Configuration::Component.create! :name => component_spec[:name], :specification => component_spec[:specification], :options => component_spec[:options] + + # Iterates through each shard specified in the DSL and creates + # rows in the database corresponding to the shard and included + # components + def process_shard_specs + @shard_specs.each do |shard_spec| + RFlow.logger.debug "Found #{shard_spec[:type]} shard '#{shard_spec[:name]}', creating" + + shard_class = case shard_spec[:type] + when :process + RFlow::Configuration::ProcessShard + when :thread + RFlow::Configuration::ThreadShard + else + raise RFlow::Configuration::Shard::ShardInvalid, "Invalid shard: #{shard_spec.inspect}" + end + + shard = shard_class.create! :name => shard_spec[:name], :count => shard_spec[:count] + + shard_spec[:components].each do |component_spec| + RFlow.logger.debug "Shard '#{shard_spec[:name]}' found component '#{component_spec[:name]}', creating" + RFlow::Configuration::Component.create!(:shard => shard, + :name => component_spec[:name], + :specification => component_spec[:specification], + :options => component_spec[:options]) + end end end - + # Iterates through each component specified in the DSL and uses # 'process_connection' to insert all the parts of the connection # into the database def process_connection_specs connection_specs.each do |connection_spec| @@ -119,40 +157,26 @@ # component/port specification, ensure that the component # already exists in the database (by name). Also, only supports # ZeroMQ ipc sockets def process_connection_spec(connection_spec) RFlow.logger.debug "Found connection from '#{connection_spec[:output_string]}' to '#{connection_spec[:input_string]}', creating" - + # an input port can be associated with multiple outputs, but # an output port can only be associated with one input output_component = RFlow::Configuration::Component.find_by_name connection_spec[:output_component_name] raise RFlow::Configuration::Component::ComponentNotFound, "#{connection_spec[:output_component_name]}" unless output_component output_port = output_component.output_ports.find_or_initialize_by_name :name => connection_spec[:output_port_name] output_port.save! - + input_component = RFlow::Configuration::Component.find_by_name connection_spec[:input_component_name] raise RFlow::Configuration::Component::ComponentNotFound, "#{connection_spec[:input_component_name]}" unless input_component input_port = input_component.input_ports.find_or_initialize_by_name :name => connection_spec[:input_port_name] input_port.save! - # Create a unique ZMQ address -# zmq_address = "ipc://run/rflow.#{output_component.uuid}.#{output_port.uuid}" -# if connection_spec[:output_port_key] -# zmq_address << ".#{connection_spec[:output_port_key].gsub(/[^\w]/, '').downcase}" -# end - connection = RFlow::Configuration::ZMQConnection.new(:name => connection_spec[:name], :output_port_key => connection_spec[:output_port_key], :input_port_key => connection_spec[:input_port_key]) -# :options => { -# 'output_socket_type' => "PUSH", -# 'output_address' => zmq_address, -# 'output_responsibility' => "bind", -# 'input_socket_type' => "PULL", -# 'input_address' => zmq_address, -# 'input_responsibility' => "connect", -# }) connection.output_port = output_port connection.input_port = input_port connection.save! @@ -168,10 +192,10 @@ RFlow.logger.debug "Exception #{e.class} - " + error_message RFlow.logger.error error_message raise RFlow::Configuration::Connection::ConnectionInvalid, error_message end - + # Method called within the config file itself def self.configure config_file = self.new yield config_file config_file.process