lib/nats/io/js.rb in nats-pure-2.0.0 vs lib/nats/io/js.rb in nats-pure-2.1.0

- old
+ new

@@ -282,10 +282,11 @@ else config end stream = config[:name] raise ArgumentError.new(":name is required to create streams") unless stream + raise ArgumentError.new("Spaces, tabs, period (.), greater than (>) or asterisk (*) are prohibited in stream names") if stream =~ /(\s|\.|\>|\*)/ req_subject = "#{@prefix}.STREAM.CREATE.#{stream}" result = api_request(req_subject, config.to_json, params) JetStream::API::StreamCreateResponse.new(result) end @@ -343,10 +344,11 @@ req = { stream_name: stream, config: config } + result = api_request(req_subject, req.to_json, params) JetStream::API::ConsumerInfo.new(result).freeze end # consumer_info retrieves the current status of a consumer. @@ -500,10 +502,11 @@ # Check if there is any pending message in the queue that is # ready to be consumed. synchronize do unless @pending_queue.empty? msg = @pending_queue.pop + @pending_size -= msg.data.size # Check for a no msgs response status. if JS.is_status_msg(msg) case msg.header["Status"] when JS::Status::NoMsgs msg = nil @@ -525,12 +528,17 @@ @nc.publish(@jsi.nms, JS.next_req_to_json(next_req), @subject) # Wait for result of fetch or timeout. synchronize { wait_for_msgs_cond.wait(timeout) } - msgs << @pending_queue.pop unless @pending_queue.empty? + unless @pending_queue.empty? + msg = @pending_queue.pop + @pending_size -= msg.data.size + msgs << msg + end + duration = MonotonicTime.since(t) if duration > timeout raise ::NATS::Timeout.new("nats: fetch timeout") end @@ -554,10 +562,11 @@ # Check if there already enough in the pending buffer. synchronize do if batch <= @pending_queue.size batch.times do msg = @pending_queue.pop + @pending_size -= msg.data.size # Check for a no msgs response status. if JS.is_status_msg(msg) case msg.header[JS::Header::Status] when JS::Status::NoMsgs, JS::Status::RequestTimeout @@ -580,15 +589,20 @@ @nc.publish(@jsi.nms, JS.next_req_to_json(next_req), @subject) # Not receiving even one is a timeout. start_time = MonotonicTime.now msg = nil - synchronize { + + synchronize do wait_for_msgs_cond.wait(timeout) - msg = @pending_queue.pop unless @pending_queue.empty? - } + unless @pending_queue.empty? + msg = @pending_queue.pop + @pending_size -= msg.data.size + end + end + # Check if the first message was a response saying that # there are no messages. if !msg.nil? && JS.is_status_msg(msg) case msg.header[JS::Header::Status] when JS::Status::NoMsgs @@ -628,9 +642,10 @@ if msgs.empty? && @pending_queue.empty? and duration > timeout raise NATS::Timeout.new("nats: fetch timeout") end else msg = @pending_queue.pop + @pending_size -= msg.data.size if JS.is_status_msg(msg) case msg.header[JS::Header::Status] when JS::Status::NoMsgs, JS::Status::RequestTimeout duration = MonotonicTime.since(start_time)