# Copyright 2018 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require "google/cloud/firestore/v1" require "google/cloud/firestore/convert" require "google/cloud/firestore/watch/enumerator_queue" require "google/cloud/firestore/watch/inventory" require "monitor" module Google module Cloud module Firestore ## # @private module Watch ## # @private class Listener include MonitorMixin def self.for_doc_ref parent, doc_ref, &callback raise ArgumentError if doc_ref.nil? raise ArgumentError if callback.nil? documents = Google::Cloud::Firestore::V1::Target::DocumentsTarget.new( documents: [doc_ref.path] ) init_listen_req = Google::Cloud::Firestore::V1::ListenRequest.new( database: doc_ref.client.path, add_target: Google::Cloud::Firestore::V1::Target.new( documents: documents ) ) new parent, nil, doc_ref, doc_ref.client, init_listen_req, &callback end def self.for_query parent, query, &callback raise ArgumentError if query.nil? raise ArgumentError if callback.nil? init_listen_req = Google::Cloud::Firestore::V1::ListenRequest.new( database: query.client.path, add_target: Google::Cloud::Firestore::V1::Target.new( query: Google::Cloud::Firestore::V1::Target::QueryTarget.new( parent: query.parent_path, structured_query: query.query ) ) ) new parent, query, nil, query.client, init_listen_req, &callback end def initialize parent, query, doc_ref, client, init_listen_req, &callback super() # to init MonitorMixin @parent = parent @query = query @doc_ref = doc_ref @client = client @init_listen_req = init_listen_req @callback = callback end def start synchronize { start_listening! } self end def stop synchronize do @stopped = true @request_queue&.push self end end ## # Whether the client has stopped listening for changes. # # @example # require "google/cloud/firestore" # # firestore = Google::Cloud::Firestore.new # # # Create a query # query = firestore.col(:cities).order(:population, :desc) # # listener = query.listen do |snapshot| # puts "The query snapshot has #{snapshot.docs.count} documents " # puts "and has #{snapshot.changes.count} changes." # end # # # Checks if the listener is stopped. # listener.stopped? #=> false # # # When ready, stop the listen operation and close the stream. # listener.stop # # # Checks if the listener is stopped. # listener.stopped? #=> true # def stopped? synchronize { @stopped } end private def send_callback query_snp @callback.call query_snp rescue StandardError => e @parent.error! e end def start_listening! # create new background thread to handle the stream's enumerator @background_thread = Thread.new { background_run } end # @private class RestartStream < StandardError; end # rubocop:disable all def background_run # Don't allow a stream to restart if already stopped return if synchronize { @stopped } @backoff ||= { current: 0, delay: 1.0, max: 5, mod: 1.3 } # Reuse inventory if one already exists # Even though this uses an @var, no need to synchronize @inventory ||= Inventory.new(@client, @query) @inventory.restart # Send stop if already running synchronize do @request_queue.push self if @request_queue end # Customize the provided initial listen request init_listen_req = @init_listen_req.dup.tap do |req| req.add_target.resume_token = String(@inventory.resume_token) req.add_target.target_id = 0x42 end # Always create a new enum queue synchronize do @request_queue = EnumeratorQueue.new self @request_queue.push init_listen_req end # Not an @var, we get a new enum each time enum = synchronize do @client.service.listen @request_queue.each end loop do # Break loop, close thread if stopped break if synchronize { @stopped } begin # Cannot syncronize the enumerator, causes deadlock response = enum.next case response.response_type when :target_change case response.target_change.target_change_type when :NO_CHANGE # No change has occurred. Used only to send an updated # +resume_token+. @inventory.persist( response.target_change.resume_token, Convert.timestamp_to_time( response.target_change.read_time ) ) if @inventory.current? && @inventory.changes? synchronize do send_callback @inventory.build_query_snapshot end end when :CURRENT # The targets reflect all changes committed before the targets # were added to the stream. # # This will be sent after or with a +read_time+ that is # greater than or equal to the time at which the targets were # added. # # Listeners can wait for this change if read-after-write # semantics are desired. @inventory.persist( response.target_change.resume_token, Convert.timestamp_to_time( response.target_change.read_time ) ) @inventory.current = true when :RESET # The targets have been reset, and a new initial state for the # targets will be returned in subsequent changes. # # After the initial state is complete, +CURRENT+ will be # returned even if the target was previously indicated to be # +CURRENT+. @inventory.reset raise RestartStream # Raise to restart the stream end when :document_change # A {Google::Cloud::Firestore::V1::Document Document} has changed. if response.document_change.removed_target_ids.any? @inventory.delete response.document_change.document.name else @inventory.add response.document_change.document end when :document_delete # A {Google::Cloud::Firestore::V1::Document Document} has been # deleted. @inventory.delete response.document_delete.document when :document_remove # A {Google::Cloud::Firestore::V1::Document Document} has been # removed from a target (because it is no longer relevant to # that target). @inventory.delete response.document_remove.document when :filter # A filter to apply to the set of documents previously returned # for the given target. # # Returned when documents may have been removed from the given # target, but the exact documents are unknown. if response.filter.count != @inventory.count_with_pending @inventory.reset raise RestartStream # Raise to restart the stream end end rescue StopIteration break end # Reset backoff values when completed without an error @backoff[:current] = 0 @backoff[:delay] = 1.0 end # Has the loop broken but we aren't stopped? # Could be GRPC has thrown an internal error, so restart. raise RestartStream unless synchronize { @stopped } # We must be stopped, tell the stream to quit. @request_queue.push self rescue GRPC::Cancelled, GRPC::DeadlineExceeded, GRPC::Internal, GRPC::ResourceExhausted, GRPC::Unauthenticated, GRPC::Unavailable, GRPC::Unknown, GRPC::Core::CallError => e # Restart the stream with an incremental back for a retriable error. # Also when GRPC raises the internal CallError. # Raise if retried more than the max if @backoff[:current] > @backoff[:max] @parent.error! e raise e else # Sleep with incremental backoff before restarting sleep @backoff[:delay] # Update increment backoff delay and retry counter @backoff[:delay] *= @backoff[:mod] @backoff[:current] += 1 retry end rescue RestartStream retry rescue StandardError => e @parent.error! e raise e end end end end end end