lib/logstash/inputs/snmp.rb in logstash-input-snmp-1.2.7 vs lib/logstash/inputs/snmp.rb in logstash-input-snmp-1.2.8
- old
+ new
@@ -159,13 +159,13 @@
@client_definitions << definition
end
end
def run(queue)
- # for now a naive single threaded poller which sleeps for the given interval between
+ # for now a naive single threaded poller which sleeps off the remaining interval between
# each run. each run polls all the defined hosts for the get and walk options.
- while !stop?
+ stoppable_interval_runner.every(@interval, "polling hosts") do
@client_definitions.each do |definition|
result = {}
if !definition[:get].empty?
begin
result = result.merge(definition[:client].get(definition[:get], @oid_root_skip, @oid_path_length))
@@ -205,25 +205,30 @@
event = LogStash::Event.new(result)
decorate(event)
queue << event
end
end
-
- Stud.stoppable_sleep(@interval) { stop? }
end
end
+ def stoppable_interval_runner
+ StoppableIntervalRunner.new(self)
+ end
+
def close
@client_definitions.each do |definition|
begin
definition[:client].close
rescue => e
logger.warn("error closing client on #{definition[:host_address]}, ignoring", :exception => e)
end
end
end
+ def stop
+ end
+
private
OID_REGEX = /^\.?([0-9\.]+)$/
HOST_REGEX = /^(?<host_protocol>udp|tcp):(?<host_address>.+)\/(?<host_port>\d+)$/i
VERSION_REGEX =/^1|2c|3$/
@@ -292,7 +297,48 @@
end
end
def validate_strip!
raise(LogStash::ConfigurationError, "you can not specify both oid_root_skip and oid_path_length") if @oid_root_skip > 0 and @oid_path_length > 0
+ end
+
+ ##
+ # The StoppableIntervalRunner is capable of running a block of code at a
+ # repeating interval, while respecting the stop condition of the plugin.
+ class StoppableIntervalRunner
+ ##
+ # @param plugin [#logger,#stop?]
+ def initialize(plugin)
+ @plugin = plugin
+ end
+
+ ##
+ # Runs the provided block repeatedly using the provided interval.
+ # After executing the block, the remainder of the interval if any is slept off
+ # using an interruptible sleep.
+ # If no time remains, a warning is emitted to the logs.
+ #
+ # @param interval_seconds [Integer,Float]
+ # @param desc [String] (default: "operation"): a description to use when logging
+ # @yield
+ def every(interval_seconds, desc="operation", &block)
+ until @plugin.stop?
+ start_time = Time.now
+
+ yield
+
+ duration_seconds = Time.now - start_time
+ if duration_seconds >= interval_seconds
+ @plugin.logger.warn("#{desc} took longer than the configured interval", :interval_seconds => interval_seconds, :duration_seconds => duration_seconds.round(3))
+ else
+ remaining_interval = interval_seconds - duration_seconds
+ sleep(remaining_interval)
+ end
+ end
+ end
+
+ # @api private
+ def sleep(duration)
+ Stud.stoppable_sleep(duration) { @plugin.stop? }
+ end
end
end