lib/google/cloud/logging/async_writer.rb in google-cloud-logging-1.5.7 vs lib/google/cloud/logging/async_writer.rb in google-cloud-logging-1.6.0
- old
+ new
@@ -11,25 +11,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-require "set"
-require "stackdriver/core/async_actor"
+require "monitor"
+require "concurrent"
+require "google/cloud/logging/errors"
module Google
module Cloud
module Logging
##
# # AsyncWriter
#
- # An object that batches and transmits log entries asynchronously.
+ # AsyncWriter buffers, batches, and transmits log entries efficiently.
+ # Writing log entries is asynchronous and will not block.
#
- # Use this object to transmit log entries efficiently. It keeps a queue
- # of log entries, and runs a background thread that transmits them to
- # the logging service in batches. Generally, adding to the queue will
- # not block.
+ # Batches that cannot be delivered immediately are queued. When the queue
+ # is full new batch requests will raise errors that can be consumed using
+ # the {#on_error} callback. This provides back pressure in case the writer
+ # cannot keep up with requests.
#
# This object is thread-safe; it may accept write requests from
# multiple threads simultaneously, and will serialize them when
# executing in the background thread.
#
@@ -52,62 +54,41 @@
# log_name: "my_app_log",
# resource: resource,
# labels: labels
#
class AsyncWriter
- include Stackdriver::Core::AsyncActor
+ include MonitorMixin
- DEFAULT_MAX_QUEUE_SIZE = 10000
- CLEANUP_TIMEOUT = Stackdriver::Core::AsyncActor::CLEANUP_TIMEOUT
- WAIT_INTERVAL = Stackdriver::Core::AsyncActor::WAIT_INTERVAL
-
##
- # @private Item in the log entries queue.
- QueueItem = Struct.new(:entries, :log_name, :resource, :labels) do
- def try_combine next_item
- if log_name == next_item.log_name &&
- resource == next_item.resource &&
- labels == next_item.labels
- entries.concat(next_item.entries)
- true
- else
- false
- end
- end
- end
+ # @private Implementation accessors
+ attr_reader :logging, :max_bytes, :max_count, :interval,
+ :threads, :max_queue, :partial_success
##
- # @private The logging object.
- attr_accessor :logging
+ # @private Creates a new AsyncWriter instance.
+ def initialize logging, max_count: 10000, max_bytes: 10000000,
+ max_queue: 100, interval: 5, threads: 10,
+ partial_success: false
+ @logging = logging
- ##
- # @private The maximum size of the entries queue, or nil if not set.
- attr_accessor :max_queue_size
+ @max_count = max_count
+ @max_bytes = max_bytes
+ @max_queue = max_queue
+ @interval = interval
+ @threads = threads
- ##
- # The current state. Either :running, :suspended, :stopping, or :stopped
- #
- # DEPRECATED. Use #async_state instead.
- alias state async_state
+ @partial_success = partial_success
- ##
- # The last exception thrown by the background thread, or nil if nothing
- # has been thrown.
- attr_reader :last_exception
+ @error_callbacks = []
- ##
- # @private Creates a new AsyncWriter instance.
- def initialize logging, max_queue_size = DEFAULT_MAX_QUEUE_SIZE,
- partial_success = false
- super()
+ @cond = new_cond
- @logging = logging
- @max_queue_size = max_queue_size
- @partial_success = partial_success
- @queue_resource = new_cond
- @queue = []
- @queue_size = 0
+ # Make sure all buffered messages are sent when process exits.
+ at_exit { stop }
+
+ # init MonitorMixin
+ super()
end
##
# Asynchronously write one or more log entries to the Stackdriver
# Logging service.
@@ -147,23 +128,39 @@
# entry.resource.labels[:version_id] = "20150925t173233"
#
# async.write_entries entry
#
def write_entries entries, log_name: nil, resource: nil, labels: nil
- ensure_thread
- entries = Array(entries)
synchronize do
- raise "AsyncWriter has been stopped" unless writable?
- queue_item = QueueItem.new entries, log_name, resource, labels
- if @queue.empty? || !@queue.last.try_combine(queue_item)
- @queue.push queue_item
+ raise "AsyncWriter has been stopped" if @stopped
+
+ Array(entries).each do |entry|
+ # Update the entry to have all the data directly on it
+ entry.log_name ||= log_name
+ if entry.resource.nil? || entry.resource.empty?
+ entry.resource = resource
+ end
+ entry.labels = labels if entry.labels.nil? || entry.labels.empty?
+
+ # Add the entry to the batch
+ @batch ||= Batch.new self
+ next if @batch.try_add entry
+
+ # If we can't add to the batch, publish and create a new batch
+ publish_batch!
+ @batch = Batch.new self
+ @batch.add entry
end
- @queue_size += entries.size
- @queue_resource.broadcast
- while @max_queue_size && @queue_size > @max_queue_size
- @queue_resource.wait
- end
+
+ @thread_pool ||= \
+ Concurrent::CachedThreadPool.new max_threads: @threads,
+ max_queue: @max_queue
+ @thread ||= Thread.new { run_background }
+
+ publish_batch! if @batch.ready?
+
+ @cond.broadcast
end
self
end
##
@@ -199,185 +196,309 @@
def logger log_name, resource, labels = {}
Logger.new self, log_name, resource, labels
end
##
- # Stops this asynchronous writer.
+ # Begins the process of stopping the writer. Entries already in the
+ # queue will be published, but no new entries can be added. Use {#wait!}
+ # to block until the writer is fully stopped and all pending entries
+ # have been published.
#
- # After this call succeeds, the state will change to :stopping, and
- # you may not issue any additional write_entries calls. Any previously
- # issued writes will complete. Once any existing backlog has been
- # cleared, the state will change to :stopped.
+ # @return [AsyncWriter] returns self so calls can be chained.
+ def stop
+ synchronize do
+ break if @stopped
+
+ @stopped = true
+ publish_batch!
+ @cond.broadcast
+ @thread_pool.shutdown if @thread_pool
+ end
+
+ self
+ end
+
+ ##
+ # Blocks until the writer is fully stopped, all pending entries have
+ # been published, and all callbacks have completed. Does not stop the
+ # writer. To stop the writer, first call {#stop} and then call {#wait!}
+ # to block until the writer is stopped.
#
- # DEPRECATED. Use #async_stop instead.
- #
- # @return [Boolean] Returns true if the writer was running, or false
- # if the writer had already been stopped.
- #
- alias stop async_stop
+ # @return [AsyncWriter] returns self so calls can be chained.
+ def wait! timeout = nil
+ synchronize do
+ if @thread_pool
+ @thread_pool.shutdown
+ @thread_pool.wait_for_termination timeout
+ end
+ end
+ self
+ end
+
##
- # Suspends this asynchronous writer.
+ # Stop this asynchronous writer and block until it has been stopped.
#
- # After this call succeeds, the state will change to :suspended, and
- # the writer will stop sending RPCs until resumed.
+ # @param [Number] timeout Timeout in seconds.
+ # @param [Boolean] force If set to true, and the writer hasn't stopped
+ # within the given timeout, kill it forcibly by terminating the
+ # thread. This should be used with extreme caution, as it can
+ # leave RPCs unfinished. Default is false.
#
- # DEPRECATED. Use #async_suspend instead.
+ # @return [Symbol] Returns `:stopped` if the AsyncWriter was already
+ # stopped at the time of invocation, `:waited` if it stopped
+ # during the timeout period, `:timeout` if it is still running
+ # after the timeout, or `:forced` if it was forcibly killed.
#
- # @return [Boolean] Returns true if the writer had been running and was
- # suspended, otherwise false.
+ def stop! timeout = nil, force: nil
+ return :stopped if stopped?
+
+ stop
+ wait! timeout
+
+ if synchronize { @thread_pool.shutdown? }
+ return :waited if timeout
+ elsif force
+ @thread_pool.kill
+ return :forced
+ end
+ :timeout
+ end
+ alias async_stop! stop!
+
+ ##
+ # Forces all entries in the current batch to be published
+ # immediately.
#
- alias suspend async_suspend
+ # @return [AsyncWriter] returns self so calls can be chained.
+ def flush
+ synchronize do
+ publish_batch!
+ @cond.broadcast
+ end
+ self
+ end
+
##
- # Resumes this suspended asynchronous writer.
+ # Whether the writer has been started.
#
- # After this call succeeds, the state will change to :running, and
- # the writer will resume sending RPCs.
- #
- # DEPRECATED. Use #async_resume instead.
- #
- # @return [Boolean] Returns true if the writer had been suspended and
- # is now running, otherwise false.
- #
- alias resume async_resume
+ # @return [boolean] `true` when started, `false` otherwise.
+ def started?
+ !stopped?
+ end
##
- # Returns true if this writer is running.
+ # Whether the writer has been stopped.
#
- # DEPRECATED. Use #async_running? instead.
- #
- # @return [Boolean] Returns true if the writer is currently running.
- #
- alias running? async_running?
+ # @return [boolean] `true` when stopped, `false` otherwise.
+ def stopped?
+ synchronize { @stopped }
+ end
##
- # Returns true if this writer is suspended.
+ # Register to be notified of errors when raised.
#
- # DEPRECATED. Use #async_suspended? instead.
+ # If an unhandled error has occurred the writer will attempt to
+ # recover from the error and resume buffering, batching, and
+ # transmitting log entries
#
- # @return [Boolean] Returns true if the writer is currently suspended.
+ # Multiple error handlers can be added.
#
- alias suspended? async_suspended?
-
- ##
- # Returns true if this writer is still accepting writes. This means
- # it is either running or suspended.
+ # @yield [callback] The block to be called when an error is raised.
+ # @yieldparam [Exception] error The error raised.
#
- # DEPRECATED. Use #async_working? instead.
+ # @example
+ # require "google/cloud/logging"
+ # require "google/cloud/error_reporting"
#
- # @return [Boolean] Returns true if the writer is accepting writes.
+ # logging = Google::Cloud::Logging.new
#
- alias writable? async_working?
-
- ##
- # Returns true if this writer is fully stopped.
+ # resource = logging.resource "gae_app",
+ # module_id: "1",
+ # version_id: "20150925t173233"
#
- # DEPRECATED. Use #async_stopped? instead.
+ # async = logging.async_writer
#
- # @return [Boolean] Returns true if the writer is fully stopped.
+ # # Register to be notified when unhandled errors occur.
+ # async.on_error do |error|
+ # # error can be a AsyncWriterError or AsyncWriteEntriesError
+ # Google::Cloud::ErrorReporting.report error
+ # end
#
- alias stopped? async_stopped?
+ # logger = async.logger "my_app_log", resource, env: :production
+ # logger.info "Job started."
+ #
+ def on_error &block
+ synchronize do
+ @error_callbacks << block
+ end
+ end
##
- # Blocks until this asynchronous writer has been stopped, or the given
- # timeout (if present) has elapsed.
+ # The most recent unhandled error to occur while transmitting log
+ # entries.
#
- # DEPRECATED. Use #wait_until_async_stopped instead.
+ # If an unhandled error has occurred the subscriber will attempt to
+ # recover from the error and resume buffering, batching, and
+ # transmitting log entries.
#
- # @param [Number, nil] timeout Timeout in seconds, or `nil` for no
- # timeout.
+ # @return [Exception, nil] error The most recent error raised.
#
- # @return [Boolean] Returns true if the writer is stopped, or false
- # if the timeout expired.
+ # @example
+ # require "google/cloud/logging"
#
- alias wait_until_stopped wait_until_async_stopped
-
- ##
- # Stop this asynchronous writer and block until it has been stopped.
+ # logging = Google::Cloud::Logging.new
#
- # DEPRECATED. Use #async_stop! instead.
+ # resource = logging.resource "gae_app",
+ # module_id: "1",
+ # version_id: "20150925t173233"
#
- # @param [Number] timeout Timeout in seconds.
- # @param [Boolean] force If set to true, and the writer hasn't stopped
- # within the given timeout, kill it forcibly by terminating the
- # thread. This should be used with extreme caution, as it can
- # leave RPCs unfinished. Default is false.
+ # async = logging.async_writer
#
- # @return [Symbol] Returns `:stopped` if the AsyncWriter was already
- # stopped at the time of invocation, `:waited` if it stopped
- # during the timeout period, `:timeout` if it is still running
- # after the timeout, or `:forced` if it was forcibly killed.
+ # logger = async.logger "my_app_log", resource, env: :production
+ # logger.info "Job started."
#
- def stop! timeout, force: false
- @cleanup_options[:timeout] = timeout unless timeout.nil?
- @cleanup_options[:force] = force unless force.nil?
-
- async_stop!
+ # # If an error was raised, it can be retrieved here:
+ # async.last_error #=> nil
+ #
+ def last_error
+ synchronize { @last_error }
end
+ alias last_exception last_error
- ##
- # @private Callback function when the async actor thread state changes
- def on_async_state_change
+ protected
+
+ def run_background
synchronize do
- @queue_resource.broadcast
+ until @stopped
+ if @batch.nil?
+ @cond.wait
+ next
+ end
+
+ if @batch.ready?
+ # interval met, publish the batch...
+ publish_batch!
+ @cond.wait
+ else
+ # still waiting for the interval to publish the batch...
+ @cond.wait(@batch.publish_wait)
+ end
+ end
end
end
- protected
+ def publish_batch!
+ return unless @batch
- ##
- # @private The background thread implementation, which continuously
- # waits for and performs work, and returns only when fully stopped.
- #
- def run_backgrounder
- queue_item = wait_next_item
- return unless queue_item
- begin
- logging.write_entries(
- queue_item.entries,
- log_name: queue_item.log_name,
- resource: queue_item.resource,
- labels: queue_item.labels,
- partial_success: @partial_success
- )
- rescue StandardError => e
- # Ignore any exceptions thrown from the background thread, but
- # keep running to ensure its state behavior remains consistent.
- @last_exception = e
+ batch_to_be_published = @batch
+ @batch = nil
+ publish_batch_async batch_to_be_published
+ end
+
+ # Sets the last_error and calls all error callbacks.
+ def error! error
+ error_callbacks = synchronize do
+ @last_error = error
+ @error_callbacks
end
+ error_callbacks = default_error_callbacks if error_callbacks.empty?
+ error_callbacks.each { |error_callback| error_callback.call error }
end
- ##
- # @private Wait for and dequeue the next set of log entries to transmit.
- #
- # @return [QueueItem, nil] Returns the next set of entries. If
- # the writer has been stopped and no more entries are left in the
- # queue, returns `nil`.
- #
- def wait_next_item
- synchronize do
- while state == :suspended ||
- (state == :running && @queue.empty?)
- @queue_resource.wait
+ def default_error_callbacks
+ # This is memoized to reduce calls to the configuration.
+ @default_error_callbacks ||= begin
+ error_callback = Google::Cloud::Logging.configuration.on_error
+ error_callback ||= Google::Cloud.configure.on_error
+ if error_callback
+ [error_callback]
+ else
+ []
end
- queue_item = nil
- unless @queue.empty?
- queue_item = @queue.shift
- @queue_size -= queue_item.entries.size
- end
- @queue_resource.broadcast
- queue_item
end
end
+ def publish_batch_async batch
+ Concurrent::Promises.future_on(
+ @thread_pool, batch.entries
+ ) do |entries|
+ write_entries_with entries
+ end
+ rescue Concurrent::RejectedExecutionError => e
+ async_error = AsyncWriterError.new(
+ "Error writing entries: #{e.message}",
+ batch.entries
+ )
+ # Manually set backtrace so we don't have to raise
+ async_error.set_backtrace caller
+ error! async_error
+ end
+
+ def write_entries_with entries
+ logging.write_entries entries, partial_success: partial_success
+ rescue StandardError => e
+ write_error = AsyncWriteEntriesError.new(
+ "Error writing entries: #{e.message}",
+ entries
+ )
+ # Manually set backtrace so we don't have to raise
+ write_error.set_backtrace caller
+ error! write_error
+ end
+
##
- # @private Override the #backgrounder_stoppable? method from AsyncActor
- # module. The actor can be gracefully stopped when queue is
- # empty.
- def backgrounder_stoppable?
- synchronize do
- @queue.empty?
+ # @private
+ class Batch
+ attr_reader :created_at, :entries
+
+ def initialize writer
+ @writer = writer
+ @entries = []
+ @entries_bytes = 2 # initial size w/ partial_success
+ @created_at = nil
+ end
+
+ def add entry, addl_bytes: nil
+ addl_bytes ||= addl_bytes_for entry
+ @entries << entry
+ @entries_bytes += addl_bytes
+ @created_at ||= Time.now
+ nil
+ end
+
+ def try_add entry
+ addl_bytes = addl_bytes_for entry
+ new_message_count = @entries.count + 1
+ new_message_bytes = @entries_bytes + addl_bytes
+ if new_message_count > @writer.max_count ||
+ new_message_bytes >= @writer.max_bytes
+ return false
+ end
+ add entry, addl_bytes: addl_bytes
+ true
+ end
+
+ def ready?
+ @entries.count >= @writer.max_count ||
+ @entries_bytes >= @writer.max_bytes ||
+ (@created_at.nil? || (publish_at < Time.now))
+ end
+
+ def publish_at
+ return nil if @created_at.nil?
+ @created_at + @writer.interval
+ end
+
+ def publish_wait
+ publish_wait = publish_at - Time.now
+ return 0 if publish_wait < 0
+ publish_wait
+ end
+
+ def addl_bytes_for entry
+ entry.to_grpc.to_proto.bytesize + 2
end
end
end
end
end