lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-2.0.4 vs lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-3.0.0
- old
+ new
@@ -136,18 +136,22 @@
public
def run(queue)
buffer = FileWatch::BufferedTokenizer.new
@logger.info("Connecting to CouchDB _changes stream at:", :host => @host.to_s, :port => @port.to_s, :db => @db)
uri = build_uri
+ @logger.info("Using service uri :", :uri => uri)
until stop?
begin
Net::HTTP.start(@host, @port, :use_ssl => (@secure == true), :ca_file => @ca_file) do |http|
request = Net::HTTP::Get.new(uri.request_uri)
request.basic_auth(@username, @password.value) if @username && @password
http.request request do |response|
- raise ArgumentError, "Database not found!" if response.code == "404"
+ raise ArgumentError, :message => "Server error!", :response_code => response.code if response.code >= "500"
+ raise ArgumentError, :message => "Authentication error!", :response_code => response.code if response.code == "401"
+ raise ArgumentError, :message => "Database not found!", :response_code => response.code if response.code == "404"
+ raise ArgumentError, :message => "Request error!", :response_code => response.code if response.code >= "400"
response.read_body do |chunk|
buffer.extract(chunk).each do |changes|
# Put a "stop" check here. If we stop here, anything we've read, but
# not written, will be read again since the @sequence change won't
# have been written to the file, ensuring that it will pick up where
@@ -188,30 +192,34 @@
URI::HTTP.build(:scheme => @scheme, :host => @host, :port => @port, :path => @path, :query => URI.encode_www_form(options))
end
private
def reconnect?
- Stud.stoppable_sleep(@connect_retry_interval) if @always_reconnect
+ Stud.stoppable_sleep(@reconnect_delay) if @always_reconnect
@always_reconnect
end
private
- def build_event(line)
+ def build_event(changes)
# In lieu of a codec, build the event here
- line = LogStash::Json.load(line)
- return nil if line.has_key?("last_seq")
+ data = LogStash::Json.load(changes)
+ return nil if data.has_key?("last_seq")
+ if data['doc'].nil?
+ logger.debug("doc is nil", :data => data)
+ return nil
+ end
hash = Hash.new
- hash['@metadata'] = { '_id' => line['doc']['_id'] }
- if line['doc']['_deleted']
+ hash['@metadata'] = { '_id' => data['doc']['_id'] }
+ if data['doc']['_deleted']
hash['@metadata']['action'] = 'delete'
else
- hash['doc'] = line['doc']
+ hash['doc'] = data['doc']
hash['@metadata']['action'] = 'update'
hash['doc'].delete('_id')
hash['doc_as_upsert'] = true
hash['doc'].delete('_rev') unless @keep_revision
end
- hash['@metadata']['seq'] = line['seq']
+ hash['@metadata']['seq'] = data['seq']
event = LogStash::Event.new(hash)
@logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug?
event
end
end