lib/metricize/forwarder.rb in metricize-0.4.7 vs lib/metricize/forwarder.rb in metricize-0.5.0

- old
+ new

@@ -5,39 +5,45 @@ def initialize(options) @password = options.fetch(:password) @username = options.fetch(:username) @remote_url = options[:remote_url] || 'metrics-api.librato.com/v1/metrics' @remote_timeout = options[:remote_timeout] || 10 + @max_batch_size = options[:max_batch_size] || 5000 establish_logger(options) initialize_redis(options) + establish_redis_connection end def go! - establish_redis_connection process_metric_queue end private def process_metric_queue with_error_handling do - queue = retrieve_queue_contents + queue = lshift_queue return if queue.empty? store_metrics(add_aggregate_info(queue)) - clear_queue end end - def retrieve_queue_contents - log_message "checking... queue_length=#{queue_length = @redis.llen(@queue_name)}", :info + def lshift_queue return [] unless queue_length > 0 - queue = @redis.lrange(@queue_name, 0, -1) - queue.map {|metric| JSON.parse(metric, :symbolize_names => true) } + current_batch = @redis.lrange(@queue_name, 0, @max_batch_size - 1) + # ltrim indexes are 0 based and somewhat confusing -- see http://redis.io/commands/ltrim + @redis.ltrim(@queue_name, 0, -1-@max_batch_size) + current_batch.map {|metric| JSON.parse(metric, :symbolize_names => true) } end + def queue_length + log_message "queue_length=#{length = @redis.llen(@queue_name)}", :info + length + end + def clear_queue log_message "clearing queue" - @redis.del @queue_name if @redis + @redis.del @queue_name end def store_metrics(data) log_message "remote_data_sent='#{data}'" start_time = Time.now