lib/ldclient-rb/stream.rb in ldclient-rb-0.4.0 vs lib/ldclient-rb/stream.rb in ldclient-rb-0.5.0

- old
+ new

@@ -1,69 +1,68 @@ -require 'concurrent/atomics' -require 'json' -require 'ld-em-eventsource' +require "concurrent/atomics" +require "json" +require "ld-em-eventsource" module LaunchDarkly - PUT = "put" PATCH = "patch" DELETE = "delete" class InMemoryFeatureStore - def initialize() + def initialize @features = Hash.new @lock = Concurrent::ReadWriteLock.new @initialized = Concurrent::AtomicBoolean.new(false) end def get(key) - @lock.with_read_lock { + @lock.with_read_lock do f = @features[key.to_sym] (f.nil? || f[:deleted]) ? nil : f - } + end end - def all() - @lock.with_read_lock { - @features.select {|k,f| not f[:deleted]} - } + def all + @lock.with_read_lock do + @features.select { |_k, f| not f[:deleted] } + end end def delete(key, version) - @lock.with_write_lock { + @lock.with_write_lock do old = @features[key.to_sym] - if old != nil and old[:version] < version + if !old.nil? && old[:version] < version old[:deleted] = true old[:version] = version @features[key.to_sym] = old elsif old.nil? - @features[key.to_sym] = {:deleted => true, :version => version} + @features[key.to_sym] = { deleted: true, version: version } end - } + end end def init(fs) - @lock.with_write_lock { + @lock.with_write_lock do @features.replace(fs) @initialized.make_true - } + end end def upsert(key, feature) - @lock.with_write_lock { + @lock.with_write_lock do old = @features[key.to_sym] - if old.nil? or old[:version] < feature[:version] + if old.nil? || old[:version] < feature[:version] @features[key.to_sym] = feature end - } + end end - def initialized?() + def initialized? @initialized.value - end + end end class StreamProcessor def initialize(api_key, config) @api_key = api_key @@ -71,96 +70,97 @@ @store = config.feature_store ? config.feature_store : InMemoryFeatureStore.new @disconnected = Concurrent::AtomicReference.new(nil) @started = Concurrent::AtomicBoolean.new(false) end - def initialized?() + def initialized? @store.initialized? end - def started?() + def started? @started.value end def get_feature(key) if not initialized? throw :uninitialized end @store.get(key) end - def start_reactor() + def start_reactor if defined?(Thin) - @config.logger.debug("Running in a Thin environment-- not starting EventMachine") + @config.logger.debug("Running in a Thin environment-- not starting EventMachine") elsif EM.reactor_running? @config.logger.debug("EventMachine already running") else @config.logger.debug("Starting EventMachine") Thread.new { EM.run {} } Thread.pass until EM.reactor_running? end EM.reactor_running? end - def start() + def start # Try to start the reactor. If it's not started, we shouldn't start # the stream processor - if not start_reactor - return - end + return if not start_reactor # If someone else booted the stream processor connection, just return - if not @started.make_true - return - end + return unless @started.make_true # If we're the first and only thread to set started, boot # the stream processor connection EM.defer do - source = EM::EventSource.new(@config.stream_uri + "/features", - {}, - {'Accept' => 'text/event-stream', - 'Authorization' => 'api_key ' + @api_key, - 'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION}) - source.on PUT do |message| - features = JSON.parse(message, :symbolize_names => true) - @store.init(features) - set_connected - end - source.on PATCH do |message| - json = JSON.parse(message, :symbolize_names => true) - @store.upsert(json[:path][1..-1], json[:data]) - set_connected - end - source.on DELETE do |message| - json = JSON.parse(message, :symbolize_names => true) - @store.delete(json[:path][1..-1], json[:version]) - set_connected - end - source.error do |error| - @config.logger.error("[LDClient] Error subscribing to stream API: #{error}") - set_disconnected - end - source.inactivity_timeout = 0 - source.start + boot_event_manager end end - def set_disconnected() + def boot_event_manager + source = EM::EventSource.new(@config.stream_uri + "/features", + {}, + "Accept" => "text/event-stream", + "Authorization" => "api_key " + @api_key, + "User-Agent" => "RubyClient/" + LaunchDarkly::VERSION) + source.on(PUT) { |message| process_message(message, PUT) } + source.on(PATCH) { |message| process_message(message, PATCH) } + source.on(DELETE) { |message| process_message(message, DELETE) } + source.error do |error| + @config.logger.info("[LDClient] Error subscribing to stream API: #{error}") + set_disconnected + end + source.inactivity_timeout = 0 + source.start + source + end + + def process_message(message, method) + message = JSON.parse(message, symbolize_names: true) + if method == PUT + @store.init(message) + elsif method == PATCH + @store.upsert(message[:path][1..-1], message[:data]) + elsif method == DELETE + @store.delete(message[:path][1..-1], message[:version]) + else + @config.logger.error("[LDClient] Unknown message received: #{method}") + end + set_connected + end + + def set_disconnected @disconnected.set(Time.now) end - def set_connected() + def set_connected @disconnected.set(nil) end - def should_fallback_update() + def should_fallback_update disc = @disconnected.get - disc != nil and disc < (Time.now - 120) + !disc.nil? && disc < (Time.now - 120) end # TODO mark private methods - private :set_connected, :set_disconnected, :start_reactor - + private :boot_event_manager, :process_message, :set_connected, :set_disconnected, :start_reactor end - -end \ No newline at end of file +end