lib/nats/io/js.rb in nats-pure-2.1.2 vs lib/nats/io/js.rb in nats-pure-2.2.0
- old
+ new
@@ -17,11 +17,10 @@
require_relative 'kv'
require 'time'
require 'base64'
module NATS
-
# JetStream returns a context with a similar API as the NATS::Client
# but with enhanced functions to persist and consume messages from
# the NATS JetStream engine.
#
# @example
@@ -183,11 +182,11 @@
# Heartbeats / FlowControl
config.flow_control = flow_control
if idle_heartbeat or config.idle_heartbeat
idle_heartbeat = config.idle_heartbeat if config.idle_heartbeat
- idle_heartbeat = idle_heartbeat * 1_000_000_000
+ idle_heartbeat = idle_heartbeat * ::NATS::NANOSECONDS
config.idle_heartbeat = idle_heartbeat
end
# Auto create the consumer.
cinfo = add_consumer(stream, config)
@@ -221,11 +220,13 @@
# @option params [String] :stream Name of the Stream to which the consumer belongs.
# @option params [String] :consumer Name of the Consumer to which the PullSubscription will be bound.
# @option params [Hash] :config Configuration for the consumer.
# @return [NATS::JetStream::PullSubscription]
def pull_subscribe(subject, durable, params={})
- raise JetStream::Error::InvalidDurableName.new("nats: invalid durable name") if durable.empty?
+ if durable.empty? && !params[:consumer]
+ raise JetStream::Error::InvalidDurableName.new("nats: invalid durable name")
+ end
params[:consumer] ||= durable
stream = params[:stream].nil? ? find_stream_name_by_subject(subject) : params[:stream]
begin
consumer_info(stream, params[:consumer])
@@ -328,22 +329,35 @@
config = if not config.is_a?(JetStream::API::ConsumerConfig)
JetStream::API::ConsumerConfig.new(config)
else
config
end
- req_subject = if config[:durable_name]
+
+ req_subject = case
+ when config[:name]
+ # NOTE: Only supported after nats-server v2.9.0
+ if config[:filter_subject] && config[:filter_subject] != ">"
+ "#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}.#{config[:filter_subject]}"
+ else
+ "#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}"
+ end
+ when config[:durable_name]
"#{@prefix}.CONSUMER.DURABLE.CREATE.#{stream}.#{config[:durable_name]}"
else
"#{@prefix}.CONSUMER.CREATE.#{stream}"
end
config[:ack_policy] ||= JS::Config::AckExplicit
# Check if have to normalize ack wait so that it is in nanoseconds for Go compat.
if config[:ack_wait]
raise ArgumentError.new("nats: invalid ack wait") unless config[:ack_wait].is_a?(Integer)
- config[:ack_wait] = config[:ack_wait] * 1_000_000_000
+ config[:ack_wait] = config[:ack_wait] * ::NATS::NANOSECONDS
end
+ if config[:inactive_threshold]
+ raise ArgumentError.new("nats: invalid inactive threshold") unless config[:inactive_threshold].is_a?(Integer)
+ config[:inactive_threshold] = config[:inactive_threshold] * ::NATS::NANOSECONDS
+ end
req = {
stream_name: stream,
config: config
}
@@ -395,32 +409,100 @@
raise JetStream::Error::NotFound unless result[:streams]
result[:streams].first
end
- def get_last_msg(stream_name, subject)
- req_subject = "#{@prefix}.STREAM.MSG.GET.#{stream_name}"
- req = {'last_by_subj': subject}
+ # get_msg retrieves a message from the stream.
+ # @param next [Boolean] Fetch the next message for a subject.
+ # @param seq [Integer] Sequence number of a message.
+ # @param subject [String] Subject of the message.
+ # @param direct [Boolean] Use direct mode to for faster access (requires NATS v2.9.0)
+ def get_msg(stream_name, params={})
+ req = {}
+ case
+ when params[:next]
+ req[:seq] = params[:seq]
+ req[:next_by_subj] = params[:subject]
+ when params[:seq]
+ req[:seq] = params[:seq]
+ when params[:subject]
+ req[:last_by_subj] = params[:subject]
+ end
+
data = req.to_json
- resp = api_request(req_subject, data)
- JetStream::API::RawStreamMsg.new(resp[:message])
+ if params[:direct]
+ if params[:subject] and not params[:seq]
+ # last_by_subject type request requires no payload.
+ data = ''
+ req_subject = "#{@prefix}.DIRECT.GET.#{stream_name}.#{params[:subject]}"
+ else
+ req_subject = "#{@prefix}.DIRECT.GET.#{stream_name}"
+ end
+ else
+ req_subject = "#{@prefix}.STREAM.MSG.GET.#{stream_name}"
+ end
+ resp = api_request(req_subject, data, direct: params[:direct])
+ msg = if params[:direct]
+ _lift_msg_to_raw_msg(resp)
+ else
+ JetStream::API::RawStreamMsg.new(resp[:message])
+ end
+
+ msg
end
+ def get_last_msg(stream_name, subject, params={})
+ params[:subject] = subject
+ get_msg(stream_name, params)
+ end
+
+ def account_info
+ api_request("#{@prefix}.INFO")
+ end
+
private
def api_request(req_subject, req="", params={})
params[:timeout] ||= @opts[:timeout]
- result = begin
- msg = @nc.request(req_subject, req, **params)
+ msg = begin
+ @nc.request(req_subject, req, **params)
+ rescue NATS::IO::NoRespondersError
+ raise JetStream::Error::ServiceUnavailable
+ end
+
+ result = if params[:direct]
+ msg
+ else
JSON.parse(msg.data, symbolize_names: true)
- rescue NATS::IO::NoRespondersError
- raise JetStream::Error::ServiceUnavailable
end
- raise JS.from_error(result[:error]) if result[:error]
+ if result.is_a?(Hash) and result[:error]
+ raise JS.from_error(result[:error])
+ end
result
end
+
+ def _lift_msg_to_raw_msg(msg)
+ if msg.header and msg.header['Status']
+ status = msg.header['Status']
+ if status == '404'
+ raise ::NATS::JetStream::Error::NotFound.new
+ else
+ raise JS.from_msg(msg)
+ end
+ end
+ subject = msg.header['Nats-Subject']
+ seq = msg.header['Nats-Sequence']
+ raw_msg = JetStream::API::RawStreamMsg.new(
+ subject: subject,
+ seq: seq,
+ headers: msg.header,
+ )
+ raw_msg.data = msg.data
+
+ raw_msg
+ end
end
# PushSubscription is included into NATS::Subscription so that it
#
# @example Create a push subscription using JetStream context.
@@ -1105,11 +1187,11 @@
keyword_init: true) do
def initialize(opts={})
opts[:created] = Time.parse(opts[:created])
opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
opts[:delivered] = SequenceInfo.new(opts[:delivered])
- opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
+ opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
opts[:config] = ConsumerConfig.new(opts[:config])
opts.delete(:cluster)
# Filter unrecognized fields just in case.
rem = opts.keys - members
opts.delete_if { |k| rem.include?(k) }
@@ -1134,11 +1216,11 @@
# @return [String]
# @!attribute max_waiting
# @return [Integer]
# @!attribute max_ack_pending
# @return [Integer]
- ConsumerConfig = Struct.new(:durable_name, :description,
+ ConsumerConfig = Struct.new(:name, :durable_name, :description,
:deliver_policy, :opt_start_seq, :opt_start_time,
:ack_policy, :ack_wait, :max_deliver, :backoff,
:filter_subject, :replay_policy, :rate_limit_bps,
:sample_freq, :max_waiting, :max_ack_pending,
:flow_control, :idle_heartbeat, :headers_only,
@@ -1151,11 +1233,11 @@
:inactive_threshold,
# Generally inherited by parent stream and other markers,
# now can be configured directly.
:num_replicas,
# Force memory storage
- :memory_storage,
+ :mem_storage,
keyword_init: true) do
def initialize(opts={})
# Filter unrecognized fields just in case.
rem = opts.keys - members
opts.delete_if { |k| rem.include?(k) }
@@ -1205,15 +1287,36 @@
# @return [String]
# @!attribute num_replicas
# @return [Integer]
# @!attribute duplicate_window
# @return [Integer]
- StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
- :max_msgs, :max_bytes, :discard, :max_age,
- :max_msgs_per_subject, :max_msg_size,
- :storage, :num_replicas, :no_ack, :duplicate_window,
- :placement, :allow_direct,
+ StreamConfig = Struct.new(
+ :name,
+ :description,
+ :subjects,
+ :retention,
+ :max_consumers,
+ :max_msgs,
+ :max_bytes,
+ :discard,
+ :max_age,
+ :max_msgs_per_subject,
+ :max_msg_size,
+ :storage,
+ :num_replicas,
+ :no_ack,
+ :duplicate_window,
+ :placement,
+ :mirror,
+ :sources,
+ :sealed,
+ :deny_delete,
+ :deny_purge,
+ :allow_rollup_hdrs,
+ :republish,
+ :allow_direct,
+ :mirror_direct,
keyword_init: true) do
def initialize(opts={})
# Filter unrecognized fields just in case.
rem = opts.keys - members
opts.delete_if { |k| rem.include?(k) }
@@ -1242,10 +1345,10 @@
StreamInfo = Struct.new(:type, :config, :created, :state, :domain,
keyword_init: true) do
def initialize(opts={})
opts[:config] = StreamConfig.new(opts[:config])
opts[:state] = StreamState.new(opts[:state])
- opts[:created] = Time.parse(opts[:created])
+ opts[:created] = ::Time.parse(opts[:created])
# Filter fields and freeze.
rem = opts.keys - members
opts.delete_if { |k| rem.include?(k) }
super(opts)