lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-1.0.0 vs lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-2.0.0
- old
+ new
@@ -136,42 +136,51 @@
public
def run(queue)
buffer = FileWatch::BufferedTokenizer.new
@logger.info("Connecting to CouchDB _changes stream at:", :host => @host.to_s, :port => @port.to_s, :db => @db)
uri = build_uri
- Net::HTTP.start(@host, @port, :use_ssl => (@secure == true), :ca_file => @ca_file) do |http|
+ until stop?
+ begin
+ Net::HTTP.start(@host, @port, :use_ssl => (@secure == true), :ca_file => @ca_file) do |http|
- request = Net::HTTP::Get.new(uri.request_uri)
- request.basic_auth(@username, @password.value) if @username && @password
- http.request request do |response|
- raise ArgumentError, "Database not found!" if response.code == "404"
- response.read_body do |chunk|
- buffer.extract(chunk).each do |changes|
- # If no changes come since the last heartbeat period, a blank line is
- # sent as a sort of keep-alive. We should ignore those.
- next if changes.chomp.empty?
- if event = build_event(changes)
- @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug?
- decorate(event)
- queue << event
- @sequence = event['@metadata']['seq']
- @sequencedb.write(@sequence.to_s)
+ request = Net::HTTP::Get.new(uri.request_uri)
+ request.basic_auth(@username, @password.value) if @username && @password
+ http.request request do |response|
+ raise ArgumentError, "Database not found!" if response.code == "404"
+ response.read_body do |chunk|
+ buffer.extract(chunk).each do |changes|
+ # Put a "stop" check here. If we stop here, anything we've read, but
+ # not written, will be read again since the @sequence change won't
+ # have been written to the file, ensuring that it will pick up where
+ # it left off.
+ break if stop?
+ # If no changes come since the last heartbeat period, a blank line is
+ # sent as a sort of keep-alive. We should ignore those.
+ next if changes.chomp.empty?
+ if event = build_event(changes)
+ @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug?
+ decorate(event)
+ queue << event
+ @sequence = event['@metadata']['seq']
+ @sequencedb.write(@sequence.to_s)
+ end
+ end
end
end
end
+ rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError, Errno::EHOSTUNREACH, Errno::ECONNREFUSED,
+ Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError => e
+ @logger.error("Connection problem encountered: Retrying connection in 10 seconds...", :error => e.to_s)
+ retry if reconnect?
+ rescue Errno::EBADF => e
+ @logger.error("Unable to connect: Bad file descriptor: ", :error => e.to_s)
+ retry if reconnect?
+ rescue ArgumentError => e
+ @logger.error("Unable to connect to database", :db => @db, :error => e.to_s)
+ retry if reconnect?
end
end
- rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError, Errno::EHOSTUNREACH, Errno::ECONNREFUSED,
- Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError => e
- @logger.error("Connection problem encountered: Retrying connection in 10 seconds...", :error => e.to_s)
- retry if reconnect?
- rescue Errno::EBADF => e
- @logger.error("Unable to connect: Bad file descriptor: ", :error => e.to_s)
- retry if reconnect?
- rescue ArgumentError => e
- @logger.error("Unable to connect to database", :db => @db, :error => e.to_s)
- retry if reconnect?
end
private
def build_uri
options = {:feed => FEED, :include_docs => INCLUDEDOCS, :since => @sequence}
@@ -179,10 +188,10 @@
URI::HTTP.build(:scheme => @scheme, :host => @host, :port => @port, :path => @path, :query => URI.encode_www_form(options))
end
private
def reconnect?
- sleep(@always_reconnect ? @reconnect_delay : 0)
+ Stud.stoppable_sleep(@connect_retry_interval) if @always_reconnect
@always_reconnect
end
private
def build_event(line)