lib/splash/daemon/orchestrator.rb in prometheus-splash-0.7.0 vs lib/splash/daemon/orchestrator.rb in prometheus-splash-0.8.0

- old
+ new

@@ -30,24 +30,73 @@ # @param [Hash] options # @option options [Symbol] :scheduling activate commands scheduling def initialize(options = {}) @log = get_logger self.extend Splash::Daemon::Metrics - @metric_manager = get_metrics_manager + @session = get_session + @metric_manager = get_metrics_manager(@session) $stdout.sync = true $stderr.sync = true @server = Rufus::Scheduler::new @server.extend SchedulerHooks @config = get_config + @scheduling = options[:scheduling] @log.info "Splash Orchestrator starting :" - if options[:scheduling] then - @log.item "Initializing commands Scheduling." + if @scheduling then + @log.item "Initializing Sequences & commands Scheduling." init_commands_scheduling init_sequences_scheduling end + init_logs_monitoring_scheduling + init_process_monitoring_scheduling + init_metrics_scheduling + init_daemon_subscriber + + end + + + + + + # Stop the Splash daemon gracefully + # @return [hash] Exiter Case :quiet_exit + def terminate + @log.info "Splash daemon shutdown" + @server.shutdown + change_logger logger: :cli + splash_exit case: :quiet_exit + end + + private + + #prepare main daemon subscriber + def init_daemon_subscriber + hostname = Socket.gethostname + transport = get_default_subscriber queue: "splash.#{hostname}.input" + if transport.class == Hash and transport.include? :case then + splash_exit transport + end + transport.subscribe(:block => true) do |delivery_info, properties, body| + content = YAML::load(body) + session = get_session + content[:session] = session + if VERBS.include? content[:verb] + @log.receive "Valid remote order, verb : #{content[:verb].to_s}", session + res = self.send content[:verb], content + get_default_client.publish queue: content[:return_to], message: res.to_yaml + @log.send "Result to #{content[:return_to]}.", session + else + @log.receive "INVALID remote order, verb : #{content[:verb].to_s}", session + get_default_client.publish queue: content[:return_to], message: "Unkown verb #{content[:verb]}".to_yaml + end + end + end + + #prepare logs monitoring sheduling + def init_logs_monitoring_scheduling if @config.logs.empty? then @log.item "No logs to monitor" else sched,value = @config.daemon_procmon_scheduling.flatten @log.item "Initializing logs monitorings & notifications." @@ -62,11 +111,14 @@ rescue Errno::ECONNREFUSED @log.error "PushGateway seems to be done, please start it.", session end end end + end + #prepare process monitoring sheduling + def init_process_monitoring_scheduling if @config.processes.empty? then @log.item "No processes to monitor" else sched,value = @config.daemon_logmon_scheduling.flatten @log.item "Initializing processes monitorings & notifications." @@ -81,53 +133,28 @@ rescue Errno::ECONNREFUSED @log.error "PushGateway seems to be done, please start it.", session end end end + end + #prepare metrics sheduling + def init_metrics_scheduling sched,value = @config.daemon_metrics_scheduling.flatten @log.item "Initializing Splash metrics notifications." @server.send sched,value do begin + @log.trigger "Splash Metrics monitoring for Scheduling : #{sched.to_s} #{value.to_s}", @session @metric_manager.notify rescue Errno::ECONNREFUSED @log.error "PushGateway seems to be done, please start it." end end - - hostname = Socket.gethostname - transport = get_default_subscriber queue: "splash.#{hostname}.input" - if transport.class == Hash and transport.include? :case then - splash_exit transport - end - transport.subscribe(:block => true) do |delivery_info, properties, body| - content = YAML::load(body) - session = get_session - content[:session] = session - if VERBS.include? content[:verb] - @log.receive "Valid remote order, verb : #{content[:verb].to_s}", session - res = self.send content[:verb], content - get_default_client.publish queue: content[:return_to], message: res.to_yaml - @log.send "Result to #{content[:return_to]}.", session - else - @log.receive "INVALID remote order, verb : #{content[:verb].to_s}", session - get_default_client.publish queue: content[:return_to], message: "Unkown verb #{content[:verb]}".to_yaml - end - end end - # Stop the Splash daemon gracefully - # @return [hash] Exiter Case :quiet_exit - def terminate - @log.info "Splash daemon shutdown" - @server.shutdown - change_logger logger: :cli - splash_exit case: :quiet_exit - end - private # prepare commands Scheduling def init_commands_scheduling config = get_config.commands commands = config.select{|key,value| value.include? :schedule}.keys commands.each do |command| @@ -137,11 +164,10 @@ session = get_session @log.trigger "Executing Scheduled command #{command.to_s} for Scheduling : #{sched.to_s} #{value.to_s}", session execute command: command.to_s, session: session end end - end # prepare sequences Scheduling def init_sequences_scheduling @@ -154,15 +180,31 @@ session = get_session @log.trigger "Executing Scheduled sequence #{sequence.to_s} for Scheduling : #{sched.to_s} #{value.to_s}", session run_seq name: sequence.to_s, session: session end end + end + # reset the orchestrator + # @return [Hash] Exiter case + def reset_orchestrator + @server.shutdown + @server = Rufus::Scheduler::new + @server.extend SchedulerHooks + @config = rehash_config + @log.info "Splash Orchestrator re-hashing :" + if @scheduling then + @log.item "Re-Initializing Sequences & commands Scheduling." + init_commands_scheduling + init_sequences_scheduling + end + init_logs_monitoring_scheduling + init_process_monitoring_scheduling + init_metrics_scheduling end - # execute_command verb : execute command specified in payload # @param [Hash] options # @option options [Symbol] :command the name of the command # @option options [Symbol] :ack ack flag to inhibit execution and send ack to Prometheus (0) # @return [Hash] Exiter case @@ -172,11 +214,9 @@ else @metric_manager.inc_execution return command.call_and_notify trace: true, notify: true, callback: true, session: options[:session] end end - end - end end end