lib/splash/daemon/orchestrator.rb in prometheus-splash-0.7.0 vs lib/splash/daemon/orchestrator.rb in prometheus-splash-0.8.0
- old
+ new
@@ -30,24 +30,73 @@
# @param [Hash] options
# @option options [Symbol] :scheduling activate commands scheduling
def initialize(options = {})
@log = get_logger
self.extend Splash::Daemon::Metrics
- @metric_manager = get_metrics_manager
+ @session = get_session
+ @metric_manager = get_metrics_manager(@session)
$stdout.sync = true
$stderr.sync = true
@server = Rufus::Scheduler::new
@server.extend SchedulerHooks
@config = get_config
+ @scheduling = options[:scheduling]
@log.info "Splash Orchestrator starting :"
- if options[:scheduling] then
- @log.item "Initializing commands Scheduling."
+ if @scheduling then
+ @log.item "Initializing Sequences & commands Scheduling."
init_commands_scheduling
init_sequences_scheduling
end
+ init_logs_monitoring_scheduling
+ init_process_monitoring_scheduling
+ init_metrics_scheduling
+ init_daemon_subscriber
+
+ end
+
+
+
+
+
+ # Stop the Splash daemon gracefully
+ # @return [hash] Exiter Case :quiet_exit
+ def terminate
+ @log.info "Splash daemon shutdown"
+ @server.shutdown
+ change_logger logger: :cli
+ splash_exit case: :quiet_exit
+ end
+
+ private
+
+ #prepare main daemon subscriber
+ def init_daemon_subscriber
+ hostname = Socket.gethostname
+ transport = get_default_subscriber queue: "splash.#{hostname}.input"
+ if transport.class == Hash and transport.include? :case then
+ splash_exit transport
+ end
+ transport.subscribe(:block => true) do |delivery_info, properties, body|
+ content = YAML::load(body)
+ session = get_session
+ content[:session] = session
+ if VERBS.include? content[:verb]
+ @log.receive "Valid remote order, verb : #{content[:verb].to_s}", session
+ res = self.send content[:verb], content
+ get_default_client.publish queue: content[:return_to], message: res.to_yaml
+ @log.send "Result to #{content[:return_to]}.", session
+ else
+ @log.receive "INVALID remote order, verb : #{content[:verb].to_s}", session
+ get_default_client.publish queue: content[:return_to], message: "Unkown verb #{content[:verb]}".to_yaml
+ end
+ end
+ end
+
+ #prepare logs monitoring sheduling
+ def init_logs_monitoring_scheduling
if @config.logs.empty? then
@log.item "No logs to monitor"
else
sched,value = @config.daemon_procmon_scheduling.flatten
@log.item "Initializing logs monitorings & notifications."
@@ -62,11 +111,14 @@
rescue Errno::ECONNREFUSED
@log.error "PushGateway seems to be done, please start it.", session
end
end
end
+ end
+ #prepare process monitoring sheduling
+ def init_process_monitoring_scheduling
if @config.processes.empty? then
@log.item "No processes to monitor"
else
sched,value = @config.daemon_logmon_scheduling.flatten
@log.item "Initializing processes monitorings & notifications."
@@ -81,53 +133,28 @@
rescue Errno::ECONNREFUSED
@log.error "PushGateway seems to be done, please start it.", session
end
end
end
+ end
+ #prepare metrics sheduling
+ def init_metrics_scheduling
sched,value = @config.daemon_metrics_scheduling.flatten
@log.item "Initializing Splash metrics notifications."
@server.send sched,value do
begin
+ @log.trigger "Splash Metrics monitoring for Scheduling : #{sched.to_s} #{value.to_s}", @session
@metric_manager.notify
rescue Errno::ECONNREFUSED
@log.error "PushGateway seems to be done, please start it."
end
end
-
- hostname = Socket.gethostname
- transport = get_default_subscriber queue: "splash.#{hostname}.input"
- if transport.class == Hash and transport.include? :case then
- splash_exit transport
- end
- transport.subscribe(:block => true) do |delivery_info, properties, body|
- content = YAML::load(body)
- session = get_session
- content[:session] = session
- if VERBS.include? content[:verb]
- @log.receive "Valid remote order, verb : #{content[:verb].to_s}", session
- res = self.send content[:verb], content
- get_default_client.publish queue: content[:return_to], message: res.to_yaml
- @log.send "Result to #{content[:return_to]}.", session
- else
- @log.receive "INVALID remote order, verb : #{content[:verb].to_s}", session
- get_default_client.publish queue: content[:return_to], message: "Unkown verb #{content[:verb]}".to_yaml
- end
- end
end
- # Stop the Splash daemon gracefully
- # @return [hash] Exiter Case :quiet_exit
- def terminate
- @log.info "Splash daemon shutdown"
- @server.shutdown
- change_logger logger: :cli
- splash_exit case: :quiet_exit
- end
- private
# prepare commands Scheduling
def init_commands_scheduling
config = get_config.commands
commands = config.select{|key,value| value.include? :schedule}.keys
commands.each do |command|
@@ -137,11 +164,10 @@
session = get_session
@log.trigger "Executing Scheduled command #{command.to_s} for Scheduling : #{sched.to_s} #{value.to_s}", session
execute command: command.to_s, session: session
end
end
-
end
# prepare sequences Scheduling
def init_sequences_scheduling
@@ -154,15 +180,31 @@
session = get_session
@log.trigger "Executing Scheduled sequence #{sequence.to_s} for Scheduling : #{sched.to_s} #{value.to_s}", session
run_seq name: sequence.to_s, session: session
end
end
+ end
+ # reset the orchestrator
+ # @return [Hash] Exiter case
+ def reset_orchestrator
+ @server.shutdown
+ @server = Rufus::Scheduler::new
+ @server.extend SchedulerHooks
+ @config = rehash_config
+ @log.info "Splash Orchestrator re-hashing :"
+ if @scheduling then
+ @log.item "Re-Initializing Sequences & commands Scheduling."
+ init_commands_scheduling
+ init_sequences_scheduling
+ end
+ init_logs_monitoring_scheduling
+ init_process_monitoring_scheduling
+ init_metrics_scheduling
end
-
# execute_command verb : execute command specified in payload
# @param [Hash] options
# @option options [Symbol] :command the name of the command
# @option options [Symbol] :ack ack flag to inhibit execution and send ack to Prometheus (0)
# @return [Hash] Exiter case
@@ -172,11 +214,9 @@
else
@metric_manager.inc_execution
return command.call_and_notify trace: true, notify: true, callback: true, session: options[:session]
end
end
-
end
-
end
end
end