Sha256: 0e05febb1a8502fbebcc721f82a811ef0a4e5393d5070473fb4a59184a80cc42

Contents?: true

Size: 1.94 KB

Versions: 9

Compression:

Stored size: 1.94 KB

Contents

#
# Fluent
#
# Copyright (C) 2011 FURUHASHI Sadayuki
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
module Fluent


class MemoryBufferChunk < BufferChunk
  def initialize(key, data='')
    @data = data
    @data.force_encoding('ASCII-8BIT')
    now = Time.now.utc
    u1 = ((now.to_i*1000*1000+now.usec) << 12 | rand(0xfff))
    @unique_id = [u1 >> 32, u1 & u1 & 0xffffffff, rand(0xffffffff), rand(0xffffffff)].pack('NNNN')
    super(key)
  end

  attr_reader :unique_id

  def <<(data)
    data.force_encoding('ASCII-8BIT')
    @data << data
  end

  def size
    @data.bytesize
  end

  def close
  end

  def purge
  end

  def read
    @data
  end

  def open(&block)
    StringIO.open(@data, &block)
  end

  # optimize
  def write_to(io)
    io.write @data
  end

  # optimize
  def msgpack_each(&block)
    u = MessagePack::Unpacker.new
    u.feed_each(@data, &block)
  end
end


class MemoryBuffer < BasicBuffer
  Plugin.register_buffer('memory', self)

  def initialize
    super
  end

  # Overwrite default BasicBuffer#buffer_queue_limit
  # to limit total memory usage upto 512MB.
  config_set_default :buffer_queue_limit, 64

  def configure(conf)
    super
  end

  def before_shutdown(out)
    synchronize do
      @map.each_key {|key|
        push(key)
      }
      while pop(out)
      end
    end
  end

  def new_chunk(key)
    MemoryBufferChunk.new(key)
  end

  def resume
    return [], {}
  end

  def enqueue(chunk)
  end
end


end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
fluentd-0.10.35 lib/fluent/plugin/buf_memory.rb
fluentd-0.10.34 lib/fluent/plugin/buf_memory.rb
fluentd-0.10.33 lib/fluent/plugin/buf_memory.rb
fluentd-0.10.32 lib/fluent/plugin/buf_memory.rb
fluentd-0.10.31 lib/fluent/plugin/buf_memory.rb
fluentd-0.10.30 lib/fluent/plugin/buf_memory.rb
fluentd-0.10.29 lib/fluent/plugin/buf_memory.rb
fluentd-0.10.28 lib/fluent/plugin/buf_memory.rb
fluentd-0.10.27 lib/fluent/plugin/buf_memory.rb