lib/google/cloud/logging/async_writer.rb in google-cloud-logging-0.23.0 vs lib/google/cloud/logging/async_writer.rb in google-cloud-logging-0.23.1
- old
+ new
@@ -10,11 +10,13 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+require "set"
+
module Google
module Cloud
module Logging
##
# # AsyncWriter
@@ -50,11 +52,16 @@
# resource: resource,
# labels: labels
#
class AsyncWriter
DEFAULT_MAX_QUEUE_SIZE = 10000
+ CLEANUP_TIMEOUT = 10.0
+ WAIT_INTERVAL = 1.0
+ @cleanup_list = nil
+ @exit_lock = Mutex.new
+
##
# @private Item in the log entries queue.
QueueItem = Struct.new(:entries, :log_name, :resource, :labels) do
def try_combine next_item
if log_name == next_item.log_name &&
@@ -306,11 +313,12 @@
##
# Blocks until this asynchronous writer has been stopped, or the given
# timeout (if present) has elapsed.
#
- # @param [Number] timeout Timeout in seconds, or nil for no timeout.
+ # @param [Number, nil] timeout Timeout in seconds, or `nil` for no
+ # timeout.
#
# @return [Boolean] Returns true if the writer is stopped, or false
# if the timeout expired.
#
def wait_until_stopped timeout = nil
@@ -318,16 +326,41 @@
deadline = timeout ? ::Time.new.to_f + timeout : nil
@lock.synchronize do
until state == :stopped
cur_time = ::Time.new.to_f
return false if deadline && cur_time >= deadline
- @lock_cond.wait(deadline ? deadline - cur_time : nil)
+ interval = deadline ? deadline - cur_time : WAIT_INTERVAL
+ interval = WAIT_INTERVAL if interval > WAIT_INTERVAL
+ @lock_cond.wait interval
end
end
true
end
+ ##
+ # Stop this asynchronous writer and block until it has been stopped.
+ #
+ # @param [Number] timeout Timeout in seconds.
+ # @param [Boolean] force If set to true, and the writer hasn't stopped
+ # within the given timeout, kill it forcibly by terminating the
+ # thread. This should be used with extreme caution, as it can
+ # leave RPCs unfinished. Default is false.
+ #
+ # @return [Symbol] Returns `:stopped` if the AsyncWriter was already
+ # stopped at the time of invocation, `:waited` if it stopped
+ # during the timeout period, `:timeout` if it is still running
+ # after the timeout, or `:forced` if it was forcibly killed.
+ #
+ def stop! timeout, force: false
+ return :stopped unless stop
+ return :waited if wait_until_stopped timeout
+ return :timeout unless force
+ @thread.kill
+ @thread.join
+ :forced
+ end
+
protected
##
# @private Ensures the background thread is running. This is called
# at the start of all public methods, and kicks off the thread lazily.
@@ -339,18 +372,22 @@
if (@thread.nil? || !@thread.alive?) && @state != :stopped
@queue_size = 0
@queue = []
@lock = Monitor.new
@lock_cond = @lock.new_cond
- @thread = Thread.new { run_backgrounder }
+ AsyncWriter.register_for_cleanup self
+ @thread = Thread.new do
+ run_backgrounder
+ AsyncWriter.unregister_for_cleanup self
+ end
end
end
end
##
# @private The background thread implementation, which continuously
- # waits for performs work, and returns only when fully stopped.
+ # waits for and performs work, and returns only when fully stopped.
#
def run_backgrounder
loop do
queue_item = wait_next_item
return unless queue_item
@@ -365,32 +402,77 @@
# Ignore any exceptions thrown from the background thread, but
# keep running to ensure its state behavior remains consistent.
@last_exception = e
end
end
+ ensure
+ # If something drastic happened like the thread was killed, make
+ # sure the state is consistent.
+ @state = :stopped
end
##
# @private Wait for and dequeue the next set of log entries to transmit.
#
- # @return [QueueItem,NilClass] Returns the next set of entries. If
+ # @return [QueueItem, nil] Returns the next set of entries. If
# the writer has been stopped and no more entries are left in the
- # queue, returns nil.
+ # queue, returns `nil`.
#
def wait_next_item
@lock.synchronize do
while state == :suspended ||
(state == :running && @queue.empty?)
@lock_cond.wait
end
+ queue_item = nil
if @queue.empty?
@state = :stopped
- nil
else
queue_item = @queue.shift
@queue_size -= queue_item.entries.size
- @lock_cond.broadcast
- queue_item
+ end
+ @lock_cond.broadcast
+ queue_item
+ end
+ end
+
+ ##
+ # Register the given AsyncWriter for cleanup on VM exit.
+ #
+ # @private
+ #
+ def self.register_for_cleanup async
+ @exit_lock.synchronize do
+ unless @cleanup_list
+ @cleanup_list = ::Set.new
+ at_exit { AsyncWriter.run_cleanup }
+ end
+ @cleanup_list.add async
+ end
+ end
+
+ ##
+ # Unregister the given AsyncWriter for cleanup on VM exit.
+ #
+ # @private
+ #
+ def self.unregister_for_cleanup async
+ @exit_lock.synchronize do
+ @cleanup_list.delete async if @cleanup_list
+ end
+ end
+
+ ##
+ # Exit hook that cleans up any running AsyncWriters.
+ #
+ # @private
+ #
+ def self.run_cleanup
+ @exit_lock.synchronize do
+ if @cleanup_list
+ @cleanup_list.each do |async|
+ async.stop! CLEANUP_TIMEOUT, force: true
+ end
end
end
end
end
end