lib/agent/server/worker/am_worker.rb in apminsight-1.0.1 vs lib/agent/server/worker/am_worker.rb in apminsight-1.8.2

- old
+ new

@@ -5,66 +5,83 @@ class APMWorker @work =nil; @status = 'not_init' @id = 0 attr_accessor :id - def initialize - @status = "initialized" - @id = Process.pid - end + def initialize + @status = "initialized" + @id = Process.pid + end - def start - @obj = ManageEngine::APMObjectHolder.instance - - if @status=="working" - @obj.log.debug "woker thread already started" - elsif @status == "initialized" - @obj.log.info "start worker thread for - #{Process.pid} :: #{@status} " - #@obj.log.info "Starting APMWorker Thread #{Process.pid} " - @apm = Thread.new do - @status = 'working' - while !@obj.shutdown do - checkforagentstatus - updateConfig - dc - sleep (@obj.config.connect_interval).to_i - end#w - @status= "end" - @obj.log.debug "Worker thread ends" - end - end + def start + @obj = ManageEngine::APMObjectHolder.instance + + if @status=="working" + @obj.log.debug "woker thread already started" + elsif @status == "initialized" + @obj.log.info "start worker thread for - #{Process.pid} :: #{@status} " + #@obj.log.info "Starting APMWorker Thread #{Process.pid} " + @apm = Thread.new do + @status = 'working' + while !@obj.shutdown do + checkforagentstatus + updateConfig + dc + sleep (@obj.config.connect_interval).to_i + end#w + @status= "end" + @obj.log.debug "Worker thread ends" end + end + end + + def self.getInstance + if(@work==nil || @work.id!=Process.pid) + @work = ManageEngine::APMWorker.new + end + return @work + end - def self.getInstance - if(@work==nil || @work.id!=Process.pid) - @work = ManageEngine::APMWorker.new - end - return @work - end - def updateConfig if(@obj.config.lastupdatedtime!=File.mtime(@obj.constants.apm_conf).to_i) - @obj.log.info "Configuration File Changed... So Updating Configuration." + @obj.log.info "Configuration File Changed... So Updating Configuration." + agent_config_data = @obj.config.getAgentConfigData @obj.config.lastupdatedtime=File.mtime(@obj.constants.apm_conf).to_i @obj.config.configureFile @obj.config.assignConfig + new_agent_config_data = @obj.config.getAgentConfigData + sendUpdate = "false" + agent_config_data.each do|key,value| + if key != "last.modified.time" + newValue = new_agent_config_data[key] + if value != newValue + sendUpdate = "true" + end + end + end + if sendUpdate == "true" + @obj.log.info "sending update to server #{new_agent_config_data}" + data1 = Hash.new + data1["custom_config_info"]=new_agent_config_data + resp = @obj.connector.post @obj.constants.connect_config_update_uri+@obj.config.instance_id,data1 + end end end def checkforagentstatus prevState = @obj.config.agent_enabled @obj.config.checkAgentInfo if !@obj.config.agent_enabled @obj.log.info "Agent in Disabled State." if prevState @obj.log.info "Agent in Disabled State. Going to unsubscribe" - @obj.instrumenter.doUnSubscribe +# @obj.instrumenter.doUnSubscribe end else if !prevState @obj.log.info "Agent in Active State." - @obj.instrumenter.doSubscribe +# @obj.instrumenter.doSubscribe end end end def stop @@ -124,18 +141,22 @@ case val.size when 1 tdata.concat(val[0]) when 2 tdata.concat(val[0]) - trdata.concat(val[1]) + if (trdata.size < @obj.config.trace_overflow_t) + trdata.concat(val[1]) + end end end result.push(merge(tdata)) resp = @obj.connector.post @obj.constants.connect_data_uri+@obj.config.instance_id,result + @obj.log.info "#{tdata.size} metric(s) dispatched." if trdata.size>0 result[2]=trdata; resp = @obj.connector.post @obj.constants.connect_trace_uri+@obj.config.instance_id,result + @obj.log.info "#{trdata.size} trace(s) dispatched." end end def save fd begin @@ -147,11 +168,11 @@ end def send_save data begin if FileTest.exist?(@obj.constants.agent_lock) - if Time.now.to_i - File.mtime(@obj.constants.agent_lock).to_i > (@obj.config.connect_interval).to_i + if Time.now.to_i - File.mtime(@obj.constants.agent_lock).to_i >= (@obj.config.connect_interval).to_i @obj.log.debug "worker send signal" senddata data else @obj.log.info "worker save signal" save data @@ -170,11 +191,15 @@ data = Array.new File.open( p, "r+" ) { |f| f.flock(File::LOCK_EX) begin f.each_line do |line| - data.push(JSON.parse(line)) + begin + data.push(JSON.parse(line)) + rescue Exception=>ex + @obj.log.logException "Error Parsing data, Skipping line #{line}", ex + end end f.truncate 0 rescue Exception=>e @obj.log.logException "Exception while reading data #{e}",e ensure @@ -221,22 +246,40 @@ res end def mapdx res,dat - res[0] = res[0]+dat[0]; - if dat[1]<res[1] - res[1]=dat[1] - end - if dat[2]>res[2] - res[2]=dat[2] - end - res[3] = res[3]+dat[3] - res[5] = res[5]+dat[5] - res[6] = res[6]+dat[6] - res[7] = res[7]+dat[7] - res[4] = (res[5].to_f + (res[6].to_f/2).to_f).to_f/res[3].to_f + begin + rtData = res[0]; + rtData[0] = rtData[0]+dat[0][0]; + if dat[0][1]<rtData[1] + rtData[1]=dat[0][1] + end + if dat[0][2]>rtData[2] + rtData[2]=dat[0][2] + end + rtData[3] = rtData[3]+dat[0][3] + rtData[5] = rtData[5]+dat[0][5] + rtData[6] = rtData[6]+dat[0][6] + rtData[7] = rtData[7]+dat[0][7] + rtData[4] = rtData[3] != 0 ? (rtData[5].to_f + (rtData[6].to_f/2).to_f).to_f/rtData[3].to_f : 0 + res[0] = rtData + + resExcepData = res[1][@obj.constants.mf_logmetric] + excepData = dat[1][@obj.constants.mf_logmetric] + if (resExcepData == nil) + resExcepData = excepData + else + if (excepData != nil) + resExcepData = resExcepData.merge(excepData){|key, oldval, newval| newval + oldval} + end + end + + res[1][@obj.constants.mf_logmetric] = resExcepData != nil ? resExcepData : Hash.new + rescue Exception=>e + @obj.log.logException "Exception while merging data",e + end res end def mapdb res,dat res[0] = res[0]+dat[0]; @@ -245,9 +288,10 @@ end if dat[2]>res[2] res[2]=dat[2] end res[3] = res[3]+dat[3] + res[4] = res[4]+dat[4] res end end#c end#m