lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-2.0.4 vs lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-3.0.0

- old
+ new

@@ -136,18 +136,22 @@ public def run(queue) buffer = FileWatch::BufferedTokenizer.new @logger.info("Connecting to CouchDB _changes stream at:", :host => @host.to_s, :port => @port.to_s, :db => @db) uri = build_uri + @logger.info("Using service uri :", :uri => uri) until stop? begin Net::HTTP.start(@host, @port, :use_ssl => (@secure == true), :ca_file => @ca_file) do |http| request = Net::HTTP::Get.new(uri.request_uri) request.basic_auth(@username, @password.value) if @username && @password http.request request do |response| - raise ArgumentError, "Database not found!" if response.code == "404" + raise ArgumentError, :message => "Server error!", :response_code => response.code if response.code >= "500" + raise ArgumentError, :message => "Authentication error!", :response_code => response.code if response.code == "401" + raise ArgumentError, :message => "Database not found!", :response_code => response.code if response.code == "404" + raise ArgumentError, :message => "Request error!", :response_code => response.code if response.code >= "400" response.read_body do |chunk| buffer.extract(chunk).each do |changes| # Put a "stop" check here. If we stop here, anything we've read, but # not written, will be read again since the @sequence change won't # have been written to the file, ensuring that it will pick up where @@ -188,30 +192,34 @@ URI::HTTP.build(:scheme => @scheme, :host => @host, :port => @port, :path => @path, :query => URI.encode_www_form(options)) end private def reconnect? - Stud.stoppable_sleep(@connect_retry_interval) if @always_reconnect + Stud.stoppable_sleep(@reconnect_delay) if @always_reconnect @always_reconnect end private - def build_event(line) + def build_event(changes) # In lieu of a codec, build the event here - line = LogStash::Json.load(line) - return nil if line.has_key?("last_seq") + data = LogStash::Json.load(changes) + return nil if data.has_key?("last_seq") + if data['doc'].nil? + logger.debug("doc is nil", :data => data) + return nil + end hash = Hash.new - hash['@metadata'] = { '_id' => line['doc']['_id'] } - if line['doc']['_deleted'] + hash['@metadata'] = { '_id' => data['doc']['_id'] } + if data['doc']['_deleted'] hash['@metadata']['action'] = 'delete' else - hash['doc'] = line['doc'] + hash['doc'] = data['doc'] hash['@metadata']['action'] = 'update' hash['doc'].delete('_id') hash['doc_as_upsert'] = true hash['doc'].delete('_rev') unless @keep_revision end - hash['@metadata']['seq'] = line['seq'] + hash['@metadata']['seq'] = data['seq'] event = LogStash::Event.new(hash) @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug? event end end