lib/nats/io/js.rb in nats-pure-2.1.2 vs lib/nats/io/js.rb in nats-pure-2.2.0

- old
+ new

@@ -17,11 +17,10 @@ require_relative 'kv' require 'time' require 'base64' module NATS - # JetStream returns a context with a similar API as the NATS::Client # but with enhanced functions to persist and consume messages from # the NATS JetStream engine. # # @example @@ -183,11 +182,11 @@ # Heartbeats / FlowControl config.flow_control = flow_control if idle_heartbeat or config.idle_heartbeat idle_heartbeat = config.idle_heartbeat if config.idle_heartbeat - idle_heartbeat = idle_heartbeat * 1_000_000_000 + idle_heartbeat = idle_heartbeat * ::NATS::NANOSECONDS config.idle_heartbeat = idle_heartbeat end # Auto create the consumer. cinfo = add_consumer(stream, config) @@ -221,11 +220,13 @@ # @option params [String] :stream Name of the Stream to which the consumer belongs. # @option params [String] :consumer Name of the Consumer to which the PullSubscription will be bound. # @option params [Hash] :config Configuration for the consumer. # @return [NATS::JetStream::PullSubscription] def pull_subscribe(subject, durable, params={}) - raise JetStream::Error::InvalidDurableName.new("nats: invalid durable name") if durable.empty? + if durable.empty? && !params[:consumer] + raise JetStream::Error::InvalidDurableName.new("nats: invalid durable name") + end params[:consumer] ||= durable stream = params[:stream].nil? ? find_stream_name_by_subject(subject) : params[:stream] begin consumer_info(stream, params[:consumer]) @@ -328,22 +329,35 @@ config = if not config.is_a?(JetStream::API::ConsumerConfig) JetStream::API::ConsumerConfig.new(config) else config end - req_subject = if config[:durable_name] + + req_subject = case + when config[:name] + # NOTE: Only supported after nats-server v2.9.0 + if config[:filter_subject] && config[:filter_subject] != ">" + "#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}.#{config[:filter_subject]}" + else + "#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}" + end + when config[:durable_name] "#{@prefix}.CONSUMER.DURABLE.CREATE.#{stream}.#{config[:durable_name]}" else "#{@prefix}.CONSUMER.CREATE.#{stream}" end config[:ack_policy] ||= JS::Config::AckExplicit # Check if have to normalize ack wait so that it is in nanoseconds for Go compat. if config[:ack_wait] raise ArgumentError.new("nats: invalid ack wait") unless config[:ack_wait].is_a?(Integer) - config[:ack_wait] = config[:ack_wait] * 1_000_000_000 + config[:ack_wait] = config[:ack_wait] * ::NATS::NANOSECONDS end + if config[:inactive_threshold] + raise ArgumentError.new("nats: invalid inactive threshold") unless config[:inactive_threshold].is_a?(Integer) + config[:inactive_threshold] = config[:inactive_threshold] * ::NATS::NANOSECONDS + end req = { stream_name: stream, config: config } @@ -395,32 +409,100 @@ raise JetStream::Error::NotFound unless result[:streams] result[:streams].first end - def get_last_msg(stream_name, subject) - req_subject = "#{@prefix}.STREAM.MSG.GET.#{stream_name}" - req = {'last_by_subj': subject} + # get_msg retrieves a message from the stream. + # @param next [Boolean] Fetch the next message for a subject. + # @param seq [Integer] Sequence number of a message. + # @param subject [String] Subject of the message. + # @param direct [Boolean] Use direct mode to for faster access (requires NATS v2.9.0) + def get_msg(stream_name, params={}) + req = {} + case + when params[:next] + req[:seq] = params[:seq] + req[:next_by_subj] = params[:subject] + when params[:seq] + req[:seq] = params[:seq] + when params[:subject] + req[:last_by_subj] = params[:subject] + end + data = req.to_json - resp = api_request(req_subject, data) - JetStream::API::RawStreamMsg.new(resp[:message]) + if params[:direct] + if params[:subject] and not params[:seq] + # last_by_subject type request requires no payload. + data = '' + req_subject = "#{@prefix}.DIRECT.GET.#{stream_name}.#{params[:subject]}" + else + req_subject = "#{@prefix}.DIRECT.GET.#{stream_name}" + end + else + req_subject = "#{@prefix}.STREAM.MSG.GET.#{stream_name}" + end + resp = api_request(req_subject, data, direct: params[:direct]) + msg = if params[:direct] + _lift_msg_to_raw_msg(resp) + else + JetStream::API::RawStreamMsg.new(resp[:message]) + end + + msg end + def get_last_msg(stream_name, subject, params={}) + params[:subject] = subject + get_msg(stream_name, params) + end + + def account_info + api_request("#{@prefix}.INFO") + end + private def api_request(req_subject, req="", params={}) params[:timeout] ||= @opts[:timeout] - result = begin - msg = @nc.request(req_subject, req, **params) + msg = begin + @nc.request(req_subject, req, **params) + rescue NATS::IO::NoRespondersError + raise JetStream::Error::ServiceUnavailable + end + + result = if params[:direct] + msg + else JSON.parse(msg.data, symbolize_names: true) - rescue NATS::IO::NoRespondersError - raise JetStream::Error::ServiceUnavailable end - raise JS.from_error(result[:error]) if result[:error] + if result.is_a?(Hash) and result[:error] + raise JS.from_error(result[:error]) + end result end + + def _lift_msg_to_raw_msg(msg) + if msg.header and msg.header['Status'] + status = msg.header['Status'] + if status == '404' + raise ::NATS::JetStream::Error::NotFound.new + else + raise JS.from_msg(msg) + end + end + subject = msg.header['Nats-Subject'] + seq = msg.header['Nats-Sequence'] + raw_msg = JetStream::API::RawStreamMsg.new( + subject: subject, + seq: seq, + headers: msg.header, + ) + raw_msg.data = msg.data + + raw_msg + end end # PushSubscription is included into NATS::Subscription so that it # # @example Create a push subscription using JetStream context. @@ -1105,11 +1187,11 @@ keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) - opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 + opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } @@ -1134,11 +1216,11 @@ # @return [String] # @!attribute max_waiting # @return [Integer] # @!attribute max_ack_pending # @return [Integer] - ConsumerConfig = Struct.new(:durable_name, :description, + ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, @@ -1151,11 +1233,11 @@ :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage - :memory_storage, + :mem_storage, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } @@ -1205,15 +1287,36 @@ # @return [String] # @!attribute num_replicas # @return [Integer] # @!attribute duplicate_window # @return [Integer] - StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, - :max_msgs, :max_bytes, :discard, :max_age, - :max_msgs_per_subject, :max_msg_size, - :storage, :num_replicas, :no_ack, :duplicate_window, - :placement, :allow_direct, + StreamConfig = Struct.new( + :name, + :description, + :subjects, + :retention, + :max_consumers, + :max_msgs, + :max_bytes, + :discard, + :max_age, + :max_msgs_per_subject, + :max_msg_size, + :storage, + :num_replicas, + :no_ack, + :duplicate_window, + :placement, + :mirror, + :sources, + :sealed, + :deny_delete, + :deny_purge, + :allow_rollup_hdrs, + :republish, + :allow_direct, + :mirror_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } @@ -1242,10 +1345,10 @@ StreamInfo = Struct.new(:type, :config, :created, :state, :domain, keyword_init: true) do def initialize(opts={}) opts[:config] = StreamConfig.new(opts[:config]) opts[:state] = StreamState.new(opts[:state]) - opts[:created] = Time.parse(opts[:created]) + opts[:created] = ::Time.parse(opts[:created]) # Filter fields and freeze. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts)