lib/twirl/cluster.rb in twirl-0.1.0 vs lib/twirl/cluster.rb in twirl-0.2.0
- old
+ new
@@ -1,8 +1,9 @@
require "forwardable"
require "kjess"
require "twirl/item"
+require "twirl/mirror"
require "twirl/instrumenters/noop"
module Twirl
class Cluster
include Enumerable
@@ -28,10 +29,13 @@
attr_reader :commands_per_client
# Private: What should be used to instrument all the things.
attr_reader :instrumenter
+ # Private: What handles dumping and loading values.
+ attr_reader :encoder
+
# Private: What errors should be considered retryable.
attr_reader :retryable_errors
# Public: How many clients are in the cluster.
def_delegators :@clients, :size, :length
@@ -45,17 +49,19 @@
# options - A Hash of options.
# :commands_per_client - The Number of commands to run per client
# before rotating to the next client (default: 100)
# :retries - The Number of times a command should be retried (default: 5).
# :instrumenter - Where to send instrumention (defaults: noop).
+ # :encoder - What to use to dump/load vlues (defaults: mirror).
def initialize(clients, options = {})
@client_index = 0
@command_count = 0
@clients = clients.shuffle
@retries = options.fetch(:retries, 5)
@commands_per_client = options.fetch(:commands_per_client, 100)
@instrumenter = options.fetch(:instrumenter, Instrumenters::Noop)
+ @encoder = options.fetch(:encoder, Mirror)
@retryable_errors = options.fetch(:retryable_errors, RetryableErrors)
end
# Public: Iterate through the clients.
def each(&block)
@@ -71,15 +77,20 @@
# Returns true if successful, false otherwise.
def set(queue_name, item, expiration = 0)
with_retries { |tries|
@instrumenter.instrument("op.twirl") { |payload|
payload[:op] = :set
- payload[:bytes] = item.size
+ payload[:bytes] = item.to_s.size
payload[:queue_name] = queue_name
payload[:retry] = tries != @retries
- client.set(queue_name, item, expiration)
+ value = if item
+ @encoder.dump(item)
+ else
+ nil
+ end
+ client.set(queue_name, value, expiration)
}
}
end
# Public: Retrieve an item from the given queue.
@@ -242,10 +253,11 @@
payload[:queue_name] = queue_name
payload[:retry] = tries != @retries
if value = client.send(op, queue_name, *args)
payload[:bytes] = value.size
+ value = @encoder.load(value) if value
Item.new queue_name, value, client, @instrumenter
else
rotate_for_next_op
nil
end
@@ -302,9 +314,10 @@
}
result
}
end
+ # Private: Retries an operation a number of times if it raises exception.
def with_retries
tries = @retries
begin
yield tries
rescue *@retryable_errors