// Copyright 2021 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include #include "src/core/lib/resource_quota/memory_quota.h" #include #include #include #include #include #include "absl/status/status.h" #include "absl/strings/str_cat.h" #include "absl/utility/utility.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/mpscq.h" #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" #include "src/core/lib/promise/loop.h" #include "src/core/lib/promise/map.h" #include "src/core/lib/promise/race.h" #include "src/core/lib/promise/seq.h" #include "src/core/lib/resource_quota/trace.h" namespace grpc_core { // Maximum number of bytes an allocator will request from a quota in one step. // Larger allocations than this will require multiple allocation requests. static constexpr size_t kMaxReplenishBytes = 1024 * 1024; // Minimum number of bytes an allocator will request from a quota in one step. static constexpr size_t kMinReplenishBytes = 4096; // // Reclaimer // ReclamationSweep::~ReclamationSweep() { if (memory_quota_ != nullptr) { memory_quota_->FinishReclamation(sweep_token_, std::move(waker_)); } } // // ReclaimerQueue // struct ReclaimerQueue::QueuedNode : public MultiProducerSingleConsumerQueue::Node { explicit QueuedNode(RefCountedPtr reclaimer_handle) : reclaimer_handle(std::move(reclaimer_handle)) {} RefCountedPtr reclaimer_handle; }; struct ReclaimerQueue::State { Mutex reader_mu; MultiProducerSingleConsumerQueue queue; // reader_mu must be held to pop Waker waker ABSL_GUARDED_BY(reader_mu); ~State() { bool empty = false; do { delete static_cast(queue.PopAndCheckEnd(&empty)); } while (!empty); } }; void ReclaimerQueue::Handle::Orphan() { if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) { sweep->RunAndDelete(absl::nullopt); } Unref(); } void ReclaimerQueue::Handle::Run(ReclamationSweep reclamation_sweep) { if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) { sweep->RunAndDelete(std::move(reclamation_sweep)); } } bool ReclaimerQueue::Handle::Requeue(ReclaimerQueue* new_queue) { if (sweep_.load(std::memory_order_relaxed)) { new_queue->Enqueue(Ref()); return true; } else { return false; } } void ReclaimerQueue::Handle::Sweep::MarkCancelled() { // When we cancel a reclaimer we rotate the elements of the queue once - // taking one non-cancelled node from the start, and placing it on the end. // This ensures that we don't suffer from head of line blocking whereby a // non-cancelled reclaimer at the head of the queue, in the absence of memory // pressure, prevents the remainder of the queue from being cleaned up. MutexLock lock(&state_->reader_mu); while (true) { bool empty = false; std::unique_ptr node( static_cast(state_->queue.PopAndCheckEnd(&empty))); if (node == nullptr) break; if (node->reclaimer_handle->sweep_.load(std::memory_order_relaxed) != nullptr) { state_->queue.Push(node.release()); break; } } } ReclaimerQueue::ReclaimerQueue() : state_(std::make_shared()) {} ReclaimerQueue::~ReclaimerQueue() = default; void ReclaimerQueue::Enqueue(RefCountedPtr handle) { if (state_->queue.Push(new QueuedNode(std::move(handle)))) { MutexLock lock(&state_->reader_mu); state_->waker.Wakeup(); } } Poll> ReclaimerQueue::PollNext() { MutexLock lock(&state_->reader_mu); bool empty = false; // Try to pull from the queue. std::unique_ptr node( static_cast(state_->queue.PopAndCheckEnd(&empty))); // If we get something, great. if (node != nullptr) return std::move(node->reclaimer_handle); if (!empty) { // If we don't, but the queue is probably not empty, schedule an immediate // repoll. Activity::current()->ForceImmediateRepoll(); } else { // Otherwise, schedule a wakeup for whenever something is pushed. state_->waker = Activity::current()->MakeNonOwningWaker(); } return Pending{}; } // // GrpcMemoryAllocatorImpl // GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl( std::shared_ptr memory_quota, std::string name) : memory_quota_(memory_quota), name_(std::move(name)) { memory_quota_->Take(taken_bytes_); } GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() { GPR_ASSERT(free_bytes_.load(std::memory_order_acquire) + sizeof(GrpcMemoryAllocatorImpl) == taken_bytes_); memory_quota_->Return(taken_bytes_); } void GrpcMemoryAllocatorImpl::Shutdown() { std::shared_ptr memory_quota; OrphanablePtr reclamation_handles[kNumReclamationPasses]; { MutexLock lock(&memory_quota_mu_); GPR_ASSERT(!shutdown_); shutdown_ = true; memory_quota = memory_quota_; for (size_t i = 0; i < kNumReclamationPasses; i++) { reclamation_handles[i] = absl::exchange(reclamation_handles_[i], nullptr); } } } size_t GrpcMemoryAllocatorImpl::Reserve(MemoryRequest request) { // Validate request - performed here so we don't bloat the generated code with // inlined asserts. GPR_ASSERT(request.min() <= request.max()); GPR_ASSERT(request.max() <= MemoryRequest::max_allowed_size()); while (true) { // Attempt to reserve memory from our pool. auto reservation = TryReserve(request); if (reservation.has_value()) { return *reservation; } // If that failed, grab more from the quota and retry. Replenish(); } } absl::optional GrpcMemoryAllocatorImpl::TryReserve( MemoryRequest request) { // How much memory should we request? (see the scaling below) size_t scaled_size_over_min = request.max() - request.min(); // Scale the request down according to memory pressure if we have that // flexibility. if (scaled_size_over_min != 0) { double pressure; size_t max_recommended_allocation_size; { MutexLock lock(&memory_quota_mu_); const auto pressure_and_max_recommended_allocation_size = memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize(); pressure = pressure_and_max_recommended_allocation_size.first; max_recommended_allocation_size = pressure_and_max_recommended_allocation_size.second; } // Reduce allocation size proportional to the pressure > 80% usage. if (pressure > 0.8) { scaled_size_over_min = std::min(scaled_size_over_min, static_cast((request.max() - request.min()) * (1.0 - pressure) / 0.2)); } if (max_recommended_allocation_size < request.min()) { scaled_size_over_min = 0; } else if (request.min() + scaled_size_over_min > max_recommended_allocation_size) { scaled_size_over_min = max_recommended_allocation_size - request.min(); } } // How much do we want to reserve? const size_t reserve = request.min() + scaled_size_over_min; // See how many bytes are available. size_t available = free_bytes_.load(std::memory_order_acquire); while (true) { // Does the current free pool satisfy the request? if (available < reserve) { return {}; } // Try to reserve the requested amount. // If the amount of free memory changed through this loop, then available // will be set to the new value and we'll repeat. if (free_bytes_.compare_exchange_weak(available, available - reserve, std::memory_order_acq_rel, std::memory_order_acquire)) { return reserve; } } } void GrpcMemoryAllocatorImpl::MaybeDonateBack() { size_t free = free_bytes_.load(std::memory_order_relaxed); const size_t kReduceToSize = kMaxQuotaBufferSize / 2; while (true) { if (free <= kReduceToSize) return; size_t ret = free - kReduceToSize; if (free_bytes_.compare_exchange_weak(free, kReduceToSize, std::memory_order_acq_rel, std::memory_order_acquire)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { gpr_log(GPR_INFO, "[%p|%s] Early return %" PRIdPTR " bytes", this, name_.c_str(), ret); } MutexLock lock(&memory_quota_mu_); GPR_ASSERT(taken_bytes_ >= ret); taken_bytes_ -= ret; memory_quota_->Return(ret); return; } } } void GrpcMemoryAllocatorImpl::Replenish() { MutexLock lock(&memory_quota_mu_); GPR_ASSERT(!shutdown_); // Attempt a fairly low rate exponential growth request size, bounded between // some reasonable limits declared at top of file. auto amount = Clamp(taken_bytes_ / 3, kMinReplenishBytes, kMaxReplenishBytes); // Take the requested amount from the quota. memory_quota_->Take(amount); // Record that we've taken it. taken_bytes_ += amount; // Add the taken amount to the free pool. free_bytes_.fetch_add(amount, std::memory_order_acq_rel); // See if we can add ourselves as a reclaimer. MaybeRegisterReclaimerLocked(); } void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() { MutexLock lock(&memory_quota_mu_); MaybeRegisterReclaimerLocked(); } void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimerLocked() { // If the reclaimer is already registered, then there's nothing to do. if (registered_reclaimer_) return; if (shutdown_) return; // Grab references to the things we'll need auto self = shared_from_this(); std::weak_ptr self_weak{self}; registered_reclaimer_ = true; InsertReclaimer(0, [self_weak](absl::optional sweep) { if (!sweep.has_value()) return; auto self = self_weak.lock(); if (self == nullptr) return; auto* p = static_cast(self.get()); MutexLock lock(&p->memory_quota_mu_); p->registered_reclaimer_ = false; // Figure out how many bytes we can return to the quota. size_t return_bytes = p->free_bytes_.exchange(0, std::memory_order_acq_rel); if (return_bytes == 0) return; // Subtract that from our outstanding balance. p->taken_bytes_ -= return_bytes; // And return them to the quota. p->memory_quota_->Return(return_bytes); }); } void GrpcMemoryAllocatorImpl::Rebind( std::shared_ptr memory_quota) { MutexLock lock(&memory_quota_mu_); GPR_ASSERT(!shutdown_); if (memory_quota_ == memory_quota) return; // Return memory to the original memory quota. memory_quota_->Return(taken_bytes_); // Reassign any queued reclaimers for (size_t i = 0; i < kNumReclamationPasses; i++) { if (reclamation_handles_[i] != nullptr) { reclamation_handles_[i]->Requeue(memory_quota->reclaimer_queue(i)); } } // Switch to the new memory quota, leaving the old one in memory_quota so that // when we unref it, we are outside of lock. memory_quota_.swap(memory_quota); // Drop our freed memory down to zero, to avoid needing to ask the new // quota for memory we're not currently using. taken_bytes_ -= free_bytes_.exchange(0, std::memory_order_acq_rel); // And let the new quota know how much we're already using. memory_quota_->Take(taken_bytes_); } // // MemoryOwner // void MemoryOwner::Rebind(MemoryQuota* quota) { impl()->Rebind(quota->memory_quota_); } // // BasicMemoryQuota // class BasicMemoryQuota::WaitForSweepPromise { public: WaitForSweepPromise(std::shared_ptr memory_quota, uint64_t token) : memory_quota_(std::move(memory_quota)), token_(token) {} struct Empty {}; Poll operator()() { if (memory_quota_->reclamation_counter_.load(std::memory_order_relaxed) != token_) { return Empty{}; } else { return Pending{}; } } private: std::shared_ptr memory_quota_; uint64_t token_; }; void BasicMemoryQuota::Start() { auto self = shared_from_this(); // Reclamation loop: // basically, wait until we are in overcommit (free_bytes_ < 0), and then: // while (free_bytes_ < 0) reclaim_memory() // ... and repeat auto reclamation_loop = Loop(Seq( [self]() -> Poll { // If there's free memory we no longer need to reclaim memory! if (self->free_bytes_.load(std::memory_order_acquire) > 0) { return Pending{}; } return 0; }, [self]() { // Race biases to the first thing that completes... so this will // choose the highest priority/least destructive thing to do that's // available. auto annotate = [](const char* name) { return [name](RefCountedPtr f) { return std::make_tuple(name, std::move(f)); }; }; return Race(Map(self->reclaimers_[0].Next(), annotate("compact")), Map(self->reclaimers_[1].Next(), annotate("benign")), Map(self->reclaimers_[2].Next(), annotate("idle")), Map(self->reclaimers_[3].Next(), annotate("destructive"))); }, [self]( std::tuple> arg) { auto reclaimer = std::move(std::get<1>(arg)); if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { double free = std::max(intptr_t(0), self->free_bytes_.load()); size_t quota_size = self->quota_size_.load(); gpr_log(GPR_INFO, "RQ: %s perform %s reclamation. Available free bytes: %f, " "total quota_size: %zu", self->name_.c_str(), std::get<0>(arg), free, quota_size); } // One of the reclaimer queues gave us a way to get back memory. // Call the reclaimer with a token that contains enough to wake us // up again. const uint64_t token = self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) + 1; reclaimer->Run(ReclamationSweep( self, token, Activity::current()->MakeNonOwningWaker())); // Return a promise that will wait for our barrier. This will be // awoken by the token above being destroyed. So, once that token is // destroyed, we'll be able to proceed. return WaitForSweepPromise(self, token); }, []() -> LoopCtl { // Continue the loop! return Continue{}; })); reclaimer_activity_ = MakeActivity(std::move(reclamation_loop), ExecCtxWakeupScheduler(), [](absl::Status status) { GPR_ASSERT(status.code() == absl::StatusCode::kCancelled); }); } void BasicMemoryQuota::Stop() { reclaimer_activity_.reset(); } void BasicMemoryQuota::SetSize(size_t new_size) { size_t old_size = quota_size_.exchange(new_size, std::memory_order_relaxed); if (old_size < new_size) { // We're growing the quota. Return(new_size - old_size); } else { // We're shrinking the quota. Take(old_size - new_size); } } void BasicMemoryQuota::Take(size_t amount) { // If there's a request for nothing, then do nothing! if (amount == 0) return; GPR_DEBUG_ASSERT(amount <= std::numeric_limits::max()); // Grab memory from the quota. auto prior = free_bytes_.fetch_sub(amount, std::memory_order_acq_rel); // If we push into overcommit, awake the reclaimer. if (prior >= 0 && prior < static_cast(amount)) { if (reclaimer_activity_ != nullptr) reclaimer_activity_->ForceWakeup(); } } void BasicMemoryQuota::FinishReclamation(uint64_t token, Waker waker) { uint64_t current = reclamation_counter_.load(std::memory_order_relaxed); if (current != token) return; if (reclamation_counter_.compare_exchange_strong(current, current + 1, std::memory_order_relaxed, std::memory_order_relaxed)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { double free = std::max(intptr_t(0), free_bytes_.load()); size_t quota_size = quota_size_.load(); gpr_log(GPR_INFO, "RQ: %s reclamation complete. Available free bytes: %f, " "total quota_size: %zu", name_.c_str(), free, quota_size); } waker.Wakeup(); } } void BasicMemoryQuota::Return(size_t amount) { free_bytes_.fetch_add(amount, std::memory_order_relaxed); } std::pair BasicMemoryQuota::InstantaneousPressureAndMaxRecommendedAllocationSize() const { double free = free_bytes_.load(); if (free < 0) free = 0; size_t quota_size = quota_size_.load(); double size = quota_size; if (size < 1) return std::make_pair(1.0, 1); double pressure = (size - free) / size; if (pressure < 0.0) pressure = 0.0; if (pressure > 1.0) pressure = 1.0; return std::make_pair(pressure, quota_size / 16); } // // MemoryQuota // MemoryAllocator MemoryQuota::CreateMemoryAllocator(absl::string_view name) { auto impl = std::make_shared( memory_quota_, absl::StrCat(memory_quota_->name(), "/allocator/", name)); return MemoryAllocator(std::move(impl)); } MemoryOwner MemoryQuota::CreateMemoryOwner(absl::string_view name) { auto impl = std::make_shared( memory_quota_, absl::StrCat(memory_quota_->name(), "/owner/", name)); return MemoryOwner(std::move(impl)); } } // namespace grpc_core