lib/nfagent/chunk_handler.rb in nfagent-0.9.19 vs lib/nfagent/chunk_handler.rb in nfagent-0.9.20

- old
+ new

@@ -1,40 +1,65 @@ module NFAgent class ChunkHandler - # TODO: Rename this to Controller later - def initialize(chunk_size = 500) - @chunk = Chunk.new(chunk_size) + attr_accessor :chunk_group + + def initialize(options = {}) + @chunk_size = options[:chunk_size] || 500 + @parser = options[:parser] || Squiggle::SquidStandardParser.new(Config.time_zone) + @chunk_group = {} + class << @chunk_group + def fetch!(key, new_chunk) + if self.has_key?(key) + self.fetch(key) + else + self[key] = new_chunk + new_chunk + end + end + end end def append(line) - # if current day is > day of last entry on current_chunk - # then submit and reset the chunk before adding the line - current_day = Time.now.day - if current_day != @chunk.created_at.day - Log.info("Expiring chunk due to date rollover") - reset_chunk + if Config.parse == 'locally' + parsed = @parser.parse(line) + return if parsed.invalid? + if Config.mode == 'multi' + key = MapperProxy.find_account_id(parsed.username, parsed.client_ip) + # TODO: Still appending line as string until Server API has been updated + return append2(line, key) + end end - @chunk << line + # TODO: rename append2 + append2(line) end + def append2(line, key = nil) + key ||= 'all' + begin + chunk = @chunk_group.fetch!(key, Chunk.new(@chunk_size)) + chunk << line + rescue ChunkExpired, ChunkFull + reset_chunk(key) + end + end + def check_full_or_expired - if @chunk.full? - Log.info("Chunk Full: Resetting...") - reset_chunk - elsif @chunk.expired? - Log.info("Chunk Expired: Resetting...") - reset_chunk + @chunk_group.each_pair do |key, chunk| + if chunk.full? + Log.info("Chunk Full: Resetting...") + reset_chunk(key) + elsif chunk.expired? + Log.info("Chunk Expired: Resetting...") + reset_chunk(key) + end end end private - def reset_chunk - submitter = Submitter.new(@chunk.dump) - submitter.errback { |payload| - payload.write_to_disk(Config.dump_dir) - } - @chunk.clear - submitter.perform + def reset_chunk(key = nil) + key ||= 'all' + chunk = @chunk_group.delete(key) + chunk.submit end end end