lib/logstash/outputs/s3/file_repository.rb in logstash-output-s3-4.3.6 vs lib/logstash/outputs/s3/file_repository.rb in logstash-output-s3-4.3.7
- old
+ new
@@ -1,8 +1,8 @@
# encoding: utf-8
require "java"
-require "concurrent"
+require "concurrent/map"
require "concurrent/timer_task"
require "logstash/util"
module LogStash
module Outputs
@@ -37,50 +37,52 @@
with_lock{ |factory| factory.current.delete! }
end
end
class FactoryInitializer
- include java.util.function.Function
+
def initialize(tags, encoding, temporary_directory, stale_time)
@tags = tags
@encoding = encoding
@temporary_directory = temporary_directory
@stale_time = stale_time
end
- def apply(prefix_key)
+ def create_value(prefix_key)
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)
end
+
end
def initialize(tags, encoding, temporary_directory,
stale_time = DEFAULT_STALE_TIME_SECS,
sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
# The path need to contains the prefix so when we start
# logtash after a crash we keep the remote structure
- @prefixed_factories = java.util.concurrent.ConcurrentHashMap.new
+ @prefixed_factories = Concurrent::Map.new
@sweeper_interval = sweeper_interval
@factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)
start_stale_sweeper
end
def keys
- @prefixed_factories.keySet
+ @prefixed_factories.keys
end
def each_files
- @prefixed_factories.elements.each do |prefixed_file|
+ @prefixed_factories.values.each do |prefixed_file|
prefixed_file.with_lock { |factory| yield factory.current }
end
end
# Return the file factory
def get_factory(prefix_key)
- @prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory }
+ prefix_val = @prefixed_factories.fetch_or_store(prefix_key) { @factory_initializer.create_value(prefix_key) }
+ prefix_val.with_lock { |factory| yield factory }
end
def get_file(prefix_key)
get_factory(prefix_key) { |factory| yield factory.current }
end
@@ -93,19 +95,19 @@
@prefixed_factories.size
end
def remove_stale(k, v)
if v.stale?
- @prefixed_factories.remove(k, v)
+ @prefixed_factories.delete_pair(k, v)
v.delete!
end
end
def start_stale_sweeper
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
LogStash::Util.set_thread_name("S3, Stale factory sweeper")
- @prefixed_factories.forEach{|k,v| remove_stale(k,v)}
+ @prefixed_factories.each { |k, v| remove_stale(k,v) }
end
@stale_sweeper.execute
end