lib/loggability/log_device/http.rb in loggability-0.17.0 vs lib/loggability/log_device/http.rb in loggability-0.18.0

- old
+ new

@@ -31,10 +31,13 @@ DEFAULT_WRITE_TIMEOUT = 15 # The default Executor class to use for asynchronous tasks DEFAULT_EXECUTOR_CLASS = Concurrent::SingleThreadExecutor + # The default for the maximum bytesize of the queue (1 GB) + DEFAULT_MAX_QUEUE_BYTESIZE = ( 2 ** 10 ) * ( 2 ** 10 ) * ( 2 ** 10 ) + # The default options for new instances DEFAULT_OPTIONS = { execution_interval: DEFAULT_BATCH_INTERVAL, write_timeout: DEFAULT_WRITE_TIMEOUT, max_batch_size: DEFAULT_MAX_BATCH_SIZE, @@ -67,10 +70,12 @@ opts = DEFAULT_OPTIONS.merge( opts ) @endpoint = URI( endpoint ).freeze @logs_queue = Queue.new + @logs_queue_bytesize = 0 + @max_queue_bytesize = opts[:max_queue_bytesize] || DEFAULT_MAX_QUEUE_BYTESIZE @batch_interval = opts[:batch_interval] || DEFAULT_BATCH_INTERVAL @write_timeout = opts[:write_timeout] || DEFAULT_WRITE_TIMEOUT @max_batch_size = opts[:max_batch_size] || DEFAULT_MAX_BATCH_SIZE @max_message_bytesize = opts[:max_message_bytesize] || DEFAULT_MAX_MESSAGE_BYTESIZE @executor_class = opts[:executor_class] || DEFAULT_EXECUTOR_CLASS @@ -96,10 +101,18 @@ # The Queue that contains any log messages which have not yet been sent to the # logging service. attr_reader :logs_queue ## + # The max bytesize of the queue. Will not queue more messages if this threshold is hit + attr_reader :max_queue_bytesize + + ## + # The size of +logs_queue+ in bytes + attr_accessor :logs_queue_bytesize + + ## # The monotonic clock time when the last batch of logs were sent attr_accessor :last_send_time ## # Number of seconds after the task completes before the task is performed again. @@ -131,10 +144,17 @@ ### LogDevice API -- write a message to the HTTP device. def write( message ) self.start unless self.running? + if message.is_a?( Hash ) + message_size = message.to_json.bytesize + else + message_size = message.bytesize + end + return if ( self.logs_queue_bytesize + message_size ) >= self.max_queue_bytesize + self.logs_queue_bytesize += message_size self.logs_queue.enq( message ) self.send_logs end @@ -244,10 +264,12 @@ # Be conservative so as not to overflow max_size = self.max_batch_bytesize - self.max_message_bytesize - 2 # for the outer Array while count < self.max_batch_size && bytes < max_size && !self.logs_queue.empty? - formatted_message = self.format_log_message( self.logs_queue.deq ) + message = self.logs_queue.deq + formatted_message = self.format_log_message( message ) + self.logs_queue_bytesize -= message.bytesize count += 1 bytes += formatted_message.bytesize + 3 # comma and delimiters buf << formatted_message