lib/logstash/outputs/s3/file_repository.rb in logstash-output-s3-4.4.0 vs lib/logstash/outputs/s3/file_repository.rb in logstash-output-s3-4.4.1

- old
+ new

@@ -13,12 +13,13 @@ # Ensure that all access or work done # on a factory is threadsafe class PrefixedValue def initialize(file_factory, stale_time) @file_factory = file_factory - @lock = Mutex.new + @lock = Monitor.new @stale_time = stale_time + @is_deleted = false end def with_lock @lock.synchronize { yield @file_factory @@ -32,12 +33,19 @@ def apply(prefix) return self end def delete! - with_lock{ |factory| factory.current.delete! } + with_lock do |factory| + factory.current.delete! + @is_deleted = true + end end + + def deleted? + with_lock { |_| @is_deleted } + end end class FactoryInitializer def initialize(tags, encoding, temporary_directory, stale_time) @@ -70,21 +78,56 @@ def keys @prefixed_factories.keys end def each_files - @prefixed_factories.values.each do |prefixed_file| - prefixed_file.with_lock { |factory| yield factory.current } + each_factory(keys) do |factory| + yield factory.current end end - # Return the file factory + ## + # Yields the file factory while the current thread has exclusive access to it, creating a new + # one if one does not exist or if the current one is being reaped by the stale watcher. + # @param prefix_key [String]: the prefix key + # @yieldparam factory [TemporaryFileFactory]: a temporary file factory that this thread has exclusive access to + # @yieldreturn [Object]: a value to return; should NOT be the factory, which should be contained by the exclusive access scope. + # @return [Object]: the value returned by the provided block def get_factory(prefix_key) - prefix_val = @prefixed_factories.fetch_or_store(prefix_key) { @factory_initializer.create_value(prefix_key) } + + # fast-path: if factory exists and is not deleted, yield it with exclusive access and return + prefix_val = @prefixed_factories.get(prefix_key) + prefix_val&.with_lock do |factory| + # intentional local-jump to ensure deletion detection + # is done inside the exclusive access. + return yield(factory) unless prefix_val.deleted? + end + + # slow-path: + # the Concurrent::Map#get operation is lock-free, but may have returned an entry that was being deleted by + # another thread (such as via stale detection). If we failed to retrieve a value, or retrieved one that had + # been marked deleted, use the atomic Concurrent::Map#compute to retrieve a non-deleted entry. + prefix_val = @prefixed_factories.compute(prefix_key) do |existing| + existing && !existing.deleted? ? existing : @factory_initializer.create_value(prefix_key) + end prefix_val.with_lock { |factory| yield factory } end + ## + # Yields each non-deleted file factory while the current thread has exclusive access to it. + # @param prefixes [Array<String>]: the prefix keys + # @yieldparam factory [TemporaryFileFactory] + # @return [void] + def each_factory(prefixes) + prefixes.each do |prefix_key| + prefix_val = @prefixed_factories.get(prefix_key) + prefix_val&.with_lock do |factory| + yield factory unless prefix_val.deleted? + end + end + end + def get_file(prefix_key) get_factory(prefix_key) { |factory| yield factory.current } end def shutdown @@ -93,21 +136,34 @@ def size @prefixed_factories.size end - def remove_stale(k, v) - if v.stale? - @prefixed_factories.delete_pair(k, v) - v.delete! + def remove_if_stale(prefix_key) + # we use the ATOMIC `Concurrent::Map#compute_if_present` to atomically + # detect the staleness, mark a stale prefixed factory as deleted, and delete from the map. + @prefixed_factories.compute_if_present(prefix_key) do |prefixed_factory| + # once we have retrieved an instance, we acquire exclusive access to it + # for stale detection, marking it as deleted before releasing the lock + # and causing it to become deleted from the map. + prefixed_factory.with_lock do |_| + if prefixed_factory.stale? + prefixed_factory.delete! # mark deleted to prevent reuse + nil # cause deletion + else + prefixed_factory # keep existing + end + end end end def start_stale_sweeper @stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do LogStash::Util.set_thread_name("S3, Stale factory sweeper") - @prefixed_factories.each { |k, v| remove_stale(k,v) } + @prefixed_factories.keys.each do |prefix| + remove_if_stale(prefix) + end end @stale_sweeper.execute end