Sha256: 418c93b6d34a5697b9d9631b1150f50d10e032e01948fd77c0b7bf15f825dc38
Contents?: true
Size: 1.82 KB
Versions: 1
Compression:
Stored size: 1.82 KB
Contents
class Fwd::Buffer extend Forwardable def_delegators :core, :root, :prefix, :logger MAX_LIMIT = 64 * 1024 * 1024 # 64M attr_reader :core, :interval, :rate, :count, :limit, :timer, :fd # Constructor # @param [Fwd] core def initialize(core) @core = core @interval = (core.opts[:flush_interval] || 60).to_i @rate = (core.opts[:flush_rate] || 10_000).to_i @limit = [core.opts[:buffer_limit].to_i, MAX_LIMIT].reject(&:zero?).min @count = 0 reschedule! end # @param [String] data binary data def concat(data) rotate! if rotate? @fd.write(data) @count += 1 flush! if flush? end # (Force) flush buffer def flush! @count = 0 rotate! core.flush! ensure reschedule! end # @return [Boolean] true if flush is due def flush? @rate > 0 && @count >= @rate end # (Force) rotate buffer file def rotate! return if @fd && @fd.size.zero? if @fd logger.debug { "Rotating #{File.basename(@fd.path)}, #{@fd.size / 1024} kB" } FileUtils.mv(@fd.path, @fd.path.sub(/\.open$/, ".closed")) end @fd = new_file rescue Errno::ENOENT => e logger.warn "Rotation delayed: #{e.message}" end # @return [Boolean] true if rotation is due def rotate? @fd.nil? || @fd.size >= @limit rescue Errno::ENOENT false end private def new_file path = nil until path && !path.exist? path = root.join("#{generate_name}.open") end FileUtils.mkdir_p root.to_s file = path.open("wb") file.sync = true file end def reschedule! return unless @interval > 0 @timer.cancel if @timer @timer = EM.add_periodic_timer(@interval) { flush! } end def generate_name [prefix, Time.now.utc.strftime("%Y%m%d%H%m%s"), SecureRandom.hex(4)].join(".") end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fwd-0.3.3 | lib/fwd/buffer.rb |