lib/metricize/forwarder.rb in metricize-0.4.7 vs lib/metricize/forwarder.rb in metricize-0.5.0
- old
+ new
@@ -5,39 +5,45 @@
def initialize(options)
@password = options.fetch(:password)
@username = options.fetch(:username)
@remote_url = options[:remote_url] || 'metrics-api.librato.com/v1/metrics'
@remote_timeout = options[:remote_timeout] || 10
+ @max_batch_size = options[:max_batch_size] || 5000
establish_logger(options)
initialize_redis(options)
+ establish_redis_connection
end
def go!
- establish_redis_connection
process_metric_queue
end
private
def process_metric_queue
with_error_handling do
- queue = retrieve_queue_contents
+ queue = lshift_queue
return if queue.empty?
store_metrics(add_aggregate_info(queue))
- clear_queue
end
end
- def retrieve_queue_contents
- log_message "checking... queue_length=#{queue_length = @redis.llen(@queue_name)}", :info
+ def lshift_queue
return [] unless queue_length > 0
- queue = @redis.lrange(@queue_name, 0, -1)
- queue.map {|metric| JSON.parse(metric, :symbolize_names => true) }
+ current_batch = @redis.lrange(@queue_name, 0, @max_batch_size - 1)
+ # ltrim indexes are 0 based and somewhat confusing -- see http://redis.io/commands/ltrim
+ @redis.ltrim(@queue_name, 0, -1-@max_batch_size)
+ current_batch.map {|metric| JSON.parse(metric, :symbolize_names => true) }
end
+ def queue_length
+ log_message "queue_length=#{length = @redis.llen(@queue_name)}", :info
+ length
+ end
+
def clear_queue
log_message "clearing queue"
- @redis.del @queue_name if @redis
+ @redis.del @queue_name
end
def store_metrics(data)
log_message "remote_data_sent='#{data}'"
start_time = Time.now