require 'flydata/compatibility_check' require 'flydata/command/base' require 'flydata/command/sync' module Flydata module Command class Sender < Base def self.slop_start Slop.new do on 'n', 'no-daemon', 'Start FlyData agent as a regular program' end end def start(options_or_show_final_message = {show_final_message: true}) # For backward compatibility. Use only as options going forward if options_or_show_final_message.kind_of? Hash options = options_or_show_final_message else options = {show_final_message: options_or_show_final_message} end # Check if process exist if process_exist? log_info_stdout("Process is still running. Please stop process first.") unless options[:quiet] return end if agent_locked? log_info_stdout("Agent was not shut down properly. Agent will check the status and fix itself if necessary.") repair_opts = Flydata::Command::Sync.slop_repair repair_opts.parse!(["-y"]) sync = Flydata::Command::Sync.new(repair_opts) sync.send(:_repair) # call internal method to bypass command lock if agent_locked? raise "Agent was not able to recover from the previous unexpected shutdown. Please contact support@lyfdata.com to resolve the issue." end end # Ends orphan_proceses if there is any orphan_processes.each do |pid| Process.kill(:TERM, pid) end wait_until_server_ready(options) dp = flydata.data_port.get AgentCompatibilityCheck.new(dp).check fluentd_started = false start_fluentd = Proc.new do # Start sender(fluentd) process log_info_stdout("Starting FlyData Agent sender process.") unless options[:quiet] raw_start(options) wait_until_client_ready(options) #wait_until_logs_uploaded fluentd_started = true end quiet_option = options[:quiet] # surpress messages if fluentd is started in #try_mysql_sync options[:quiet] = true Flydata::Command::Sync.new.try_mysql_sync( binlog_ready_callback: start_fluentd) options[:quiet] = quiet_option start_fluentd.call unless fluentd_started if options[:show_final_message] && !options[:quiet] data_port = flydata.data_port.get log_info_stdout("Go to your Dashboard! #{flydata.flydata_api_host}/data_ports/#{data_port['id']}") log_info_stdout < 0 end def kill_all(options = {}) if Kernel.system("ps ax | grep 'flydata' | grep -v grep | grep fluentd | awk '{print \"kill \" $1}' | sh") log_info_stdout("Done.") unless options[:quiet] return true else raise 'Something has gone wrong...' end end # call the method only when no legit process is running. def agent_locked? File.exists?(FLYDATA_LOCK) end def wait_until_server_ready(options = {}) retry_count = 60 # 60 x 30(sec) = 1800(sec) = 30(min) 1.upto(retry_count) do |i| return true if server_ready? log_info_stdout("Waiting for the server side to become active... (#{i})") unless options[:quiet] Kernel.sleep 30 end false end private # Return a list of fluentd parent processes run by the same user for the # same flydata.pid file but not the process managed by the file itself. CMD = %Q{test -f %s && ps -u `whoami` -o ppid,pid,args | grep '^ *1 ' | grep '\\%s' | egrep -v "\\b`cat %s`\\b"} def raw_start(options) Dir.chdir(FLYDATA_HOME){ Kernel.system("bash #{FLYDATA_SERVERINFO}", :out => [FLYDATA_LOG,'a'], :err => [FLYDATA_LOG,'a']) daemon_opt = opts.no_daemon? ? "" : daemon_option Kernel.system("ruby `which fluentd` #{daemon_opt} -l #{FLYDATA_LOG} -c #{FLYDATA_CONF} -p #{FLYDATA_FLUENT_PLUGIN_DIR}") } Kernel.sleep 5 end def orphan_processes cmd = CMD % [ pid_file, daemon_option, pid_file ] result = `#{cmd}` orphan_pids = [] result.each_line do |line| orphan_pids << line.split[1].to_i end orphan_pids end def wait_until_client_ready(options = {}) retry_count = 10 1.upto(retry_count) do |i| if client_ready? log_info_stdout("Done! Client is ready now.") unless options[:quiet] return true end if process_died? raise "Client could not been launched. Detail here #{FLYDATA_HOME}/flydata.log" end log_info_stdout("Waiting for the client side to become active... (#{i}/#{retry_count})") unless options[:quiet] Kernel.sleep 10 end raise "Somthing has gone wrong... Please try setup command again." end def wait_until_client_stop(options = {}) retry_count = 10 1.upto(retry_count) do |i| return true unless process_exist? log_info_stdout("Waiting for the client to stop... (#{i}/#{retry_count})") unless options[:quiet] Kernel.sleep 3 end false end def wait_until_logs_uploaded(options = {}) log_info_stdout('Starting to check the upload from your server.') unless options[:quiet] data_port = flydata.data_port.get data_port_id = data_port['id'] retry_count = 10 1.upto(retry_count) do |i| if uploaded_successfully?(data_port_id) log_info_stdout("Uploading your logs correctly.") unless options[:quiet] return true end log_info_stdout("Waiting logs uploading... (#{i}/#{retry_count})") unless options[:quiet] Kernel.sleep 30 end raise 'Cannot confirm that your logs exist on the FlyData server. Something has gone wrong..' end def server_ready? data_port = flydata.data_port.get data_port['server_status'] == 'active' end def client_ready? process_exist? end def process_died? # Returns true if the process is running !!(`tail -n 1 #{FLYDATA_HOME}/flydata.log` =~ /process died within/) end def uploaded_successfully?(data_port_id) res = flydata.get("/data_ports/#{data_port_id}/tail.json") res and res['logs'] and res['logs'].size > 0 end def client_buffer_empty?(options = {}) client_buffer = File.join(FLYDATA_HOME, 'buffer') log_info_stdout("Checking the client buffer #{client_buffer}") unless options[:quiet] Dir.glob("#{client_buffer}/*").empty? end def pid_file "#{FLYDATA_HOME}/flydata.pid" end def daemon_option "-d #{pid_file}" end end end end