ext/bindings/consumer.cpp in pulsar-client-2.4.1.pre.beta.1 vs ext/bindings/consumer.cpp in pulsar-client-2.4.1.pre.beta.2

- old
+ new

@@ -9,29 +9,34 @@ namespace pulsar_rb { typedef struct { pulsar::Consumer& consumer; + unsigned int timeout_ms; pulsar::Message message; pulsar::Result result; } consumer_receive_job; void* consumer_receive_nogvl(void* jobPtr) { consumer_receive_job& job = *(consumer_receive_job*)jobPtr; - job.result = job.consumer.receive(job.message); + if (job.timeout_ms > 0) { + job.result = job.consumer.receive(job.message, job.timeout_ms); + } else { + job.result = job.consumer.receive(job.message); + } return nullptr; } -pulsar::Message consumer_receive(pulsar::Consumer& consumer) { - consumer_receive_job job = { consumer }; +pulsar::Message consumer_receive(pulsar::Consumer& consumer, unsigned int timeout_ms) { + consumer_receive_job job = { consumer, timeout_ms }; rb_thread_call_without_gvl(&consumer_receive_nogvl, &job, RUBY_UBF_IO, nullptr); CheckResult(job.result); return job.message; } -Message::ptr Consumer::receive() { - pulsar::Message message = consumer_receive(_consumer); +Message::ptr Consumer::receive(unsigned int timeout_ms) { + pulsar::Message message = consumer_receive(_consumer, timeout_ms); return Message::ptr(new Message(message)); } void Consumer::acknowledge(const Message& message) { _consumer.acknowledgeAsync(message._msg, nullptr); @@ -46,10 +51,10 @@ using namespace Rice; void bind_consumer(Module &module) { define_class_under<pulsar_rb::Consumer>(module, "Consumer") .define_constructor(Constructor<pulsar_rb::Consumer>()) - .define_method("receive", &pulsar_rb::Consumer::receive) + .define_method("receive", &pulsar_rb::Consumer::receive, (Arg("timeout_ms") = 0)) .define_method("acknowledge", &pulsar_rb::Consumer::acknowledge) .define_method("negative_acknowledge", &pulsar_rb::Consumer::negative_acknowledge) ; define_enum<pulsar::ConsumerType>("ConsumerType", module)