lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-0.1.3 vs lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-0.1.4

- old
+ new

@@ -3,17 +3,17 @@ require "logstash/inputs/base" require "logstash/namespace" require "net/http" require "uri" -# This CouchDB input allows you to automatically stream events from the +# This CouchDB input allows you to automatically stream events from the # CouchDB http://guide.couchdb.org/draft/notifications.html[_changes] URI. # Moreover, any "future" changes will automatically be streamed as well making it easy to synchronize # your CouchDB data with any target destination # # ### Upsert and delete -# You can use event metadata to allow for document deletion. +# You can use event metadata to allow for document deletion. # All non-delete operations are treated as upserts # # ### Starting at a Specific Sequence # The CouchDB input stores the last sequence number value in location defined by `sequence_path`. # You can use this fact to start or resume the stream at a particular sequence. @@ -31,62 +31,62 @@ config :db, :validate => :string, :required => true # Connect to CouchDB's _changes feed securely (via https) # Default: false (via http) config :secure, :validate => :boolean, :default => false - + # Path to a CA certificate file, used to validate certificates config :ca_file, :validate => :path - # Username, if authentication is needed to connect to + # Username, if authentication is needed to connect to # CouchDB config :username, :validate => :string, :default => nil - # Password, if authentication is needed to connect to + # Password, if authentication is needed to connect to # CouchDB config :password, :validate => :password, :default => nil - + # Logstash connects to CouchDB's _changes with feed=continuous # The heartbeat is how often (in milliseconds) Logstash will ping - # CouchDB to ensure the connection is maintained. Changing this + # CouchDB to ensure the connection is maintained. Changing this # setting is not recommended unless you know what you are doing. config :heartbeat, :validate => :number, :default => 1000 # File path where the last sequence number in the _changes # stream is stored. If unset it will write to `$HOME/.couchdb_seq` config :sequence_path, :validate => :string # If unspecified, Logstash will attempt to read the last sequence number # from the `sequence_path` file. If that is empty or non-existent, it will # begin with 0 (the beginning). - # - # If you specify this value, it is anticipated that you will + # + # If you specify this value, it is anticipated that you will # only be doing so for an initial read under special circumstances # and that you will unset this value afterwards. config :initial_sequence, :validate => :number - + # Preserve the CouchDB document revision "_rev" value in the # output. config :keep_revision, :validate => :boolean, :default => false - - # Future feature! Until implemented, changing this from the default + + # Future feature! Until implemented, changing this from the default # will not do anything. # # Ignore attachments associated with CouchDB documents. config :ignore_attachments, :validate => :boolean, :default => true - + # Reconnect flag. When true, always try to reconnect after a failure config :always_reconnect, :validate => :boolean, :default => true - + # Reconnect delay: time between reconnect attempts, in seconds. config :reconnect_delay, :validate => :number, :default => 10 - + # Timeout: Number of milliseconds to wait for new data before # terminating the connection. If a timeout is set it will disable # the heartbeat configuration option. config :timeout, :validate => :number - + # Declare these constants here. FEED = 'continuous' INCLUDEDOCS = 'true' public @@ -112,18 +112,12 @@ @scheme = @secure ? 'https' : 'http' @sequence = @initial_sequence ? @initial_sequence : @sequencedb.read - if @username && @password - @userinfo = @username + ':' + @password.value - else - @userinfo = nil - end - end - + module SequenceDB class File def initialize(file) @sequence_path = file end @@ -136,31 +130,33 @@ sequence = 0 if sequence.nil? ::File.write(@sequence_path, sequence.to_s) end end end - + public def run(queue) buffer = FileWatch::BufferedTokenizer.new @logger.info("Connecting to CouchDB _changes stream at:", :host => @host.to_s, :port => @port.to_s, :db => @db) uri = build_uri Net::HTTP.start(@host, @port, :use_ssl => (@secure == true), :ca_file => @ca_file) do |http| + request = Net::HTTP::Get.new(uri.request_uri) + request.basic_auth(@username, @password.value) if @username && @password http.request request do |response| raise ArgumentError, "Database not found!" if response.code == "404" response.read_body do |chunk| buffer.extract(chunk).each do |changes| # If no changes come since the last heartbeat period, a blank line is # sent as a sort of keep-alive. We should ignore those. next if changes.chomp.empty? if event = build_event(changes) - @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug? - decorate(event) - queue << event - @sequence = event['@metadata']['seq'] - @sequencedb.write(@sequence.to_s) + @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug? + decorate(event) + queue << event + @sequence = event['@metadata']['seq'] + @sequencedb.write(@sequence.to_s) end end end end end @@ -173,15 +169,15 @@ retry if reconnect? rescue ArgumentError => e @logger.error("Unable to connect to database", :db => @db, :error => e.to_s) retry if reconnect? end - + private def build_uri options = {:feed => FEED, :include_docs => INCLUDEDOCS, :since => @sequence} options = options.merge(@timeout ? {:timeout => @timeout} : {:heartbeat => @heartbeat}) - URI::HTTP.build(:scheme => @scheme, :userinfo => @userinfo, :host => @host, :port => @port, :path => @path, :query => URI.encode_www_form(options)) + URI::HTTP.build(:scheme => @scheme, :host => @host, :port => @port, :path => @path, :query => URI.encode_www_form(options)) end private def reconnect? sleep(@always_reconnect ? @reconnect_delay : 0)