ext/bindings/consumer.cpp in pulsar-client-2.4.1.pre.beta.4 vs ext/bindings/consumer.cpp in pulsar-client-2.6.1.pre.beta.1

- old
+ new

@@ -14,10 +14,15 @@ unsigned int timeout_ms; pulsar::Message message; pulsar::Result result; } consumer_receive_job; +typedef struct { + pulsar::Consumer& consumer; + pulsar::Result result; +} consumer_close_task; + void* consumer_receive_nogvl(void* jobPtr) { consumer_receive_job& job = *(consumer_receive_job*)jobPtr; if (job.timeout_ms > 0) { job.result = job.consumer.receive(job.message, job.timeout_ms); } else { @@ -44,19 +49,32 @@ void Consumer::negative_acknowledge(const Message& message) { _consumer.negativeAcknowledge(message._msg); } +void* consumer_close_worker(void* taskPtr) { + consumer_close_task& task = *(consumer_close_task*)taskPtr; + task.result = task.consumer.close(); + return nullptr; } +void Consumer::close() { + consumer_close_task task = { _consumer }; + rb_thread_call_without_gvl(&consumer_close_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); +} + +} + using namespace Rice; void bind_consumer(Module &module) { define_class_under<pulsar_rb::Consumer>(module, "Consumer") .define_constructor(Constructor<pulsar_rb::Consumer>()) .define_method("receive", &pulsar_rb::Consumer::receive, (Arg("timeout_ms") = 0)) .define_method("acknowledge", &pulsar_rb::Consumer::acknowledge) .define_method("negative_acknowledge", &pulsar_rb::Consumer::negative_acknowledge) + .define_method("close", &pulsar_rb::Consumer::close) ; define_enum<pulsar::ConsumerType>("ConsumerType", module) .define_value("Exclusive", ConsumerExclusive) .define_value("Shared", ConsumerShared)