lib/ldclient-rb/stream.rb in ldclient-rb-2.5.0 vs lib/ldclient-rb/stream.rb in ldclient-rb-3.0.0

- old
+ new

@@ -8,15 +8,20 @@ DELETE = :delete INDIRECT_PUT = :'indirect/put' INDIRECT_PATCH = :'indirect/patch' READ_TIMEOUT_SECONDS = 300 # 5 minutes; the stream should send a ping every 3 minutes + KEY_PATHS = { + FEATURES => "/flags/", + SEGMENTS => "/segments/" + } + class StreamProcessor def initialize(sdk_key, config, requestor) @sdk_key = sdk_key @config = config - @store = config.feature_store + @feature_store = config.feature_store @requestor = requestor @initialized = Concurrent::AtomicBoolean.new(false) @started = Concurrent::AtomicBoolean.new(false) @stopped = Concurrent::AtomicBoolean.new(false) end @@ -34,11 +39,11 @@ { 'Authorization' => @sdk_key, 'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION } opts = {:headers => headers, :with_credentials => true, :proxy => @config.proxy, :read_timeout => READ_TIMEOUT_SECONDS} - @es = Celluloid::EventSource.new(@config.stream_uri + "/flags", opts) do |conn| + @es = Celluloid::EventSource.new(@config.stream_uri + "/all", opts) do |conn| conn.on(PUT) { |message| process_message(message, PUT) } conn.on(PATCH) { |message| process_message(message, PATCH) } conn.on(DELETE) { |message| process_message(message, DELETE) } conn.on(INDIRECT_PUT) { |message| process_message(message, INDIRECT_PUT) } conn.on(INDIRECT_PATCH) { |message| process_message(message, INDIRECT_PATCH) } @@ -64,32 +69,63 @@ @es.close @config.logger.info("[LDClient] Stream connection stopped") end end + private + def process_message(message, method) @config.logger.debug("[LDClient] Stream received #{method} message: #{message.data}") if method == PUT message = JSON.parse(message.data, symbolize_names: true) - @store.init(message) + @feature_store.init({ + FEATURES => message[:data][:flags], + SEGMENTS => message[:data][:segments] + }) @initialized.make_true @config.logger.info("[LDClient] Stream initialized") elsif method == PATCH message = JSON.parse(message.data, symbolize_names: true) - @store.upsert(message[:path][1..-1], message[:data]) + for kind in [FEATURES, SEGMENTS] + key = key_for_path(kind, message[:path]) + if key + @feature_store.upsert(kind, message[:data]) + break + end + end elsif method == DELETE message = JSON.parse(message.data, symbolize_names: true) - @store.delete(message[:path][1..-1], message[:version]) + for kind in [FEATURES, SEGMENTS] + key = key_for_path(kind, message[:path]) + if key + @feature_store.delete(kind, key, message[:version]) + break + end + end elsif method == INDIRECT_PUT - @store.init(@requestor.request_all_flags) + all_data = @requestor.request_all_data + @feature_store.init({ + FEATURES => all_data[:flags], + SEGMENTS => all_data[:segments] + }) @initialized.make_true @config.logger.info("[LDClient] Stream initialized (via indirect message)") elsif method == INDIRECT_PATCH - @store.upsert(message.data, @requestor.request_flag(message.data)) + key = feature_key_for_path(message.data) + if key + @feature_store.upsert(FEATURES, @requestor.request_flag(key)) + else + key = segment_key_for_path(message.data) + if key + @feature_store.upsert(SEGMENTS, key, @requestor.request_segment(key)) + end + end else @config.logger.warn("[LDClient] Unknown message received: #{method}") end end - private :process_message + def key_for_path(kind, path) + path.start_with?(KEY_PATHS[kind]) ? path[KEY_PATHS[kind].length..-1] : nil + end end end