lib/agent/server/worker/am_worker.rb in apminsight-1.0.1 vs lib/agent/server/worker/am_worker.rb in apminsight-1.8.2
- old
+ new
@@ -5,66 +5,83 @@
class APMWorker
@work =nil;
@status = 'not_init'
@id = 0
attr_accessor :id
- def initialize
- @status = "initialized"
- @id = Process.pid
- end
+ def initialize
+ @status = "initialized"
+ @id = Process.pid
+ end
- def start
- @obj = ManageEngine::APMObjectHolder.instance
-
- if @status=="working"
- @obj.log.debug "woker thread already started"
- elsif @status == "initialized"
- @obj.log.info "start worker thread for - #{Process.pid} :: #{@status} "
- #@obj.log.info "Starting APMWorker Thread #{Process.pid} "
- @apm = Thread.new do
- @status = 'working'
- while !@obj.shutdown do
- checkforagentstatus
- updateConfig
- dc
- sleep (@obj.config.connect_interval).to_i
- end#w
- @status= "end"
- @obj.log.debug "Worker thread ends"
- end
- end
+ def start
+ @obj = ManageEngine::APMObjectHolder.instance
+
+ if @status=="working"
+ @obj.log.debug "woker thread already started"
+ elsif @status == "initialized"
+ @obj.log.info "start worker thread for - #{Process.pid} :: #{@status} "
+ #@obj.log.info "Starting APMWorker Thread #{Process.pid} "
+ @apm = Thread.new do
+ @status = 'working'
+ while !@obj.shutdown do
+ checkforagentstatus
+ updateConfig
+ dc
+ sleep (@obj.config.connect_interval).to_i
+ end#w
+ @status= "end"
+ @obj.log.debug "Worker thread ends"
end
+ end
+ end
+
+ def self.getInstance
+ if(@work==nil || @work.id!=Process.pid)
+ @work = ManageEngine::APMWorker.new
+ end
+ return @work
+ end
- def self.getInstance
- if(@work==nil || @work.id!=Process.pid)
- @work = ManageEngine::APMWorker.new
- end
- return @work
- end
-
def updateConfig
if(@obj.config.lastupdatedtime!=File.mtime(@obj.constants.apm_conf).to_i)
- @obj.log.info "Configuration File Changed... So Updating Configuration."
+ @obj.log.info "Configuration File Changed... So Updating Configuration."
+ agent_config_data = @obj.config.getAgentConfigData
@obj.config.lastupdatedtime=File.mtime(@obj.constants.apm_conf).to_i
@obj.config.configureFile
@obj.config.assignConfig
+ new_agent_config_data = @obj.config.getAgentConfigData
+ sendUpdate = "false"
+ agent_config_data.each do|key,value|
+ if key != "last.modified.time"
+ newValue = new_agent_config_data[key]
+ if value != newValue
+ sendUpdate = "true"
+ end
+ end
+ end
+ if sendUpdate == "true"
+ @obj.log.info "sending update to server #{new_agent_config_data}"
+ data1 = Hash.new
+ data1["custom_config_info"]=new_agent_config_data
+ resp = @obj.connector.post @obj.constants.connect_config_update_uri+@obj.config.instance_id,data1
+ end
end
end
def checkforagentstatus
prevState = @obj.config.agent_enabled
@obj.config.checkAgentInfo
if !@obj.config.agent_enabled
@obj.log.info "Agent in Disabled State."
if prevState
@obj.log.info "Agent in Disabled State. Going to unsubscribe"
- @obj.instrumenter.doUnSubscribe
+# @obj.instrumenter.doUnSubscribe
end
else
if !prevState
@obj.log.info "Agent in Active State."
- @obj.instrumenter.doSubscribe
+# @obj.instrumenter.doSubscribe
end
end
end
def stop
@@ -124,18 +141,22 @@
case val.size
when 1
tdata.concat(val[0])
when 2
tdata.concat(val[0])
- trdata.concat(val[1])
+ if (trdata.size < @obj.config.trace_overflow_t)
+ trdata.concat(val[1])
+ end
end
end
result.push(merge(tdata))
resp = @obj.connector.post @obj.constants.connect_data_uri+@obj.config.instance_id,result
+ @obj.log.info "#{tdata.size} metric(s) dispatched."
if trdata.size>0
result[2]=trdata;
resp = @obj.connector.post @obj.constants.connect_trace_uri+@obj.config.instance_id,result
+ @obj.log.info "#{trdata.size} trace(s) dispatched."
end
end
def save fd
begin
@@ -147,11 +168,11 @@
end
def send_save data
begin
if FileTest.exist?(@obj.constants.agent_lock)
- if Time.now.to_i - File.mtime(@obj.constants.agent_lock).to_i > (@obj.config.connect_interval).to_i
+ if Time.now.to_i - File.mtime(@obj.constants.agent_lock).to_i >= (@obj.config.connect_interval).to_i
@obj.log.debug "worker send signal"
senddata data
else
@obj.log.info "worker save signal"
save data
@@ -170,11 +191,15 @@
data = Array.new
File.open( p, "r+" ) { |f|
f.flock(File::LOCK_EX)
begin
f.each_line do |line|
- data.push(JSON.parse(line))
+ begin
+ data.push(JSON.parse(line))
+ rescue Exception=>ex
+ @obj.log.logException "Error Parsing data, Skipping line #{line}", ex
+ end
end
f.truncate 0
rescue Exception=>e
@obj.log.logException "Exception while reading data #{e}",e
ensure
@@ -221,22 +246,40 @@
res
end
def mapdx res,dat
- res[0] = res[0]+dat[0];
- if dat[1]<res[1]
- res[1]=dat[1]
- end
- if dat[2]>res[2]
- res[2]=dat[2]
- end
- res[3] = res[3]+dat[3]
- res[5] = res[5]+dat[5]
- res[6] = res[6]+dat[6]
- res[7] = res[7]+dat[7]
- res[4] = (res[5].to_f + (res[6].to_f/2).to_f).to_f/res[3].to_f
+ begin
+ rtData = res[0];
+ rtData[0] = rtData[0]+dat[0][0];
+ if dat[0][1]<rtData[1]
+ rtData[1]=dat[0][1]
+ end
+ if dat[0][2]>rtData[2]
+ rtData[2]=dat[0][2]
+ end
+ rtData[3] = rtData[3]+dat[0][3]
+ rtData[5] = rtData[5]+dat[0][5]
+ rtData[6] = rtData[6]+dat[0][6]
+ rtData[7] = rtData[7]+dat[0][7]
+ rtData[4] = rtData[3] != 0 ? (rtData[5].to_f + (rtData[6].to_f/2).to_f).to_f/rtData[3].to_f : 0
+ res[0] = rtData
+
+ resExcepData = res[1][@obj.constants.mf_logmetric]
+ excepData = dat[1][@obj.constants.mf_logmetric]
+ if (resExcepData == nil)
+ resExcepData = excepData
+ else
+ if (excepData != nil)
+ resExcepData = resExcepData.merge(excepData){|key, oldval, newval| newval + oldval}
+ end
+ end
+
+ res[1][@obj.constants.mf_logmetric] = resExcepData != nil ? resExcepData : Hash.new
+ rescue Exception=>e
+ @obj.log.logException "Exception while merging data",e
+ end
res
end
def mapdb res,dat
res[0] = res[0]+dat[0];
@@ -245,9 +288,10 @@
end
if dat[2]>res[2]
res[2]=dat[2]
end
res[3] = res[3]+dat[3]
+ res[4] = res[4]+dat[4]
res
end
end#c
end#m