ext/bindings/consumer.cpp in pulsar-client-2.4.1.pre.beta.1 vs ext/bindings/consumer.cpp in pulsar-client-2.4.1.pre.beta.2
- old
+ new
@@ -9,29 +9,34 @@
namespace pulsar_rb {
typedef struct {
pulsar::Consumer& consumer;
+ unsigned int timeout_ms;
pulsar::Message message;
pulsar::Result result;
} consumer_receive_job;
void* consumer_receive_nogvl(void* jobPtr) {
consumer_receive_job& job = *(consumer_receive_job*)jobPtr;
- job.result = job.consumer.receive(job.message);
+ if (job.timeout_ms > 0) {
+ job.result = job.consumer.receive(job.message, job.timeout_ms);
+ } else {
+ job.result = job.consumer.receive(job.message);
+ }
return nullptr;
}
-pulsar::Message consumer_receive(pulsar::Consumer& consumer) {
- consumer_receive_job job = { consumer };
+pulsar::Message consumer_receive(pulsar::Consumer& consumer, unsigned int timeout_ms) {
+ consumer_receive_job job = { consumer, timeout_ms };
rb_thread_call_without_gvl(&consumer_receive_nogvl, &job, RUBY_UBF_IO, nullptr);
CheckResult(job.result);
return job.message;
}
-Message::ptr Consumer::receive() {
- pulsar::Message message = consumer_receive(_consumer);
+Message::ptr Consumer::receive(unsigned int timeout_ms) {
+ pulsar::Message message = consumer_receive(_consumer, timeout_ms);
return Message::ptr(new Message(message));
}
void Consumer::acknowledge(const Message& message) {
_consumer.acknowledgeAsync(message._msg, nullptr);
@@ -46,10 +51,10 @@
using namespace Rice;
void bind_consumer(Module &module) {
define_class_under<pulsar_rb::Consumer>(module, "Consumer")
.define_constructor(Constructor<pulsar_rb::Consumer>())
- .define_method("receive", &pulsar_rb::Consumer::receive)
+ .define_method("receive", &pulsar_rb::Consumer::receive, (Arg("timeout_ms") = 0))
.define_method("acknowledge", &pulsar_rb::Consumer::acknowledge)
.define_method("negative_acknowledge", &pulsar_rb::Consumer::negative_acknowledge)
;
define_enum<pulsar::ConsumerType>("ConsumerType", module)