#-- # Copyright 2011-2013 Splunk, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"): you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. #++ ## # +resultsreader.rb+ provides classes to incrementally parse the XML output from # Splunk search jobs. For most search jobs you will want +ResultsReader+, which # handles a single results set. However, the running a blocking export job from # the +search/jobs/export endpoint+ sends back a stream of results sets, all but # the last of which are previews. In this case, you should use the # +MultiResultsReader+, which will let you iterate over the results sets. # # By default, +ResultsReader+ will try to use Nokogiri for XML parsing. If # Nokogiri isn't available, it will fall back to REXML, which ships with Ruby # 1.9. See +xml_shim.rb+ for how to alter this behavior. # #-- # There are two basic designs we could have used for handling the # search/jobs/export output. We could either have the user call # +ResultsReader#each+ multiple times, each time going through the next results # set, or we could do what we have here and have an outer iterator that yields # distinct +ResultsReader+ objects for each results set. # # The outer iterator is syntactically somewhat clearer, but you must invalidate # the previous +ResultsReader+ objects before yielding a new one so that code # like # # readers = [] # outer_iter.each do |reader| # readers << reader # end # readers[2].each do |result| # puts result # end # # will throw an error on the second each. The right behavior is to throw an # exception in the +ResultsReader+ each if it is invoked out of order. This # problem doesn't affect the all-in-one design. # # However, in the all-in-one design, it is impossible to set the is_preview and # fields instance variables of the +ResultsReader+ correctly between invocations # of each. This makes code with the all-in-one design such as # # while reader.is_preview # reader.each do |result| # ... # end # end # # If the ... contains a break, then there is no way to set is_preview correctly # before the next iteration of the while loop. This problem does not affect # the outer iterator design, and Fred Ross and Yunxin Wu were not able to come # up with a way to make it work in the all-in-one design, so the SDK uses the # outer iterator design. #++ require 'stringio' require_relative 'xml_shim' require_relative 'collection/jobs' # To access ExportStream module Splunk # +ResultsReader+ parses Splunk's XML format for results into Ruby objects. # # You can use both Nokogiri and REXML. By default, the +ResultsReader+ will # try to use Nokogiri, and if it is not available will fall back to REXML. If # you want other behavior, see +xml_shim.rb+ for how to set the XML library. # # +ResultsReader is an +Enumerable+, so it has methods such as +each+ and # +each_with_index+. However, since it's a stream parser, once you iterate # through it once, it will thereafter be empty. # # Do not use +ResultsReader+ with the results of the +create_export+ or # +create_stream+ methods on +Service+ or +Jobs+. These methods use endpoints # which return a different set of data structures. Use +MultiResultsReader+ # instead for those cases. If you do use +ResultsReader+, it will return # a concatenation of all non-preview events in the stream, but that behavior # should be considered deprecated, and will result in a warning. # # The ResultsReader object has two additional methods: # # * +is_preview?+ returns a Boolean value that indicates whether these # results are a preview from an unfinished search or not # * +fields+ returns an array of all the fields that may appear in a result # in this set, in the order they should be displayed (if you're going # to make a table or the like) # # *Example*: # # require 'splunk-sdk-ruby' # # service = Splunk::connect(:username => "admin", :password => "changeme") # # stream = service.jobs.create_oneshot("search index=_internal | head 10") # reader = ResultsReader.new(stream) # puts reader.is_preview? # # Prints: false # reader.each do |result| # puts result # end # # Prints a sequence of Hashes containing events. # class ResultsReader include Enumerable ## # Are the results in this reader a preview from an unfinished search? # # Returns: +true+ or +false+, or +nil+ if the stream is empty. # def is_preview? return @is_preview end ## # An +Array+ of all the fields that may appear in each result. # # Note that any given result will contain a subset of these fields. # # Returns: an +Array+ of +Strings+. # attr_reader :fields def initialize(text_or_stream) if text_or_stream.nil? stream = StringIO.new("") elsif stream.is_a?(ExportStream) # The sensible behavior on streams from the export endpoints is to # skip all preview results and concatenate all others. The export # functions wrap their streams in ExportStream to mark that they need # this special handling. @is_export = true @reader = MultiResultsReader.new(text_or_stream).final_results() @is_preview = @reader.is_preview? @fields = @reader.fields return elsif !text_or_stream.respond_to?(:read) # Strip because the XML libraries can be pissy. stream = StringIO.new(text_or_stream.strip) else stream = text_or_stream end if stream.eof? @is_preview = nil @fields = [] elsif stream.is_a?(ExportStream) else # We use a SAX parser. +listener+ is the event handler, but a SAX # parser won't usually transfer control during parsing. # To incrementally return results as we parse, we have to put # the parser into a +Fiber+ from which we can yield. listener = ResultsListener.new() @iteration_fiber = Fiber.new do if $splunk_xml_library == :nokogiri parser = Nokogiri::XML::SAX::Parser.new(listener) parser.parse(stream) else # Use REXML REXML::Document.parse_stream(stream, listener) end end @is_preview = @iteration_fiber.resume @fields = @iteration_fiber.resume @reached_end = false end end def each() # If we have been passed a stream from an export endpoint, it should be # marked as such, and we handle it differently. if @is_export warn "[DEPRECATED] Do not use ResultsReader on the output of the " + "export endpoint. Use MultiResultsReader instead." reader = MultiResultsReader.new(@stream).final_results() enum = reader.each() else enum = Enumerator.new() do |yielder| if !@iteration_fiber.nil? # Handle the case of empty files @reached_end = false while true result = @iteration_fiber.resume break if result.nil? or result == :end_of_results_set yielder << result end end @reached_end = true end end if block_given? # Apply the enumerator to a block if we have one enum.each() { |e| yield e } else enum # Otherwise return the enumerator itself end end ## # Skips the rest of the events in this ResultsReader. # def skip_remaining_results() if !@reached_end each() { |result|} end end end ## # +ResultsListener+ is the SAX event handler for +ResultsReader+. # # The authors of Nokogiri decided to make their SAX interface # slightly incompatible with that of REXML. For example, REXML # uses tag_start and passes attributes as a dictionary, while # Nokogiri calls the same thing start_element, and passes # attributes as an association list. # # This is a classic finite state machine parser. The `@states` variable # contains a hash with the states as its values. Each hash contains # functions giving the behavior of the state machine in that state. # The actual methods on the function dispatch to these functions # based upon the current state (as stored in `@state`). # # The parser initially runs until it has determined if the results are # a preview, then calls +Fiber.yield+ to return it. Then it continues and # tries to yield a field order, and then any results. (It will always yield # a field order, even if it is empty). At the end of a results set, it yields # +:end_of_results_set+. # class ResultsListener # :nodoc: def initialize() # @fields holds the accumulated list of fields from the fieldOrder # element. If there has been no accumulation, it is set to # :no_fieldOrder_found. For empty results sets, there is often no # fieldOrder element, but we still want to yield an empty Array at the # right point, so if we reach the end of a results element and @fields # is still :no_fieldOrder_found, we yield an empty array at that point. @fields = :no_fieldOrder_found @concatenate = false @is_preview = nil @state = :base @states = { # Toplevel state. :base => { :start_element => lambda do |name, attributes| if name == "results" if !@concatenate @is_preview = attributes["preview"] == "1" Fiber.yield(@is_preview) end elsif name == "fieldOrder" if !@concatenate @state = :field_order @fields = [] end elsif name == "result" @state = :result @current_offset = Integer(attributes["offset"]) @current_result = {} end end, :end_element => lambda do |name| if name == "results" and !@concatenate Fiber.yield([]) if @fields == :no_fieldOrder_found if !@is_preview # Start concatenating events @concatenate = true else # Reset the fieldOrder @fields = :no_fieldOrder_found Fiber.yield(:end_of_results_set) end end end }, # Inside a `fieldOrder` element. Recognizes only # the `field` element, and returns to the `:base` state # when it encounters ``. :field_order => { :start_element => lambda do |name, attributes| if name == "field" @state = :field_order_field end end, :end_element => lambda do |name| if name == "fieldOrder" @state = :base Fiber.yield(@fields) end end }, # When the parser in `:field_order` state encounters # a `field` element, it jumps to this state to record it. # When `` is encountered, jumps back to `:field_order`. :field_order_field => { :characters => lambda do |text| @fields << text.strip end, :end_element => lambda do |name| if name == "field" @state = :field_order end end }, # When the parser has hit the `result` element, it jumps here. # When this state hits ``, it calls `Fiber.yield` to # send the completed result back, and, when the fiber is # resumed, jumps back to the `:base` state. :result => { :start_element => lambda do |name, attributes| if name == "field" @current_field = attributes["k"] @current_value = nil elsif name == "text" || name == "v" @state = :field_values @current_scratch = "" end end, :end_element => lambda do |name| if name == "result" Fiber.yield @current_result @current_result = nil @current_offset = nil @state = :base elsif name == "field" if @current_result.has_key?(@current_field) if @current_result[@current_field].is_a?(Array) @current_result[@current_field] << @current_value elsif @current_result[@current_field] != nil @current_result[@current_field] = [@current_result[@current_field], @current_value] end else @current_result[@current_field] = @current_value end @current_field = nil @current_value = nil end end }, # Parse the values inside a results field. :field_values => { :end_element => lambda do |name| if name == "text" || name == "v" if @current_value == nil @current_value = @current_scratch elsif @current_value.is_a?(Array) @current_value << @current_scratch else @current_value = [@current_value, @current_scratch] end @current_scratch = nil @state = :result elsif name == "sg" # is emitted to delimit text that should be displayed # highlighted. We preserve it in field values. @current_scratch << "" end end, :start_element => lambda do |name, attributes| if name == "sg" s = ["sg"] + attributes.sort.map do |entry| key, value = entry "#{key}=\"#{value}\"" end text = "<" + s.join(" ") + ">" @current_scratch << text end end, :characters => lambda do |text| @current_scratch << text end } } end # Nokogiri methods - all dispatch to the REXML methods. def start_element(name, attributes) # attributes is an association list. Turn it into a hash # that tag_start can use. attribute_dict = {} attributes.each do |attribute| key = attribute.localname value = attribute.value attribute_dict[key] = value end tag_start(name, attribute_dict) end def start_element_namespace(name, attributes=[], prefix=nil, uri=nil, ns=[]) start_element(name, attributes) end def end_element(name) tag_end(name) end def end_element_namespace(name, prefix = nil, uri = nil) end_element(name) end def characters(text) text(text) end # REXML methods - all dispatch is done here def tag_start(name, attributes) # attributes is a hash. if @states[@state].has_key?(:start_element) @states[@state][:start_element].call(name, attributes) end end def tag_end(name) if @states[@state].has_key?(:end_element) @states[@state][:end_element].call(name) end end def text(text) if @states[@state].has_key?(:characters) @states[@state][:characters].call(text) end end # Unused methods in Nokogiri def cdata_block(string) end def comment(string) end def end_document() end def error(string) end def start_document() end def warning(string) end # xmldecl declared in REXML list below. # Unused methods in REXML def attlistdecl(element_name, attributes, raw_content) end def cdata(content) end def comment(comment) end def doctype(name, pub_sys, long_name, uri) end def doctype_end() end def elementdecl(content) end def entity(content) end def entitydecl(content) end def instruction(name, instruction) end def notationdecl(content) end def xmldecl(version, encoding, standalone) end end ## # Version of +ResultsReader+ that accepts an external parsing state. # # +ResultsReader+ sets up its own Fiber for doing SAX parsing of the XML, # but for the +MultiResultsReader+, we want to share a single fiber among # all the results readers that we create. +PuppetResultsReader+ takes # the fiber, is_preview, and fields information from its constructor # and then exposes the same methods as ResultsReader. # # You should never create an instance of +PuppetResultsReader+ by hand. It # will be passed back from iterating over a +MultiResultsReader+. # class PuppetResultsReader < ResultsReader def initialize(fiber, is_preview, fields) @valid = true @iteration_fiber = fiber @is_preview = is_preview @fields = fields end def each() if !@valid raise StandardError.new("Cannot iterate on ResultsReaders out of order.") else super() end end def invalidate() @valid = false end end ## # Parser for the XML results sets returned by blocking export jobs. # # The methods +create_export+ and +create_stream+ on +Jobs+ and +Service+ # do not return data in quite the same format as other search jobs in Splunk. # They will return a sequence of preview results sets, and then (if they are # not real time searches) a final results set. # # +MultiResultsReader+ takes the stream returned by such a call, and provides # iteration over each results set, or access to only the final, non-preview # results set. # # # *Examples*: # require 'splunk-sdk-ruby' # # service = Splunk::connect(:username => "admin", :password => "changeme") # # stream = service.jobs.create_export("search index=_internal | head 10") # # readers = MultiResultsReader.new(stream) # readers.each do |reader| # puts "New result set (preview=#{reader.is_preview?})" # reader.each do |result| # puts result # end # end # # # Alternately # reader = readers.final_results() # reader.each do |result| # puts result # end # class MultiResultsReader include Enumerable def initialize(text_or_stream) if text_or_stream.nil? stream = StringIO.new("") elsif !text_or_stream.respond_to?(:read) # Strip because the XML libraries can be pissy. stream = StringIO.new(text_or_stream.strip) else stream = text_or_stream end listener = ResultsListener.new() @iteration_fiber = Fiber.new do if $splunk_xml_library == :nokogiri parser = Nokogiri::XML::SAX::Parser.new(listener) # Nokogiri requires a unique root element, which we are fabricating # here, while REXML is fine with multiple root elements in a stream. edited_stream = ConcatenatedStream.new( StringIO.new(""), XMLDTDFilter.new(stream), StringIO.new("") ) parser.parse(edited_stream) else # Use REXML REXML::Document.parse_stream(stream, listener) end end end def each() enum = Enumerator.new() do |yielder| if !@iteration_fiber.nil? # Handle the case of empty files begin while true is_preview = @iteration_fiber.resume fields = @iteration_fiber.resume reader = PuppetResultsReader.new(@iteration_fiber, is_preview, fields) yielder << reader # Finish extracting any events that the user didn't read. # Otherwise the next results reader will start in the middle of # the previous results set. reader.skip_remaining_results() reader.invalidate() end rescue FiberError # After the last result element, the next evaluation of # 'is_preview = @iteration_fiber.resume' above will throw a # +FiberError+ when the fiber terminates without yielding any # additional values. We handle the control flow in this way so # that the final code in the fiber to handle cleanup always gets # run. end end end if block_given? # Apply the enumerator to a block if we have one enum.each() { |e| yield e } else enum # Otherwise return the enumerator itself end end ## # Returns a +ResultsReader+ over only the non-preview results. # # If you run this method against a real time search job, which only ever # produces preview results, it will loop forever. If you run it against # a non-reporting system (that is, one that filters and extracts fields # from events, but doesn't calculate a whole new set of events), you will # get only the first few results, since you should be using the normal # +ResultsReader+, not +MultiResultsReader+, in that case. # def final_results() each do |reader| if reader.is_preview? reader.skip_remaining_results() else return reader end end end end ## # Stream transformer that filters out XML DTD definitions. # # +XMLDTDFilter+ takes anything between to be a DTD. It does no # escaping of quoted text. # class XMLDTDFilter < IO def initialize(stream) @stream = stream @peeked_char = nil end def close() @stream.close() end def read(n=nil) response = "" while n.nil? or n > 0 # First use any element we already peeked at. if !@peeked_char.nil? response << @peeked_char @peeked_char = nil if !n.nil? n -= 1 end next end c = @stream.read(1) if c.nil? # We've reached the end of the stream break elsif c == "<" # We might have a DTD definition d = @stream.read(1) || "" if d == "?" # It's a DTD. Skip until we've consumed a >. while true q = @stream.read(1) if q == ">" break end end else # It's not a DTD. Push that ? into lookahead. @peeked_char = d response << c if !n.nil? n = n-1 end end else # No special behavior response << c if !n.nil? n -= 1 end end end return response end end ## # Returns a stream which concatenates all the streams passed to it. # class ConcatenatedStream < IO def initialize(*streams) @streams = streams end def close() @streams.each do |stream| stream.close() end end def read(n=nil) response = "" while n.nil? or n > 0 if @streams.empty? # No streams left break else # We have streams left. chunk = @streams[0].read(n) || "" found_n = chunk.length() if n.nil? or chunk.length() < n @streams.shift() end if !n.nil? n -= chunk.length() end response << chunk end end if response == "" return nil else return response end end end end