lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.4.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.5.0

- old
+ new

@@ -1,9 +1,10 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" require "logstash/json" +require "logstash/util/safe_uri" require "base64" # .Compatibility Note # [NOTE] @@ -67,10 +68,15 @@ # List of elasticsearch hosts to use for querying. # Each host can be either IP, HOST, IP:port or HOST:port. # Port defaults to 9200 config :hosts, :validate => :array + # Cloud ID, from the Elastic Cloud web console. If set `hosts` should not be used. + # + # For more info, check out the https://www.elastic.co/guide/en/logstash/current/connecting-to-cloud.html#_cloud_id[Logstash-to-Cloud documentation] + config :cloud_id, :validate => :string + # The index or alias to search. config :index, :validate => :string, :default => "logstash-*" # The query to be executed. Read the Elasticsearch query DSL documentation # for more info @@ -132,10 +138,15 @@ config :user, :validate => :string # Basic Auth - password config :password, :validate => :password + # Cloud authentication string ("<username>:<password>" format) is an alternative for the `user`/`password` configuration. + # + # For more info, check out the https://www.elastic.co/guide/en/logstash/current/connecting-to-cloud.html#_cloud_auth[Logstash-to-Cloud documentation] + config :cloud_auth, :validate => :password + # SSL config :ssl, :validate => :boolean, :default => false # SSL Certificate Authority file in PEM encoded format, must also include any chain certificates as necessary config :ca_file, :validate => :path @@ -163,16 +174,21 @@ @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") end transport_options = {} + fill_user_password_from_cloud_auth + if @user && @password token = Base64.strict_encode64("#{@user}:#{@password.value}") transport_options[:headers] = { :Authorization => "Basic #{token}" } end - hosts = if @ssl then + fill_hosts_from_cloud_id + @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s + + hosts = if @ssl @hosts.map do |h| host, port = h.split(":") { :host => host, :scheme => 'https', :port => port } end else @@ -273,12 +289,72 @@ output_queue << event end def scroll_request scroll_id - @client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll) + client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll) end def search_request(options) - @client.search(options) + client.search(options) end + + attr_reader :client + + def hosts_default?(hosts) + hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? ) + end + + def fill_hosts_from_cloud_id + return unless @cloud_id + + if @hosts && !hosts_default?(@hosts) + raise LogStash::ConfigurationError, 'Both cloud_id and hosts specified, please only use one of those.' + end + @hosts = parse_host_uri_from_cloud_id(@cloud_id) + end + + def fill_user_password_from_cloud_auth + return unless @cloud_auth + + if @user || @password + raise LogStash::ConfigurationError, 'Both cloud_auth and user/password specified, please only use one.' + end + @user, @password = parse_user_password_from_cloud_auth(@cloud_auth) + params['user'], params['password'] = @user, @password + end + + def parse_host_uri_from_cloud_id(cloud_id) + begin # might not be available on older LS + require 'logstash/util/cloud_setting_id' + rescue LoadError + raise LogStash::ConfigurationError, 'The cloud_id setting is not supported by your version of Logstash, ' + + 'please upgrade your installation (or set hosts instead).' + end + + begin + cloud_id = LogStash::Util::CloudSettingId.new(cloud_id) # already does append ':{port}' to host + rescue ArgumentError => e + raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Id/i, 'cloud_id') + end + cloud_uri = "#{cloud_id.elasticsearch_scheme}://#{cloud_id.elasticsearch_host}" + LogStash::Util::SafeURI.new(cloud_uri) + end + + def parse_user_password_from_cloud_auth(cloud_auth) + begin # might not be available on older LS + require 'logstash/util/cloud_setting_auth' + rescue LoadError + raise LogStash::ConfigurationError, 'The cloud_auth setting is not supported by your version of Logstash, ' + + 'please upgrade your installation (or set user/password instead).' + end + + cloud_auth = cloud_auth.value if cloud_auth.is_a?(LogStash::Util::Password) + begin + cloud_auth = LogStash::Util::CloudSettingAuth.new(cloud_auth) + rescue ArgumentError => e + raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth') + end + [ cloud_auth.username, cloud_auth.password ] + end + end