lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc6 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc7
- old
+ new
@@ -158,10 +158,19 @@
log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead"
@storage_class = "REDUCED_REDUNDANCY"
end
@s3_object_key_format = process_s3_object_key_format
+ if !@check_object
+ if conf.has_key?('s3_object_key_format')
+ log.warn "Set 'check_object false' and s3_object_key_format is specified. Check s3_object_key_format is unique in each write. If not, existing file will be overwritten."
+ else
+ log.warn "Set 'check_object false' and s3_object_key_format is not specified. Use '%{path}/%{date_slice}_%{hms_slice}.%{file_extension}' for s3_object_key_format"
+ @s3_object_key_format = "%{path}/%{date_slice}_%{hms_slice}.%{file_extension}"
+ end
+ end
+
# For backward compatibility
# TODO: Remove time_slice_format when end of support compat_parameters
@configured_time_slice_format = conf['time_slice_format']
@values_for_s3_object_chunk = {}
@time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))
@@ -190,14 +199,10 @@
@bucket = @s3.bucket(@s3_bucket)
check_apikeys if @check_apikey_on_start
ensure_bucket if @check_bucket
- if !@check_object
- @s3_object_key_format = "%{path}/%{date_slice}_%{hms_slice}.%{file_extension}"
- end
-
super
end
def format(tag, time, record)
r = inject_values_to_record(tag, time, record)
@@ -251,11 +256,11 @@
@values_for_s3_object_chunk[chunk.unique_id] ||= {
"%{hex_random}" => hex_random(chunk),
}
values_for_s3_object_key = {
"%{path}" => @path,
- "%{time_slice}" => time_slice,
+ "%{date_slice}" => time_slice,
"%{file_extension}" => @compressor.ext,
"%{hms_slice}" => hms_slicer,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
values_for_s3_object_key["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled
@@ -266,11 +271,11 @@
tmp = Tempfile.new("s3-")
tmp.binmode
begin
@compressor.compress(chunk, tmp)
tmp.rewind
- log.debug { "out_s3: write chunk: {key:#{chunk.key},tsuffix:#{tsuffix(chunk)}} to s3://#{@s3_bucket}/#{s3path}" }
+ log.debug "out_s3: write chunk with metadata #{chunk.metadata} to s3://#{@s3_bucket}/#{s3path}"
put_options = {
body: tmp,
content_type: @compressor.content_type,
storage_class: @storage_class,
@@ -292,10 +297,10 @@
@values_for_s3_object_chunk.delete(chunk.unique_id)
if @warn_for_delay
if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay
- log.warn { "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" }
+ log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}"
end
end
ensure
tmp.close(true) rescue nil
end