ext/bindings/consumer.cpp in pulsar-client-2.4.1.pre.beta.4 vs ext/bindings/consumer.cpp in pulsar-client-2.6.1.pre.beta.1
- old
+ new
@@ -14,10 +14,15 @@
unsigned int timeout_ms;
pulsar::Message message;
pulsar::Result result;
} consumer_receive_job;
+typedef struct {
+ pulsar::Consumer& consumer;
+ pulsar::Result result;
+} consumer_close_task;
+
void* consumer_receive_nogvl(void* jobPtr) {
consumer_receive_job& job = *(consumer_receive_job*)jobPtr;
if (job.timeout_ms > 0) {
job.result = job.consumer.receive(job.message, job.timeout_ms);
} else {
@@ -44,19 +49,32 @@
void Consumer::negative_acknowledge(const Message& message) {
_consumer.negativeAcknowledge(message._msg);
}
+void* consumer_close_worker(void* taskPtr) {
+ consumer_close_task& task = *(consumer_close_task*)taskPtr;
+ task.result = task.consumer.close();
+ return nullptr;
}
+void Consumer::close() {
+ consumer_close_task task = { _consumer };
+ rb_thread_call_without_gvl(&consumer_close_worker, &task, RUBY_UBF_IO, nullptr);
+ CheckResult(task.result);
+}
+
+}
+
using namespace Rice;
void bind_consumer(Module &module) {
define_class_under<pulsar_rb::Consumer>(module, "Consumer")
.define_constructor(Constructor<pulsar_rb::Consumer>())
.define_method("receive", &pulsar_rb::Consumer::receive, (Arg("timeout_ms") = 0))
.define_method("acknowledge", &pulsar_rb::Consumer::acknowledge)
.define_method("negative_acknowledge", &pulsar_rb::Consumer::negative_acknowledge)
+ .define_method("close", &pulsar_rb::Consumer::close)
;
define_enum<pulsar::ConsumerType>("ConsumerType", module)
.define_value("Exclusive", ConsumerExclusive)
.define_value("Shared", ConsumerShared)