lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.7.1 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.8.0
- old
+ new
@@ -133,10 +133,19 @@
config :user, :validate => :string
# Basic Auth - password
config :password, :validate => :password
+ # Connection Timeout, in Seconds
+ config :connect_timeout_seconds, :validate => :positive_whole_number, :default => 10
+
+ # Request Timeout, in Seconds
+ config :request_timeout_seconds, :validate => :positive_whole_number, :default => 60
+
+ # Socket Timeout, in Seconds
+ config :socket_timeout_seconds, :validate => :positive_whole_number, :default => 60
+
# Cloud ID, from the Elastic Cloud web console. If set `hosts` should not be used.
#
# For more info, check out the https://www.elastic.co/guide/en/logstash/current/connecting-to-cloud.html#_cloud_id[Logstash-to-Cloud documentation]
config :cloud_id, :validate => :string
@@ -187,10 +196,13 @@
transport_options = {:headers => {}}
transport_options[:headers].merge!(setup_basic_auth(user, password))
transport_options[:headers].merge!(setup_api_key(api_key))
+ transport_options[:request_timeout] = @request_timeout_seconds unless @request_timeout_seconds.nil?
+ transport_options[:connect_timeout] = @connect_timeout_seconds unless @connect_timeout_seconds.nil?
+ transport_options[:socket_timeout] = @socket_timeout_seconds unless @socket_timeout_seconds.nil?
hosts = setup_hosts
ssl_options = setup_ssl
@logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')
@@ -203,27 +215,12 @@
:transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore,
:ssl => ssl_options
)
end
- ##
- # @override to handle proxy => '' as if none was set
- # @param value [Array<Object>]
- # @param validator [nil,Array,Symbol]
- # @return [Array(true,Object)]: if validation is a success, a tuple containing `true` and the coerced value
- # @return [Array(false,String)]: if validation is a failure, a tuple containing `false` and the failure reason.
- def self.validate_value(value, validator)
- return super unless validator == :uri_or_empty
- value = deep_replace(value)
- value = hash_or_array(value)
- return true, value.first if value.size == 1 && value.first.empty?
-
- return super(value, :uri)
- end
-
def run(output_queue)
if @schedule
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
@scheduler.cron @schedule do
do_run(output_queue)
@@ -437,6 +434,44 @@
raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth')
end
[ cloud_auth.username, cloud_auth.password ]
end
+ module URIOrEmptyValidator
+ ##
+ # @override to provide :uri_or_empty validator
+ # @param value [Array<Object>]
+ # @param validator [nil,Array,Symbol]
+ # @return [Array(true,Object)]: if validation is a success, a tuple containing `true` and the coerced value
+ # @return [Array(false,String)]: if validation is a failure, a tuple containing `false` and the failure reason.
+ def validate_value(value, validator)
+ return super unless validator == :uri_or_empty
+
+ value = deep_replace(value)
+ value = hash_or_array(value)
+
+ return true, value.first if value.size == 1 && value.first.empty?
+
+ return super(value, :uri)
+ end
+ end
+ extend(URIOrEmptyValidator)
+
+ module PositiveWholeNumberValidator
+ ##
+ # @override to provide :positive_whole_number validator
+ # @param value [Array<Object>]
+ # @param validator [nil,Array,Symbol]
+ # @return [Array(true,Object)]: if validation is a success, a tuple containing `true` and the coerced value
+ # @return [Array(false,String)]: if validation is a failure, a tuple containing `false` and the failure reason.
+ def validate_value(value, validator)
+ return super unless validator == :positive_whole_number
+
+ is_number, coerced_number = super(value, :number)
+
+ return [true, coerced_number.to_i] if is_number && coerced_number.denominator == 1 && coerced_number > 0
+
+ return [false, "Expected positive whole number, got `#{value.inspect}`"]
+ end
+ end
+ extend(PositiveWholeNumberValidator)
end