# Copyright 2017 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require "monitor" require "concurrent" module Google module Cloud module Pubsub class Subscriber ## # @private # # AsyncStreamPusher # class AsyncStreamPusher include MonitorMixin attr_reader :batch attr_reader :max_bytes, :interval def initialize stream, max_bytes: 10000000, interval: 1.0 @stream = stream @max_bytes = max_bytes @interval = interval @cond = new_cond # init MonitorMixin super() end def acknowledge ack_ids return true if ack_ids.empty? synchronize do ack_ids.each do |ack_id| if @batch.nil? @batch = Batch.new max_bytes: @max_bytes @batch.ack ack_id else unless @batch.try_ack ack_id push_batch_request! @batch = Batch.new max_bytes: @max_bytes @batch.ack ack_id end end @batch_created_at ||= Time.now @background_thread ||= Thread.new { run_background } push_batch_request! if @batch.ready? end @cond.signal end nil end def delay deadline, ack_ids return true if ack_ids.empty? synchronize do ack_ids.each do |ack_id| if @batch.nil? @batch = Batch.new max_bytes: @max_bytes @batch.delay deadline, ack_id else unless @batch.try_delay deadline, ack_id push_batch_request! @batch = Batch.new max_bytes: @max_bytes @batch.delay deadline, ack_id end end @batch_created_at ||= Time.now @background_thread ||= Thread.new { run_background } push_batch_request! if @batch.ready? end @cond.signal end nil end def stop synchronize do @stopped = true # Stop any background activity, clean up happens in wait! @background_thread.kill if @background_thread end push_batch_request! end def started? !stopped? end def stopped? synchronize { @stopped } end protected def run_background synchronize do until @stopped if @batch.nil? @cond.wait next end time_since_batch_creation = Time.now - @batch_created_at if time_since_batch_creation > @interval # interval met, publish the batch... push_batch_request! @cond.wait else # still waiting for the interval to publish the batch... @cond.wait(@interval - time_since_batch_creation) end end end end def push_batch_request! return unless @batch @stream.push @batch.request @batch = nil @batch_created_at = nil end class Batch attr_reader :max_bytes, :request def initialize max_bytes: 10000000 @max_bytes = max_bytes @request = Google::Pubsub::V1::StreamingPullRequest.new @total_message_bytes = 0 end def ack ack_id @request.ack_ids << ack_id @total_message_bytes += addl_ack_bytes ack_id end def try_ack ack_id addl_bytes = addl_ack_bytes ack_id return false if total_message_bytes + addl_bytes >= @max_bytes ack ack_id true end def addl_ack_bytes ack_id ack_id.bytesize + 2 end def delay deadline, ack_id @request.modify_deadline_seconds << deadline @request.modify_deadline_ack_ids << ack_id @total_message_bytes += addl_delay_bytes deadline, ack_id end def try_delay deadline, ack_id addl_bytes = addl_delay_bytes deadline, ack_id return false if total_message_bytes + addl_bytes >= @max_bytes delay deadline, ack_id true end def addl_delay_bytes deadline, ack_id bytes_for_int(deadline) + ack_id.bytesize + 4 end def bytes_for_int num # Ruby 2.0 does not have Integer#bit_length return [num].pack("s").bytesize unless num.respond_to? :bit_length (num.bit_length / 8.0).ceil end def ready? total_message_bytes >= @max_bytes end def total_message_bytes @total_message_bytes end end end end end end end