# Copyright 2021 The NATS Authors # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require_relative 'msg' require_relative 'client' require_relative 'errors' require_relative 'kv' require 'time' require 'base64' module NATS # JetStream returns a context with a similar API as the NATS::Client # but with enhanced functions to persist and consume messages from # the NATS JetStream engine. # # @example # nc = NATS.connect("demo.nats.io") # js = nc.jetstream() # class JetStream # Create a new JetStream context for a NATS connection. # # @param conn [NATS::Client] # @param params [Hash] Options to customize JetStream context. # @option params [String] :prefix JetStream API prefix to use for the requests. # @option params [String] :domain JetStream Domain to use for the requests. # @option params [Float] :timeout Default timeout to use for JS requests. def initialize(conn, params={}) @nc = conn @prefix = if params[:prefix] params[:prefix] elsif params[:domain] "$JS.#{params[:domain]}.API" else JS::DefaultAPIPrefix end @opts = params @opts[:timeout] ||= 5 # seconds params[:prefix] = @prefix # Include JetStream::Manager extend Manager extend KeyValue::Manager end # PubAck is the API response from a successfully published message. # # @!attribute [stream] stream # @return [String] Name of the stream that processed the published message. # @!attribute [seq] seq # @return [Fixnum] Sequence of the message in the stream. # @!attribute [duplicate] duplicate # @return [Boolean] Indicates whether the published message is a duplicate. # @!attribute [domain] domain # @return [String] JetStream Domain that processed the ack response. PubAck = Struct.new(:stream, :seq, :duplicate, :domain, keyword_init: true) # publish produces a message for JetStream. # # @param subject [String] The subject from a stream where the message will be sent. # @param payload [String] The payload of the message. # @param params [Hash] Options to customize the publish message request. # @option params [Float] :timeout Time to wait for an PubAck response or an error. # @option params [Hash] :header NATS Headers to use for the message. # @option params [String] :stream Expected Stream to which the message is being published. # @raise [NATS::Timeout] When it takes too long to receive an ack response. # @return [PubAck] The pub ack response. def publish(subject, payload="", **params) params[:timeout] ||= @opts[:timeout] if params[:stream] params[:header] ||= {} params[:header][JS::Header::ExpectedStream] = params[:stream] end # Send message with headers. msg = NATS::Msg.new(subject: subject, data: payload, header: params[:header]) begin resp = @nc.request_msg(msg, **params) result = JSON.parse(resp.data, symbolize_names: true) rescue ::NATS::IO::NoRespondersError raise JetStream::Error::NoStreamResponse.new("nats: no response from stream") end raise JS.from_error(result[:error]) if result[:error] PubAck.new(result) end # subscribe binds or creates a push subscription to a JetStream pull consumer. # # @param subject [String] Subject from which the messages will be fetched. # @param params [Hash] Options to customize the PushSubscription. # @option params [String] :stream Name of the Stream to which the consumer belongs. # @option params [String] :consumer Name of the Consumer to which the PushSubscription will be bound. # @option params [String] :durable Consumer durable name from where the messages will be fetched. # @option params [Hash] :config Configuration for the consumer. # @return [NATS::JetStream::PushSubscription] def subscribe(subject, params={}, &cb) params[:consumer] ||= params[:durable] stream = params[:stream].nil? ? find_stream_name_by_subject(subject) : params[:stream] queue = params[:queue] durable = params[:durable] flow_control = params[:flow_control] manual_ack = params[:manual_ack] idle_heartbeat = params[:idle_heartbeat] flow_control = params[:flow_control] config = params[:config] if queue if durable and durable != queue raise NATS::JetStream::Error.new("nats: cannot create queue subscription '#{queue}' to consumer '#{durable}'") else durable = queue end end cinfo = nil consumer_found = false should_create = false if not durable should_create = true else begin cinfo = consumer_info(stream, durable) config = cinfo.config consumer_found = true consumer = durable rescue NATS::JetStream::Error::NotFound should_create = true consumer_found = false end end if consumer_found if not config.deliver_group if queue raise NATS::JetStream::Error.new("nats: cannot create a queue subscription for a consumer without a deliver group") elsif cinfo.push_bound raise NATS::JetStream::Error.new("nats: consumer is already bound to a subscription") end else if not queue raise NATS::JetStream::Error.new("nats: cannot create a subscription for a consumer with a deliver group #{config.deliver_group}") elsif queue != config.deliver_group raise NATS::JetStream::Error.new("nats: cannot create a queue subscription #{queue} for a consumer with a deliver group #{config.deliver_group}") end end elsif should_create # Auto-create consumer if none found. if config.nil? # Defaults config = JetStream::API::ConsumerConfig.new({ack_policy: "explicit"}) elsif config.is_a?(Hash) config = JetStream::API::ConsumerConfig.new(config) elsif !config.is_a?(JetStream::API::ConsumerConfig) raise NATS::JetStream::Error.new("nats: invalid ConsumerConfig") end config.durable_name = durable if not config.durable_name config.deliver_group = queue if not config.deliver_group # Create inbox for push consumer. deliver = @nc.new_inbox config.deliver_subject = deliver # Auto created consumers use the filter subject. config.filter_subject = subject # Heartbeats / FlowControl config.flow_control = flow_control if idle_heartbeat or config.idle_heartbeat idle_heartbeat = config.idle_heartbeat if config.idle_heartbeat idle_heartbeat = idle_heartbeat * ::NATS::NANOSECONDS config.idle_heartbeat = idle_heartbeat end # Auto create the consumer. cinfo = add_consumer(stream, config) consumer = cinfo.name end # Enable auto acking for async callbacks unless disabled. if cb and not manual_ack ocb = cb new_cb = proc do |msg| ocb.call(msg) msg.ack rescue JetStream::Error::MsgAlreadyAckd end cb = new_cb end sub = @nc.subscribe(config.deliver_subject, queue: config.deliver_group, &cb) sub.extend(PushSubscription) sub.jsi = JS::Sub.new( js: self, stream: stream, consumer: consumer, ) sub end # pull_subscribe binds or creates a subscription to a JetStream pull consumer. # # @param subject [String] Subject from which the messages will be fetched. # @param durable [String] Consumer durable name from where the messages will be fetched. # @param params [Hash] Options to customize the PullSubscription. # @option params [String] :stream Name of the Stream to which the consumer belongs. # @option params [String] :consumer Name of the Consumer to which the PullSubscription will be bound. # @option params [Hash] :config Configuration for the consumer. # @return [NATS::JetStream::PullSubscription] def pull_subscribe(subject, durable, params={}) if durable.empty? && !params[:consumer] raise JetStream::Error::InvalidDurableName.new("nats: invalid durable name") end params[:consumer] ||= durable stream = params[:stream].nil? ? find_stream_name_by_subject(subject) : params[:stream] begin consumer_info(stream, params[:consumer]) rescue NATS::JetStream::Error::NotFound => e # If attempting to bind, then this is a hard error. raise e if params[:stream] config = if not params[:config] JetStream::API::ConsumerConfig.new elsif params[:config].is_a?(JetStream::API::ConsumerConfig) params[:config] else JetStream::API::ConsumerConfig.new(params[:config]) end config[:durable_name] = durable config[:ack_policy] ||= JS::Config::AckExplicit add_consumer(stream, config) end deliver = @nc.new_inbox sub = @nc.subscribe(deliver) sub.extend(PullSubscription) consumer = params[:consumer] subject = "#{@prefix}.CONSUMER.MSG.NEXT.#{stream}.#{consumer}" sub.jsi = JS::Sub.new( js: self, stream: stream, consumer: params[:consumer], nms: subject ) sub end # A JetStream::Manager can be used to make requests to the JetStream API. # # @example # require 'nats/client' # # nc = NATS.connect("demo.nats.io") # # config = JetStream::API::StreamConfig.new() # nc.jsm.add_stream(config) # # module Manager # add_stream creates a stream with a given config. # @param config [JetStream::API::StreamConfig] Configuration of the stream to create. # @param params [Hash] Options to customize API request. # @option params [Float] :timeout Time to wait for response. # @return [JetStream::API::StreamCreateResponse] The result of creating a Stream. def add_stream(config, params={}) config = if not config.is_a?(JetStream::API::StreamConfig) JetStream::API::StreamConfig.new(config) else config end stream = config[:name] raise ArgumentError.new(":name is required to create streams") unless stream raise ArgumentError.new("Spaces, tabs, period (.), greater than (>) or asterisk (*) are prohibited in stream names") if stream =~ /(\s|\.|\>|\*)/ req_subject = "#{@prefix}.STREAM.CREATE.#{stream}" result = api_request(req_subject, config.to_json, params) JetStream::API::StreamCreateResponse.new(result) end # stream_info retrieves the current status of a stream. # @param stream [String] Name of the stream. # @param params [Hash] Options to customize API request. # @option params [Float] :timeout Time to wait for response. # @return [JetStream::API::StreamInfo] The latest StreamInfo of the stream. def stream_info(stream, params={}) raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty? req_subject = "#{@prefix}.STREAM.INFO.#{stream}" result = api_request(req_subject, '', params) JetStream::API::StreamInfo.new(result) end # delete_stream deletes a stream. # @param stream [String] Name of the stream. # @param params [Hash] Options to customize API request. # @option params [Float] :timeout Time to wait for response. # @return [Boolean] def delete_stream(stream, params={}) raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty? req_subject = "#{@prefix}.STREAM.DELETE.#{stream}" result = api_request(req_subject, '', params) result[:success] end # add_consumer creates a consumer with a given config. # @param stream [String] Name of the stream. # @param config [JetStream::API::ConsumerConfig] Configuration of the consumer to create. # @param params [Hash] Options to customize API request. # @option params [Float] :timeout Time to wait for response. # @return [JetStream::API::ConsumerInfo] The result of creating a Consumer. def add_consumer(stream, config, params={}) raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty? config = if not config.is_a?(JetStream::API::ConsumerConfig) JetStream::API::ConsumerConfig.new(config) else config end req_subject = case when config[:name] # NOTE: Only supported after nats-server v2.9.0 if config[:filter_subject] && config[:filter_subject] != ">" "#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}.#{config[:filter_subject]}" else "#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}" end when config[:durable_name] "#{@prefix}.CONSUMER.DURABLE.CREATE.#{stream}.#{config[:durable_name]}" else "#{@prefix}.CONSUMER.CREATE.#{stream}" end config[:ack_policy] ||= JS::Config::AckExplicit # Check if have to normalize ack wait so that it is in nanoseconds for Go compat. if config[:ack_wait] raise ArgumentError.new("nats: invalid ack wait") unless config[:ack_wait].is_a?(Integer) config[:ack_wait] = config[:ack_wait] * ::NATS::NANOSECONDS end if config[:inactive_threshold] raise ArgumentError.new("nats: invalid inactive threshold") unless config[:inactive_threshold].is_a?(Integer) config[:inactive_threshold] = config[:inactive_threshold] * ::NATS::NANOSECONDS end req = { stream_name: stream, config: config } result = api_request(req_subject, req.to_json, params) JetStream::API::ConsumerInfo.new(result).freeze end # consumer_info retrieves the current status of a consumer. # @param stream [String] Name of the stream. # @param consumer [String] Name of the consumer. # @param params [Hash] Options to customize API request. # @option params [Float] :timeout Time to wait for response. # @return [JetStream::API::ConsumerInfo] The latest ConsumerInfo of the consumer. def consumer_info(stream, consumer, params={}) raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty? raise JetStream::Error::InvalidConsumerName.new("nats: invalid consumer name") if consumer.nil? or consumer.empty? req_subject = "#{@prefix}.CONSUMER.INFO.#{stream}.#{consumer}" result = api_request(req_subject, '', params) JetStream::API::ConsumerInfo.new(result) end # delete_consumer deletes a consumer. # @param stream [String] Name of the stream. # @param consumer [String] Name of the consumer. # @param params [Hash] Options to customize API request. # @option params [Float] :timeout Time to wait for response. # @return [Boolean] def delete_consumer(stream, consumer, params={}) raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty? raise JetStream::Error::InvalidConsumerName.new("nats: invalid consumer name") if consumer.nil? or consumer.empty? req_subject = "#{@prefix}.CONSUMER.DELETE.#{stream}.#{consumer}" result = api_request(req_subject, '', params) result[:success] end # find_stream_name_by_subject does a lookup for the stream to which # the subject belongs. # @param subject [String] The subject that belongs to a stream. # @param params [Hash] Options to customize API request. # @option params [Float] :timeout Time to wait for response. # @return [String] The name of the JetStream stream for the subject. def find_stream_name_by_subject(subject, params={}) req_subject = "#{@prefix}.STREAM.NAMES" req = { subject: subject } result = api_request(req_subject, req.to_json, params) raise JetStream::Error::NotFound unless result[:streams] result[:streams].first end # get_msg retrieves a message from the stream. # @param next [Boolean] Fetch the next message for a subject. # @param seq [Integer] Sequence number of a message. # @param subject [String] Subject of the message. # @param direct [Boolean] Use direct mode to for faster access (requires NATS v2.9.0) def get_msg(stream_name, params={}) req = {} case when params[:next] req[:seq] = params[:seq] req[:next_by_subj] = params[:subject] when params[:seq] req[:seq] = params[:seq] when params[:subject] req[:last_by_subj] = params[:subject] end data = req.to_json if params[:direct] if params[:subject] and not params[:seq] # last_by_subject type request requires no payload. data = '' req_subject = "#{@prefix}.DIRECT.GET.#{stream_name}.#{params[:subject]}" else req_subject = "#{@prefix}.DIRECT.GET.#{stream_name}" end else req_subject = "#{@prefix}.STREAM.MSG.GET.#{stream_name}" end resp = api_request(req_subject, data, direct: params[:direct]) msg = if params[:direct] _lift_msg_to_raw_msg(resp) else JetStream::API::RawStreamMsg.new(resp[:message]) end msg end def get_last_msg(stream_name, subject, params={}) params[:subject] = subject get_msg(stream_name, params) end def account_info api_request("#{@prefix}.INFO") end private def api_request(req_subject, req="", params={}) params[:timeout] ||= @opts[:timeout] msg = begin @nc.request(req_subject, req, **params) rescue NATS::IO::NoRespondersError raise JetStream::Error::ServiceUnavailable end result = if params[:direct] msg else JSON.parse(msg.data, symbolize_names: true) end if result.is_a?(Hash) and result[:error] raise JS.from_error(result[:error]) end result end def _lift_msg_to_raw_msg(msg) if msg.header and msg.header['Status'] status = msg.header['Status'] if status == '404' raise ::NATS::JetStream::Error::NotFound.new else raise JS.from_msg(msg) end end subject = msg.header['Nats-Subject'] seq = msg.header['Nats-Sequence'] raw_msg = JetStream::API::RawStreamMsg.new( subject: subject, seq: seq, headers: msg.header, ) raw_msg.data = msg.data raw_msg end end # PushSubscription is included into NATS::Subscription so that it # # @example Create a push subscription using JetStream context. # # require 'nats/client' # # nc = NATS.connect # js = nc.jetstream # sub = js.subscribe("foo", "bar") # msg = sub.next_msg # msg.ack # sub.unsubscribe # # @!visibility public module PushSubscription # consumer_info retrieves the current status of the pull subscription consumer. # @param params [Hash] Options to customize API request. # @option params [Float] :timeout Time to wait for response. # @return [JetStream::API::ConsumerInfo] The latest ConsumerInfo of the consumer. def consumer_info(params={}) @jsi.js.consumer_info(@jsi.stream, @jsi.consumer, params) end end private_constant :PushSubscription # PullSubscription is included into NATS::Subscription so that it # can be used to fetch messages from a pull based consumer from # JetStream. # # @example Create a pull subscription using JetStream context. # # require 'nats/client' # # nc = NATS.connect # js = nc.jetstream # psub = js.pull_subscribe("foo", "bar") # # loop do # msgs = psub.fetch(5) # msgs.each do |msg| # msg.ack # end # end # # @!visibility public module PullSubscription # next_msg is not available for pull based subscriptions. # @raise [NATS::JetStream::Error] def next_msg(params={}) raise ::NATS::JetStream::Error.new("nats: pull subscription cannot use next_msg") end # fetch makes a request to be delivered more messages from a pull consumer. # # @param batch [Fixnum] Number of messages to pull from the stream. # @param params [Hash] Options to customize the fetch request. # @option params [Float] :timeout Duration of the fetch request before it expires. # @return [Array] def fetch(batch=1, params={}) if batch < 1 raise ::NATS::JetStream::Error.new("nats: invalid batch size") end t = MonotonicTime.now timeout = params[:timeout] ||= 5 expires = (timeout * 1_000_000_000) - 100_000 next_req = { batch: batch } msgs = [] case when batch < 1 raise ::NATS::JetStream::Error.new("nats: invalid batch size") when batch == 1 #################################################### # Fetch (1) # #################################################### # Check if there is any pending message in the queue that is # ready to be consumed. synchronize do unless @pending_queue.empty? msg = @pending_queue.pop @pending_size -= msg.data.size # Check for a no msgs response status. if JS.is_status_msg(msg) case msg.header["Status"] when JS::Status::NoMsgs msg = nil when JS::Status::RequestTimeout # Skip else raise JS.from_msg(msg) end else msgs << msg end end end # Make lingering request with expiration. next_req[:expires] = expires if msgs.empty? # Make publish request and wait for response. @nc.publish(@jsi.nms, JS.next_req_to_json(next_req), @subject) # Wait for result of fetch or timeout. synchronize { wait_for_msgs_cond.wait(timeout) } unless @pending_queue.empty? msg = @pending_queue.pop @pending_size -= msg.data.size msgs << msg end duration = MonotonicTime.since(t) if duration > timeout raise ::NATS::Timeout.new("nats: fetch timeout") end # Should have received at least a message at this point, # if that is not the case then error already. if JS.is_status_msg(msgs.first) msg = msgs.first case msg.header[JS::Header::Status] when JS::Status::RequestTimeout raise NATS::Timeout.new("nats: fetch request timeout") else raise JS.from_msg(msgs.first) end end end when batch > 1 #################################################### # Fetch (n) # #################################################### # Check if there already enough in the pending buffer. synchronize do if batch <= @pending_queue.size batch.times do msg = @pending_queue.pop @pending_size -= msg.data.size # Check for a no msgs response status. if JS.is_status_msg(msg) case msg.header[JS::Header::Status] when JS::Status::NoMsgs, JS::Status::RequestTimeout # Skip these next else raise JS.from_msg(msg) end else msgs << msg end end return msgs end end # Make publish request and wait any response. next_req[:no_wait] = true @nc.publish(@jsi.nms, JS.next_req_to_json(next_req), @subject) # Not receiving even one is a timeout. start_time = MonotonicTime.now msg = nil synchronize do wait_for_msgs_cond.wait(timeout) unless @pending_queue.empty? msg = @pending_queue.pop @pending_size -= msg.data.size end end # Check if the first message was a response saying that # there are no messages. if !msg.nil? && JS.is_status_msg(msg) case msg.header[JS::Header::Status] when JS::Status::NoMsgs # Make another request that does wait. next_req[:expires] = expires next_req.delete(:no_wait) @nc.publish(@jsi.nms, JS.next_req_to_json(next_req), @subject) when JS::Status::RequestTimeout raise NATS::Timeout.new("nats: fetch request timeout") else raise JS.from_msg(msg) end else msgs << msg unless msg.nil? end # Check if have not received yet a single message. duration = MonotonicTime.since(start_time) if msgs.empty? and duration > timeout raise NATS::Timeout.new("nats: fetch timeout") end needed = batch - msgs.count while needed > 0 and MonotonicTime.since(start_time) < timeout duration = MonotonicTime.since(start_time) # Wait for the rest of the messages. synchronize do # Wait until there is a message delivered. if @pending_queue.empty? deadline = timeout - duration wait_for_msgs_cond.wait(deadline) if deadline > 0 duration = MonotonicTime.since(start_time) if msgs.empty? && @pending_queue.empty? and duration > timeout raise NATS::Timeout.new("nats: fetch timeout") end else msg = @pending_queue.pop @pending_size -= msg.data.size if JS.is_status_msg(msg) case msg.header[JS::Header::Status] when JS::Status::NoMsgs, JS::Status::RequestTimeout duration = MonotonicTime.since(start_time) if duration > timeout # Only received a subset of the messages. if !msgs.empty? return msgs else raise NATS::Timeout.new("nats: fetch timeout") end end else raise JS.from_msg(msg) end else # Add to the set of messages that will be returned. msgs << msg needed -= 1 end end end # :end: synchronize end end msgs end # consumer_info retrieves the current status of the pull subscription consumer. # @param params [Hash] Options to customize API request. # @option params [Float] :timeout Time to wait for response. # @return [JetStream::API::ConsumerInfo] The latest ConsumerInfo of the consumer. def consumer_info(params={}) @jsi.js.consumer_info(@jsi.stream, @jsi.consumer, params) end end private_constant :PullSubscription ####################################### # # # JetStream Message and Ack Methods # # # ####################################### # JetStream::Msg module includes the methods so that a regular NATS::Msg # can be enhanced with JetStream features like acking and metadata. module Msg module Ack # Ack types Ack = ("+ACK".freeze) Nak = ("-NAK".freeze) Progress = ("+WPI".freeze) Term = ("+TERM".freeze) Empty = (''.freeze) DotSep = ('.'.freeze) NoDomainName = ('_'.freeze) # Position Prefix0 = ('$JS'.freeze) Prefix1 = ('ACK'.freeze) Domain = 2 AccHash = 3 Stream = 4 Consumer = 5 NumDelivered = 6 StreamSeq = 7 ConsumerSeq = 8 Timestamp = 9 NumPending = 10 # Subject without domain: # $JS.ACK....... # V1TokenCounts = 9 # Subject with domain: # $JS.ACK.......... # V2TokenCounts = 12 SequencePair = Struct.new(:stream, :consumer) end private_constant :Ack class Metadata attr_reader :sequence, :num_delivered, :num_pending, :timestamp, :stream, :consumer, :domain def initialize(opts) @sequence = Ack::SequencePair.new(opts[Ack::StreamSeq].to_i, opts[Ack::ConsumerSeq].to_i) @domain = opts[Ack::Domain] @num_delivered = opts[Ack::NumDelivered].to_i @num_pending = opts[Ack::NumPending].to_i @timestamp = Time.at((opts[Ack::Timestamp].to_i / 1_000_000_000.0)) @stream = opts[Ack::Stream] @consumer = opts[Ack::Consumer] # TODO: Not exposed in Go client either right now. # account = opts[Ack::AccHash] end end module AckMethods def ack(**params) ensure_is_acked_once! resp = if params[:timeout] @nc.request(@reply, Ack::Ack, **params) else @nc.publish(@reply, Ack::Ack) end @sub.synchronize { @ackd = true } resp end def ack_sync(**params) ensure_is_acked_once! params[:timeout] ||= 0.5 resp = @nc.request(@reply, Ack::Ack, **params) @sub.synchronize { @ackd = true } resp end def nak(**params) ensure_is_acked_once! resp = if params[:timeout] @nc.request(@reply, Ack::Nak, **params) else @nc.publish(@reply, Ack::Nak) end @sub.synchronize { @ackd = true } resp end def term(**params) ensure_is_acked_once! resp = if params[:timeout] @nc.request(@reply, Ack::Term, **params) else @nc.publish(@reply, Ack::Term) end @sub.synchronize { @ackd = true } resp end def in_progress(**params) params[:timeout] ? @nc.request(@reply, Ack::Progress, **params) : @nc.publish(@reply, Ack::Progress) end def metadata @meta ||= parse_metadata(reply) end private def ensure_is_acked_once! @sub.synchronize do if @ackd raise JetStream::Error::MsgAlreadyAckd.new("nats: message was already acknowledged: #{self}") end end end def parse_metadata(reply) tokens = reply.split(Ack::DotSep) n = tokens.count case when n < Ack::V1TokenCounts || (n > Ack::V1TokenCounts and n < Ack::V2TokenCounts) raise NotJSMessage.new("nats: not a jetstream message") when tokens[0] != Ack::Prefix0 || tokens[1] != Ack::Prefix1 raise NotJSMessage.new("nats: not a jetstream message") when n == Ack::V1TokenCounts tokens.insert(Ack::Domain, Ack::Empty) tokens.insert(Ack::AccHash, Ack::Empty) when tokens[Ack::Domain] == Ack::NoDomainName tokens[Ack::Domain] = Ack::Empty end Metadata.new(tokens) end end end #################################### # # # JetStream Configuration Options # # # #################################### # Misc internal functions to support JS API. # @private module JS DefaultAPIPrefix = ("$JS.API".freeze) module Status CtrlMsg = ("100".freeze) NoMsgs = ("404".freeze) NotFound = ("404".freeze) RequestTimeout = ("408".freeze) ServiceUnavailable = ("503".freeze) end module Header Status = ("Status".freeze) Desc = ("Description".freeze) MsgID = ("Nats-Msg-Id".freeze) ExpectedStream = ("Nats-Expected-Stream".freeze) ExpectedLastSeq = ("Nats-Expected-Last-Sequence".freeze) ExpectedLastSubjSeq = ("Nats-Expected-Last-Subject-Sequence".freeze) ExpectedLastMsgID = ("Nats-Expected-Last-Msg-Id".freeze) LastConsumerSeq = ("Nats-Last-Consumer".freeze) LastStreamSeq = ("Nats-Last-Stream".freeze) end module Config # AckPolicy AckExplicit = ("explicit".freeze) AckAll = ("all".freeze) AckNone = ("none".freeze) end class Sub attr_reader :js, :stream, :consumer, :nms def initialize(opts={}) @js = opts[:js] @stream = opts[:stream] @consumer = opts[:consumer] @nms = opts[:nms] end end class << self def next_req_to_json(next_req) req = {} req[:batch] = next_req[:batch] req[:expires] = next_req[:expires].to_i if next_req[:expires] req[:no_wait] = next_req[:no_wait] if next_req[:no_wait] req.to_json end def is_status_msg(msg) return (!msg.nil? and (!msg.header.nil? and msg.header[Header::Status])) end # check_503_error raises exception when a NATS::Msg has a 503 status header. # @param msg [NATS::Msg] The message with status headers. # @raise [NATS::JetStream::Error::ServiceUnavailable] def check_503_error(msg) return if msg.nil? or msg.header.nil? if msg.header[Header::Status] == Status::ServiceUnavailable raise ::NATS::JetStream::Error::ServiceUnavailable end end # from_msg takes a plain NATS::Msg and checks its headers to confirm # if it was an error: # # msg.header={"Status"=>"503"}) # msg.header={"Status"=>"408", "Description"=>"Request Timeout"}) # # @param msg [NATS::Msg] The message with status headers. # @return [NATS::JetStream::API::Error] def from_msg(msg) check_503_error(msg) code = msg.header[JS::Header::Status] desc = msg.header[JS::Header::Desc] return ::NATS::JetStream::API::Error.new({code: code, description: desc}) end # from_error takes an API response that errored and maps the error # into a JetStream error type based on the status and error code. def from_error(err) return unless err case err[:code] when 503 ::NATS::JetStream::Error::ServiceUnavailable.new(err) when 500 ::NATS::JetStream::Error::ServerError.new(err) when 404 case err[:err_code] when 10059 ::NATS::JetStream::Error::StreamNotFound.new(err) when 10014 ::NATS::JetStream::Error::ConsumerNotFound.new(err) else ::NATS::JetStream::Error::NotFound.new(err) end when 400 ::NATS::JetStream::Error::BadRequest.new(err) else ::NATS::JetStream::API::Error.new(err) end end end end private_constant :JS ##################### # # # JetStream Errors # # # ##################### # Error is any error that may arise when interacting with JetStream. class Error < Error # When there is a NATS::IO::NoResponders error after making a publish request. class NoStreamResponse < Error; end # When an invalid durable or consumer name was attempted to be used. class InvalidDurableName < Error; end # When an ack not longer valid. class InvalidJSAck < Error; end # When an ack has already been acked. class MsgAlreadyAckd < Error; end # When the delivered message does not behave as a message delivered by JetStream, # for example when the ack reply has unrecognizable fields. class NotJSMessage < Error; end # When the stream name is invalid. class InvalidStreamName < Error; end # When the consumer name is invalid. class InvalidConsumerName < Error; end # When the server responds with an error from the JetStream API. class APIError < Error attr_reader :code, :err_code, :description, :stream, :seq def initialize(params={}) @code = params[:code] @err_code = params[:err_code] @description = params[:description] @stream = params[:stream] @seq = params[:seq] end def to_s "#{@description} (status_code=#{@code}, err_code=#{@err_code})" end end # When JetStream is not currently available, this could be due to JetStream # not being enabled or temporarily unavailable due to a leader election when # running in cluster mode. # This condition is represented with a message that has 503 status code header. class ServiceUnavailable < APIError def initialize(params={}) super(params) @code ||= 503 end end # When there is a hard failure in the JetStream. # This condition is represented with a message that has 500 status code header. class ServerError < APIError def initialize(params={}) super(params) @code ||= 500 end end # When a JetStream object was not found. # This condition is represented with a message that has 404 status code header. class NotFound < APIError def initialize(params={}) super(params) @code ||= 404 end end # When the stream is not found. class StreamNotFound < NotFound; end # When the consumer or durable is not found by name. class ConsumerNotFound < NotFound; end # When the JetStream client makes an invalid request. # This condition is represented with a message that has 400 status code header. class BadRequest < APIError def initialize(params={}) super(params) @code ||= 400 end end end ####################### # # # JetStream API Types # # # ####################### # JetStream::API are the types used to interact with the JetStream API. module API # When the server responds with an error from the JetStream API. Error = ::NATS::JetStream::Error::APIError # SequenceInfo is a pair of consumer and stream sequence and last activity. # @!attribute consumer_seq # @return [Integer] The consumer sequence. # @!attribute stream_seq # @return [Integer] The stream sequence. SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields and freeze. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end # ConsumerInfo is the current status of a JetStream consumer. # # @!attribute stream_name # @return [String] name of the stream to which the consumer belongs. # @!attribute name # @return [String] name of the consumer. # @!attribute created # @return [String] time when the consumer was created. # @!attribute config # @return [ConsumerConfig] consumer configuration. # @!attribute delivered # @return [SequenceInfo] # @!attribute ack_floor # @return [SequenceInfo] # @!attribute num_ack_pending # @return [Integer] # @!attribute num_redelivered # @return [Integer] # @!attribute num_waiting # @return [Integer] # @!attribute num_pending # @return [Integer] # @!attribute cluster # @return [Hash] ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end # ConsumerConfig is the consumer configuration. # # @!attribute durable_name # @return [String] # @!attribute deliver_policy # @return [String] # @!attribute ack_policy # @return [String] # @!attribute ack_wait # @return [Integer] # @!attribute max_deliver # @return [Integer] # @!attribute replay_policy # @return [String] # @!attribute max_waiting # @return [Integer] # @!attribute max_ack_pending # @return [Integer] ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end # StreamConfig represents the configuration of a stream from JetStream. # # @!attribute type # @return [String] # @!attribute config # @return [Hash] # @!attribute created # @return [String] # @!attribute state # @return [StreamState] # @!attribute did_create # @return [Boolean] # @!attribute name # @return [String] # @!attribute subjects # @return [Array] # @!attribute retention # @return [String] # @!attribute max_consumers # @return [Integer] # @!attribute max_msgs # @return [Integer] # @!attribute max_bytes # @return [Integer] # @!attribute max_age # @return [Integer] # @!attribute max_msgs_per_subject # @return [Integer] # @!attribute max_msg_size # @return [Integer] # @!attribute discard # @return [String] # @!attribute storage # @return [String] # @!attribute num_replicas # @return [Integer] # @!attribute duplicate_window # @return [Integer] StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end # StreamInfo is the info about a stream from JetStream. # # @!attribute type # @return [String] # @!attribute config # @return [Hash] # @!attribute created # @return [String] # @!attribute state # @return [Hash] # @!attribute domain # @return [String] StreamInfo = Struct.new(:type, :config, :created, :state, :domain, keyword_init: true) do def initialize(opts={}) opts[:config] = StreamConfig.new(opts[:config]) opts[:state] = StreamState.new(opts[:state]) opts[:created] = ::Time.parse(opts[:created]) # Filter fields and freeze. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end # StreamState is the state of a stream. # # @!attribute messages # @return [Integer] # @!attribute bytes # @return [Integer] # @!attribute first_seq # @return [Integer] # @!attribute last_seq # @return [Integer] # @!attribute consumer_count # @return [Integer] StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end # StreamCreateResponse is the response from the JetStream $JS.API.STREAM.CREATE API. # # @!attribute type # @return [String] # @!attribute config # @return [StreamConfig] # @!attribute created # @return [String] # @!attribute state # @return [StreamState] # @!attribute did_create # @return [Boolean] StreamCreateResponse = Struct.new(:type, :config, :created, :state, :did_create, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } opts[:config] = StreamConfig.new(opts[:config]) opts[:state] = StreamState.new(opts[:state]) super(opts) freeze end end RawStreamMsg = Struct.new(:subject, :seq, :data, :headers, keyword_init: true) do def initialize(opts) opts[:data] = Base64.decode64(opts[:data]) if opts[:data] if opts[:hdrs] header = Base64.decode64(opts[:hdrs]) hdr = {} lines = header.lines lines.slice(1, header.size).each do |line| line.rstrip! next if line.empty? key, value = line.strip.split(/\s*:\s*/, 2) hdr[key] = value end opts[:headers] = hdr end # Filter out members not present. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def sequence self.seq end end end end end