/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "src/core/lib/transport/byte_stream.h" #include #include #include #include #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_refcount.h" namespace grpc_core { // // SliceBufferByteStream // SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags) : ByteStream(static_cast(slice_buffer->length), flags) { GPR_ASSERT(slice_buffer->length <= UINT32_MAX); grpc_slice_buffer_init(&backing_buffer_); grpc_slice_buffer_swap(slice_buffer, &backing_buffer_); if (backing_buffer_.count == 0) { grpc_slice_buffer_add_indexed(&backing_buffer_, grpc_empty_slice()); GPR_ASSERT(backing_buffer_.count > 0); } } SliceBufferByteStream::~SliceBufferByteStream() {} void SliceBufferByteStream::Orphan() { grpc_slice_buffer_destroy_internal(&backing_buffer_); GRPC_ERROR_UNREF(shutdown_error_); shutdown_error_ = GRPC_ERROR_NONE; // Note: We do not actually delete the object here, since // SliceBufferByteStream is usually allocated as part of a larger // object and has an OrphanablePtr of itself passed down through the // filter stack. } bool SliceBufferByteStream::Next(size_t /*max_size_hint*/, grpc_closure* /*on_complete*/) { GPR_DEBUG_ASSERT(backing_buffer_.count > 0); return true; } grpc_error_handle SliceBufferByteStream::Pull(grpc_slice* slice) { if (GPR_UNLIKELY(shutdown_error_ != GRPC_ERROR_NONE)) { return GRPC_ERROR_REF(shutdown_error_); } *slice = grpc_slice_buffer_take_first(&backing_buffer_); return GRPC_ERROR_NONE; } void SliceBufferByteStream::Shutdown(grpc_error_handle error) { GRPC_ERROR_UNREF(shutdown_error_); shutdown_error_ = error; } // // ByteStreamCache // ByteStreamCache::ByteStreamCache(OrphanablePtr underlying_stream) : underlying_stream_(std::move(underlying_stream)), length_(underlying_stream_->length()), flags_(underlying_stream_->flags()) { grpc_slice_buffer_init(&cache_buffer_); } ByteStreamCache::~ByteStreamCache() { Destroy(); } void ByteStreamCache::Destroy() { underlying_stream_.reset(); if (cache_buffer_.length > 0) { grpc_slice_buffer_destroy_internal(&cache_buffer_); } } // // ByteStreamCache::CachingByteStream // ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache) : ByteStream(cache->length_, cache->flags_), cache_(cache) {} ByteStreamCache::CachingByteStream::~CachingByteStream() {} void ByteStreamCache::CachingByteStream::Orphan() { GRPC_ERROR_UNREF(shutdown_error_); shutdown_error_ = GRPC_ERROR_NONE; // Note: We do not actually delete the object here, since // CachingByteStream is usually allocated as part of a larger // object and has an OrphanablePtr of itself passed down through the // filter stack. } bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint, grpc_closure* on_complete) { if (shutdown_error_ != GRPC_ERROR_NONE) return true; if (cursor_ < cache_->cache_buffer_.count) return true; GPR_ASSERT(cache_->underlying_stream_ != nullptr); return cache_->underlying_stream_->Next(max_size_hint, on_complete); } grpc_error_handle ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) { if (shutdown_error_ != GRPC_ERROR_NONE) { return GRPC_ERROR_REF(shutdown_error_); } if (cursor_ < cache_->cache_buffer_.count) { *slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]); ++cursor_; offset_ += GRPC_SLICE_LENGTH(*slice); return GRPC_ERROR_NONE; } GPR_ASSERT(cache_->underlying_stream_ != nullptr); grpc_error_handle error = cache_->underlying_stream_->Pull(slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&cache_->cache_buffer_, grpc_slice_ref_internal(*slice)); ++cursor_; offset_ += GRPC_SLICE_LENGTH(*slice); // Orphan the underlying stream if it's been drained. if (offset_ == cache_->underlying_stream_->length()) { cache_->underlying_stream_.reset(); } } return error; } void ByteStreamCache::CachingByteStream::Shutdown(grpc_error_handle error) { GRPC_ERROR_UNREF(shutdown_error_); shutdown_error_ = GRPC_ERROR_REF(error); if (cache_->underlying_stream_ != nullptr) { cache_->underlying_stream_->Shutdown(error); } } void ByteStreamCache::CachingByteStream::Reset() { cursor_ = 0; offset_ = 0; } } // namespace grpc_core