lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-1.0.0 vs lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-2.0.0

- old
+ new

@@ -136,42 +136,51 @@ public def run(queue) buffer = FileWatch::BufferedTokenizer.new @logger.info("Connecting to CouchDB _changes stream at:", :host => @host.to_s, :port => @port.to_s, :db => @db) uri = build_uri - Net::HTTP.start(@host, @port, :use_ssl => (@secure == true), :ca_file => @ca_file) do |http| + until stop? + begin + Net::HTTP.start(@host, @port, :use_ssl => (@secure == true), :ca_file => @ca_file) do |http| - request = Net::HTTP::Get.new(uri.request_uri) - request.basic_auth(@username, @password.value) if @username && @password - http.request request do |response| - raise ArgumentError, "Database not found!" if response.code == "404" - response.read_body do |chunk| - buffer.extract(chunk).each do |changes| - # If no changes come since the last heartbeat period, a blank line is - # sent as a sort of keep-alive. We should ignore those. - next if changes.chomp.empty? - if event = build_event(changes) - @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug? - decorate(event) - queue << event - @sequence = event['@metadata']['seq'] - @sequencedb.write(@sequence.to_s) + request = Net::HTTP::Get.new(uri.request_uri) + request.basic_auth(@username, @password.value) if @username && @password + http.request request do |response| + raise ArgumentError, "Database not found!" if response.code == "404" + response.read_body do |chunk| + buffer.extract(chunk).each do |changes| + # Put a "stop" check here. If we stop here, anything we've read, but + # not written, will be read again since the @sequence change won't + # have been written to the file, ensuring that it will pick up where + # it left off. + break if stop? + # If no changes come since the last heartbeat period, a blank line is + # sent as a sort of keep-alive. We should ignore those. + next if changes.chomp.empty? + if event = build_event(changes) + @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug? + decorate(event) + queue << event + @sequence = event['@metadata']['seq'] + @sequencedb.write(@sequence.to_s) + end + end end end end + rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError, Errno::EHOSTUNREACH, Errno::ECONNREFUSED, + Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError => e + @logger.error("Connection problem encountered: Retrying connection in 10 seconds...", :error => e.to_s) + retry if reconnect? + rescue Errno::EBADF => e + @logger.error("Unable to connect: Bad file descriptor: ", :error => e.to_s) + retry if reconnect? + rescue ArgumentError => e + @logger.error("Unable to connect to database", :db => @db, :error => e.to_s) + retry if reconnect? end end - rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError, Errno::EHOSTUNREACH, Errno::ECONNREFUSED, - Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError => e - @logger.error("Connection problem encountered: Retrying connection in 10 seconds...", :error => e.to_s) - retry if reconnect? - rescue Errno::EBADF => e - @logger.error("Unable to connect: Bad file descriptor: ", :error => e.to_s) - retry if reconnect? - rescue ArgumentError => e - @logger.error("Unable to connect to database", :db => @db, :error => e.to_s) - retry if reconnect? end private def build_uri options = {:feed => FEED, :include_docs => INCLUDEDOCS, :since => @sequence} @@ -179,10 +188,10 @@ URI::HTTP.build(:scheme => @scheme, :host => @host, :port => @port, :path => @path, :query => URI.encode_www_form(options)) end private def reconnect? - sleep(@always_reconnect ? @reconnect_delay : 0) + Stud.stoppable_sleep(@connect_retry_interval) if @always_reconnect @always_reconnect end private def build_event(line)