require "forwardable" require "kjess" require "twirl/item" require "twirl/instrumenters/noop" module Twirl class Cluster include Enumerable extend Forwardable # Private: The default Array of errors to retry. RetryableErrors = [ KJess::NetworkError, # this inherits from protocol error, but seems like it should be retried KJess::ServerError, ] # Private: What is the array index of the client being used currently. attr_reader :client_index # Private: The number of commands issued to the current client. attr_reader :command_count # Private: The number of times to retry retryable errors. attr_reader :retries # Private: The number of commands to issue to a client before rotating. attr_reader :commands_per_client # Private: What should be used to instrument all the things. attr_reader :instrumenter # Private: What errors should be considered retryable. attr_reader :retryable_errors # Public: How many clients are in the cluster. def_delegators :@clients, :size, :length # Public: Access a client by its index. def_delegator :@clients, :[] # Public: Initialize a new cluster. # # clients - An array of KJess::Client instances with port (localhost:1234) # options - A Hash of options. # :commands_per_client - The Number of commands to run per client # before rotating to the next client (default: 100) # :retries - The Number of times a command should be retried (default: 5). # :instrumenter - Where to send instrumention (defaults: noop). def initialize(clients, options = {}) @client_index = 0 @command_count = 0 @clients = clients.shuffle @retries = options.fetch(:retries, 5) @commands_per_client = options.fetch(:commands_per_client, 100) @instrumenter = options.fetch(:instrumenter, Instrumenters::Noop) @retryable_errors = options.fetch(:retryable_errors, RetryableErrors) end # Public: Iterate through the clients. def each(&block) @clients.each { |client| yield client } end # Public: Add an item to the given queue. # # queue_name - The String name of the queue. # item - The String item to add to the queue. # expiration - The Number of seconds from now to expire the item (default: 0). # # Returns true if successful, false otherwise. def set(queue_name, item, expiration = 0) with_retries { |tries| @instrumenter.instrument("op.twirl") { |payload| payload[:op] = :set payload[:bytes] = item.size payload[:queue_name] = queue_name payload[:retry] = tries != @retries client.set(queue_name, item, expiration) } } end # Public: Retrieve an item from the given queue. # # It is possible to send both :open and :close in the same get operation, # but I would not recommend it. You will end up in a situation where the # client will rotate and the :close then goes to the wrong client. # # We could do two get operations if you pass both options, send the :close # to the current client and send the :open as a second operation to the # rotated client, but that seems sneaky. # # queue_name - The String name of the queue. # options - The Hash of options for retrieving an item. # See KJess::Client#get for all options. # # Returns a Twirl::Item if an item was found, otherwise nil. def get(queue_name, options = {}) client_read_op client, :get, queue_name, options end # Public: Reserve the next item on the queue. # # This is a helper method to get an item from a queue and open it for # reliable read. # # queue_name - The String name of the queue. # options - Additional options. # See KJess::Client#get for all options. # # Returns a Twirl::Item if an item was found, otherwise nil. def reserve(queue_name, options = {}) client_read_op client, :reserve, queue_name, options end # Public: Peek at the top item in the queue. # # queue_name - The String name of the queue. # # Returns a Twirl::Item if an item was found, otherwise nil. def peek(queue_name) client_read_op client, :peek, queue_name end # Public : Remove a queue. # # queue_name - The String name of the queue. # # Returns a Hash of hosts and results. def delete(queue_name) multi_client_queue_op_with_result :delete, queue_name end # Public: Remove all items from a queue. # # queue_name - The String name of the queue. # # Returns a Hash of hosts and results. def flush(queue_name) multi_client_queue_op_with_result :flush, queue_name end # Public: Remove all items from all queues. # # Returns a Hash of hosts and results. def flush_all multi_client_op_with_result :flush_all end # Public: Return the version of each server. # # Returns a Hash of hosts and results. def version multi_client_op_with_result :version do |client| begin client.version rescue KJess::ProtocolError "unavailable" end end end # Public: Which clients can actually reach their server. # # Returns Hash of hosts and results. def ping multi_client_op_with_result :ping end # Public: Reload the config of each client's server. # # Returns Hash of hosts and results. def reload multi_client_op_with_result :reload end # Public: Return stats for each client's server. # # Returns a Hash of stats for each host. def stats multi_client_op_with_result :stats end # Public: Disconnect from each client's server. # # Returns Hash of hosts and results. def quit multi_client_op_with_result :quit end # Public: Disconnect from each client's server. # # Returns nothing. def disconnect multi_client_op :disconnect end # Public: Tells each client to shutdown their server. # # Returns nothing. def shutdown multi_client_op :shutdown end # Private: Returns the client to be used to issue a command. def client rotate if @command_count >= @commands_per_client @command_count += 1 @clients[@client_index] end # Private: Ensures that clients will be rotated by changing the client index # and resetting the command count. def rotate @instrumenter.instrument "op.twirl", { op: :rotate, metric_type: :counter, command_count: @command_count, commands_per_client: @commands_per_client, } @command_count = 0 @client_index = (@client_index + 1) % @clients.size end # Private: Makes it so the client will rotate for the next operation. def rotate_for_next_op @command_count = @commands_per_client end # Private: Perform an operation for a given client. Rotates clients if nil # item is result of op. # # Returns a Twirl::Item if an item was found, otherwise nil. def client_read_op(client, op, queue_name, *args) with_retries { |tries| @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op payload[:queue_name] = queue_name payload[:retry] = tries != @retries if value = client.send(op, queue_name, *args) payload[:bytes] = value.size Item.new queue_name, value, client, @instrumenter else rotate_for_next_op nil end } } end # Private: Perform an op on all the clients. def multi_client_op(op, *args, &block) @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op @clients.each do |client| if block_given? yield client else client.send(op, *args) end end } end def multi_client_queue_op_with_result(op, queue_name, *args, &block) @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op payload[:queue_name] = queue_name result = {} @clients.each { |client| result["#{client.host}:#{client.port}"] = if block_given? yield client else client.send(op, queue_name, *args) end } result } end # Private: Perform an op on all clients. # # Returns a Hash of the servers as keys and the results as values. def multi_client_op_with_result(op, *args, &block) @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op result = {} @clients.each { |client| result["#{client.host}:#{client.port}"] = if block_given? yield client else client.send(op, *args) end } result } end def with_retries tries = @retries begin yield tries rescue *@retryable_errors tries -= 1 tries > 0 ? retry : raise end end end end