lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-0.1.3 vs lib/logstash/inputs/couchdb_changes.rb in logstash-input-couchdb_changes-0.1.4
- old
+ new
@@ -3,17 +3,17 @@
require "logstash/inputs/base"
require "logstash/namespace"
require "net/http"
require "uri"
-# This CouchDB input allows you to automatically stream events from the
+# This CouchDB input allows you to automatically stream events from the
# CouchDB http://guide.couchdb.org/draft/notifications.html[_changes] URI.
# Moreover, any "future" changes will automatically be streamed as well making it easy to synchronize
# your CouchDB data with any target destination
#
# ### Upsert and delete
-# You can use event metadata to allow for document deletion.
+# You can use event metadata to allow for document deletion.
# All non-delete operations are treated as upserts
#
# ### Starting at a Specific Sequence
# The CouchDB input stores the last sequence number value in location defined by `sequence_path`.
# You can use this fact to start or resume the stream at a particular sequence.
@@ -31,62 +31,62 @@
config :db, :validate => :string, :required => true
# Connect to CouchDB's _changes feed securely (via https)
# Default: false (via http)
config :secure, :validate => :boolean, :default => false
-
+
# Path to a CA certificate file, used to validate certificates
config :ca_file, :validate => :path
- # Username, if authentication is needed to connect to
+ # Username, if authentication is needed to connect to
# CouchDB
config :username, :validate => :string, :default => nil
- # Password, if authentication is needed to connect to
+ # Password, if authentication is needed to connect to
# CouchDB
config :password, :validate => :password, :default => nil
-
+
# Logstash connects to CouchDB's _changes with feed=continuous
# The heartbeat is how often (in milliseconds) Logstash will ping
- # CouchDB to ensure the connection is maintained. Changing this
+ # CouchDB to ensure the connection is maintained. Changing this
# setting is not recommended unless you know what you are doing.
config :heartbeat, :validate => :number, :default => 1000
# File path where the last sequence number in the _changes
# stream is stored. If unset it will write to `$HOME/.couchdb_seq`
config :sequence_path, :validate => :string
# If unspecified, Logstash will attempt to read the last sequence number
# from the `sequence_path` file. If that is empty or non-existent, it will
# begin with 0 (the beginning).
- #
- # If you specify this value, it is anticipated that you will
+ #
+ # If you specify this value, it is anticipated that you will
# only be doing so for an initial read under special circumstances
# and that you will unset this value afterwards.
config :initial_sequence, :validate => :number
-
+
# Preserve the CouchDB document revision "_rev" value in the
# output.
config :keep_revision, :validate => :boolean, :default => false
-
- # Future feature! Until implemented, changing this from the default
+
+ # Future feature! Until implemented, changing this from the default
# will not do anything.
#
# Ignore attachments associated with CouchDB documents.
config :ignore_attachments, :validate => :boolean, :default => true
-
+
# Reconnect flag. When true, always try to reconnect after a failure
config :always_reconnect, :validate => :boolean, :default => true
-
+
# Reconnect delay: time between reconnect attempts, in seconds.
config :reconnect_delay, :validate => :number, :default => 10
-
+
# Timeout: Number of milliseconds to wait for new data before
# terminating the connection. If a timeout is set it will disable
# the heartbeat configuration option.
config :timeout, :validate => :number
-
+
# Declare these constants here.
FEED = 'continuous'
INCLUDEDOCS = 'true'
public
@@ -112,18 +112,12 @@
@scheme = @secure ? 'https' : 'http'
@sequence = @initial_sequence ? @initial_sequence : @sequencedb.read
- if @username && @password
- @userinfo = @username + ':' + @password.value
- else
- @userinfo = nil
- end
-
end
-
+
module SequenceDB
class File
def initialize(file)
@sequence_path = file
end
@@ -136,31 +130,33 @@
sequence = 0 if sequence.nil?
::File.write(@sequence_path, sequence.to_s)
end
end
end
-
+
public
def run(queue)
buffer = FileWatch::BufferedTokenizer.new
@logger.info("Connecting to CouchDB _changes stream at:", :host => @host.to_s, :port => @port.to_s, :db => @db)
uri = build_uri
Net::HTTP.start(@host, @port, :use_ssl => (@secure == true), :ca_file => @ca_file) do |http|
+
request = Net::HTTP::Get.new(uri.request_uri)
+ request.basic_auth(@username, @password.value) if @username && @password
http.request request do |response|
raise ArgumentError, "Database not found!" if response.code == "404"
response.read_body do |chunk|
buffer.extract(chunk).each do |changes|
# If no changes come since the last heartbeat period, a blank line is
# sent as a sort of keep-alive. We should ignore those.
next if changes.chomp.empty?
if event = build_event(changes)
- @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug?
- decorate(event)
- queue << event
- @sequence = event['@metadata']['seq']
- @sequencedb.write(@sequence.to_s)
+ @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug?
+ decorate(event)
+ queue << event
+ @sequence = event['@metadata']['seq']
+ @sequencedb.write(@sequence.to_s)
end
end
end
end
end
@@ -173,15 +169,15 @@
retry if reconnect?
rescue ArgumentError => e
@logger.error("Unable to connect to database", :db => @db, :error => e.to_s)
retry if reconnect?
end
-
+
private
def build_uri
options = {:feed => FEED, :include_docs => INCLUDEDOCS, :since => @sequence}
options = options.merge(@timeout ? {:timeout => @timeout} : {:heartbeat => @heartbeat})
- URI::HTTP.build(:scheme => @scheme, :userinfo => @userinfo, :host => @host, :port => @port, :path => @path, :query => URI.encode_www_form(options))
+ URI::HTTP.build(:scheme => @scheme, :host => @host, :port => @port, :path => @path, :query => URI.encode_www_form(options))
end
private
def reconnect?
sleep(@always_reconnect ? @reconnect_delay : 0)