lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.3.0 vs lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.3.1

- old
+ new

@@ -47,23 +47,39 @@ public Schedule_types = %w(cron every at in) def register @host = Socket.gethostname.force_encoding(Encoding::UTF_8) - @logger.info("Registering http_poller Input", :type => @type, :schedule => @schedule, :timeout => @timeout) - setup_ecs_field! setup_requests! end # @overload def stop + shutdown_scheduler_and_close_client(:wait) + end + + # @overload + def close + shutdown_scheduler_and_close_client + end + + def shutdown_scheduler_and_close_client(opt = nil) + @logger.debug("closing http client", client: client) + begin + client.close # since Manticore 0.9.0 this shuts-down/closes all resources + rescue => e + details = { exception: e.class, message: e.message } + details[:backtrace] = e.backtrace if @logger.debug? + @logger.warn "failed closing http client", details + end if @scheduler - @scheduler.shutdown # on newer Rufus (3.8) this joins on the scheduler thread + @logger.debug("shutting down scheduler", scheduler: @scheduler) + @scheduler.shutdown(opt) # on newer Rufus (3.8) this joins on the scheduler thread end - # TODO implement client.close as we as releasing it's pooled resources! end + private :shutdown_scheduler_and_close_client private def setup_requests! @requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }] end @@ -177,19 +193,22 @@ @scheduler.thread.join # due newer rufus (3.8) doing a blocking operation on scheduler.join end def run_once(queue) @requests.each do |name, request| + # prevent executing a scheduler kick after the plugin has been stop-ed + # this could easily happen as the scheduler shutdown is not immediate + return if stop? request_async(queue, name, request) end - client.execute! + client.execute! unless stop? end private def request_async(queue, name, request) - @logger.debug? && @logger.debug("Fetching URL", :name => name, :url => request) + @logger.debug? && @logger.debug("async queueing fetching url", name: name, url: request) started = Time.now method, *request_opts = request client.async.send(method, *request_opts). on_success {|response| handle_success(queue, name, request, response, Time.now - started) }. @@ -202,10 +221,11 @@ (time_diff * 1000000).to_i end private def handle_success(queue, name, request, response, execution_time) + @logger.debug? && @logger.debug("success fetching url", name: name, url: request) body = response.body # If there is a usable response. HEAD requests are `nil` and empty get # responses come up as "" which will cause the codec to not yield anything if body && body.size > 0 decode_and_flush(@codec, body) do |decoded| @@ -240,9 +260,10 @@ end private # Beware, on old versions of manticore some uncommon failures are not handled def handle_failure(queue, name, request, exception, execution_time) + @logger.debug? && @logger.debug("failed fetching url", name: name, url: request) event = event_factory.new_event event.tag("_http_request_failure") apply_metadata(event, name, request, nil, execution_time) apply_failure_fields(event, name, request, exception, execution_time)