lib/nats/io/js.rb in nats-pure-2.0.0 vs lib/nats/io/js.rb in nats-pure-2.1.0
- old
+ new
@@ -282,10 +282,11 @@
else
config
end
stream = config[:name]
raise ArgumentError.new(":name is required to create streams") unless stream
+ raise ArgumentError.new("Spaces, tabs, period (.), greater than (>) or asterisk (*) are prohibited in stream names") if stream =~ /(\s|\.|\>|\*)/
req_subject = "#{@prefix}.STREAM.CREATE.#{stream}"
result = api_request(req_subject, config.to_json, params)
JetStream::API::StreamCreateResponse.new(result)
end
@@ -343,10 +344,11 @@
req = {
stream_name: stream,
config: config
}
+
result = api_request(req_subject, req.to_json, params)
JetStream::API::ConsumerInfo.new(result).freeze
end
# consumer_info retrieves the current status of a consumer.
@@ -500,10 +502,11 @@
# Check if there is any pending message in the queue that is
# ready to be consumed.
synchronize do
unless @pending_queue.empty?
msg = @pending_queue.pop
+ @pending_size -= msg.data.size
# Check for a no msgs response status.
if JS.is_status_msg(msg)
case msg.header["Status"]
when JS::Status::NoMsgs
msg = nil
@@ -525,12 +528,17 @@
@nc.publish(@jsi.nms, JS.next_req_to_json(next_req), @subject)
# Wait for result of fetch or timeout.
synchronize { wait_for_msgs_cond.wait(timeout) }
- msgs << @pending_queue.pop unless @pending_queue.empty?
+ unless @pending_queue.empty?
+ msg = @pending_queue.pop
+ @pending_size -= msg.data.size
+ msgs << msg
+ end
+
duration = MonotonicTime.since(t)
if duration > timeout
raise ::NATS::Timeout.new("nats: fetch timeout")
end
@@ -554,10 +562,11 @@
# Check if there already enough in the pending buffer.
synchronize do
if batch <= @pending_queue.size
batch.times do
msg = @pending_queue.pop
+ @pending_size -= msg.data.size
# Check for a no msgs response status.
if JS.is_status_msg(msg)
case msg.header[JS::Header::Status]
when JS::Status::NoMsgs, JS::Status::RequestTimeout
@@ -580,15 +589,20 @@
@nc.publish(@jsi.nms, JS.next_req_to_json(next_req), @subject)
# Not receiving even one is a timeout.
start_time = MonotonicTime.now
msg = nil
- synchronize {
+
+ synchronize do
wait_for_msgs_cond.wait(timeout)
- msg = @pending_queue.pop unless @pending_queue.empty?
- }
+ unless @pending_queue.empty?
+ msg = @pending_queue.pop
+ @pending_size -= msg.data.size
+ end
+ end
+
# Check if the first message was a response saying that
# there are no messages.
if !msg.nil? && JS.is_status_msg(msg)
case msg.header[JS::Header::Status]
when JS::Status::NoMsgs
@@ -628,9 +642,10 @@
if msgs.empty? && @pending_queue.empty? and duration > timeout
raise NATS::Timeout.new("nats: fetch timeout")
end
else
msg = @pending_queue.pop
+ @pending_size -= msg.data.size
if JS.is_status_msg(msg)
case msg.header[JS::Header::Status]
when JS::Status::NoMsgs, JS::Status::RequestTimeout
duration = MonotonicTime.since(start_time)