lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a1 vs lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a2
- old
+ new
@@ -1,60 +1,56 @@
require 'rflow/configuration'
class RFlow
class Configuration
-
# Ruby DSL config file controller.
# TODO: more docs and examples
class RubyDSL
- attr_accessor :setting_specs, :shard_specs, :connection_specs, :allocated_system_ports
+ private
+ attr_accessor :setting_specs, :shard_specs, :connection_specs, :default_shard
+ public
def initialize
+ @default_shard = {:name => "DEFAULT", :type => :process, :count => 1, :components => []}
+ @current_shard = default_shard
+
@setting_specs = []
- @shard_specs = [{:name => "DEFAULT", :type => :process, :count => 1, :components => []}]
+ @shard_specs = [default_shard]
@connection_specs = []
-
- @current_shard = @shard_specs.first
end
- # Helper function to extract the line of the config that
- # specified the operation. Useful in printing helpful error messages
- def get_config_line(call_history)
- call_history.first.split(':in').first
- end
-
# DSL method to specify a name/value pair. RFlow core uses the
# 'rflow.' prefix on all of its settings. Custom settings
# should use a custom (unique) prefix
- def setting(setting_name, setting_value)
- setting_specs << {:name => setting_name.to_s, :value => setting_value.to_s, :config_line => get_config_line(caller)}
+ def setting(name, value)
+ setting_specs << {:name => name.to_s, :value => value.to_s, :config_line => get_config_line(caller)}
end
# DSL method to specify a shard block for either a process or thread
- def shard(shard_name, shard_options={})
- raise ArgumentError, "Cannot use DEFAULT as a shard name" if shard_name == 'DEFAULT'
- shard_type = if shard_options[:thread] || shard_options[:type] == :thread
- :thread
- else
- :process
- end
+ def shard(name, options = {})
+ raise ArgumentError, "Cannot use DEFAULT as a shard name" if name == 'DEFAULT'
+ raise ArgumentError, "Cannot nest shards" if @current_shard != default_shard
- shard_count = shard_options[shard_type] || shard_options[:count] || 1
+ type = if options[:thread] || options[:type] == :thread; :thread
+ else :process
+ end
- @current_shard = {:name => shard_name, :type => shard_type, :count => shard_count, :components => [], :config_line => get_config_line(caller)}
- @shard_specs << @current_shard
+ count = options[type] || options[:count] || 1
+
+ @current_shard = {:name => name, :type => type, :count => count, :components => [], :config_line => get_config_line(caller)}
+ shard_specs << @current_shard
yield self
- @current_shard = @shard_specs.first
+ @current_shard = default_shard
end
# DSL method to specify a component. Expects a name,
# specification, and set of component specific options, that
# must be marshallable into the database (i.e. should all be strings)
- def component(component_name, component_specification, component_options={})
+ def component(name, specification, options = {})
@current_shard[:components] << {
- :name => component_name,
- :specification => component_specification.to_s, :options => component_options,
+ :name => name,
+ :specification => specification.to_s, :options => options,
:config_line => get_config_line(caller)
}
end
# DSL method to specify a connection between a
@@ -65,13 +61,12 @@
# connect 'componentA#output' => 'componentB#input'
# Array ports are specified with an key suffix in standard
# progamming syntax, i.e.
# connect 'componentA#arrayport[2]' => 'componentB#in[1]'
# Uses the model to assign random UUIDs
- def connect(connection_hash)
- config_file_line = get_config_line(caller)
- connection_hash.each do |output_string, input_string|
+ def connect(hash)
+ hash.each do |output_string, input_string|
output_component_name, output_port_name, output_port_key = parse_connection_string(output_string)
input_component_name, input_port_name, input_port_key = parse_connection_string(input_string)
connection_specs << {
:name => output_string + '=>' + input_string,
@@ -79,129 +74,120 @@
:output_port_name => output_port_name, :output_port_key => output_port_key,
:output_string => output_string,
:input_component_name => input_component_name,
:input_port_name => input_port_name, :input_port_key => input_port_key,
:input_string => input_string,
- :config_line => config_file_line,
+ :config_line => get_config_line(caller)
}
end
end
- # Splits the connection string into component/port parts
- COMPONENT_PORT_STRING_REGEX = /^(\w+)#(\w+)(?:\[([^\]]+)\])?$/
- def parse_connection_string(connection_string)
- matched = COMPONENT_PORT_STRING_REGEX.match(connection_string)
- raise ArgumentError, "Invalid component/port string specification: #{connection_string}" unless matched
- # component_name, port_name, port_key
- [matched[1], matched[2], (matched[3] || nil)]
+ # Method called within the config file itself
+ def self.configure
+ config_file = self.new
+ yield config_file
+ config_file.process
end
-
# Method to process the 'DSL' objects into the config database
# via ActiveRecord
def process
process_setting_specs
process_shard_specs
process_connection_specs
end
+ private
+ # Helper function to extract the line of the config that
+ # specified the operation. Useful in printing helpful error messages
+ def get_config_line(call_history)
+ call_history.first.split(':in').first
+ end
+ # Splits the connection string into component/port parts
+ COMPONENT_PORT_STRING_REGEX = /^(\w+)#(\w+)(?:\[([^\]]+)\])?$/
+
+ def parse_connection_string(string)
+ matched = COMPONENT_PORT_STRING_REGEX.match(string)
+ raise ArgumentError, "Invalid component/port string specification: #{string}" unless matched
+ component_name, port_name, port_key = matched.captures
+ [component_name, port_name, port_key]
+ end
+
# Iterates through each setting specified in the DSL and
# creates rows in the database corresponding to the setting
def process_setting_specs
- setting_specs.each do |setting_spec|
- RFlow.logger.debug "Found config file setting '#{setting_spec[:name]}' = (#{Dir.getwd}) '#{setting_spec[:value]}'"
- RFlow::Configuration::Setting.create! :name => setting_spec[:name], :value => setting_spec[:value]
+ setting_specs.each do |spec|
+ RFlow.logger.debug "Found config file setting '#{spec[:name]}' = (#{Dir.getwd}) '#{spec[:value]}'"
+ RFlow::Configuration::Setting.create! :name => spec[:name], :value => spec[:value]
end
end
-
# Iterates through each shard specified in the DSL and creates
# rows in the database corresponding to the shard and included
# components
def process_shard_specs
- @shard_specs.each do |shard_spec|
- RFlow.logger.debug "Found #{shard_spec[:type]} shard '#{shard_spec[:name]}', creating"
+ shard_specs.each do |spec|
+ RFlow.logger.debug "Found #{spec[:type]} shard '#{spec[:name]}', creating"
- shard_class = case shard_spec[:type]
- when :process
- RFlow::Configuration::ProcessShard
- when :thread
- RFlow::Configuration::ThreadShard
- else
- raise RFlow::Configuration::Shard::ShardInvalid, "Invalid shard: #{shard_spec.inspect}"
- end
+ if spec[:components].empty?
+ RFlow.logger.warn "Skipping shard '#{spec[:name]}' because it has no components"
+ next
+ end
- shard = shard_class.create! :name => shard_spec[:name], :count => shard_spec[:count]
+ clazz = case spec[:type]
+ when :process; RFlow::Configuration::ProcessShard
+ when :thread; RFlow::Configuration::ThreadShard
+ else raise RFlow::Configuration::Shard::ShardInvalid, "Invalid shard: #{spec.inspect}"
+ end
- shard_spec[:components].each do |component_spec|
- RFlow.logger.debug "Shard '#{shard_spec[:name]}' found component '#{component_spec[:name]}', creating"
+ shard = clazz.create! :name => spec[:name], :count => spec[:count]
+
+ spec[:components].each do |component_spec|
+ RFlow.logger.debug "Shard '#{spec[:name]}' found component '#{component_spec[:name]}', creating"
RFlow::Configuration::Component.create!(:shard => shard,
:name => component_spec[:name],
:specification => component_spec[:specification],
:options => component_spec[:options])
end
end
end
-
- # Iterates through each component specified in the DSL and uses
- # 'process_connection' to insert all the parts of the connection
- # into the database
- def process_connection_specs
- connection_specs.each do |connection_spec|
- process_connection_spec(connection_spec)
- end
- end
-
- # For the given connection, break up each input/output
+ # For each given connection, break up each input/output
# component/port specification, ensure that the component
# already exists in the database (by name). Also, only supports
# ZeroMQ ipc sockets
- def process_connection_spec(connection_spec)
- RFlow.logger.debug "Found connection from '#{connection_spec[:output_string]}' to '#{connection_spec[:input_string]}', creating"
+ def process_connection_specs
+ connection_specs.each do |spec|
+ begin
+ RFlow.logger.debug "Found connection from '#{spec[:output_string]}' to '#{spec[:input_string]}', creating"
- # an input port can be associated with multiple outputs, but
- # an output port can only be associated with one input
- output_component = RFlow::Configuration::Component.find_by_name connection_spec[:output_component_name]
- raise RFlow::Configuration::Component::ComponentNotFound, "#{connection_spec[:output_component_name]}" unless output_component
- output_port = output_component.output_ports.find_or_initialize_by_name :name => connection_spec[:output_port_name]
- output_port.save!
+ # an input port can be associated with multiple outputs, but
+ # an output port can only be associated with one input
+ output_component = RFlow::Configuration::Component.find_by_name spec[:output_component_name]
+ raise RFlow::Configuration::Connection::ConnectionInvalid,
+ "Component '#{spec[:output_component_name]}' not found at #{spec[:config_line]}" unless output_component
+ output_port = output_component.output_ports.find_or_initialize_by_name :name => spec[:output_port_name]
+ output_port.save!
- input_component = RFlow::Configuration::Component.find_by_name connection_spec[:input_component_name]
- raise RFlow::Configuration::Component::ComponentNotFound, "#{connection_spec[:input_component_name]}" unless input_component
- input_port = input_component.input_ports.find_or_initialize_by_name :name => connection_spec[:input_port_name]
- input_port.save!
+ input_component = RFlow::Configuration::Component.find_by_name spec[:input_component_name]
+ raise RFlow::Configuration::Connection::ConnectionInvalid,
+ "Component '#{spec[:input_component_name]}' not found at #{spec[:config_line]}" unless input_component
+ input_port = input_component.input_ports.find_or_initialize_by_name :name => spec[:input_port_name]
+ input_port.save!
- connection = RFlow::Configuration::ZMQConnection.new(:name => connection_spec[:name],
- :output_port_key => connection_spec[:output_port_key],
- :input_port_key => connection_spec[:input_port_key])
-
- connection.output_port = output_port
- connection.input_port = input_port
- connection.save!
-
- rescue RFlow::Configuration::Component::ComponentNotFound => e
- error_message = "Component '#{e.message}' not found at #{connection_spec[:config_line]}"
- RFlow.logger.error error_message
- raise RFlow::Configuration::Connection::ConnectionInvalid, error_message
- rescue Exception => e
- # TODO: Figure out why an ArgumentError doesn't put the
- # offending message into e.message, even though it is printed
- # out if not caught
- error_message = "#{e.class}: #{e.message} at config '#{connection_spec[:config_line]}'"
- RFlow.logger.debug "Exception #{e.class} - " + error_message
- RFlow.logger.error error_message
- raise RFlow::Configuration::Connection::ConnectionInvalid, error_message
+ RFlow::Configuration::ZMQConnection.create!(:name => spec[:name],
+ :output_port_key => spec[:output_port_key],
+ :input_port_key => spec[:input_port_key],
+ :output_port => output_port,
+ :input_port => input_port)
+ rescue Exception => e
+ # TODO: Figure out why an ArgumentError doesn't put the
+ # offending message into e.message, even though it is printed
+ # out if not caught
+ raise RFlow::Configuration::Connection::ConnectionInvalid, "#{e.class}: #{e.message} at config '#{spec[:config_line]}'"
+ end
+ end
end
-
-
- # Method called within the config file itself
- def self.configure
- config_file = self.new
- yield config_file
- config_file.process
- end
-
- end # class RubyDSL
- end # class Configuration
-end # class RFlow
+ end
+ end
+end