# # Fluentd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require 'fluent/plugin/buffer' require 'fluent/plugin/compressable' require 'fluent/unique_id' require 'fluent/event' require 'fluent/ext_monitor_require' require 'tempfile' require 'zlib' module Fluent module Plugin class Buffer # fluent/plugin/buffer is already loaded class Chunk include MonitorMixin include UniqueId::Mixin # Chunks has 2 part: # * metadata: contains metadata which should be restored after resume (if possible) # v: {key=>value,key=>value,...} (optional) # t: tag as string (optional) # k: time slice key (optional) # # id: unique_id of chunk (*) # s: size (number of events in chunk) (*) # c: created_at as unix time (*) # m: modified_at as unix time (*) # (*): fields automatically injected by chunk itself # * data: binary data, combined records represented as String, maybe compressed # NOTE: keys of metadata are named with a single letter # to decread bytesize of metadata I/O # TODO: CompressedPackedMessage of forward protocol? def initialize(metadata, compress: :text) super() @unique_id = generate_unique_id @metadata = metadata # state: unstaged/staged/queued/closed @state = :unstaged @size = 0 @created_at = Fluent::Clock.real_now @modified_at = Fluent::Clock.real_now extend Decompressable if compress == :gzip end attr_reader :unique_id, :metadata, :state def raw_create_at @created_at end def raw_modified_at @modified_at end # for compatibility def created_at @created_at_object ||= Time.at(@created_at) end # for compatibility def modified_at @modified_at_object ||= Time.at(@modified_at) end # data is array of formatted record string def append(data, **kwargs) raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip adding = ''.b data.each do |d| adding << d.b end concat(adding, data.size) end # for event streams which is packed or zipped (and we want not to unpack/uncompress) def concat(bulk, records) raise NotImplementedError, "Implement this method in child class" end def commit raise NotImplementedError, "Implement this method in child class" end def rollback raise NotImplementedError, "Implement this method in child class" end def bytesize raise NotImplementedError, "Implement this method in child class" end def size raise NotImplementedError, "Implement this method in child class" end alias :length :size def empty? size == 0 end def writable? @state == :staged || @state == :unstaged end def unstaged? @state == :unstaged end def staged? @state == :staged end def queued? @state == :queued end def closed? @state == :closed end def staged! @state = :staged self end def unstaged! @state = :unstaged self end def enqueued! @state = :queued self end def close @state = :closed self end def purge @state = :closed self end def read(**kwargs) raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip raise NotImplementedError, "Implement this method in child class" end def open(**kwargs, &block) raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip raise NotImplementedError, "Implement this method in child class" end def write_to(io, **kwargs) raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip open do |i| IO.copy_stream(i, io) end end module Decompressable include Fluent::Plugin::Compressable def append(data, **kwargs) if kwargs[:compress] == :gzip io = StringIO.new Zlib::GzipWriter.wrap(io) do |gz| data.each do |d| gz.write d end end concat(io.string, data.size) else super end end def open(**kwargs, &block) if kwargs[:compressed] == :gzip super else super(**kwargs) do |chunk_io| output_io = if chunk_io.is_a?(StringIO) StringIO.new else Tempfile.new('decompressed-data') end output_io.binmode if output_io.is_a?(Tempfile) decompress(input_io: chunk_io, output_io: output_io) output_io.seek(0, IO::SEEK_SET) yield output_io end end end def read(**kwargs) if kwargs[:compressed] == :gzip super else decompress(super) end end def write_to(io, **kwargs) open(compressed: :gzip) do |chunk_io| if kwargs[:compressed] == :gzip IO.copy_stream(chunk_io, io) else decompress(input_io: chunk_io, output_io: io) end end end end end end end end