lib/google/cloud/logging/async_writer.rb in google-cloud-logging-1.5.7 vs lib/google/cloud/logging/async_writer.rb in google-cloud-logging-1.6.0

- old
+ new

@@ -11,25 +11,27 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -require "set" -require "stackdriver/core/async_actor" +require "monitor" +require "concurrent" +require "google/cloud/logging/errors" module Google module Cloud module Logging ## # # AsyncWriter # - # An object that batches and transmits log entries asynchronously. + # AsyncWriter buffers, batches, and transmits log entries efficiently. + # Writing log entries is asynchronous and will not block. # - # Use this object to transmit log entries efficiently. It keeps a queue - # of log entries, and runs a background thread that transmits them to - # the logging service in batches. Generally, adding to the queue will - # not block. + # Batches that cannot be delivered immediately are queued. When the queue + # is full new batch requests will raise errors that can be consumed using + # the {#on_error} callback. This provides back pressure in case the writer + # cannot keep up with requests. # # This object is thread-safe; it may accept write requests from # multiple threads simultaneously, and will serialize them when # executing in the background thread. # @@ -52,62 +54,41 @@ # log_name: "my_app_log", # resource: resource, # labels: labels # class AsyncWriter - include Stackdriver::Core::AsyncActor + include MonitorMixin - DEFAULT_MAX_QUEUE_SIZE = 10000 - CLEANUP_TIMEOUT = Stackdriver::Core::AsyncActor::CLEANUP_TIMEOUT - WAIT_INTERVAL = Stackdriver::Core::AsyncActor::WAIT_INTERVAL - ## - # @private Item in the log entries queue. - QueueItem = Struct.new(:entries, :log_name, :resource, :labels) do - def try_combine next_item - if log_name == next_item.log_name && - resource == next_item.resource && - labels == next_item.labels - entries.concat(next_item.entries) - true - else - false - end - end - end + # @private Implementation accessors + attr_reader :logging, :max_bytes, :max_count, :interval, + :threads, :max_queue, :partial_success ## - # @private The logging object. - attr_accessor :logging + # @private Creates a new AsyncWriter instance. + def initialize logging, max_count: 10000, max_bytes: 10000000, + max_queue: 100, interval: 5, threads: 10, + partial_success: false + @logging = logging - ## - # @private The maximum size of the entries queue, or nil if not set. - attr_accessor :max_queue_size + @max_count = max_count + @max_bytes = max_bytes + @max_queue = max_queue + @interval = interval + @threads = threads - ## - # The current state. Either :running, :suspended, :stopping, or :stopped - # - # DEPRECATED. Use #async_state instead. - alias state async_state + @partial_success = partial_success - ## - # The last exception thrown by the background thread, or nil if nothing - # has been thrown. - attr_reader :last_exception + @error_callbacks = [] - ## - # @private Creates a new AsyncWriter instance. - def initialize logging, max_queue_size = DEFAULT_MAX_QUEUE_SIZE, - partial_success = false - super() + @cond = new_cond - @logging = logging - @max_queue_size = max_queue_size - @partial_success = partial_success - @queue_resource = new_cond - @queue = [] - @queue_size = 0 + # Make sure all buffered messages are sent when process exits. + at_exit { stop } + + # init MonitorMixin + super() end ## # Asynchronously write one or more log entries to the Stackdriver # Logging service. @@ -147,23 +128,39 @@ # entry.resource.labels[:version_id] = "20150925t173233" # # async.write_entries entry # def write_entries entries, log_name: nil, resource: nil, labels: nil - ensure_thread - entries = Array(entries) synchronize do - raise "AsyncWriter has been stopped" unless writable? - queue_item = QueueItem.new entries, log_name, resource, labels - if @queue.empty? || !@queue.last.try_combine(queue_item) - @queue.push queue_item + raise "AsyncWriter has been stopped" if @stopped + + Array(entries).each do |entry| + # Update the entry to have all the data directly on it + entry.log_name ||= log_name + if entry.resource.nil? || entry.resource.empty? + entry.resource = resource + end + entry.labels = labels if entry.labels.nil? || entry.labels.empty? + + # Add the entry to the batch + @batch ||= Batch.new self + next if @batch.try_add entry + + # If we can't add to the batch, publish and create a new batch + publish_batch! + @batch = Batch.new self + @batch.add entry end - @queue_size += entries.size - @queue_resource.broadcast - while @max_queue_size && @queue_size > @max_queue_size - @queue_resource.wait - end + + @thread_pool ||= \ + Concurrent::CachedThreadPool.new max_threads: @threads, + max_queue: @max_queue + @thread ||= Thread.new { run_background } + + publish_batch! if @batch.ready? + + @cond.broadcast end self end ## @@ -199,185 +196,309 @@ def logger log_name, resource, labels = {} Logger.new self, log_name, resource, labels end ## - # Stops this asynchronous writer. + # Begins the process of stopping the writer. Entries already in the + # queue will be published, but no new entries can be added. Use {#wait!} + # to block until the writer is fully stopped and all pending entries + # have been published. # - # After this call succeeds, the state will change to :stopping, and - # you may not issue any additional write_entries calls. Any previously - # issued writes will complete. Once any existing backlog has been - # cleared, the state will change to :stopped. + # @return [AsyncWriter] returns self so calls can be chained. + def stop + synchronize do + break if @stopped + + @stopped = true + publish_batch! + @cond.broadcast + @thread_pool.shutdown if @thread_pool + end + + self + end + + ## + # Blocks until the writer is fully stopped, all pending entries have + # been published, and all callbacks have completed. Does not stop the + # writer. To stop the writer, first call {#stop} and then call {#wait!} + # to block until the writer is stopped. # - # DEPRECATED. Use #async_stop instead. - # - # @return [Boolean] Returns true if the writer was running, or false - # if the writer had already been stopped. - # - alias stop async_stop + # @return [AsyncWriter] returns self so calls can be chained. + def wait! timeout = nil + synchronize do + if @thread_pool + @thread_pool.shutdown + @thread_pool.wait_for_termination timeout + end + end + self + end + ## - # Suspends this asynchronous writer. + # Stop this asynchronous writer and block until it has been stopped. # - # After this call succeeds, the state will change to :suspended, and - # the writer will stop sending RPCs until resumed. + # @param [Number] timeout Timeout in seconds. + # @param [Boolean] force If set to true, and the writer hasn't stopped + # within the given timeout, kill it forcibly by terminating the + # thread. This should be used with extreme caution, as it can + # leave RPCs unfinished. Default is false. # - # DEPRECATED. Use #async_suspend instead. + # @return [Symbol] Returns `:stopped` if the AsyncWriter was already + # stopped at the time of invocation, `:waited` if it stopped + # during the timeout period, `:timeout` if it is still running + # after the timeout, or `:forced` if it was forcibly killed. # - # @return [Boolean] Returns true if the writer had been running and was - # suspended, otherwise false. + def stop! timeout = nil, force: nil + return :stopped if stopped? + + stop + wait! timeout + + if synchronize { @thread_pool.shutdown? } + return :waited if timeout + elsif force + @thread_pool.kill + return :forced + end + :timeout + end + alias async_stop! stop! + + ## + # Forces all entries in the current batch to be published + # immediately. # - alias suspend async_suspend + # @return [AsyncWriter] returns self so calls can be chained. + def flush + synchronize do + publish_batch! + @cond.broadcast + end + self + end + ## - # Resumes this suspended asynchronous writer. + # Whether the writer has been started. # - # After this call succeeds, the state will change to :running, and - # the writer will resume sending RPCs. - # - # DEPRECATED. Use #async_resume instead. - # - # @return [Boolean] Returns true if the writer had been suspended and - # is now running, otherwise false. - # - alias resume async_resume + # @return [boolean] `true` when started, `false` otherwise. + def started? + !stopped? + end ## - # Returns true if this writer is running. + # Whether the writer has been stopped. # - # DEPRECATED. Use #async_running? instead. - # - # @return [Boolean] Returns true if the writer is currently running. - # - alias running? async_running? + # @return [boolean] `true` when stopped, `false` otherwise. + def stopped? + synchronize { @stopped } + end ## - # Returns true if this writer is suspended. + # Register to be notified of errors when raised. # - # DEPRECATED. Use #async_suspended? instead. + # If an unhandled error has occurred the writer will attempt to + # recover from the error and resume buffering, batching, and + # transmitting log entries # - # @return [Boolean] Returns true if the writer is currently suspended. + # Multiple error handlers can be added. # - alias suspended? async_suspended? - - ## - # Returns true if this writer is still accepting writes. This means - # it is either running or suspended. + # @yield [callback] The block to be called when an error is raised. + # @yieldparam [Exception] error The error raised. # - # DEPRECATED. Use #async_working? instead. + # @example + # require "google/cloud/logging" + # require "google/cloud/error_reporting" # - # @return [Boolean] Returns true if the writer is accepting writes. + # logging = Google::Cloud::Logging.new # - alias writable? async_working? - - ## - # Returns true if this writer is fully stopped. + # resource = logging.resource "gae_app", + # module_id: "1", + # version_id: "20150925t173233" # - # DEPRECATED. Use #async_stopped? instead. + # async = logging.async_writer # - # @return [Boolean] Returns true if the writer is fully stopped. + # # Register to be notified when unhandled errors occur. + # async.on_error do |error| + # # error can be a AsyncWriterError or AsyncWriteEntriesError + # Google::Cloud::ErrorReporting.report error + # end # - alias stopped? async_stopped? + # logger = async.logger "my_app_log", resource, env: :production + # logger.info "Job started." + # + def on_error &block + synchronize do + @error_callbacks << block + end + end ## - # Blocks until this asynchronous writer has been stopped, or the given - # timeout (if present) has elapsed. + # The most recent unhandled error to occur while transmitting log + # entries. # - # DEPRECATED. Use #wait_until_async_stopped instead. + # If an unhandled error has occurred the subscriber will attempt to + # recover from the error and resume buffering, batching, and + # transmitting log entries. # - # @param [Number, nil] timeout Timeout in seconds, or `nil` for no - # timeout. + # @return [Exception, nil] error The most recent error raised. # - # @return [Boolean] Returns true if the writer is stopped, or false - # if the timeout expired. + # @example + # require "google/cloud/logging" # - alias wait_until_stopped wait_until_async_stopped - - ## - # Stop this asynchronous writer and block until it has been stopped. + # logging = Google::Cloud::Logging.new # - # DEPRECATED. Use #async_stop! instead. + # resource = logging.resource "gae_app", + # module_id: "1", + # version_id: "20150925t173233" # - # @param [Number] timeout Timeout in seconds. - # @param [Boolean] force If set to true, and the writer hasn't stopped - # within the given timeout, kill it forcibly by terminating the - # thread. This should be used with extreme caution, as it can - # leave RPCs unfinished. Default is false. + # async = logging.async_writer # - # @return [Symbol] Returns `:stopped` if the AsyncWriter was already - # stopped at the time of invocation, `:waited` if it stopped - # during the timeout period, `:timeout` if it is still running - # after the timeout, or `:forced` if it was forcibly killed. + # logger = async.logger "my_app_log", resource, env: :production + # logger.info "Job started." # - def stop! timeout, force: false - @cleanup_options[:timeout] = timeout unless timeout.nil? - @cleanup_options[:force] = force unless force.nil? - - async_stop! + # # If an error was raised, it can be retrieved here: + # async.last_error #=> nil + # + def last_error + synchronize { @last_error } end + alias last_exception last_error - ## - # @private Callback function when the async actor thread state changes - def on_async_state_change + protected + + def run_background synchronize do - @queue_resource.broadcast + until @stopped + if @batch.nil? + @cond.wait + next + end + + if @batch.ready? + # interval met, publish the batch... + publish_batch! + @cond.wait + else + # still waiting for the interval to publish the batch... + @cond.wait(@batch.publish_wait) + end + end end end - protected + def publish_batch! + return unless @batch - ## - # @private The background thread implementation, which continuously - # waits for and performs work, and returns only when fully stopped. - # - def run_backgrounder - queue_item = wait_next_item - return unless queue_item - begin - logging.write_entries( - queue_item.entries, - log_name: queue_item.log_name, - resource: queue_item.resource, - labels: queue_item.labels, - partial_success: @partial_success - ) - rescue StandardError => e - # Ignore any exceptions thrown from the background thread, but - # keep running to ensure its state behavior remains consistent. - @last_exception = e + batch_to_be_published = @batch + @batch = nil + publish_batch_async batch_to_be_published + end + + # Sets the last_error and calls all error callbacks. + def error! error + error_callbacks = synchronize do + @last_error = error + @error_callbacks end + error_callbacks = default_error_callbacks if error_callbacks.empty? + error_callbacks.each { |error_callback| error_callback.call error } end - ## - # @private Wait for and dequeue the next set of log entries to transmit. - # - # @return [QueueItem, nil] Returns the next set of entries. If - # the writer has been stopped and no more entries are left in the - # queue, returns `nil`. - # - def wait_next_item - synchronize do - while state == :suspended || - (state == :running && @queue.empty?) - @queue_resource.wait + def default_error_callbacks + # This is memoized to reduce calls to the configuration. + @default_error_callbacks ||= begin + error_callback = Google::Cloud::Logging.configuration.on_error + error_callback ||= Google::Cloud.configure.on_error + if error_callback + [error_callback] + else + [] end - queue_item = nil - unless @queue.empty? - queue_item = @queue.shift - @queue_size -= queue_item.entries.size - end - @queue_resource.broadcast - queue_item end end + def publish_batch_async batch + Concurrent::Promises.future_on( + @thread_pool, batch.entries + ) do |entries| + write_entries_with entries + end + rescue Concurrent::RejectedExecutionError => e + async_error = AsyncWriterError.new( + "Error writing entries: #{e.message}", + batch.entries + ) + # Manually set backtrace so we don't have to raise + async_error.set_backtrace caller + error! async_error + end + + def write_entries_with entries + logging.write_entries entries, partial_success: partial_success + rescue StandardError => e + write_error = AsyncWriteEntriesError.new( + "Error writing entries: #{e.message}", + entries + ) + # Manually set backtrace so we don't have to raise + write_error.set_backtrace caller + error! write_error + end + ## - # @private Override the #backgrounder_stoppable? method from AsyncActor - # module. The actor can be gracefully stopped when queue is - # empty. - def backgrounder_stoppable? - synchronize do - @queue.empty? + # @private + class Batch + attr_reader :created_at, :entries + + def initialize writer + @writer = writer + @entries = [] + @entries_bytes = 2 # initial size w/ partial_success + @created_at = nil + end + + def add entry, addl_bytes: nil + addl_bytes ||= addl_bytes_for entry + @entries << entry + @entries_bytes += addl_bytes + @created_at ||= Time.now + nil + end + + def try_add entry + addl_bytes = addl_bytes_for entry + new_message_count = @entries.count + 1 + new_message_bytes = @entries_bytes + addl_bytes + if new_message_count > @writer.max_count || + new_message_bytes >= @writer.max_bytes + return false + end + add entry, addl_bytes: addl_bytes + true + end + + def ready? + @entries.count >= @writer.max_count || + @entries_bytes >= @writer.max_bytes || + (@created_at.nil? || (publish_at < Time.now)) + end + + def publish_at + return nil if @created_at.nil? + @created_at + @writer.interval + end + + def publish_wait + publish_wait = publish_at - Time.now + return 0 if publish_wait < 0 + publish_wait + end + + def addl_bytes_for entry + entry.to_grpc.to_proto.bytesize + 2 end end end end end