lib/splash/orchestrator.rb in prometheus-splash-0.1.1 vs lib/splash/orchestrator.rb in prometheus-splash-0.2.0
- old
+ new
@@ -1,76 +1,85 @@
# coding: utf-8
+Dir[File.dirname(__FILE__) + '/orchestrator/*.rb'].each {|file| require file }
+
module Splash
module Orchestrator
- module SchedulerHooks
- def on_pre_trigger(job, trigger_time)
-
- end
-
- def on_post_trigger(job, trigger_time)
-
- end
-
- def init_log
-
- end
- end
-
- module Commander
- include Splash::Transports
- def send_message (options)
- client = get_default_client
- client.publish options
- end
- end
-
- module Grammar
-
- VERBS=[:ping]
-
- def ping(payload)
- return "Pong : #{payload[:hostname]} !"
- end
- end
-
class Scheduler
include Splash::Constants
include Splash::Helpers
include Splash::Config
include Splash::Transports
include Splash::Orchestrator::Grammar
- def initialize
+ def initialize(options = {})
+ $stdout.sync = true
+ $stderr.sync = true
@server = Rufus::Scheduler::new
@server.extend SchedulerHooks
- @server.init_log
@config = get_config
@result = LogScanner::new
+ puts "Splash Orchestrator starting :"
+ if options[:scheduling] then
+ puts " * Initializing commands Scheduling."
+ self.init_commands_scheduling
+ end
sched,value = @config.daemon_logmon_scheduling.flatten
+ puts " * Initializing logs monitorings & notifications."
@server.send sched,value do
begin
- puts "Notify"
@result.analyse
@result.notify
$stdout.flush
rescue Errno::ECONNREFUSED
$stderr.puts "PushGateway seems to be done, please start it."
end
end
hostname = Socket.gethostname
transport = get_default_subscriber queue: "splash.#{hostname}.input"
+ if transport.class == Hash and transport.include? :case then
+ splash_exit transport
+ end
transport.subscribe(:block => true) do |delivery_info, properties, body|
content = YAML::load(body)
if VERBS.include? content[:verb]
- res = self.send content[:verb], content[:payload]
+ puts "Receive valid remote order, verb : #{content[:verb].to_s}"
+ if content[:payload] then
+ res = self.send content[:verb], content[:payload]
+ else
+ res = self.send content[:verb]
+ end
get_default_client.publish queue: content[:return_to], message: res.to_yaml
else
+ puts "Receive INVALID remote order, verb : #{content[:verb].to_s}"
get_default_client.publish queue: content[:return_to], message: "Unkown verb #{content[:verb]}".to_yaml
end
end
end
def terminate
+ end
+
+ private
+ def init_commands_scheduling
+ config = get_config.commands
+ commands = config.select{|key,value| value.include? :schedule}.keys
+ commands.each do |command|
+ sched,value = config[command][:schedule].flatten
+ puts " => Scheduling command #{command.to_s}"
+ @server.send sched,value do
+ self.execute command: command.to_s
+ end
+ end
+
+ end
+
+ def execute(options)
+ command = Splash::CommandWrapper::new(options[:command])
+ if options[:ack] then
+ command.ack
+ else
+ command.call_and_notify trace: true, notify: true, callback: true
+ end
end
end