lib/fluent/buffer.rb in fluentd-0.10.35 vs lib/fluent/buffer.rb in fluentd-0.10.36
- old
+ new
@@ -14,290 +14,286 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Fluent
+ class BufferError < StandardError
+ end
+ class BufferChunkLimitError < BufferError
+ end
-class BufferError < StandardError
-end
+ class BufferQueueLimitError < BufferError
+ end
-class BufferChunkLimitError < BufferError
-end
-class BufferQueueLimitError < BufferError
-end
+ class Buffer
+ include Configurable
+ def initialize
+ super
+ end
-class Buffer
- include Configurable
+ def configure(conf)
+ super
+ end
- def initialize
- super
- end
+ def start
+ end
- def configure(conf)
- super
- end
+ def shutdown
+ end
- def start
- end
+ def before_shutdown(out)
+ end
- def shutdown
- end
+ #def emit(key, data, chain)
+ #end
- def before_shutdown(out)
- end
+ #def keys
+ #end
- #def emit(key, data, chain)
- #end
+ #def push(key)
+ #end
- #def keys
- #end
+ #def pop(out)
+ #end
- #def push(key)
- #end
+ #def clear!
+ #end
+ end
- #def pop(out)
- #end
- #def clear!
- #end
-end
+ class BufferChunk
+ include MonitorMixin
+ def initialize(key)
+ super()
+ @key = key
+ end
-class BufferChunk
- include MonitorMixin
+ attr_reader :key
- def initialize(key)
- super()
- @key = key
- end
+ #def <<(data)
+ #end
- attr_reader :key
+ #def size
+ #end
- #def <<(data)
- #end
+ def empty?
+ size == 0
+ end
- #def size
- #end
+ #def close
+ #end
- def empty?
- size == 0
- end
+ #def purge
+ #end
- #def close
- #end
+ #def read
+ #end
- #def purge
- #end
+ #def open
+ #end
- #def read
- #end
+ def write_to(io)
+ open {|i|
+ FileUtils.copy_stream(i, io)
+ }
+ end
- #def open
- #end
-
- def write_to(io)
- open {|i|
- FileUtils.copy_stream(i, io)
- }
+ def msgpack_each(&block)
+ open {|io|
+ u = MessagePack::Unpacker.new(io)
+ begin
+ u.each(&block)
+ rescue EOFError
+ end
+ }
+ end
end
- def msgpack_each(&block)
- open {|io|
- u = MessagePack::Unpacker.new(io)
- begin
- u.each(&block)
- rescue EOFError
- end
- }
- end
-end
+ class BasicBuffer < Buffer
+ include MonitorMixin
-class BasicBuffer < Buffer
- include MonitorMixin
+ def initialize
+ super
+ @parallel_pop = true
+ end
- def initialize
- super
- @parallel_pop = true
- end
+ def enable_parallel(b=true)
+ @parallel_pop = b
+ end
- def enable_parallel(b=true)
- @parallel_pop = b
- end
+ # This configuration assumes plugins to send records to a remote server.
+ # Local file based plugins which should provide more reliability and efficiency
+ # should override buffer_chunk_limit with a larger size.
+ config_param :buffer_chunk_limit, :size, :default => 8*1024*1024
+ config_param :buffer_queue_limit, :integer, :default => 256
- # This configuration assumes plugins to send records to a remote server.
- # Local file based plugins which should provide more reliability and efficiency
- # should override buffer_chunk_limit with a larger size.
- config_param :buffer_chunk_limit, :size, :default => 8*1024*1024
- config_param :buffer_queue_limit, :integer, :default => 256
+ alias chunk_limit buffer_chunk_limit
+ alias chunk_limit= buffer_chunk_limit=
+ alias queue_limit buffer_queue_limit
+ alias queue_limit= buffer_queue_limit=
- alias chunk_limit buffer_chunk_limit
- alias chunk_limit= buffer_chunk_limit=
- alias queue_limit buffer_queue_limit
- alias queue_limit= buffer_queue_limit=
+ def configure(conf)
+ super
+ end
- def configure(conf)
- super
- end
+ def start
+ @queue, @map = resume
+ @queue.extend(MonitorMixin)
+ end
- def start
- @queue, @map = resume
- @queue.extend(MonitorMixin)
- end
-
- def shutdown
- synchronize do
- @queue.synchronize do
- until @queue.empty?
- @queue.shift.close
+ def shutdown
+ synchronize do
+ @queue.synchronize do
+ until @queue.empty?
+ @queue.shift.close
+ end
end
+ @map.each_pair {|key,chunk|
+ chunk.close
+ }
end
- @map.each_pair {|key,chunk|
- chunk.close
- }
end
- end
- def emit(key, data, chain)
- key = key.to_s
+ def emit(key, data, chain)
+ key = key.to_s
- synchronize do
- top = (@map[key] ||= new_chunk(key)) # TODO generate unique chunk id
+ synchronize do
+ top = (@map[key] ||= new_chunk(key)) # TODO generate unique chunk id
- if top.size + data.bytesize <= @buffer_chunk_limit
- chain.next
- top << data
- return false
+ if top.size + data.bytesize <= @buffer_chunk_limit
+ chain.next
+ top << data
+ return false
- ## FIXME
- #elsif data.bytesize > @buffer_chunk_limit
- # # TODO
- # raise BufferChunkLimitError, "received data too large"
+ ## FIXME
+ #elsif data.bytesize > @buffer_chunk_limit
+ # # TODO
+ # raise BufferChunkLimitError, "received data too large"
- elsif @queue.size >= @buffer_queue_limit
- raise BufferQueueLimitError, "queue size exceeds limit"
- end
+ elsif @queue.size >= @buffer_queue_limit
+ raise BufferQueueLimitError, "queue size exceeds limit"
+ end
- if data.bytesize > @buffer_chunk_limit
- $log.warn "Size of the emitted data exceeds buffer_chunk_limit."
- $log.warn "This may occur problems in the output plugins ``at this server.``"
- $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit"
- $log.warn "in the forward output ``at the log forwarding server.``"
- end
+ if data.bytesize > @buffer_chunk_limit
+ $log.warn "Size of the emitted data exceeds buffer_chunk_limit."
+ $log.warn "This may occur problems in the output plugins ``at this server.``"
+ $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit"
+ $log.warn "in the forward output ``at the log forwarding server.``"
+ end
- nc = new_chunk(key) # TODO generate unique chunk id
- ok = false
+ nc = new_chunk(key) # TODO generate unique chunk id
+ ok = false
- begin
- nc << data
- chain.next
+ begin
+ nc << data
+ chain.next
- flush_trigger = false
- @queue.synchronize {
- enqueue(top)
- flush_trigger = @queue.empty?
- @queue << top
- @map[key] = nc
- }
+ flush_trigger = false
+ @queue.synchronize {
+ enqueue(top)
+ flush_trigger = @queue.empty?
+ @queue << top
+ @map[key] = nc
+ }
- ok = true
- return flush_trigger
- ensure
- nc.purge unless ok
- end
+ ok = true
+ return flush_trigger
+ ensure
+ nc.purge unless ok
+ end
- end # synchronize
- end
+ end # synchronize
+ end
- def keys
- @map.keys
- end
+ def keys
+ @map.keys
+ end
- def queue_size
- @queue.size
- end
+ def queue_size
+ @queue.size
+ end
- def total_queued_chunk_size
- total = 0
- @map.each_value {|c|
- total += c.size
- }
- @queue.each {|c|
- total += c.size
- }
- total
- end
+ def total_queued_chunk_size
+ total = 0
+ @map.each_value {|c|
+ total += c.size
+ }
+ @queue.each {|c|
+ total += c.size
+ }
+ total
+ end
- #def new_chunk(key)
- #end
+ #def new_chunk(key)
+ #end
- #def resume
- #end
+ #def resume
+ #end
- #def enqueue(chunk)
- #end
+ #def enqueue(chunk)
+ #end
- def push(key)
- synchronize do
- top = @map[key]
- if !top || top.empty?
- return false
- end
+ def push(key)
+ synchronize do
+ top = @map[key]
+ if !top || top.empty?
+ return false
+ end
+ @queue.synchronize do
+ enqueue(top)
+ @queue << top
+ @map.delete(key)
+ end
+
+ return true
+ end # synchronize
+ end
+
+ def pop(out)
+ chunk = nil
@queue.synchronize do
- enqueue(top)
- @queue << top
- @map.delete(key)
+ if @parallel_pop
+ chunk = @queue.find {|c| c.try_mon_enter }
+ return false unless chunk
+ else
+ chunk = @queue.first
+ return false unless chunk
+ return false unless chunk.try_mon_enter
+ end
end
- return true
- end # synchronize
- end
+ begin
+ if !chunk.empty?
+ write_chunk(chunk, out)
+ end
- def pop(out)
- chunk = nil
- @queue.synchronize do
- if @parallel_pop
- chunk = @queue.find {|c| c.try_mon_enter }
- return false unless chunk
- else
- chunk = @queue.first
- return false unless chunk
- return false unless chunk.try_mon_enter
+ @queue.delete_if {|c|
+ c.object_id == chunk.object_id
+ }
+
+ chunk.purge
+
+ return !@queue.empty?
+ ensure
+ chunk.mon_exit
end
end
- begin
- if !chunk.empty?
- write_chunk(chunk, out)
- end
+ def write_chunk(chunk, out)
+ out.write(chunk)
+ end
- @queue.delete_if {|c|
- c.object_id == chunk.object_id
+ def clear!
+ @queue.delete_if {|chunk|
+ chunk.purge
+ true
}
-
- chunk.purge
-
- return !@queue.empty?
- ensure
- chunk.mon_exit
end
end
-
- def write_chunk(chunk, out)
- out.write(chunk)
- end
-
- def clear!
- @queue.delete_if {|chunk|
- chunk.purge
- true
- }
- end
-end
-
-
end