lib/rflow/configuration/ruby_dsl.rb in rflow-0.0.5 vs lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a1
- old
+ new
@@ -4,36 +4,55 @@
class Configuration
# Ruby DSL config file controller.
# TODO: more docs and examples
class RubyDSL
- attr_accessor :setting_specs, :component_specs, :connection_specs, :allocated_system_ports
-
+ attr_accessor :setting_specs, :shard_specs, :connection_specs, :allocated_system_ports
+
def initialize
@setting_specs = []
- @component_specs = []
+ @shard_specs = [{:name => "DEFAULT", :type => :process, :count => 1, :components => []}]
@connection_specs = []
+
+ @current_shard = @shard_specs.first
end
# Helper function to extract the line of the config that
# specified the operation. Useful in printing helpful error messages
def get_config_line(call_history)
call_history.first.split(':in').first
end
-
+
# DSL method to specify a name/value pair. RFlow core uses the
# 'rflow.' prefix on all of its settings. Custom settings
# should use a custom (unique) prefix
def setting(setting_name, setting_value)
setting_specs << {:name => setting_name.to_s, :value => setting_value.to_s, :config_line => get_config_line(caller)}
end
+ # DSL method to specify a shard block for either a process or thread
+ def shard(shard_name, shard_options={})
+ raise ArgumentError, "Cannot use DEFAULT as a shard name" if shard_name == 'DEFAULT'
+ shard_type = if shard_options[:thread] || shard_options[:type] == :thread
+ :thread
+ else
+ :process
+ end
+
+ shard_count = shard_options[shard_type] || shard_options[:count] || 1
+
+ @current_shard = {:name => shard_name, :type => shard_type, :count => shard_count, :components => [], :config_line => get_config_line(caller)}
+ @shard_specs << @current_shard
+ yield self
+ @current_shard = @shard_specs.first
+ end
+
# DSL method to specify a component. Expects a name,
# specification, and set of component specific options, that
# must be marshallable into the database (i.e. should all be strings)
def component(component_name, component_specification, component_options={})
- component_specs << {
+ @current_shard[:components] << {
:name => component_name,
:specification => component_specification.to_s, :options => component_options,
:config_line => get_config_line(caller)
}
end
@@ -53,13 +72,13 @@
connection_hash.each do |output_string, input_string|
output_component_name, output_port_name, output_port_key = parse_connection_string(output_string)
input_component_name, input_port_name, input_port_key = parse_connection_string(input_string)
connection_specs << {
- :name => output_string + '=>' + input_string,
+ :name => output_string + '=>' + input_string,
:output_component_name => output_component_name,
- :output_port_name => output_port_name, :output_port_key => output_port_key,
+ :output_port_name => output_port_name, :output_port_key => output_port_key,
:output_string => output_string,
:input_component_name => input_component_name,
:input_port_name => input_port_name, :input_port_key => input_port_key,
:input_string => input_string,
:config_line => config_file_line,
@@ -74,40 +93,59 @@
raise ArgumentError, "Invalid component/port string specification: #{connection_string}" unless matched
# component_name, port_name, port_key
[matched[1], matched[2], (matched[3] || nil)]
end
-
+
# Method to process the 'DSL' objects into the config database
# via ActiveRecord
def process
process_setting_specs
- process_component_specs
+ process_shard_specs
process_connection_specs
end
-
+
# Iterates through each setting specified in the DSL and
# creates rows in the database corresponding to the setting
def process_setting_specs
setting_specs.each do |setting_spec|
RFlow.logger.debug "Found config file setting '#{setting_spec[:name]}' = (#{Dir.getwd}) '#{setting_spec[:value]}'"
RFlow::Configuration::Setting.create! :name => setting_spec[:name], :value => setting_spec[:value]
end
end
-
- # Iterates through each component specified in the DSL and
- # creates rows in the database corresponding to the component.
- def process_component_specs
- component_specs.each do |component_spec|
- RFlow.logger.debug "Found component '#{component_spec[:name]}', creating"
- RFlow::Configuration::Component.create! :name => component_spec[:name], :specification => component_spec[:specification], :options => component_spec[:options]
+
+ # Iterates through each shard specified in the DSL and creates
+ # rows in the database corresponding to the shard and included
+ # components
+ def process_shard_specs
+ @shard_specs.each do |shard_spec|
+ RFlow.logger.debug "Found #{shard_spec[:type]} shard '#{shard_spec[:name]}', creating"
+
+ shard_class = case shard_spec[:type]
+ when :process
+ RFlow::Configuration::ProcessShard
+ when :thread
+ RFlow::Configuration::ThreadShard
+ else
+ raise RFlow::Configuration::Shard::ShardInvalid, "Invalid shard: #{shard_spec.inspect}"
+ end
+
+ shard = shard_class.create! :name => shard_spec[:name], :count => shard_spec[:count]
+
+ shard_spec[:components].each do |component_spec|
+ RFlow.logger.debug "Shard '#{shard_spec[:name]}' found component '#{component_spec[:name]}', creating"
+ RFlow::Configuration::Component.create!(:shard => shard,
+ :name => component_spec[:name],
+ :specification => component_spec[:specification],
+ :options => component_spec[:options])
+ end
end
end
-
+
# Iterates through each component specified in the DSL and uses
# 'process_connection' to insert all the parts of the connection
# into the database
def process_connection_specs
connection_specs.each do |connection_spec|
@@ -119,40 +157,26 @@
# component/port specification, ensure that the component
# already exists in the database (by name). Also, only supports
# ZeroMQ ipc sockets
def process_connection_spec(connection_spec)
RFlow.logger.debug "Found connection from '#{connection_spec[:output_string]}' to '#{connection_spec[:input_string]}', creating"
-
+
# an input port can be associated with multiple outputs, but
# an output port can only be associated with one input
output_component = RFlow::Configuration::Component.find_by_name connection_spec[:output_component_name]
raise RFlow::Configuration::Component::ComponentNotFound, "#{connection_spec[:output_component_name]}" unless output_component
output_port = output_component.output_ports.find_or_initialize_by_name :name => connection_spec[:output_port_name]
output_port.save!
-
+
input_component = RFlow::Configuration::Component.find_by_name connection_spec[:input_component_name]
raise RFlow::Configuration::Component::ComponentNotFound, "#{connection_spec[:input_component_name]}" unless input_component
input_port = input_component.input_ports.find_or_initialize_by_name :name => connection_spec[:input_port_name]
input_port.save!
- # Create a unique ZMQ address
-# zmq_address = "ipc://run/rflow.#{output_component.uuid}.#{output_port.uuid}"
-# if connection_spec[:output_port_key]
-# zmq_address << ".#{connection_spec[:output_port_key].gsub(/[^\w]/, '').downcase}"
-# end
-
connection = RFlow::Configuration::ZMQConnection.new(:name => connection_spec[:name],
:output_port_key => connection_spec[:output_port_key],
:input_port_key => connection_spec[:input_port_key])
-# :options => {
-# 'output_socket_type' => "PUSH",
-# 'output_address' => zmq_address,
-# 'output_responsibility' => "bind",
-# 'input_socket_type' => "PULL",
-# 'input_address' => zmq_address,
-# 'input_responsibility' => "connect",
-# })
connection.output_port = output_port
connection.input_port = input_port
connection.save!
@@ -168,10 +192,10 @@
RFlow.logger.debug "Exception #{e.class} - " + error_message
RFlow.logger.error error_message
raise RFlow::Configuration::Connection::ConnectionInvalid, error_message
end
-
+
# Method called within the config file itself
def self.configure
config_file = self.new
yield config_file
config_file.process