lib/fluent/buffer.rb in fluentd-0.12.18 vs lib/fluent/buffer.rb in fluentd-0.12.19
- old
+ new
@@ -141,18 +141,24 @@
# should override buffer_chunk_limit with a larger size.
desc 'The size of each buffer chunk.'
config_param :buffer_chunk_limit, :size, :default => 8*1024*1024
desc 'The length limit of the chunk queue.'
config_param :buffer_queue_limit, :integer, :default => 256
+ desc 'The action when the size of buffer queue exceeds the buffer_queue_limit.'
+ config_param :buffer_queue_full_action, :enum, :list => [:exception, :block], :default => :exception
alias chunk_limit buffer_chunk_limit
alias chunk_limit= buffer_chunk_limit=
alias queue_limit buffer_queue_limit
alias queue_limit= buffer_queue_limit=
def configure(conf)
super
+
+ if @buffer_queue_full_action == :block
+ $log.warn "'block' action stops input process until the buffer full is resolved. Check your pipeline this action is fit or not"
+ end
end
def start
@queue, @map = resume
@queue.extend(MonitorMixin)
@@ -177,19 +183,31 @@
def emit(key, data, chain)
key = key.to_s
synchronize do
- # chunk unique id is generated in #new_chunk
- chunk = (@map[key] ||= new_chunk(key))
+ begin
+ # chunk unique id is generated in #new_chunk
+ chunk = (@map[key] ||= new_chunk(key))
- if storable?(chunk, data)
- chain.next
- chunk << data
- return false
+ if storable?(chunk, data)
+ chain.next
+ chunk << data
+ return false
- elsif @queue.size >= @buffer_queue_limit
- raise BufferQueueLimitError, "queue size exceeds limit"
+ elsif @queue.size >= @buffer_queue_limit
+ raise BufferQueueLimitError, "queue size exceeds limit"
+ end
+ rescue BufferQueueLimitError => e
+ case @buffer_queue_full_action
+ when :exception
+ raise e
+ when :block
+ # This is rough implementation. New Buffer API should improve this routine by using wait/signal
+ $log.debug "buffer queue is full. Wait 1 second to re-emit events"
+ sleep 1
+ retry
+ end
end
if data.bytesize > @buffer_chunk_limit
$log.warn "Size of the emitted data exceeds buffer_chunk_limit."
$log.warn "This may occur problems in the output plugins ``at this server.``"