lib/loggability/log_device/http.rb in loggability-0.17.0 vs lib/loggability/log_device/http.rb in loggability-0.18.0
- old
+ new
@@ -31,10 +31,13 @@
DEFAULT_WRITE_TIMEOUT = 15
# The default Executor class to use for asynchronous tasks
DEFAULT_EXECUTOR_CLASS = Concurrent::SingleThreadExecutor
+ # The default for the maximum bytesize of the queue (1 GB)
+ DEFAULT_MAX_QUEUE_BYTESIZE = ( 2 ** 10 ) * ( 2 ** 10 ) * ( 2 ** 10 )
+
# The default options for new instances
DEFAULT_OPTIONS = {
execution_interval: DEFAULT_BATCH_INTERVAL,
write_timeout: DEFAULT_WRITE_TIMEOUT,
max_batch_size: DEFAULT_MAX_BATCH_SIZE,
@@ -67,10 +70,12 @@
opts = DEFAULT_OPTIONS.merge( opts )
@endpoint = URI( endpoint ).freeze
@logs_queue = Queue.new
+ @logs_queue_bytesize = 0
+ @max_queue_bytesize = opts[:max_queue_bytesize] || DEFAULT_MAX_QUEUE_BYTESIZE
@batch_interval = opts[:batch_interval] || DEFAULT_BATCH_INTERVAL
@write_timeout = opts[:write_timeout] || DEFAULT_WRITE_TIMEOUT
@max_batch_size = opts[:max_batch_size] || DEFAULT_MAX_BATCH_SIZE
@max_message_bytesize = opts[:max_message_bytesize] || DEFAULT_MAX_MESSAGE_BYTESIZE
@executor_class = opts[:executor_class] || DEFAULT_EXECUTOR_CLASS
@@ -96,10 +101,18 @@
# The Queue that contains any log messages which have not yet been sent to the
# logging service.
attr_reader :logs_queue
##
+ # The max bytesize of the queue. Will not queue more messages if this threshold is hit
+ attr_reader :max_queue_bytesize
+
+ ##
+ # The size of +logs_queue+ in bytes
+ attr_accessor :logs_queue_bytesize
+
+ ##
# The monotonic clock time when the last batch of logs were sent
attr_accessor :last_send_time
##
# Number of seconds after the task completes before the task is performed again.
@@ -131,10 +144,17 @@
### LogDevice API -- write a message to the HTTP device.
def write( message )
self.start unless self.running?
+ if message.is_a?( Hash )
+ message_size = message.to_json.bytesize
+ else
+ message_size = message.bytesize
+ end
+ return if ( self.logs_queue_bytesize + message_size ) >= self.max_queue_bytesize
+ self.logs_queue_bytesize += message_size
self.logs_queue.enq( message )
self.send_logs
end
@@ -244,10 +264,12 @@
# Be conservative so as not to overflow
max_size = self.max_batch_bytesize - self.max_message_bytesize - 2 # for the outer Array
while count < self.max_batch_size && bytes < max_size && !self.logs_queue.empty?
- formatted_message = self.format_log_message( self.logs_queue.deq )
+ message = self.logs_queue.deq
+ formatted_message = self.format_log_message( message )
+ self.logs_queue_bytesize -= message.bytesize
count += 1
bytes += formatted_message.bytesize + 3 # comma and delimiters
buf << formatted_message