lib/fluent/buffer.rb in fluentd-0.10.35 vs lib/fluent/buffer.rb in fluentd-0.10.36

- old
+ new

@@ -14,290 +14,286 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # module Fluent + class BufferError < StandardError + end + class BufferChunkLimitError < BufferError + end -class BufferError < StandardError -end + class BufferQueueLimitError < BufferError + end -class BufferChunkLimitError < BufferError -end -class BufferQueueLimitError < BufferError -end + class Buffer + include Configurable + def initialize + super + end -class Buffer - include Configurable + def configure(conf) + super + end - def initialize - super - end + def start + end - def configure(conf) - super - end + def shutdown + end - def start - end + def before_shutdown(out) + end - def shutdown - end + #def emit(key, data, chain) + #end - def before_shutdown(out) - end + #def keys + #end - #def emit(key, data, chain) - #end + #def push(key) + #end - #def keys - #end + #def pop(out) + #end - #def push(key) - #end + #def clear! + #end + end - #def pop(out) - #end - #def clear! - #end -end + class BufferChunk + include MonitorMixin + def initialize(key) + super() + @key = key + end -class BufferChunk - include MonitorMixin + attr_reader :key - def initialize(key) - super() - @key = key - end + #def <<(data) + #end - attr_reader :key + #def size + #end - #def <<(data) - #end + def empty? + size == 0 + end - #def size - #end + #def close + #end - def empty? - size == 0 - end + #def purge + #end - #def close - #end + #def read + #end - #def purge - #end + #def open + #end - #def read - #end + def write_to(io) + open {|i| + FileUtils.copy_stream(i, io) + } + end - #def open - #end - - def write_to(io) - open {|i| - FileUtils.copy_stream(i, io) - } + def msgpack_each(&block) + open {|io| + u = MessagePack::Unpacker.new(io) + begin + u.each(&block) + rescue EOFError + end + } + end end - def msgpack_each(&block) - open {|io| - u = MessagePack::Unpacker.new(io) - begin - u.each(&block) - rescue EOFError - end - } - end -end + class BasicBuffer < Buffer + include MonitorMixin -class BasicBuffer < Buffer - include MonitorMixin + def initialize + super + @parallel_pop = true + end - def initialize - super - @parallel_pop = true - end + def enable_parallel(b=true) + @parallel_pop = b + end - def enable_parallel(b=true) - @parallel_pop = b - end + # This configuration assumes plugins to send records to a remote server. + # Local file based plugins which should provide more reliability and efficiency + # should override buffer_chunk_limit with a larger size. + config_param :buffer_chunk_limit, :size, :default => 8*1024*1024 + config_param :buffer_queue_limit, :integer, :default => 256 - # This configuration assumes plugins to send records to a remote server. - # Local file based plugins which should provide more reliability and efficiency - # should override buffer_chunk_limit with a larger size. - config_param :buffer_chunk_limit, :size, :default => 8*1024*1024 - config_param :buffer_queue_limit, :integer, :default => 256 + alias chunk_limit buffer_chunk_limit + alias chunk_limit= buffer_chunk_limit= + alias queue_limit buffer_queue_limit + alias queue_limit= buffer_queue_limit= - alias chunk_limit buffer_chunk_limit - alias chunk_limit= buffer_chunk_limit= - alias queue_limit buffer_queue_limit - alias queue_limit= buffer_queue_limit= + def configure(conf) + super + end - def configure(conf) - super - end + def start + @queue, @map = resume + @queue.extend(MonitorMixin) + end - def start - @queue, @map = resume - @queue.extend(MonitorMixin) - end - - def shutdown - synchronize do - @queue.synchronize do - until @queue.empty? - @queue.shift.close + def shutdown + synchronize do + @queue.synchronize do + until @queue.empty? + @queue.shift.close + end end + @map.each_pair {|key,chunk| + chunk.close + } end - @map.each_pair {|key,chunk| - chunk.close - } end - end - def emit(key, data, chain) - key = key.to_s + def emit(key, data, chain) + key = key.to_s - synchronize do - top = (@map[key] ||= new_chunk(key)) # TODO generate unique chunk id + synchronize do + top = (@map[key] ||= new_chunk(key)) # TODO generate unique chunk id - if top.size + data.bytesize <= @buffer_chunk_limit - chain.next - top << data - return false + if top.size + data.bytesize <= @buffer_chunk_limit + chain.next + top << data + return false - ## FIXME - #elsif data.bytesize > @buffer_chunk_limit - # # TODO - # raise BufferChunkLimitError, "received data too large" + ## FIXME + #elsif data.bytesize > @buffer_chunk_limit + # # TODO + # raise BufferChunkLimitError, "received data too large" - elsif @queue.size >= @buffer_queue_limit - raise BufferQueueLimitError, "queue size exceeds limit" - end + elsif @queue.size >= @buffer_queue_limit + raise BufferQueueLimitError, "queue size exceeds limit" + end - if data.bytesize > @buffer_chunk_limit - $log.warn "Size of the emitted data exceeds buffer_chunk_limit." - $log.warn "This may occur problems in the output plugins ``at this server.``" - $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit" - $log.warn "in the forward output ``at the log forwarding server.``" - end + if data.bytesize > @buffer_chunk_limit + $log.warn "Size of the emitted data exceeds buffer_chunk_limit." + $log.warn "This may occur problems in the output plugins ``at this server.``" + $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit" + $log.warn "in the forward output ``at the log forwarding server.``" + end - nc = new_chunk(key) # TODO generate unique chunk id - ok = false + nc = new_chunk(key) # TODO generate unique chunk id + ok = false - begin - nc << data - chain.next + begin + nc << data + chain.next - flush_trigger = false - @queue.synchronize { - enqueue(top) - flush_trigger = @queue.empty? - @queue << top - @map[key] = nc - } + flush_trigger = false + @queue.synchronize { + enqueue(top) + flush_trigger = @queue.empty? + @queue << top + @map[key] = nc + } - ok = true - return flush_trigger - ensure - nc.purge unless ok - end + ok = true + return flush_trigger + ensure + nc.purge unless ok + end - end # synchronize - end + end # synchronize + end - def keys - @map.keys - end + def keys + @map.keys + end - def queue_size - @queue.size - end + def queue_size + @queue.size + end - def total_queued_chunk_size - total = 0 - @map.each_value {|c| - total += c.size - } - @queue.each {|c| - total += c.size - } - total - end + def total_queued_chunk_size + total = 0 + @map.each_value {|c| + total += c.size + } + @queue.each {|c| + total += c.size + } + total + end - #def new_chunk(key) - #end + #def new_chunk(key) + #end - #def resume - #end + #def resume + #end - #def enqueue(chunk) - #end + #def enqueue(chunk) + #end - def push(key) - synchronize do - top = @map[key] - if !top || top.empty? - return false - end + def push(key) + synchronize do + top = @map[key] + if !top || top.empty? + return false + end + @queue.synchronize do + enqueue(top) + @queue << top + @map.delete(key) + end + + return true + end # synchronize + end + + def pop(out) + chunk = nil @queue.synchronize do - enqueue(top) - @queue << top - @map.delete(key) + if @parallel_pop + chunk = @queue.find {|c| c.try_mon_enter } + return false unless chunk + else + chunk = @queue.first + return false unless chunk + return false unless chunk.try_mon_enter + end end - return true - end # synchronize - end + begin + if !chunk.empty? + write_chunk(chunk, out) + end - def pop(out) - chunk = nil - @queue.synchronize do - if @parallel_pop - chunk = @queue.find {|c| c.try_mon_enter } - return false unless chunk - else - chunk = @queue.first - return false unless chunk - return false unless chunk.try_mon_enter + @queue.delete_if {|c| + c.object_id == chunk.object_id + } + + chunk.purge + + return !@queue.empty? + ensure + chunk.mon_exit end end - begin - if !chunk.empty? - write_chunk(chunk, out) - end + def write_chunk(chunk, out) + out.write(chunk) + end - @queue.delete_if {|c| - c.object_id == chunk.object_id + def clear! + @queue.delete_if {|chunk| + chunk.purge + true } - - chunk.purge - - return !@queue.empty? - ensure - chunk.mon_exit end end - - def write_chunk(chunk, out) - out.write(chunk) - end - - def clear! - @queue.delete_if {|chunk| - chunk.purge - true - } - end -end - - end