lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.4.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.5.0
- old
+ new
@@ -1,9 +1,10 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/json"
+require "logstash/util/safe_uri"
require "base64"
# .Compatibility Note
# [NOTE]
@@ -67,10 +68,15 @@
# List of elasticsearch hosts to use for querying.
# Each host can be either IP, HOST, IP:port or HOST:port.
# Port defaults to 9200
config :hosts, :validate => :array
+ # Cloud ID, from the Elastic Cloud web console. If set `hosts` should not be used.
+ #
+ # For more info, check out the https://www.elastic.co/guide/en/logstash/current/connecting-to-cloud.html#_cloud_id[Logstash-to-Cloud documentation]
+ config :cloud_id, :validate => :string
+
# The index or alias to search.
config :index, :validate => :string, :default => "logstash-*"
# The query to be executed. Read the Elasticsearch query DSL documentation
# for more info
@@ -132,10 +138,15 @@
config :user, :validate => :string
# Basic Auth - password
config :password, :validate => :password
+ # Cloud authentication string ("<username>:<password>" format) is an alternative for the `user`/`password` configuration.
+ #
+ # For more info, check out the https://www.elastic.co/guide/en/logstash/current/connecting-to-cloud.html#_cloud_auth[Logstash-to-Cloud documentation]
+ config :cloud_auth, :validate => :password
+
# SSL
config :ssl, :validate => :boolean, :default => false
# SSL Certificate Authority file in PEM encoded format, must also include any chain certificates as necessary
config :ca_file, :validate => :path
@@ -163,16 +174,21 @@
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
end
transport_options = {}
+ fill_user_password_from_cloud_auth
+
if @user && @password
token = Base64.strict_encode64("#{@user}:#{@password.value}")
transport_options[:headers] = { :Authorization => "Basic #{token}" }
end
- hosts = if @ssl then
+ fill_hosts_from_cloud_id
+ @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s
+
+ hosts = if @ssl
@hosts.map do |h|
host, port = h.split(":")
{ :host => host, :scheme => 'https', :port => port }
end
else
@@ -273,12 +289,72 @@
output_queue << event
end
def scroll_request scroll_id
- @client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
+ client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
def search_request(options)
- @client.search(options)
+ client.search(options)
end
+
+ attr_reader :client
+
+ def hosts_default?(hosts)
+ hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? )
+ end
+
+ def fill_hosts_from_cloud_id
+ return unless @cloud_id
+
+ if @hosts && !hosts_default?(@hosts)
+ raise LogStash::ConfigurationError, 'Both cloud_id and hosts specified, please only use one of those.'
+ end
+ @hosts = parse_host_uri_from_cloud_id(@cloud_id)
+ end
+
+ def fill_user_password_from_cloud_auth
+ return unless @cloud_auth
+
+ if @user || @password
+ raise LogStash::ConfigurationError, 'Both cloud_auth and user/password specified, please only use one.'
+ end
+ @user, @password = parse_user_password_from_cloud_auth(@cloud_auth)
+ params['user'], params['password'] = @user, @password
+ end
+
+ def parse_host_uri_from_cloud_id(cloud_id)
+ begin # might not be available on older LS
+ require 'logstash/util/cloud_setting_id'
+ rescue LoadError
+ raise LogStash::ConfigurationError, 'The cloud_id setting is not supported by your version of Logstash, ' +
+ 'please upgrade your installation (or set hosts instead).'
+ end
+
+ begin
+ cloud_id = LogStash::Util::CloudSettingId.new(cloud_id) # already does append ':{port}' to host
+ rescue ArgumentError => e
+ raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Id/i, 'cloud_id')
+ end
+ cloud_uri = "#{cloud_id.elasticsearch_scheme}://#{cloud_id.elasticsearch_host}"
+ LogStash::Util::SafeURI.new(cloud_uri)
+ end
+
+ def parse_user_password_from_cloud_auth(cloud_auth)
+ begin # might not be available on older LS
+ require 'logstash/util/cloud_setting_auth'
+ rescue LoadError
+ raise LogStash::ConfigurationError, 'The cloud_auth setting is not supported by your version of Logstash, ' +
+ 'please upgrade your installation (or set user/password instead).'
+ end
+
+ cloud_auth = cloud_auth.value if cloud_auth.is_a?(LogStash::Util::Password)
+ begin
+ cloud_auth = LogStash::Util::CloudSettingAuth.new(cloud_auth)
+ rescue ArgumentError => e
+ raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth')
+ end
+ [ cloud_auth.username, cloud_auth.password ]
+ end
+
end