lib/ldclient-rb/stream.rb in ldclient-rb-2.5.0 vs lib/ldclient-rb/stream.rb in ldclient-rb-3.0.0
- old
+ new
@@ -8,15 +8,20 @@
DELETE = :delete
INDIRECT_PUT = :'indirect/put'
INDIRECT_PATCH = :'indirect/patch'
READ_TIMEOUT_SECONDS = 300 # 5 minutes; the stream should send a ping every 3 minutes
+ KEY_PATHS = {
+ FEATURES => "/flags/",
+ SEGMENTS => "/segments/"
+ }
+
class StreamProcessor
def initialize(sdk_key, config, requestor)
@sdk_key = sdk_key
@config = config
- @store = config.feature_store
+ @feature_store = config.feature_store
@requestor = requestor
@initialized = Concurrent::AtomicBoolean.new(false)
@started = Concurrent::AtomicBoolean.new(false)
@stopped = Concurrent::AtomicBoolean.new(false)
end
@@ -34,11 +39,11 @@
{
'Authorization' => @sdk_key,
'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION
}
opts = {:headers => headers, :with_credentials => true, :proxy => @config.proxy, :read_timeout => READ_TIMEOUT_SECONDS}
- @es = Celluloid::EventSource.new(@config.stream_uri + "/flags", opts) do |conn|
+ @es = Celluloid::EventSource.new(@config.stream_uri + "/all", opts) do |conn|
conn.on(PUT) { |message| process_message(message, PUT) }
conn.on(PATCH) { |message| process_message(message, PATCH) }
conn.on(DELETE) { |message| process_message(message, DELETE) }
conn.on(INDIRECT_PUT) { |message| process_message(message, INDIRECT_PUT) }
conn.on(INDIRECT_PATCH) { |message| process_message(message, INDIRECT_PATCH) }
@@ -64,32 +69,63 @@
@es.close
@config.logger.info("[LDClient] Stream connection stopped")
end
end
+ private
+
def process_message(message, method)
@config.logger.debug("[LDClient] Stream received #{method} message: #{message.data}")
if method == PUT
message = JSON.parse(message.data, symbolize_names: true)
- @store.init(message)
+ @feature_store.init({
+ FEATURES => message[:data][:flags],
+ SEGMENTS => message[:data][:segments]
+ })
@initialized.make_true
@config.logger.info("[LDClient] Stream initialized")
elsif method == PATCH
message = JSON.parse(message.data, symbolize_names: true)
- @store.upsert(message[:path][1..-1], message[:data])
+ for kind in [FEATURES, SEGMENTS]
+ key = key_for_path(kind, message[:path])
+ if key
+ @feature_store.upsert(kind, message[:data])
+ break
+ end
+ end
elsif method == DELETE
message = JSON.parse(message.data, symbolize_names: true)
- @store.delete(message[:path][1..-1], message[:version])
+ for kind in [FEATURES, SEGMENTS]
+ key = key_for_path(kind, message[:path])
+ if key
+ @feature_store.delete(kind, key, message[:version])
+ break
+ end
+ end
elsif method == INDIRECT_PUT
- @store.init(@requestor.request_all_flags)
+ all_data = @requestor.request_all_data
+ @feature_store.init({
+ FEATURES => all_data[:flags],
+ SEGMENTS => all_data[:segments]
+ })
@initialized.make_true
@config.logger.info("[LDClient] Stream initialized (via indirect message)")
elsif method == INDIRECT_PATCH
- @store.upsert(message.data, @requestor.request_flag(message.data))
+ key = feature_key_for_path(message.data)
+ if key
+ @feature_store.upsert(FEATURES, @requestor.request_flag(key))
+ else
+ key = segment_key_for_path(message.data)
+ if key
+ @feature_store.upsert(SEGMENTS, key, @requestor.request_segment(key))
+ end
+ end
else
@config.logger.warn("[LDClient] Unknown message received: #{method}")
end
end
- private :process_message
+ def key_for_path(kind, path)
+ path.start_with?(KEY_PATHS[kind]) ? path[KEY_PATHS[kind].length..-1] : nil
+ end
end
end