lib/fluent/buffer.rb in fluentd-0.12.0.pre.1 vs lib/fluent/buffer.rb in fluentd-0.12.0.pre.2

- old
+ new

@@ -42,24 +42,29 @@ end def before_shutdown(out) end - #def emit(key, data, chain) - #end + def emit(key, data, chain) + raise NotImplementedError, "Implement this method in child class" + end - #def keys - #end + def keys + raise NotImplementedError, "Implement this method in child class" + end - #def push(key) - #end + def push(key) + raise NotImplementedError, "Implement this method in child class" + end - #def pop(out) - #end + def pop(out) + raise NotImplementedError, "Implement this method in child class" + end - #def clear! - #end + def clear! + raise NotImplementedError, "Implement this method in child class" + end end class BufferChunk include MonitorMixin @@ -69,31 +74,37 @@ @key = key end attr_reader :key - #def <<(data) - #end + def <<(data) + raise NotImplementedError, "Implement this method in child class" + end - #def size - #end + def size + raise NotImplementedError, "Implement this method in child class" + end def empty? size == 0 end - #def close - #end + def close + raise NotImplementedError, "Implement this method in child class" + end - #def purge - #end + def purge + raise NotImplementedError, "Implement this method in child class" + end - #def read - #end + def read + raise NotImplementedError, "Implement this method in child class" + end - #def open - #end + def open + raise NotImplementedError, "Implement this method in child class" + end def write_to(io) open {|i| FileUtils.copy_stream(i, io) } @@ -114,10 +125,12 @@ class BasicBuffer < Buffer include MonitorMixin def initialize super + @map = nil # chunks to store data + @queue = nil # chunks to be flushed @parallel_pop = true end def enable_parallel(b=true) @parallel_pop = b @@ -162,49 +175,52 @@ def emit(key, data, chain) key = key.to_s synchronize do - top = (@map[key] ||= new_chunk(key)) # TODO generate unique chunk id + # chunk unique id is generated in #new_chunk + chunk = (@map[key] ||= new_chunk(key)) - if storable?(top, data) + if storable?(chunk, data) chain.next - top << data + chunk << data return false - ## FIXME - #elsif data.bytesize > @buffer_chunk_limit - # # TODO - # raise BufferChunkLimitError, "received data too large" - elsif @queue.size >= @buffer_queue_limit raise BufferQueueLimitError, "queue size exceeds limit" end if data.bytesize > @buffer_chunk_limit $log.warn "Size of the emitted data exceeds buffer_chunk_limit." $log.warn "This may occur problems in the output plugins ``at this server.``" $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit" $log.warn "in the forward output ``at the log forwarding server.``" + ### TODO + # raise BufferChunkLimitError, "received data too large" end - nc = new_chunk(key) # TODO generate unique chunk id + # chunk unique id is generated in #new_chunk + nc = new_chunk(key) ok = false begin nc << data chain.next flush_trigger = false @queue.synchronize { - enqueue(top) + enqueue(chunk) # this is buffer enqueue *hook* flush_trigger = @queue.empty? - @queue << top + @queue << chunk # actual enqueue @map[key] = nc } ok = true + # false: queue have 1 or more chunks before this emit + # so this enqueue is not a trigger to flush + # true: queue have no chunks before this emit + # so this enqueue is a trigger to flush this buffer ASAP return flush_trigger ensure nc.purge unless ok end @@ -228,36 +244,44 @@ total += c.size } total end - #def new_chunk(key) - #end + def new_chunk(key) + raise NotImplementedError, "Implement this method in child class" + end - #def resume - #end + def resume + raise NotImplementedError, "Implement this method in child class" + end - #def enqueue(chunk) - #end + # enqueueing is done by #push + # this method is actually 'enqueue_hook' + def enqueue(chunk) + raise NotImplementedError, "Implement this method in child class" + end + # get the chunk specified by key, and push it into queue def push(key) synchronize do - top = @map[key] - if !top || top.empty? + chunk = @map[key] + if !chunk || chunk.empty? return false end @queue.synchronize do - enqueue(top) - @queue << top + enqueue(chunk) + @queue << chunk @map.delete(key) end return true end # synchronize end + # shift a chunk from queue, write and purge it + # returns boolean to indicate whether this buffer have more chunk to be flushed or not def pop(out) chunk = nil @queue.synchronize do if @parallel_pop chunk = @queue.find {|c| c.try_mon_enter } @@ -268,24 +292,27 @@ return false unless chunk.try_mon_enter end end begin + # #push(key) does not push empty chunks into queue. + # so this check is nonsense... if !chunk.empty? write_chunk(chunk, out) end - empty = false + queue_empty = false @queue.synchronize do @queue.delete_if {|c| c.object_id == chunk.object_id } - empty = @queue.empty? + queue_empty = @queue.empty? end chunk.purge - return !empty + # return to be flushed once more immediately, or not + return !queue_empty ensure chunk.mon_exit end end