Sha256: dc06bb241d4a52ee0d5262e22d90c239504c7732b36e542a1d5a4e7a19e212ef
Contents?: true
Size: 1.74 KB
Versions: 2
Compression:
Stored size: 1.74 KB
Contents
module Stackify class MsgsQueue < SizedQueue include MonitorMixin #TODO: restrict possibility to work with class if app is off CHUNK_MIN_WEIGHT = 50 ERROR_SIZE = 10 LOG_SIZE = 1 DELAY_WAITING = 2 def initialize super(Stackify.configuration.queue_max_size) reset_current_chunk end alias :old_push :push def push_remained_msgs wait_until_all_workers_will_add_msgs self.synchronize do push_current_chunk Stackify.shutdown_all if self.length > 0 Stackify.logs_sender.send_remained_msgs Stackify.internal_log :info, 'All remained logs are sent' Stackify.status = Stackify::STATUSES[:terminated] end end end def add_msg msg self.synchronize do if msg.is_a?(Hash) @current_chunk_weight += msg['Ex'].nil? ? LOG_SIZE : ERROR_SIZE @current_chunk << msg if @current_chunk_weight >= CHUNK_MIN_WEIGHT push_current_chunk end else Stackify.log_internal_error "MsgsQueue: add_msg should get hash, but not a #{msg.class}" end end end alias :<< :add_msg alias :push :add_msg def pop_all self.synchronize do msgs = [] until self.empty? do msgs << self.pop end msgs end end private def reset_current_chunk @current_chunk = [] @current_chunk_weight = 0 end def wait_until_all_workers_will_add_msgs while Stackify.alive_adding_msg_workers.size > 0 sleep DELAY_WAITING end end def push_current_chunk unless @current_chunk.empty? self.old_push(@current_chunk) reset_current_chunk end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
stackify-api-ruby-1.0.2 | lib/stackify/msgs_queue.rb |
stackify-api-ruby-1.0.1 | lib/stackify/msgs_queue.rb |