lib/telemetry/snmp/collector.rb in telemetry-snmp-0.2.0 vs lib/telemetry/snmp/collector.rb in telemetry-snmp-0.3.0

- old
+ new

@@ -1,100 +1,50 @@ module Telemetry module Snmp module Collector class << self - def worker_name - "#{::Socket.gethostname.tr('.', '_')}.#{::Process.pid}.#{Thread.current.object_id}" + def loop_devices + count = 0 + Telemetry::Snmp::Data::Model::Device.where(:active).order(:last_polled).each do |row| + break if count >= 10 + next if row.values[:last_polled].to_i + row.values[:frequency] > Time.now.to_i + next if device_locked?(row.values[:id]) + + Telemetry::Logger.info "Grabbing metrics for #{row.values[:hostname]}" + device = Telemetry::Snmp::DeviceCollector.new(row.values[:hostname]) + device.async.collect + count += 1 + end end - def loop_devices - Telemetry::Snmp::Data::Model::Device.where(:active).each do |row| + def poll_next_device + Telemetry::Snmp::Data::Model::Device.where(:active).order(:last_polled).each do |row| next if row.values[:last_polled].to_i + row.values[:frequency] > Time.now.to_i next if device_locked?(row.values[:id]) + Telemetry::Logger.info "Grabbing metrics for #{row.values[:hostname]}" collect(row.values[:id]) + break + rescue StandardError => e + Telemetry::Logger.exception(e, level: 'error') end end def unlock_expired_devices Telemetry::Snmp::Data::Model::DeviceLock.each do |row| next if row.values[:expires] < Sequel::CURRENT_TIMESTAMP + Telemetry::Logger.warn "removing lock for #{row.values[:hostname]}" row.delete end end def device_locked?(device_id) - !Telemetry::Snmp::Data::Model::DeviceLock[device_id: device_id].nil? + Telemetry::Snmp::Data::Model::DeviceLock.where(device_id: device_id).count.positive? end - def lock_device(device_id) - return false unless Telemetry::Snmp::Data::Model::DeviceLock[device_id: device_id].nil? - - Telemetry::Snmp::Data::Model::DeviceLock.insert( - worker_name: worker_name, - device_id: device_id, - created: Sequel::CURRENT_TIMESTAMP, - expires: Sequel::CURRENT_TIMESTAMP - ) - true - end - - # noinspection RubyArgCount - def unlock_device(device_id) - device = Telemetry::Snmp::Data::Model::DeviceLock[device_id: device_id] - return true if device.nil? - - device.delete - end - - def collect(device_id) - lock_device(device_id) - row = Telemetry::Snmp::Data::Model::Device[device_id] - lines = [] - fields = {} - tags = { - hostname: row.values[:hostname], - ip_address: row.values[:ip_address], - env: row.values[:environment], - dc: row.values[:datacenter], - zone: row.values[:zone], - influxdb_node_group: 'snmp', - influxdb_database: 'snmp' - } - - Telemetry::Snmp::Data::Model::OID.each do |oid_row| - break if @quit - - oid_value = Telemetry::Snmp::Client.oid_value(row[:hostname], oid_row.values[:oid]) - next if oid_value.nil? - next unless oid_value.is_a?(Integer) || oid_value.is_a?(Float) - - fields[oid_row.values[:name]] = - "#{Telemetry::Snmp::Client.oid_value(row[:hostname], oid_row.values[:oid])}i" - rescue StandardError => e - Telemetry::Logger.error "#{e.class}: #{e.message}" - end - - lines.push Telemetry::Metrics::Parser.to_line_protocol( - measurement: 'palo_alto', - fields: fields, - tags: tags, - timestamp: (DateTime.now.strftime('%Q').to_i * 1000 * 1000) - ) - - walker = Telemetry::Snmp::Client.grab_oid_metrics(row.values[:hostname]) - Telemetry::Logger.info "Pushing #{walker.count} lines for #{row.values[:hostname]}" unless walker.empty? - Telemetry::Snmp::Publisher.push_lines(walker) unless walker.empty? - - row.update(last_polled: Sequel::CURRENT_TIMESTAMP) - row.save - - Telemetry::Snmp::Publisher.push_lines(lines) unless lines.empty? - unlock_device(device_id) - rescue StandardError => e - Telemetry::Logger.exception(e, level: 'error') - unlock_device(device_id) + def device_unlocked?(device_id) + !device_locked?(device_id) end end end end end