lib/twirl/cluster.rb in twirl-0.1.0 vs lib/twirl/cluster.rb in twirl-0.2.0

- old
+ new

@@ -1,8 +1,9 @@ require "forwardable" require "kjess" require "twirl/item" +require "twirl/mirror" require "twirl/instrumenters/noop" module Twirl class Cluster include Enumerable @@ -28,10 +29,13 @@ attr_reader :commands_per_client # Private: What should be used to instrument all the things. attr_reader :instrumenter + # Private: What handles dumping and loading values. + attr_reader :encoder + # Private: What errors should be considered retryable. attr_reader :retryable_errors # Public: How many clients are in the cluster. def_delegators :@clients, :size, :length @@ -45,17 +49,19 @@ # options - A Hash of options. # :commands_per_client - The Number of commands to run per client # before rotating to the next client (default: 100) # :retries - The Number of times a command should be retried (default: 5). # :instrumenter - Where to send instrumention (defaults: noop). + # :encoder - What to use to dump/load vlues (defaults: mirror). def initialize(clients, options = {}) @client_index = 0 @command_count = 0 @clients = clients.shuffle @retries = options.fetch(:retries, 5) @commands_per_client = options.fetch(:commands_per_client, 100) @instrumenter = options.fetch(:instrumenter, Instrumenters::Noop) + @encoder = options.fetch(:encoder, Mirror) @retryable_errors = options.fetch(:retryable_errors, RetryableErrors) end # Public: Iterate through the clients. def each(&block) @@ -71,15 +77,20 @@ # Returns true if successful, false otherwise. def set(queue_name, item, expiration = 0) with_retries { |tries| @instrumenter.instrument("op.twirl") { |payload| payload[:op] = :set - payload[:bytes] = item.size + payload[:bytes] = item.to_s.size payload[:queue_name] = queue_name payload[:retry] = tries != @retries - client.set(queue_name, item, expiration) + value = if item + @encoder.dump(item) + else + nil + end + client.set(queue_name, value, expiration) } } end # Public: Retrieve an item from the given queue. @@ -242,10 +253,11 @@ payload[:queue_name] = queue_name payload[:retry] = tries != @retries if value = client.send(op, queue_name, *args) payload[:bytes] = value.size + value = @encoder.load(value) if value Item.new queue_name, value, client, @instrumenter else rotate_for_next_op nil end @@ -302,9 +314,10 @@ } result } end + # Private: Retries an operation a number of times if it raises exception. def with_retries tries = @retries begin yield tries rescue *@retryable_errors