lib/kestrel/client.rb in kestrel-client-0.3.1 vs lib/kestrel/client.rb in kestrel-client-0.4.0
- old
+ new
@@ -5,28 +5,49 @@
autoload :Blocking, 'kestrel/client/blocking'
autoload :Partitioning, 'kestrel/client/partitioning'
autoload :Unmarshal, 'kestrel/client/unmarshal'
autoload :Namespace, 'kestrel/client/namespace'
autoload :Json, 'kestrel/client/json'
+ autoload :Reliable, "kestrel/client/reliable"
+ autoload :Retrying, "kestrel/client/retrying"
-
QUEUE_STAT_NAMES = %w{items bytes total_items logsize expired_items mem_items mem_bytes age discarded}
+ # ==== Parameters
+ # key<String>:: Queue name
+ # opts<Boolean,Hash>:: True/false toggles Marshalling. A Hash
+ # allows collision-avoiding options support.
+ #
+ # ==== Options (opts)
+ # :open<Boolean>:: Begins a reliable read.
+ # :close<Boolean>:: Ends a reliable read.
+ # :abort<Boolean>:: Cancels an existing reliable read
+ # :peek<Boolean>:: Return the head of the queue, without removal
+ # :timeout<Integer>:: Milliseconds to block for a new item
+ # :raw<Boolean>:: Toggles Marshalling. Equivalent to the "old
+ # style" second argument.
+ #
+ def get(key, opts = false)
+ opts = extract_options(opts)
+ raw = opts.delete(:raw)
+ commands = extract_queue_commands(opts)
+
+ super key + commands, raw
+ end
+
def flush(queue)
count = 0
while sizeof(queue) > 0
- while get(queue, true)
+ while get queue, :raw => true
count += 1
end
end
count
end
def peek(queue)
- val = get(queue)
- set(queue, val)
- val
+ get queue, :peek => true
end
def sizeof(queue)
stat_info = stat(queue)
stat_info ? stat_info['items'] : 0
@@ -43,9 +64,23 @@
def stat(queue)
stats['queues'][queue]
end
private
+
+ def extract_options(opts)
+ opts.is_a?(Hash) ? opts : { :raw => !!opts }
+ end
+
+ def extract_queue_commands(opts)
+ commands = [:open, :close, :abort, :peek].select do |key|
+ opts[key]
+ end
+
+ commands << "t=#{opts[:timeout]}" if opts[:timeout]
+
+ commands.map { |c| "/#{c}" }.join('')
+ end
def stats_for_server(server)
server_name, port = server.split(/:/)
socket = TCPSocket.new(server_name, port)
socket.puts "STATS"