Sha256: b4a831bfe55b2c026889f85eb4d0aed37776e985acee40c147b372f40bac7e0f

Contents?: true

Size: 1.52 KB

Versions: 5

Compression:

Stored size: 1.52 KB

Contents

require 'zlib'
require 'digest'

module NFAgent
  class ChunkExpired < StandardError; end
  class ChunkFull < StandardError; end
  class DayBoundary < StandardError; end

  class Chunk < Array
    attr_reader :created_at
    attr_reader :max_size

    DEFAULT_MAX_SIZE = 500

    def initialize(max_size = DEFAULT_MAX_SIZE)
      @max_size = max_size
      @created_at = Time.now
    end

    def <<(line)
      raise ChunkExpired if expired?
      raise ChunkFull if full?
      raise DayBoundary if Time.now.day != self.created_at.day
      super(line)
    end

    def full?
      self.size >= @max_size
    end

    def expired?
      (Time.now - @created_at > Config.chunk_timeout) && !self.empty?
    end

    def dump(key = nil)
      Payload.new do |payload|
        Log.info("Dumping payload from chunk (#{self.size || 0} lines #{'due to expiry' if expired?}")
        payload.line_count = self.size
        payload.chunk_expired = expired?
        payload.key = key
        payload.data = Encoder.encode64url(Zlib::Deflate.deflate(self.join("\n"), Zlib::BEST_COMPRESSION))
        payload.checksum = Digest::SHA1.hexdigest(payload.data)
      end
    end

    def submit(key = nil)
      Log.info("Submitting...")
      # TODO God knows why EM Deferrable isn't working - defer here is OK
      EM.defer {
        submitter = Submitter.new(self.dump(key))
        submitter.errback { |payload|
          payload.write_to_disk(Config.dump_dir)
        }
        submitter.perform
      }
      # Callback and remove from chunk group
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
nfagent-1.0.0 lib/nfagent/chunk.rb
nfagent-0.9.50 lib/nfagent/chunk.rb
nfagent-0.9.30 lib/nfagent/chunk.rb
nfagent-0.9.29 lib/nfagent/chunk.rb
nfagent-0.9.28 lib/nfagent/chunk.rb