lib/rflow.rb in rflow-0.0.5 vs lib/rflow.rb in rflow-1.0.0a1

- old
+ new

@@ -3,438 +3,66 @@ require 'time' require 'active_record' require 'eventmachine' -require 'log4r' require 'sqlite3' require 'rflow/configuration' -require 'rflow/component' +require 'rflow/master' require 'rflow/message' require 'rflow/components' require 'rflow/connections' +require 'rflow/logger' class RFlow include Log4r class Error < StandardError; end - LOG_PATTERN_FORMAT = '%l [%d] %c (%p) - %M' - DATE_METHOD = 'xmlschema(6)' - LOG_PATTERN_FORMATTER = PatternFormatter.new :pattern => RFlow::LOG_PATTERN_FORMAT, :date_method => DATE_METHOD - - # Might be slightly faster, but also not completely correct XML - #schema timestamps due to %z - #DATE_PATTERN_FORMAT = '%Y-%m-%dT%H:%M:%S.%9N %z' - #LOG_PATTERN_FORMATTER = PatternFormatter.new :pattern => RFlow::LOG_PATTERN_FORMAT, :date_pattern => DATE_PATTERN_FORMAT - class << self attr_accessor :config_database_path attr_accessor :logger attr_accessor :configuration - attr_accessor :components + attr_accessor :master end - -# def self.initialize_config_database(config_database_path, config_file_path=nil) -# # To handle relative paths in the config (all relative paths are -# # relative to the config database -# Dir.chdir File.dirname(config_database_path) -# Configuration.new(File.basename(config_database_path), config_file_path) -# end - def self.initialize_logger(log_file_path, log_level='INFO', include_stdout=nil) - rflow_logger = Logger.new((configuration['rflow.application_name'] rescue File.basename(log_file_path))) - rflow_logger.level = LNAMES.index log_level - # TODO: Remove this once all the logging puts in its own - # Class.Method names. - rflow_logger.trace = true - begin - rflow_logger.add FileOutputter.new('rflow.log_file', :filename => log_file_path, :formatter => LOG_PATTERN_FORMATTER) - rescue Exception => e - error_message = "Log file '#{File.expand_path log_file_path}' problem: #{e.message}\b#{e.backtrace.join("\n")}" - RFlow.logger.error error_message - raise ArgumentError, error_message - end + def self.run(config_database_path=nil, daemonize=nil) + self.configuration = Configuration.new(config_database_path) - if include_stdout - rflow_logger.add StdoutOutputter.new('rflow_stdout', :formatter => RFlow::LOG_PATTERN_FORMATTER) + if config_database_path + # First change to the config database directory, which might hold + # relative paths for the other files/directories, such as the + # application_directory_path + Dir.chdir File.dirname(config_database_path) end - - - RFlow.logger.info "Transitioning to running log file #{log_file_path} at level #{log_level}" - RFlow.logger = rflow_logger - end - def self.reopen_log_file - # TODO: Make this less of a hack, although Log4r doesn't support - # it, so it might be permanent - log_file = Outputter['rflow.log_file'].instance_variable_get(:@out) - File.open(log_file.path, 'a') { |tmp_log_file| log_file.reopen(tmp_log_file) } - end - - def self.close_log_file - Outputter['rflow.log_file'].close - end - - def self.toggle_log_level - original_log_level = LNAMES[logger.level] - new_log_level = (original_log_level == 'DEBUG' ? configuration['rflow.log_level'] : 'DEBUG') - logger.warn "Changing log level from #{original_log_level} to #{new_log_level}" - logger.level = LNAMES.index new_log_level - end - - def self.trap_signals - # Gracefully shutdown on termination signals - ['SIGTERM', 'SIGINT', 'SIGQUIT'].each do |signal| - Signal.trap signal do - logger.warn "Termination signal (#{signal}) received, shutting down" - shutdown - end - end - - # Reload on HUP - ['SIGHUP'].each do |signal| - Signal.trap signal do - logger.warn "Reload signal (#{signal}) received, reloading" - reload - end - end - - # Ignore terminal signals - # TODO: Make sure this is valid for non-daemon (foreground) process - ['SIGTSTP', 'SIGTTOU', 'SIGTTIN'].each do |signal| - Signal.trap signal do - logger.warn "Terminal signal (#{signal}) received, ignoring" - end - end - - # Reopen logs on USR1 - ['SIGUSR1'].each do |signal| - Signal.trap signal do - logger.warn "Reopen logs signal (#{signal}) received, reopening #{configuration['rflow.log_file_path']}" - reopen_log_file - end - end - - # Toggle log level on USR2 - ['SIGUSR2'].each do |signal| - Signal.trap signal do - logger.warn "Toggle log level signal (#{signal}) received, toggling" - toggle_log_level - end - end - - # TODO: Manage SIGCHLD when spawning other processes - end - - - # returns a PID if a given path contains a non-stale PID file, - # nil otherwise. - def self.running_pid_file_path?(pid_file_path) - return nil unless File.exist? pid_file_path - running_pid? File.read(pid_file_path).to_i - end - - def self.running_pid?(pid) - return if pid <= 0 - Process.kill(0, pid) - pid - rescue Errno::ESRCH, Errno::ENOENT - nil - end - - # unlinks a PID file at given if it contains the current PID still - # potentially racy without locking the directory (which is - # non-portable and may interact badly with other programs), but the - # window for hitting the race condition is small - def self.remove_pid_file(pid_file_path) - (File.read(pid_file_path).to_i == $$ and File.unlink(pid_file_path)) rescue nil - logger.debug "Removed PID (#$$) file '#{File.expand_path pid_file_path}'" - end - - # TODO: Handle multiple instances and existing PID file - def self.write_pid_file(pid_file_path) - pid = running_pid_file_path?(pid_file_path) - if pid && pid == $$ - logger.warn "Already running (#{pid.to_s}), not writing PID to file '#{File.expand_path pid_file_path}'" - return pid_file_path - elsif pid - error_message = "Already running (#{pid.to_s}), possibly stale PID file '#{File.expand_path pid_file_path}'" - logger.error error_message - raise ArgumentError, error_message - elsif File.exist? pid_file_path - logger.warn "Found stale PID file '#{File.expand_path pid_file_path}', removing" - remove_pid_file pid_file_path - end - - logger.debug "Writing PID (#$$) file '#{File.expand_path pid_file_path}'" - pid_fp = begin - tmp_pid_file_path = File.join(File.dirname(pid_file_path), ".#{File.basename(pid_file_path)}") - File.open(tmp_pid_file_path, File::RDWR|File::CREAT|File::EXCL, 0644) - rescue Errno::EEXIST - retry - end - pid_fp.syswrite("#$$\n") - File.rename(pid_fp.path, pid_file_path) - pid_fp.close - - pid_file_path - end - - # TODO: Refactor this to be cleaner - def self.daemonize!(application_name, pid_file_path) - logger.info "#{application_name} daemonizing" - - # TODO: Drop privileges - - # Daemonize, but don't chdir or close outputs - Process.daemon(true, true) - - # Set the process name - $0 = application_name if application_name - - # Write the PID file - write_pid_file pid_file_path - - # Close standard IO - $stdout.sync = $stderr.sync = true - $stdin.binmode; $stdout.binmode; $stderr.binmode - begin; $stdin.reopen "/dev/null"; rescue ::Exception; end - begin; $stdout.reopen "/dev/null"; rescue ::Exception; end - begin; $stderr.reopen "/dev/null"; rescue ::Exception; end - - $$ - end - - - # Iterate through each component config in the configuration - # database and attempt to instantiate each one, storing the - # resulting instantiated components in the 'components' class - # instance attribute. This assumes that the specification of a - # component is a fully qualified Ruby class that has already been - # loaded. It will first attempt to find subclasses of - # RFlow::Component (in the available_components hash) and then - # attempt to constantize the specification into a different class. Future releases will - # support external (i.e. non-managed components), but the current - # stuff only supports Ruby classes - def self.instantiate_components! - logger.info "Instantiating components" - self.components = Hash.new - configuration.components.each do |component_config| - if component_config.managed? - logger.debug "Instantiating component '#{component_config.name}' as '#{component_config.specification}' (#{component_config.uuid})" - begin - logger.debug configuration.available_components.inspect - instantiated_component = if configuration.available_components.include? component_config.specification - logger.debug "Component found in configuration.available_components['#{component_config.specification}']" - configuration.available_components[component_config.specification].new(component_config.uuid, component_config.name) - else - logger.debug "Component not found in configuration.available_components, constantizing component '#{component_config.specification}'" - component_config.specification.constantize.new(component_config.uuid, component_config.name) - end - - components[component_config.uuid] = instantiated_component - - rescue NameError => e - error_message = "Could not instantiate component '#{component_config.name}' as '#{component_config.specification}' (#{component_config.uuid}): the class '#{component_config.specification}' was not found" - logger.error error_message - raise RuntimeError, error_message - rescue Exception => e - error_message = "Could not instantiate component '#{component_config.name}' as '#{component_config.specification}' (#{component_config.uuid}): #{e.class} #{e.message}" - logger.error error_message - raise RuntimeError, error_message - end - else - error_message = "Non-managed components not yet implemented for component '#{component_config.name}' as '#{component_config.specification}' (#{component_config.uuid})" - logger.error error_message - raise NotImplementedError, error_message - end - end - end - - - # Iterate through the instantiated components and send each - # component its soon-to-be connected port names and UUIDs - def self.configure_component_ports! - # Send the port configuration to each component - logger.info "Configuring component ports and assigning UUIDs to port names" - components.each do |component_instance_uuid, component| - RFlow.logger.debug "Configuring ports for component '#{component.name}' (#{component.instance_uuid})" - component_config = configuration.component(component.instance_uuid) - component_config.input_ports.each do |input_port_config| - RFlow.logger.debug "Configuring component '#{component.name}' (#{component.instance_uuid}) with input port '#{input_port_config.name}' (#{input_port_config.uuid})" - component.configure_input_port!(input_port_config.name, input_port_config.uuid) - end - component_config.output_ports.each do |output_port_config| - RFlow.logger.debug "Configuring component '#{component.name}' (#{component.instance_uuid}) with output port '#{output_port_config.name}' (#{output_port_config.uuid})" - component.configure_output_port!(output_port_config.name, output_port_config.uuid) - end - end - end - - - # Iterate through the instantiated components and send each - # component the information necessary to configure a connection on a - # specific port, specifically the port UUID, port key, type of connection, uuid - # of connection, and a configuration specific to the connection type - def self.configure_component_connections! - logger.info "Configuring component port connections" - components.each do |component_instance_uuid, component| - component_config = configuration.component(component.instance_uuid) - - logger.debug "Configuring input connections for component '#{component.name}' (#{component.instance_uuid})" - component_config.input_ports.each do |input_port_config| - input_port_config.input_connections.each do |input_connection_config| - logger.debug "Configuring input port '#{input_port_config.name}' (#{input_port_config.uuid}) key '#{input_connection_config.input_port_key}' with #{input_connection_config.type.to_s} connection '#{input_connection_config.name}' (#{input_connection_config.uuid})" - component.configure_connection!(input_port_config.uuid, input_connection_config.input_port_key, - input_connection_config.type, input_connection_config.uuid, input_connection_config.name, input_connection_config.options) - end - end - - logger.debug "Configuring output connections for '#{component.name}' (#{component.instance_uuid})" - component_config.output_ports.each do |output_port_config| - output_port_config.output_connections.each do |output_connection_config| - logger.debug "Configuring output port '#{output_port_config.name}' (#{output_port_config.uuid}) key '#{output_connection_config.output_port_key}' with #{output_connection_config.type.to_s} connection '#{output_connection_config.name}' (#{output_connection_config.uuid})" - component.configure_connection!(output_port_config.uuid, output_connection_config.output_port_key, - output_connection_config.type, output_connection_config.uuid, output_connection_config.name, output_connection_config.options) - end - end - end - end - - - # Send the component-specific configuration to the component - def self.configure_components! - logger.info "Configuring components with component-specific configurations" - components.each do |component_uuid, component| - component_config = configuration.component(component.instance_uuid) - logger.debug "Configuring component '#{component.name}' (#{component.instance_uuid})" - component.configure!(component_config.options) - end - end - - # Send a command to each component to tell them to connect their - # ports via their connections - def self.connect_components! - logger.info "Connecting components" - components.each do |component_uuid, component| - logger.debug "Connecting component '#{component.name}' (#{component.instance_uuid})" - component.connect! - end - end - - # Start each component running - def self.run_components! - logger.info "Running components" - components.each do |component_uuid, component| - logger.debug "Running component '#{component.name}' (#{component.instance_uuid})" - component.run! - end - end - - def self.run(config_database_path, daemonize=nil) - self.configuration = Configuration.new(config_database_path) - - # First change to the config database directory, which might hold - # relative paths for the other files/directories, such as the - # application_directory_path - Dir.chdir File.dirname(config_database_path) - # Bail unless you have some of the basic information. TODO: # rethink this when things get more dynamic unless configuration['rflow.application_directory_path'] error_message = "Empty configuration database! Use a view/controller (such as the RubyDSL) to create a configuration" RFlow.logger.error "Empty configuration database! Use a view/controller (such as the RubyDSL) to create a configuration" raise ArgumentError, error_message end Dir.chdir configuration['rflow.application_directory_path'] - - initialize_logger(configuration['rflow.log_file_path'], configuration['rflow.log_level'], !daemonize) - application_name = configuration['rflow.application_name'] - logger.info "#{application_name} starting" + self.logger = RFlow::Logger.new(configuration, !daemonize) + @master = Master.new(configuration) - logger.info "#{application_name} configured, starting flow" - logger.debug "Available Components: #{RFlow::Configuration.available_components.inspect}" - logger.debug "Available Data Types: #{RFlow::Configuration.available_data_types.inspect}" - logger.debug "Available Data Extensions: #{RFlow::Configuration.available_data_extensions.inspect}" + master.daemonize! if daemonize + master.run # Runs EM and doesn't return - # TODO: Start up a FlowManager component and connect it to the - # management interface on all the components. - - instantiate_components! - configure_component_ports! - configure_component_connections! - configure_components! - - # At this point, each component should have their entire - # configuration for the component-specific stuff and all the - # connections and be ready to be connected to the others and start - # running - - - # Daemonize - trap_signals - daemonize!(application_name, configuration['rflow.pid_file_path']) if daemonize - write_pid_file configuration['rflow.pid_file_path'] - - # Start the event loop and startup each component - EM.run do - connect_components! - - components.each do |component_uuid, component| - RFlow.logger.debug component.to_s - end - - run_components! - - # Sit back and relax because everything is running - end - # Should never get here - shutdown - - # TODO: Look into Parallel::ForkManager + logger.warn "going down" rescue SystemExit => e # Do nothing, just prevent a normal exit from causing an unsightly # error in the logs rescue Exception => e logger.fatal "Exception caught: #{e.class} - #{e.message}\n#{e.backtrace.join "\n"}" exit 1 - end - - def self.shutdown - logger.info "#{configuration['rflow.application_name']} shutting down" - - logger.debug "Shutting down components" - components.each do |component_instance_uuid, component| - logger.debug "Shutting down component '#{component.name}' (#{component.instance_uuid})" - component.shutdown! - end - - # TODO: Ensure that all the components have shut down before - # cleaning up - - logger.debug "Cleaning up components" - components.each do |component_instance_uuid, component| - logger.debug "Cleaning up component '#{component.name}' (#{component.instance_uuid})" - component.cleanup! - end - - remove_pid_file configuration['rflow.pid_file_path'] - logger.info "#{configuration['rflow.application_name']} exiting" - exit 0 - end - - def self.reload - # TODO: Actually do a real reload - logger.info "#{configuration['rflow.application_name']} reloading" - reload_log_file - logger.info "#{configuration['rflow.application_name']} reloaded" end end # class RFlow