lib/fluent/buffer.rb in fluentd-0.12.0.pre.1 vs lib/fluent/buffer.rb in fluentd-0.12.0.pre.2
- old
+ new
@@ -42,24 +42,29 @@
end
def before_shutdown(out)
end
- #def emit(key, data, chain)
- #end
+ def emit(key, data, chain)
+ raise NotImplementedError, "Implement this method in child class"
+ end
- #def keys
- #end
+ def keys
+ raise NotImplementedError, "Implement this method in child class"
+ end
- #def push(key)
- #end
+ def push(key)
+ raise NotImplementedError, "Implement this method in child class"
+ end
- #def pop(out)
- #end
+ def pop(out)
+ raise NotImplementedError, "Implement this method in child class"
+ end
- #def clear!
- #end
+ def clear!
+ raise NotImplementedError, "Implement this method in child class"
+ end
end
class BufferChunk
include MonitorMixin
@@ -69,31 +74,37 @@
@key = key
end
attr_reader :key
- #def <<(data)
- #end
+ def <<(data)
+ raise NotImplementedError, "Implement this method in child class"
+ end
- #def size
- #end
+ def size
+ raise NotImplementedError, "Implement this method in child class"
+ end
def empty?
size == 0
end
- #def close
- #end
+ def close
+ raise NotImplementedError, "Implement this method in child class"
+ end
- #def purge
- #end
+ def purge
+ raise NotImplementedError, "Implement this method in child class"
+ end
- #def read
- #end
+ def read
+ raise NotImplementedError, "Implement this method in child class"
+ end
- #def open
- #end
+ def open
+ raise NotImplementedError, "Implement this method in child class"
+ end
def write_to(io)
open {|i|
FileUtils.copy_stream(i, io)
}
@@ -114,10 +125,12 @@
class BasicBuffer < Buffer
include MonitorMixin
def initialize
super
+ @map = nil # chunks to store data
+ @queue = nil # chunks to be flushed
@parallel_pop = true
end
def enable_parallel(b=true)
@parallel_pop = b
@@ -162,49 +175,52 @@
def emit(key, data, chain)
key = key.to_s
synchronize do
- top = (@map[key] ||= new_chunk(key)) # TODO generate unique chunk id
+ # chunk unique id is generated in #new_chunk
+ chunk = (@map[key] ||= new_chunk(key))
- if storable?(top, data)
+ if storable?(chunk, data)
chain.next
- top << data
+ chunk << data
return false
- ## FIXME
- #elsif data.bytesize > @buffer_chunk_limit
- # # TODO
- # raise BufferChunkLimitError, "received data too large"
-
elsif @queue.size >= @buffer_queue_limit
raise BufferQueueLimitError, "queue size exceeds limit"
end
if data.bytesize > @buffer_chunk_limit
$log.warn "Size of the emitted data exceeds buffer_chunk_limit."
$log.warn "This may occur problems in the output plugins ``at this server.``"
$log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit"
$log.warn "in the forward output ``at the log forwarding server.``"
+ ### TODO
+ # raise BufferChunkLimitError, "received data too large"
end
- nc = new_chunk(key) # TODO generate unique chunk id
+ # chunk unique id is generated in #new_chunk
+ nc = new_chunk(key)
ok = false
begin
nc << data
chain.next
flush_trigger = false
@queue.synchronize {
- enqueue(top)
+ enqueue(chunk) # this is buffer enqueue *hook*
flush_trigger = @queue.empty?
- @queue << top
+ @queue << chunk # actual enqueue
@map[key] = nc
}
ok = true
+ # false: queue have 1 or more chunks before this emit
+ # so this enqueue is not a trigger to flush
+ # true: queue have no chunks before this emit
+ # so this enqueue is a trigger to flush this buffer ASAP
return flush_trigger
ensure
nc.purge unless ok
end
@@ -228,36 +244,44 @@
total += c.size
}
total
end
- #def new_chunk(key)
- #end
+ def new_chunk(key)
+ raise NotImplementedError, "Implement this method in child class"
+ end
- #def resume
- #end
+ def resume
+ raise NotImplementedError, "Implement this method in child class"
+ end
- #def enqueue(chunk)
- #end
+ # enqueueing is done by #push
+ # this method is actually 'enqueue_hook'
+ def enqueue(chunk)
+ raise NotImplementedError, "Implement this method in child class"
+ end
+ # get the chunk specified by key, and push it into queue
def push(key)
synchronize do
- top = @map[key]
- if !top || top.empty?
+ chunk = @map[key]
+ if !chunk || chunk.empty?
return false
end
@queue.synchronize do
- enqueue(top)
- @queue << top
+ enqueue(chunk)
+ @queue << chunk
@map.delete(key)
end
return true
end # synchronize
end
+ # shift a chunk from queue, write and purge it
+ # returns boolean to indicate whether this buffer have more chunk to be flushed or not
def pop(out)
chunk = nil
@queue.synchronize do
if @parallel_pop
chunk = @queue.find {|c| c.try_mon_enter }
@@ -268,24 +292,27 @@
return false unless chunk.try_mon_enter
end
end
begin
+ # #push(key) does not push empty chunks into queue.
+ # so this check is nonsense...
if !chunk.empty?
write_chunk(chunk, out)
end
- empty = false
+ queue_empty = false
@queue.synchronize do
@queue.delete_if {|c|
c.object_id == chunk.object_id
}
- empty = @queue.empty?
+ queue_empty = @queue.empty?
end
chunk.purge
- return !empty
+ # return to be flushed once more immediately, or not
+ return !queue_empty
ensure
chunk.mon_exit
end
end