lib/kestrel/client.rb in kestrel-client-0.3.1 vs lib/kestrel/client.rb in kestrel-client-0.4.0

- old
+ new

@@ -5,28 +5,49 @@ autoload :Blocking, 'kestrel/client/blocking' autoload :Partitioning, 'kestrel/client/partitioning' autoload :Unmarshal, 'kestrel/client/unmarshal' autoload :Namespace, 'kestrel/client/namespace' autoload :Json, 'kestrel/client/json' + autoload :Reliable, "kestrel/client/reliable" + autoload :Retrying, "kestrel/client/retrying" - QUEUE_STAT_NAMES = %w{items bytes total_items logsize expired_items mem_items mem_bytes age discarded} + # ==== Parameters + # key<String>:: Queue name + # opts<Boolean,Hash>:: True/false toggles Marshalling. A Hash + # allows collision-avoiding options support. + # + # ==== Options (opts) + # :open<Boolean>:: Begins a reliable read. + # :close<Boolean>:: Ends a reliable read. + # :abort<Boolean>:: Cancels an existing reliable read + # :peek<Boolean>:: Return the head of the queue, without removal + # :timeout<Integer>:: Milliseconds to block for a new item + # :raw<Boolean>:: Toggles Marshalling. Equivalent to the "old + # style" second argument. + # + def get(key, opts = false) + opts = extract_options(opts) + raw = opts.delete(:raw) + commands = extract_queue_commands(opts) + + super key + commands, raw + end + def flush(queue) count = 0 while sizeof(queue) > 0 - while get(queue, true) + while get queue, :raw => true count += 1 end end count end def peek(queue) - val = get(queue) - set(queue, val) - val + get queue, :peek => true end def sizeof(queue) stat_info = stat(queue) stat_info ? stat_info['items'] : 0 @@ -43,9 +64,23 @@ def stat(queue) stats['queues'][queue] end private + + def extract_options(opts) + opts.is_a?(Hash) ? opts : { :raw => !!opts } + end + + def extract_queue_commands(opts) + commands = [:open, :close, :abort, :peek].select do |key| + opts[key] + end + + commands << "t=#{opts[:timeout]}" if opts[:timeout] + + commands.map { |c| "/#{c}" }.join('') + end def stats_for_server(server) server_name, port = server.split(/:/) socket = TCPSocket.new(server_name, port) socket.puts "STATS"