lib/fluent/buffer.rb in fluentd-0.12.18 vs lib/fluent/buffer.rb in fluentd-0.12.19

- old
+ new

@@ -141,18 +141,24 @@ # should override buffer_chunk_limit with a larger size. desc 'The size of each buffer chunk.' config_param :buffer_chunk_limit, :size, :default => 8*1024*1024 desc 'The length limit of the chunk queue.' config_param :buffer_queue_limit, :integer, :default => 256 + desc 'The action when the size of buffer queue exceeds the buffer_queue_limit.' + config_param :buffer_queue_full_action, :enum, :list => [:exception, :block], :default => :exception alias chunk_limit buffer_chunk_limit alias chunk_limit= buffer_chunk_limit= alias queue_limit buffer_queue_limit alias queue_limit= buffer_queue_limit= def configure(conf) super + + if @buffer_queue_full_action == :block + $log.warn "'block' action stops input process until the buffer full is resolved. Check your pipeline this action is fit or not" + end end def start @queue, @map = resume @queue.extend(MonitorMixin) @@ -177,19 +183,31 @@ def emit(key, data, chain) key = key.to_s synchronize do - # chunk unique id is generated in #new_chunk - chunk = (@map[key] ||= new_chunk(key)) + begin + # chunk unique id is generated in #new_chunk + chunk = (@map[key] ||= new_chunk(key)) - if storable?(chunk, data) - chain.next - chunk << data - return false + if storable?(chunk, data) + chain.next + chunk << data + return false - elsif @queue.size >= @buffer_queue_limit - raise BufferQueueLimitError, "queue size exceeds limit" + elsif @queue.size >= @buffer_queue_limit + raise BufferQueueLimitError, "queue size exceeds limit" + end + rescue BufferQueueLimitError => e + case @buffer_queue_full_action + when :exception + raise e + when :block + # This is rough implementation. New Buffer API should improve this routine by using wait/signal + $log.debug "buffer queue is full. Wait 1 second to re-emit events" + sleep 1 + retry + end end if data.bytesize > @buffer_chunk_limit $log.warn "Size of the emitted data exceeds buffer_chunk_limit." $log.warn "This may occur problems in the output plugins ``at this server.``"