//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include <grpc/support/port_platform.h>

#include "src/core/client_channel/dynamic_filters.h"

#include <stddef.h>

#include <new>
#include <utility>

#include "absl/status/statusor.h"

#include <grpc/support/log.h>

#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/lame_client.h"

// Conversion between call and call stack.
#define CALL_TO_CALL_STACK(call)                                     \
  (grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
                                         sizeof(DynamicFilters::Call)))
#define CALL_STACK_TO_CALL(callstack)                     \
  (DynamicFilters::Call*)(((char*)(call_stack)) -         \
                          GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
                              sizeof(DynamicFilters::Call)))

namespace grpc_core {

//
// DynamicFilters::Call
//

DynamicFilters::Call::Call(Args args, grpc_error_handle* error)
    : channel_stack_(std::move(args.channel_stack)) {
  grpc_call_stack* call_stack = CALL_TO_CALL_STACK(this);
  const grpc_call_element_args call_args = {
      call_stack,         // call_stack
      nullptr,            // server_transport_data
      args.context,       // context
      args.path,          // path
      args.start_time,    // start_time
      args.deadline,      // deadline
      args.arena,         // arena
      args.call_combiner  // call_combiner
  };
  *error = grpc_call_stack_init(channel_stack_->channel_stack_.get(), 1,
                                Destroy, this, &call_args);
  if (GPR_UNLIKELY(!error->ok())) {
    gpr_log(GPR_ERROR, "error: %s", StatusToString(*error).c_str());
    return;
  }
  grpc_call_stack_set_pollset_or_pollset_set(call_stack, args.pollent);
}

void DynamicFilters::Call::StartTransportStreamOpBatch(
    grpc_transport_stream_op_batch* batch) {
  grpc_call_stack* call_stack = CALL_TO_CALL_STACK(this);
  grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
  GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
  top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
}

void DynamicFilters::Call::SetAfterCallStackDestroy(grpc_closure* closure) {
  GPR_ASSERT(after_call_stack_destroy_ == nullptr);
  GPR_ASSERT(closure != nullptr);
  after_call_stack_destroy_ = closure;
}

RefCountedPtr<DynamicFilters::Call> DynamicFilters::Call::Ref() {
  IncrementRefCount();
  return RefCountedPtr<DynamicFilters::Call>(this);
}

RefCountedPtr<DynamicFilters::Call> DynamicFilters::Call::Ref(
    const DebugLocation& location, const char* reason) {
  IncrementRefCount(location, reason);
  return RefCountedPtr<DynamicFilters::Call>(this);
}

void DynamicFilters::Call::Unref() {
  GRPC_CALL_STACK_UNREF(CALL_TO_CALL_STACK(this), "dynamic-filters-unref");
}

void DynamicFilters::Call::Unref(const DebugLocation& /*location*/,
                                 const char* reason) {
  GRPC_CALL_STACK_UNREF(CALL_TO_CALL_STACK(this), reason);
}

void DynamicFilters::Call::Destroy(void* arg, grpc_error_handle /*error*/) {
  DynamicFilters::Call* self = static_cast<DynamicFilters::Call*>(arg);
  // Keep some members before destroying the subchannel call.
  grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
  RefCountedPtr<DynamicFilters> channel_stack = std::move(self->channel_stack_);
  // Destroy the subchannel call.
  self->~Call();
  // Destroy the call stack. This should be after destroying the call, because
  // call->after_call_stack_destroy(), if not null, will free the call arena.
  grpc_call_stack_destroy(CALL_TO_CALL_STACK(self), nullptr,
                          after_call_stack_destroy);
  // Automatically reset channel_stack. This should be after destroying the call
  // stack, because destroying call stack needs access to the channel stack.
}

void DynamicFilters::Call::IncrementRefCount() {
  GRPC_CALL_STACK_REF(CALL_TO_CALL_STACK(this), "");
}

void DynamicFilters::Call::IncrementRefCount(const DebugLocation& /*location*/,
                                             const char* reason) {
  GRPC_CALL_STACK_REF(CALL_TO_CALL_STACK(this), reason);
}

//
// DynamicFilters
//

namespace {

absl::StatusOr<RefCountedPtr<grpc_channel_stack>> CreateChannelStack(
    const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters) {
  ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC, args);
  for (auto filter : filters) {
    builder.AppendFilter(filter);
  }
  return builder.Build();
}

}  // namespace

RefCountedPtr<DynamicFilters> DynamicFilters::Create(
    const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters) {
  // Attempt to create channel stack from requested filters.
  auto p = CreateChannelStack(args, std::move(filters));
  if (!p.ok()) {
    // Channel stack creation failed with requested filters.
    // Create with lame filter instead.
    auto error = p.status();
    p = CreateChannelStack(args.Set(MakeLameClientErrorArg(&error)),
                           {&LameClientFilter::kFilter});
  }
  return MakeRefCounted<DynamicFilters>(std::move(p.value()));
}

RefCountedPtr<DynamicFilters::Call> DynamicFilters::CreateCall(
    DynamicFilters::Call::Args args, grpc_error_handle* error) {
  size_t allocation_size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(Call)) +
                           channel_stack_->call_stack_size;
  Call* call = static_cast<Call*>(args.arena->Alloc(allocation_size));
  new (call) Call(std::move(args), error);
  return RefCountedPtr<DynamicFilters::Call>(call);
}

}  // namespace grpc_core