#include "rice/Data_Type.hpp"
#include "rice/Enum.hpp"
#include "rice/Constructor.hpp"
#include <pulsar/Client.h>
#include <ruby/thread.h>

#include "consumer.hpp"
#include "util.hpp"

namespace pulsar_rb {

typedef struct {
  pulsar::Consumer& consumer;
  unsigned int timeout_ms;
  pulsar::Message message;
  pulsar::Result result;
} consumer_receive_job;

typedef struct {
  pulsar::Consumer& consumer;
  pulsar::Result result;
} consumer_close_task;

void* consumer_receive_nogvl(void* jobPtr) {
  consumer_receive_job& job = *(consumer_receive_job*)jobPtr;
  if (job.timeout_ms > 0) {
    job.result = job.consumer.receive(job.message, job.timeout_ms);
  } else {
    job.result = job.consumer.receive(job.message);
  }
  return nullptr;
}

pulsar::Message consumer_receive(pulsar::Consumer& consumer, unsigned int timeout_ms) {
  consumer_receive_job job = { consumer, timeout_ms };
  rb_thread_call_without_gvl(&consumer_receive_nogvl, &job, RUBY_UBF_IO, nullptr);
  CheckResult(job.result);
  return job.message;
}

Message::ptr Consumer::receive(unsigned int timeout_ms) {
  pulsar::Message message = consumer_receive(_consumer, timeout_ms);
  return Message::ptr(new Message(message));
}

void Consumer::acknowledge(const Message& message) {
  _consumer.acknowledgeAsync(message._msg, nullptr);
}

void Consumer::negative_acknowledge(const Message& message) {
  _consumer.negativeAcknowledge(message._msg);
}

void* consumer_close_worker(void* taskPtr) {
  consumer_close_task& task = *(consumer_close_task*)taskPtr;
  task.result = task.consumer.close();
  return nullptr;
}

void Consumer::close() {
  consumer_close_task task = { _consumer };
  rb_thread_call_without_gvl(&consumer_close_worker, &task, RUBY_UBF_IO, nullptr);
  CheckResult(task.result);
}

}

using namespace Rice;

void bind_consumer(Module &module) {
  define_class_under<pulsar_rb::Consumer>(module, "Consumer")
    .define_constructor(Constructor<pulsar_rb::Consumer>())
    .define_method("receive", &pulsar_rb::Consumer::receive, (Arg("timeout_ms") = 0))
    .define_method("acknowledge", &pulsar_rb::Consumer::acknowledge)
    .define_method("negative_acknowledge", &pulsar_rb::Consumer::negative_acknowledge)
    .define_method("close", &pulsar_rb::Consumer::close)
    ;

  define_enum<pulsar::ConsumerType>("ConsumerType", module)
    .define_value("Exclusive", ConsumerExclusive)
    .define_value("Shared", ConsumerShared)
    .define_value("Failover", ConsumerFailover)
    .define_value("KeyShared", ConsumerKeyShared);

  define_class_under<pulsar_rb::ConsumerConfiguration>(module, "ConsumerConfiguration")
    .define_constructor(Constructor<pulsar_rb::ConsumerConfiguration>())
    .define_method("consumer_type", &ConsumerConfiguration::getConsumerType)
    .define_method("consumer_type=", &ConsumerConfiguration::setConsumerType)
    // TODO .define_method("schema", &ConsumerConfiguration::getSchema)
    // TODO .define_method("schema=", &ConsumerConfiguration::setSchema)
    // TODO .define_method("message_listener", &ConsumerConfiguration_setMessageListener)
    .define_method("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize)
    .define_method("receiver_queue_size=", &ConsumerConfiguration::setReceiverQueueSize)
    .define_method("max_total_receiver_queue_size_across_partitions", &ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions)
    .define_method("max_total_receiver_queue_size_across_partitions=", &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
    .define_method("consumer_name", &ConsumerConfiguration::getConsumerName)
    .define_method("consumer_name=", &ConsumerConfiguration::setConsumerName)
    .define_method("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
    .define_method("unacked_messages_timeout_ms=", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs)
    .define_method("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::getNegativeAckRedeliveryDelayMs)
    .define_method("negative_ack_redelivery_delay_ms=", &ConsumerConfiguration::setNegativeAckRedeliveryDelayMs)
    .define_method("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs)
    .define_method("broker_consumer_stats_cache_time_ms=", &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
    .define_method("pattern_auto_discovery_period", &ConsumerConfiguration::getPatternAutoDiscoveryPeriod)
    .define_method("pattern_auto_discovery_period=", &ConsumerConfiguration::setPatternAutoDiscoveryPeriod)
    .define_method("read_compacted?", &ConsumerConfiguration::isReadCompacted)
    .define_method("read_compacted=", &ConsumerConfiguration::setReadCompacted)
    .define_method("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition)
    .define_method("subscription_initial_position=", &ConsumerConfiguration::setSubscriptionInitialPosition)
    .define_method("[]", &ConsumerConfiguration::getProperty)
    .define_method("[]=", &ConsumerConfiguration::setProperty)
    ;
}