lib/logstash/outputs/s3/file_repository.rb in logstash-output-s3-4.3.6 vs lib/logstash/outputs/s3/file_repository.rb in logstash-output-s3-4.3.7

- old
+ new

@@ -1,8 +1,8 @@ # encoding: utf-8 require "java" -require "concurrent" +require "concurrent/map" require "concurrent/timer_task" require "logstash/util" module LogStash module Outputs @@ -37,50 +37,52 @@ with_lock{ |factory| factory.current.delete! } end end class FactoryInitializer - include java.util.function.Function + def initialize(tags, encoding, temporary_directory, stale_time) @tags = tags @encoding = encoding @temporary_directory = temporary_directory @stale_time = stale_time end - def apply(prefix_key) + def create_value(prefix_key) PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) end + end def initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) # The path need to contains the prefix so when we start # logtash after a crash we keep the remote structure - @prefixed_factories = java.util.concurrent.ConcurrentHashMap.new + @prefixed_factories = Concurrent::Map.new @sweeper_interval = sweeper_interval @factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time) start_stale_sweeper end def keys - @prefixed_factories.keySet + @prefixed_factories.keys end def each_files - @prefixed_factories.elements.each do |prefixed_file| + @prefixed_factories.values.each do |prefixed_file| prefixed_file.with_lock { |factory| yield factory.current } end end # Return the file factory def get_factory(prefix_key) - @prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory } + prefix_val = @prefixed_factories.fetch_or_store(prefix_key) { @factory_initializer.create_value(prefix_key) } + prefix_val.with_lock { |factory| yield factory } end def get_file(prefix_key) get_factory(prefix_key) { |factory| yield factory.current } end @@ -93,19 +95,19 @@ @prefixed_factories.size end def remove_stale(k, v) if v.stale? - @prefixed_factories.remove(k, v) + @prefixed_factories.delete_pair(k, v) v.delete! end end def start_stale_sweeper @stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do LogStash::Util.set_thread_name("S3, Stale factory sweeper") - @prefixed_factories.forEach{|k,v| remove_stale(k,v)} + @prefixed_factories.each { |k, v| remove_stale(k,v) } end @stale_sweeper.execute end