lib/google/cloud/logging/async_writer.rb in google-cloud-logging-0.23.0 vs lib/google/cloud/logging/async_writer.rb in google-cloud-logging-0.23.1

- old
+ new

@@ -10,11 +10,13 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +require "set" + module Google module Cloud module Logging ## # # AsyncWriter @@ -50,11 +52,16 @@ # resource: resource, # labels: labels # class AsyncWriter DEFAULT_MAX_QUEUE_SIZE = 10000 + CLEANUP_TIMEOUT = 10.0 + WAIT_INTERVAL = 1.0 + @cleanup_list = nil + @exit_lock = Mutex.new + ## # @private Item in the log entries queue. QueueItem = Struct.new(:entries, :log_name, :resource, :labels) do def try_combine next_item if log_name == next_item.log_name && @@ -306,11 +313,12 @@ ## # Blocks until this asynchronous writer has been stopped, or the given # timeout (if present) has elapsed. # - # @param [Number] timeout Timeout in seconds, or nil for no timeout. + # @param [Number, nil] timeout Timeout in seconds, or `nil` for no + # timeout. # # @return [Boolean] Returns true if the writer is stopped, or false # if the timeout expired. # def wait_until_stopped timeout = nil @@ -318,16 +326,41 @@ deadline = timeout ? ::Time.new.to_f + timeout : nil @lock.synchronize do until state == :stopped cur_time = ::Time.new.to_f return false if deadline && cur_time >= deadline - @lock_cond.wait(deadline ? deadline - cur_time : nil) + interval = deadline ? deadline - cur_time : WAIT_INTERVAL + interval = WAIT_INTERVAL if interval > WAIT_INTERVAL + @lock_cond.wait interval end end true end + ## + # Stop this asynchronous writer and block until it has been stopped. + # + # @param [Number] timeout Timeout in seconds. + # @param [Boolean] force If set to true, and the writer hasn't stopped + # within the given timeout, kill it forcibly by terminating the + # thread. This should be used with extreme caution, as it can + # leave RPCs unfinished. Default is false. + # + # @return [Symbol] Returns `:stopped` if the AsyncWriter was already + # stopped at the time of invocation, `:waited` if it stopped + # during the timeout period, `:timeout` if it is still running + # after the timeout, or `:forced` if it was forcibly killed. + # + def stop! timeout, force: false + return :stopped unless stop + return :waited if wait_until_stopped timeout + return :timeout unless force + @thread.kill + @thread.join + :forced + end + protected ## # @private Ensures the background thread is running. This is called # at the start of all public methods, and kicks off the thread lazily. @@ -339,18 +372,22 @@ if (@thread.nil? || !@thread.alive?) && @state != :stopped @queue_size = 0 @queue = [] @lock = Monitor.new @lock_cond = @lock.new_cond - @thread = Thread.new { run_backgrounder } + AsyncWriter.register_for_cleanup self + @thread = Thread.new do + run_backgrounder + AsyncWriter.unregister_for_cleanup self + end end end end ## # @private The background thread implementation, which continuously - # waits for performs work, and returns only when fully stopped. + # waits for and performs work, and returns only when fully stopped. # def run_backgrounder loop do queue_item = wait_next_item return unless queue_item @@ -365,32 +402,77 @@ # Ignore any exceptions thrown from the background thread, but # keep running to ensure its state behavior remains consistent. @last_exception = e end end + ensure + # If something drastic happened like the thread was killed, make + # sure the state is consistent. + @state = :stopped end ## # @private Wait for and dequeue the next set of log entries to transmit. # - # @return [QueueItem,NilClass] Returns the next set of entries. If + # @return [QueueItem, nil] Returns the next set of entries. If # the writer has been stopped and no more entries are left in the - # queue, returns nil. + # queue, returns `nil`. # def wait_next_item @lock.synchronize do while state == :suspended || (state == :running && @queue.empty?) @lock_cond.wait end + queue_item = nil if @queue.empty? @state = :stopped - nil else queue_item = @queue.shift @queue_size -= queue_item.entries.size - @lock_cond.broadcast - queue_item + end + @lock_cond.broadcast + queue_item + end + end + + ## + # Register the given AsyncWriter for cleanup on VM exit. + # + # @private + # + def self.register_for_cleanup async + @exit_lock.synchronize do + unless @cleanup_list + @cleanup_list = ::Set.new + at_exit { AsyncWriter.run_cleanup } + end + @cleanup_list.add async + end + end + + ## + # Unregister the given AsyncWriter for cleanup on VM exit. + # + # @private + # + def self.unregister_for_cleanup async + @exit_lock.synchronize do + @cleanup_list.delete async if @cleanup_list + end + end + + ## + # Exit hook that cleans up any running AsyncWriters. + # + # @private + # + def self.run_cleanup + @exit_lock.synchronize do + if @cleanup_list + @cleanup_list.each do |async| + async.stop! CLEANUP_TIMEOUT, force: true + end end end end end end