lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.3.0 vs lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.3.1
- old
+ new
@@ -47,23 +47,39 @@
public
Schedule_types = %w(cron every at in)
def register
@host = Socket.gethostname.force_encoding(Encoding::UTF_8)
- @logger.info("Registering http_poller Input", :type => @type, :schedule => @schedule, :timeout => @timeout)
-
setup_ecs_field!
setup_requests!
end
# @overload
def stop
+ shutdown_scheduler_and_close_client(:wait)
+ end
+
+ # @overload
+ def close
+ shutdown_scheduler_and_close_client
+ end
+
+ def shutdown_scheduler_and_close_client(opt = nil)
+ @logger.debug("closing http client", client: client)
+ begin
+ client.close # since Manticore 0.9.0 this shuts-down/closes all resources
+ rescue => e
+ details = { exception: e.class, message: e.message }
+ details[:backtrace] = e.backtrace if @logger.debug?
+ @logger.warn "failed closing http client", details
+ end
if @scheduler
- @scheduler.shutdown # on newer Rufus (3.8) this joins on the scheduler thread
+ @logger.debug("shutting down scheduler", scheduler: @scheduler)
+ @scheduler.shutdown(opt) # on newer Rufus (3.8) this joins on the scheduler thread
end
- # TODO implement client.close as we as releasing it's pooled resources!
end
+ private :shutdown_scheduler_and_close_client
private
def setup_requests!
@requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }]
end
@@ -177,19 +193,22 @@
@scheduler.thread.join # due newer rufus (3.8) doing a blocking operation on scheduler.join
end
def run_once(queue)
@requests.each do |name, request|
+ # prevent executing a scheduler kick after the plugin has been stop-ed
+ # this could easily happen as the scheduler shutdown is not immediate
+ return if stop?
request_async(queue, name, request)
end
- client.execute!
+ client.execute! unless stop?
end
private
def request_async(queue, name, request)
- @logger.debug? && @logger.debug("Fetching URL", :name => name, :url => request)
+ @logger.debug? && @logger.debug("async queueing fetching url", name: name, url: request)
started = Time.now
method, *request_opts = request
client.async.send(method, *request_opts).
on_success {|response| handle_success(queue, name, request, response, Time.now - started) }.
@@ -202,10 +221,11 @@
(time_diff * 1000000).to_i
end
private
def handle_success(queue, name, request, response, execution_time)
+ @logger.debug? && @logger.debug("success fetching url", name: name, url: request)
body = response.body
# If there is a usable response. HEAD requests are `nil` and empty get
# responses come up as "" which will cause the codec to not yield anything
if body && body.size > 0
decode_and_flush(@codec, body) do |decoded|
@@ -240,9 +260,10 @@
end
private
# Beware, on old versions of manticore some uncommon failures are not handled
def handle_failure(queue, name, request, exception, execution_time)
+ @logger.debug? && @logger.debug("failed fetching url", name: name, url: request)
event = event_factory.new_event
event.tag("_http_request_failure")
apply_metadata(event, name, request, nil, execution_time)
apply_failure_fields(event, name, request, exception, execution_time)