/* vim: ft=c et ts=8 sts=4 sw=4 cino=
*
* Copyright 2011, 2012 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "couchbase_ext.h"
void
cb_arithmetic_callback(lcb_t handle, const void *cookie, lcb_error_t error, const lcb_arithmetic_resp_t *resp)
{
struct cb_context_st *ctx = (struct cb_context_st *)cookie;
struct cb_bucket_st *bucket = ctx->bucket;
VALUE cas, key, val, exc, res;
ID o;
ctx->nqueries--;
key = STR_NEW((const char*)resp->v.v0.key, resp->v.v0.nkey);
cb_strip_key_prefix(bucket, key);
cas = resp->v.v0.cas > 0 ? ULL2NUM(resp->v.v0.cas) : Qnil;
o = ctx->arith > 0 ? cb_sym_increment : cb_sym_decrement;
exc = cb_check_error(error, "failed to perform arithmetic operation", key);
if (exc != Qnil) {
rb_ivar_set(exc, cb_id_iv_cas, cas);
rb_ivar_set(exc, cb_id_iv_operation, o);
ctx->exception = exc;
}
val = ULL2NUM(resp->v.v0.value);
if (bucket->async) { /* asynchronous */
if (ctx->proc != Qnil) {
res = rb_class_new_instance(0, NULL, cb_cResult);
rb_ivar_set(res, cb_id_iv_error, exc);
rb_ivar_set(res, cb_id_iv_operation, o);
rb_ivar_set(res, cb_id_iv_key, key);
rb_ivar_set(res, cb_id_iv_value, val);
rb_ivar_set(res, cb_id_iv_cas, cas);
cb_proc_call(bucket, ctx->proc, 1, res);
}
} else { /* synchronous */
if (NIL_P(exc)) {
if (ctx->extended) {
rb_hash_aset(ctx->rv, key, rb_ary_new3(2, val, cas));
} else {
rb_hash_aset(ctx->rv, key, val);
}
}
}
if (ctx->nqueries == 0) {
ctx->proc = Qnil;
if (bucket->async) {
cb_context_free(ctx);
}
}
(void)handle;
}
static inline VALUE
cb_bucket_arithmetic(int sign, int argc, VALUE *argv, VALUE self)
{
struct cb_bucket_st *bucket = DATA_PTR(self);
struct cb_context_st *ctx;
VALUE rv, proc, exc;
lcb_error_t err;
struct cb_params_st params;
if (!cb_bucket_connected_bang(bucket, sign > 0 ? cb_sym_increment : cb_sym_decrement)) {
return Qnil;
}
memset(¶ms, 0, sizeof(struct cb_params_st));
rb_scan_args(argc, argv, "0*&", ¶ms.args, &proc);
if (!bucket->async && proc != Qnil) {
rb_raise(rb_eArgError, "synchronous mode doesn't support callbacks");
}
params.type = cb_cmd_arith;
params.bucket = bucket;
params.cmd.arith.sign = sign;
cb_params_build(¶ms);
ctx = cb_context_alloc_common(bucket, proc, params.cmd.arith.num);
ctx->extended = params.cmd.arith.extended;
err = lcb_arithmetic(bucket->handle, (const void *)ctx,
params.cmd.arith.num, params.cmd.arith.ptr);
cb_params_destroy(¶ms);
exc = cb_check_error(err, "failed to schedule arithmetic request", Qnil);
if (exc != Qnil) {
cb_context_free(ctx);
rb_exc_raise(exc);
}
bucket->nbytes += params.npayload;
if (bucket->async) {
cb_maybe_do_loop(bucket);
return Qnil;
} else {
if (ctx->nqueries > 0) {
/* we have some operations pending */
lcb_wait(bucket->handle);
}
exc = ctx->exception;
rv = ctx->rv;
cb_context_free(ctx);
if (exc != Qnil) {
rb_exc_raise(exc);
}
if (params.cmd.store.num > 1) {
return rv; /* return as a hash {key => cas, ...} */
} else {
VALUE vv = Qnil;
rb_hash_foreach(rv, cb_first_value_i, (VALUE)&vv);
return vv;
}
return rv;
}
}
/*
* Increment the value of an existing numeric key
*
* @since 1.0.0
*
* The increment methods allow you to increase a given stored integer
* value. These are the incremental equivalent of the decrement operations
* and work on the same basis; updating the value of a key if it can be
* parsed to an integer. The update operation occurs on the server and is
* provided at the protocol level. This simplifies what would otherwise be a
* two-stage get and set operation.
*
* @note that server values stored and transmitted as unsigned numbers,
* therefore if you try to store negative number and then increment or
* decrement it will cause overflow. (see "Integer overflow" example
* below)
*
* @overload incr(key, delta = 1, options = {})
* @param key [String, Symbol] Key used to reference the value.
* @param delta [Fixnum] Integer (up to 64 bits) value to increment
* @param options [Hash] Options for operation.
* @option options [true, false] :create (false) If set to +true+, it will
* initialize the key with zero value and zero flags (use +:initial+
* option to set another initial value). Note: it won't increment the
* missing value.
* @option options [Fixnum] :initial (0) Integer (up to 64 bits) value for
* missing key initialization. This option imply +:create+ option is
* +true+.
* @option options [Fixnum] :ttl (self.default_ttl) Expiry time for key.
* Values larger than 30*24*60*60 seconds (30 days) are interpreted as
* absolute times (from the epoch). This option ignored for existent
* keys.
* @option options [true, false] :extended (false) If set to +true+, the
* operation will return tuple +[value, cas]+, otherwise (by default) it
* returns just value.
*
* @yieldparam ret [Result] the result of operation in asynchronous mode
* (valid attributes: +error+, +operation+, +key+, +value+, +cas+).
*
* @return [Fixnum] the actual value of the key.
*
* @raise [Couchbase::Error::NotFound] if key is missing and +:create+
* option isn't +true+.
*
* @raise [Couchbase::Error::DeltaBadval] if the key contains non-numeric
* value
*
* @raise [Couchbase::Error::Connect] if connection closed (see {Bucket#reconnect})
*
* @raise [ArgumentError] when passing the block in synchronous mode
*
* @example Increment key by one
* c.incr("foo")
*
* @example Increment key by 50
* c.incr("foo", 50)
*
* @example Increment key by one OR initialize with zero
* c.incr("foo", :create => true) #=> will return old+1 or 0
*
* @example Increment key by one OR initialize with three
* c.incr("foo", 50, :initial => 3) #=> will return old+50 or 3
*
* @example Increment key and get its CAS value
* val, cas = c.incr("foo", :extended => true)
*
* @example Integer overflow
* c.set("foo", -100)
* c.get("foo") #=> -100
* c.incr("foo") #=> 18446744073709551517
*
* @example Asynchronous invocation
* c.run do
* c.incr("foo") do |ret|
* ret.operation #=> :increment
* ret.success? #=> true
* ret.key #=> "foo"
* ret.value
* ret.cas
* end
* end
*
*/
VALUE
cb_bucket_incr(int argc, VALUE *argv, VALUE self)
{
return cb_bucket_arithmetic(+1, argc, argv, self);
}
/*
* Decrement the value of an existing numeric key
*
* @since 1.0.0
*
* The decrement methods reduce the value of a given key if the
* corresponding value can be parsed to an integer value. These operations
* are provided at a protocol level to eliminate the need to get, update,
* and reset a simple integer value in the database. It supports the use of
* an explicit offset value that will be used to reduce the stored value in
* the database.
*
* @note that server values stored and transmitted as unsigned numbers,
* therefore if you try to decrement negative or zero key, you will always
* get zero.
*
* @overload decr(key, delta = 1, options = {})
* @param key [String, Symbol] Key used to reference the value.
* @param delta [Fixnum] Integer (up to 64 bits) value to decrement
* @param options [Hash] Options for operation.
* @option options [true, false] :create (false) If set to +true+, it will
* initialize the key with zero value and zero flags (use +:initial+
* option to set another initial value). Note: it won't decrement the
* missing value.
* @option options [Fixnum] :initial (0) Integer (up to 64 bits) value for
* missing key initialization. This option imply +:create+ option is
* +true+.
* @option options [Fixnum] :ttl (self.default_ttl) Expiry time for key.
* Values larger than 30*24*60*60 seconds (30 days) are interpreted as
* absolute times (from the epoch). This option ignored for existent
* keys.
* @option options [true, false] :extended (false) If set to +true+, the
* operation will return tuple +[value, cas]+, otherwise (by default) it
* returns just value.
*
* @yieldparam ret [Result] the result of operation in asynchronous mode
* (valid attributes: +error+, +operation+, +key+, +value+, +cas+).
*
* @return [Fixnum] the actual value of the key.
*
* @raise [Couchbase::Error::NotFound] if key is missing and +:create+
* option isn't +true+.
*
* @raise [Couchbase::Error::DeltaBadval] if the key contains non-numeric
* value
*
* @raise [Couchbase::Error::Connect] if connection closed (see {Bucket#reconnect})
*
* @raise [ArgumentError] when passing the block in synchronous mode
*
* @example Decrement key by one
* c.decr("foo")
*
* @example Decrement key by 50
* c.decr("foo", 50)
*
* @example Decrement key by one OR initialize with zero
* c.decr("foo", :create => true) #=> will return old-1 or 0
*
* @example Decrement key by one OR initialize with three
* c.decr("foo", 50, :initial => 3) #=> will return old-50 or 3
*
* @example Decrement key and get its CAS value
* val, cas = c.decr("foo", :extended => true)
*
* @example Decrementing zero
* c.set("foo", 0)
* c.decrement("foo", 100500) #=> 0
*
* @example Decrementing negative value
* c.set("foo", -100)
* c.decrement("foo", 100500) #=> 0
*
* @example Asynchronous invocation
* c.run do
* c.decr("foo") do |ret|
* ret.operation #=> :decrement
* ret.success? #=> true
* ret.key #=> "foo"
* ret.value
* ret.cas
* end
* end
*
*/
VALUE
cb_bucket_decr(int argc, VALUE *argv, VALUE self)
{
return cb_bucket_arithmetic(-1, argc, argv, self);
}