lib/rubix/sender.rb in rubix-0.2.1 vs lib/rubix/sender.rb in rubix-0.3.0

- old
+ new

@@ -1,433 +1,132 @@ require 'rubix/log' +require 'open3' module Rubix # A class used to send data to Zabbix. # - # This sender is used to implement the logic for the +zabbix_pipe+ - # utility. It is initialized with some metadata about a host, its - # host groups and templates, and applications into which items - # should be written, and it can then accept data and forward it to a - # Zabbix server using the +zabbix_sender+ utility that comes with - # Zabbix. - # - # A sender can be given data in either TSV or JSON formats. With - # the JSON format, it is possible to embed data for hosts, host - # groups, &c. distinct from that with which this sender was - # initialized. This is a useful way to send many different kinds of - # data through the same process. - # - # The sender will also auto-vivify any hosts, host gruops, - # templates, applications, and items it needs in order to be able to - # write data. This is expensive in terms of time so it can be - # turned off using the <tt>--fast</tt> option. + # This sender is used to wrap +zabbix_sender+. class Sender include Logs - # @return [Hash] settings - attr_accessor :settings - - # @return [Rubix::Host] the host the Sender will send data for - attr_accessor :host - - # @return [Array<Rubix::HostGroup>] the hostgroups used to create this host - attr_accessor :host_groups - - # @return [Array<Rubix::Template>] the templates used to create this host - attr_accessor :templates - - # @return [Array<Rubix::Application>] The applications used to create items - attr_accessor :applications - # - # == Initialization == + # == Properties == # - # Create a new sender with the given +settings+. - # - # @param [Hash, Configliere::Param] settings - # @param settings [String] host the name of the Zabbix host to write data for - # @param settings [String] host_groups comma-separated names of Zabbix host groups the host should belong to - # @param settings [String] templates comma-separated names of Zabbix templates the host should belong to - # @param settings [String] applications comma-separated names of applications created items should be scoped under - # @param settings [String] server URL for the Zabbix server -- *not* the URL for the Zabbix API - # @param settings [String] configuration_file path to a local Zabbix configuration file as used by the +zabbix_sender+ utility - # @param settings [true, false] verbose be verbose during execution - # @param settings [true, false] fast auto-vivify (slow) or not (fast) - # @param settings [String] pipe path to a named pipe to be read from - # @param settings [Fixnum] pipe_read_sleep seconds to sleep after an empty read from the a named pipe - # @param settings [Fixnum] create_item_sleep seconds to sleep after creating a new item - def initialize settings - @settings = settings - confirm_settings - if fast? - info("Forwarding for #{settings['host']}...") if settings['verbose'] - else - initialize_host_groups - initialize_templates - initialize_host - initialize_applications - info("Forwarding for #{host.name}...") if settings['verbose'] - end + # @return [String] The IP of the Zabbix server + attr_writer :server + def server + @server ||= 'localhost' end - # Is this sender running in 'fast' mode? If so, it will *not* - # auto-vivify any hosts, groups, items, &c. - # - # @return [true, false] - def fast? - settings['fast'] + # @return [String, Rubix::Hosts] the Zabbix host name or Rubix::Host the sender will use by default + attr_reader :host + def host= nh + @host = (nh.respond_to?(:name) ? nh.name : nh.to_s) end - # Will this sender auto-vivify hosts, groups, items, &c.? - # - # @return [true, false] - def auto_vivify? - !fast? + # @return [Fixnum] the port to connect to on the Zabbix server + attr_writer :port + def port + @port ||= 10051 end - protected - - # Find or create necessary host groups. - # - # @return [Array<Rubix::HostGroup>] - def initialize_host_groups - self.host_groups = settings['host_groups'].split(',').flatten.compact.map(&:strip).uniq.map { |group_name | HostGroup.find_or_create(:name => group_name.strip) } + # @return [String] the path to the local Zabbix agent configuration file. + attr_writer :config + def config + @config ||= '/etc/zabbix/zabbix_agentd.conf' end - # Find necessary templates. - # - # @return [Array<Rubix::Template>] - def initialize_templates - self.templates = (settings['templates'] || '').split(',').flatten.compact.map(&:strip).uniq.map { |template_name | Template.find(:name => template_name.strip) }.compact + # Whether or not to include timestamps with the data. + attr_writer :timestamps + def timestamps? + @timestamps end - # Find or create the host for this data. Host groups and - # templates will automatically be attached. # - # @return [Rubix::Host] - def initialize_host - self.host = (Host.find(:name => settings['host']) || Host.new(:name => settings['host'])) + # == Initialization == + # - current_host_group_names = (host.host_groups || []).map(&:name) - current_template_names = (host.templates || []).map(&:name) - - host_groups_to_add, templates_to_add = [], [] - - (self.host_groups || []).each do |hg| - host_groups_to_add << hg unless current_host_group_names.include?(hg.name) - end - - (self.templates || []).each do |t| - templates_to_add << t unless current_template_names.include?(t.name) - end - - host.host_groups = ((host.host_groups || []) + host_groups_to_add).flatten.compact.uniq - host.templates = ((host.templates || []) + templates_to_add).flatten.compact.uniq - host.save - host - end - - # Find or create the applications for this data. + # Create a new sender with the given +settings+. # - # @return [Array<Rubix::Application>] - def initialize_applications - application_names = (settings['applications'] || '').split(',').flatten.compact.map(&:strip).uniq - self.applications = [] - application_names.each do |app_name| - app = Application.find(:name => app_name, :host_id => host.id) - if app - self.applications << app - else - app = Application.new(:name => app_name, :host_id => host.id) - if app.save - self.applications << app - else - warn("Could not create application '#{app_name}' for host #{host.name}") - end - end - end - self.applications + # @param [Hash, Configliere::Param] settings + # @param settings [String, Rubix::Host] host the name of the Zabbix host to write data for + # @param settings [String] server the IP of the Zabbix server + # @param settings [Fixnum] port the port to connect to on the Zabbix server + # @param settings [String] config the path to the local configuration file + def initialize settings={} + @settings = settings + self.server = settings[:server] if settings[:server] + self.host = settings[:host] if settings[:host] + self.port = settings[:port] if settings[:port] + self.config = settings[:config] if settings[:config] + self.timestamps = settings[:timestamps] + confirm_settings end # Check that all settings are correct in order to be able to # successfully write data to Zabbix. def confirm_settings - raise ConnectionError.new("Must specify a Zabbix server to send data to.") unless settings['server'] - raise Error.new("Must specify the path to a local configuraiton file") unless settings['configuration_file'] && File.file?(settings['configuration_file']) - raise ConnectionError.new("Must specify the name of a host to send data for.") unless settings['host'] - raise ValidationError.new("Must define at least one host group.") if auto_vivify? && (settings['host_groups'].nil? || settings['host_groups'].empty?) + raise Error.new("Must specify a path to a local configuraiton file") unless config + raise Error.new("Must specify the IP of a Zabbix server") unless server + raise Error.new("Must specify the port of a Zabbix server") unless port && port.to_i > 0 + raise Error.new("Must specify a default Zabbix host to write data for") unless host end - - public # # == Sending Data == # - # Run this sender. + # The environment for the Zabbix sender invocation. # - # Will read from the correct source of data and exit the Ruby - # process once the source is consumed. - def run - case - when settings['pipe'] - process_pipe - when settings.rest.size > 0 - settings.rest.each do |path| - process_file(path) - end - else - process_stdin - end - exit(0) + # @return [Hash] + def zabbix_sender_env + {} end - protected - - # Process each line of a file. + # Construct the command that invokes Zabbix sender. # - # @param [String] path the path to the file to process - def process_file path - f = File.new(path) - process_file_handle(f) - f.close - end - - # Process each line of standard input. - def process_stdin - process_file_handle($stdin) - end - - # Process each line read from the pipe. - # - # The pipe will be opened in a non-blocking read mode. This - # sender will wait 'pipe_read_sleep' seconds between successive - # empty reads. - def process_pipe - # We want to open this pipe in non-blocking read mode b/c - # otherwise this process becomes hard to kill. - f = File.new(settings['pipe'], (File::RDONLY | File::NONBLOCK)) - while true - process_file_handle(f) - # In non-blocking mode, an EOFError from f.readline doesn't mean - # there's no more data to read, just that there's no more data - # right *now*. If we sleep for a bit there might be more data - # coming down the pipe. - sleep settings['pipe_read_sleep'] + # @return [String] + def zabbix_sender_command + "timeout 3 zabbix_sender --zabbix-server #{server} --host #{host} --port #{port} --config #{config} --real-time --input-file - -vv".tap do |c| + c += " --with-timestamps" if timestamps? end - f.close end - - # Process each line of a given file handle. + + # Run a +zabbix_sender+ subprocess in the block. # - # @param [File] f the file to process - def process_file_handle f + # @yield [IO, IO, IO, Thread] Handle the subprocess. + def with_sender_subprocess &block begin - line = f.readline - rescue EOFError - line = nil + Open3.popen3(zabbix_sender_env, zabbix_sender_command, &block) + rescue Errno::ENOENT, Errno::EACCES => e + warn(e.message) end - while line - process_line(line) - begin - # FIXME -- this call to File#readline blocks and doesn't let - # stuff like SIGINT (generated from Ctrl-C on a keyboard, - # say) take affect. - line = f.readline - rescue EOFError - line = nil - end - end end - public - - # Process a single line of text. + # Convenience method for sending a block of text to + # +zabbix_sender+. # - # @param [String] line - def process_line line - if looks_like_json?(line) - process_line_of_json_in_new_pipe(line) - else - process_line_of_tsv_in_this_pipe(line) + # @param [String] text + def puts text + with_sender_subprocess do |stdin, stdout, stderr, wait_thr| + stdin.write(text) + stdin.close + output = [stdout.read.chomp, stderr.read.chomp].join("\n").strip + debug(output) if output.size > 0 end end - protected - - # Parse and send a single +line+ of TSV input to the Zabbix server. - # The line will be split at tabs and expects either - # - # a) two columns: an item key and a value - # b) three columns: an item key, a value, and a timestamp - # - # Unexpected input will cause an error to be logged. - # - # @param [String] line a line of TSV data - def process_line_of_tsv_in_this_pipe line - parts = line.strip.split("\t") - case parts.size - when 2 - timestamp = Time.now - key, value = parts - when 3 - key, value = parts[0..1] - timestamp = Time.parse(parts.last) - else - error("Each line of input must be a tab separated row consisting of 2 columns (key, value) or 3 columns (timestamp, key, value)") - return - end - send_data(key, value, timestamp) + # :nodoc: + def close + return end - # Parse and send a single +line+ of JSON input to the Zabbix server. - # The JSON must have a key +data+ in order to be processed. The - # value of 'data' should be an Array of Hashes each with a +key+ and - # +value+. - # - # This ZabbixPipe's settings will be merged with the remainder of - # the JSON hash. This allows sending values for 'host2' to an - # instance of ZabbixPipe already set up to receive for 'host1'. - # - # This is useful for sending data for keys from multiple hosts - # - # Example of expected input: - # - # { - # 'data': [ - # {'key': 'foo.bar.baz', 'value': 10}, - # {'key': 'snap.crackle.pop', 'value': 8 } - # ] - # } - # - # Or when sending for another host: - # - # { - # 'host': 'shazaam', - # 'applications': 'silly', - # 'data': [ - # {'key': 'foo.bar.baz', 'value': 10}, - # {'key': 'snap.crackle.pop', 'value': 8 } - # ] - # } - # - # @param [String] line a line of JSON data - def process_line_of_json_in_new_pipe line - begin - json = JSON.parse(line) - rescue JSON::ParserError => e - error("Malformed JSON") - return - end - - data = json.delete('data') - unless data && data.is_a?(Array) - error("A line of JSON input must a have an Array key 'data'") - return - end - - if json.empty? - # If there are no other settings then the daughter will be the - # same as the parent -- so just use 'self'. - daughter_pipe = self - else - # We merge the settings from 'self' with whatever else is - # present in the line. - begin - daughter_pipe = self.class.new(settings.stringify_keys.merge(json)) - rescue Error => e - error(e.message) - return - end - end - - data.each do |point| - key = point['key'] - value = point['value'] - unless key && value - warn("The elements of the 'data' Array must be Hashes with a 'key' and a 'value'") - next - end - - tsv_line = [key, value].map(&:to_s).join("\t") - daughter_pipe.process_line(tsv_line) - end + # :nodoc: + def flush + return end - - # Does the +line+ look like it might be JSON? - # - # @param [String] line - # @return [true, false] - def looks_like_json? line - !!(line =~ /^\s*\{/) - end - - # Send the +value+ for +key+ at the given +timestamp+ to the Zabbix - # server. - # - # If the +key+ doesn't exist for this local agent's host, it will be - # added. - # - # FIXME passing +timestamp+ has no effect at present... - # - # @param [String] key - # @param [String, Fixnum, Float] value - # @param [Time] timestamp - def send_data key, value, timestamp - ensure_item_exists(key, value) unless fast? - command = "#{settings['sender']} --config #{settings['configuration_file']} --zabbix-server #{settings['server']} --host #{settings['host']} --key #{key} --value '#{value}'" - process_zabbix_sender_output(key, `#{command}`) - - # command = "zabbix_sender --config #{configuration_file} --zabbix-server #{server} --input-file - --with-timestamps" - # open(command, 'w') do |zabbix_sender| - # zabbix_sender.write([settings['host'], key, timestamp.to_i, value].map(&:to_s).join("\t")) - # zabbix_sender.close_write - # process_zabbix_sender_output(zabbix_sender.read) - # end - end - - # Create an item for the given +key+ if necessary. - # - # @param [String] key - # @param [String, Fixnum, Float] value - def ensure_item_exists key, value - item = Item.find(:key => key, :host_id => host.id) - unless item - Item.new(:key => key, :host_id => host.id, :applications => applications, :value_type => Item.value_type_from_value(value)).save - - # There is a time lag of about 15-30 seconds between (successfully) - # creating an item on the Zabbix server and having the Zabbix accept - # new data for that item. - # - # If it is crucial that *every single* data point be written, dial - # up this sleep period. The first data point for a new key will put - # the wrapper to sleep for this period of time, in hopes that the - # Zabbix server will catch up and be ready to accept new data - # points. - # - # If you don't care that you're going to lose the first few data - # points you send to Zabbix, then don't worry about it. - sleep settings['create_item_sleep'] - end - end - # Parse the +text+ output by +zabbix_sender+. - # - # @param [String] key - # @param [String] text the output from +zabbix_sender+ - # @return [Fixnum] the number of data points processed - def process_zabbix_sender_output key, text - return unless settings['verbose'] - lines = text.strip.split("\n") - return if lines.size < 1 - status_line = lines.first - status_line =~ /Processed +(\d+) +Failed +(\d+) +Total +(\d+)/ - processed, failed, total = $1, $2, $3 - warn("Failed to write #{failed} values to key '#{key}'") if failed.to_i != 0 - processed - end - end end