/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* * Copyright 2011-2012 Couchbase, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "internal.h" #include "durability_internal.h" #include "trace.h" typedef struct { mc_REQDATAEX base; lcb_MULTICMD_CTX mctx; lcb_t instance; size_t remaining; unsigned oflags; /* requests array contains one buffer per server. nrequest essentially * says how many elements (and thus how many servers) */ size_t nrequests; lcb_string requests[1]; } OBSERVECTX; typedef enum { F_DURABILITY = 0x01, F_DESTROY = 0x02, F_SCHEDFAILED = 0x04 } obs_flags; static void handle_observe_callback(mc_PIPELINE *pl, mc_PACKET *pkt, lcb_error_t err, const void *arg) { OBSERVECTX *oc = (void *)pkt->u_rdata.exdata; lcb_RESPOBSERVE *resp = (void *)arg; lcb_t instance = oc->instance; (void)pl; if (resp == NULL) { int nfailed = 0; /** We need to fail the request manually.. */ const char *ptr = SPAN_BUFFER(&pkt->u_value.single); const char *end = ptr + pkt->u_value.single.size; while (ptr < end) { lcb_uint16_t nkey; lcb_RESPOBSERVE cur = { 0 }; cur.rflags = LCB_RESP_F_CLIENTGEN; ptr += 2; memcpy(&nkey, ptr, sizeof(nkey)); nkey = ntohs(nkey); ptr += 2; memset(&cur, 0, sizeof(cur)); cur.key = ptr; cur.nkey = nkey; cur.cookie = (void *)oc->base.cookie; cur.rc = err; handle_observe_callback(NULL, pkt, err, &cur); ptr += nkey; nfailed++; } lcb_assert(nfailed); return; } resp->cookie = (void *)oc->base.cookie; resp->rc = err; if (oc->oflags & F_DURABILITY) { resp->ttp = pl ? pl->index : -1; lcbdur_cas_update( instance, (lcb_DURSET *)MCREQ_PKT_COOKIE(pkt), err, resp); } else if ((oc->oflags & F_SCHEDFAILED) == 0) { lcb_RESPCALLBACK callback = lcb_find_callback(instance, LCB_CALLBACK_OBSERVE); callback(instance, LCB_CALLBACK_OBSERVE, (lcb_RESPBASE *)resp); } if (oc->oflags & F_DESTROY) { return; } if (--oc->remaining) { return; } else { lcb_RESPOBSERVE resp2 = { 0 }; resp2.rc = err; resp2.rflags = LCB_RESP_F_CLIENTGEN|LCB_RESP_F_FINAL; oc->oflags |= F_DESTROY; handle_observe_callback(NULL, pkt, err, &resp2); free(oc); } } static void handle_schedfail(mc_PACKET *pkt) { OBSERVECTX *oc = (void *)pkt->u_rdata.exdata; oc->oflags |= F_SCHEDFAILED; handle_observe_callback(NULL, pkt, LCB_SCHEDFAIL_INTERNAL, NULL); } static void destroy_requests(OBSERVECTX *reqs) { size_t ii; for (ii = 0; ii < reqs->nrequests; ii++) { lcb_string_release(reqs->requests + ii); } } #define CTX_FROM_MULTI(mcmd) (void *) ((((char *) (mcmd))) - offsetof(OBSERVECTX, mctx)) static lcb_error_t obs_ctxadd(lcb_MULTICMD_CTX *mctx, const lcb_CMDBASE *cmdbase) { int vbid, srvix_dummy; unsigned ii; const lcb_CMDOBSERVE *cmd = (const lcb_CMDOBSERVE *)cmdbase; OBSERVECTX *ctx = CTX_FROM_MULTI(mctx); lcb_t instance = ctx->instance; mc_CMDQUEUE *cq = &instance->cmdq; lcb_U16 servers_s[4]; const lcb_U16 *servers; size_t nservers; if (LCB_KEYBUF_IS_EMPTY(&cmd->key)) { return LCB_EMPTY_KEY; } if (cq->config == NULL) { return LCB_CLIENT_ETMPFAIL; } if (LCBVB_DISTTYPE(LCBT_VBCONFIG(instance)) != LCBVB_DIST_VBUCKET) { return LCB_NOT_SUPPORTED; } mcreq_map_key(cq, &cmd->key, &cmd->_hashkey, 24, &vbid, &srvix_dummy); if (cmd->servers_) { servers = cmd->servers_; nservers = cmd->nservers_; } else { nservers = 0; servers = servers_s; /* Replicas are always < 4 */ for (ii = 0; ii < LCBVB_NREPLICAS(cq->config) + 1; ii++) { int ix = lcbvb_vbserver(cq->config, vbid, ii); if (ix < 0) { if (ii == 0) { return LCB_NO_MATCHING_SERVER; } else { continue; } } servers_s[nservers++] = ix; if (cmd->cmdflags & LCB_CMDOBSERVE_F_MASTER_ONLY) { break; /* Only a single server! */ } } } if (nservers == 0) { return LCB_NO_MATCHING_SERVER; } for (ii = 0; ii < nservers; ii++) { lcb_string *rr; lcb_U16 vb16, klen16; lcb_U16 ix = servers[ii]; lcb_assert(ix < ctx->nrequests); rr = ctx->requests + ix; if (0 != lcb_string_reserve(rr, 4 + cmd->key.contig.nbytes)) { return LCB_CLIENT_ENOMEM; } vb16 = htons((lcb_U16)vbid); klen16 = htons((lcb_U16)cmd->key.contig.nbytes); lcb_string_append(rr, &vb16, sizeof vb16); lcb_string_append(rr, &klen16, sizeof klen16); lcb_string_append(rr, cmd->key.contig.bytes, cmd->key.contig.nbytes); ctx->remaining++; } return LCB_SUCCESS; } static mc_REQDATAPROCS obs_procs = { handle_observe_callback, handle_schedfail }; static lcb_error_t obs_ctxdone(lcb_MULTICMD_CTX *mctx, const void *cookie) { unsigned ii; OBSERVECTX *ctx = CTX_FROM_MULTI(mctx); mc_CMDQUEUE *cq = &ctx->instance->cmdq; for (ii = 0; ii < ctx->nrequests; ii++) { protocol_binary_request_header hdr; mc_PACKET *pkt; mc_PIPELINE *pipeline; lcb_string *rr = ctx->requests + ii; pipeline = cq->pipelines[ii]; if (!rr->nused) { continue; } pkt = mcreq_allocate_packet(pipeline); lcb_assert(pkt); mcreq_reserve_header(pipeline, pkt, MCREQ_PKT_BASESIZE); mcreq_reserve_value2(pipeline, pkt, rr->nused); hdr.request.magic = PROTOCOL_BINARY_REQ; hdr.request.opcode = PROTOCOL_BINARY_CMD_OBSERVE; hdr.request.datatype = PROTOCOL_BINARY_RAW_BYTES; hdr.request.keylen = 0; hdr.request.cas = 0; hdr.request.vbucket = 0; hdr.request.extlen = 0; hdr.request.opaque = pkt->opaque; hdr.request.bodylen = htonl((lcb_uint32_t)rr->nused); memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof(hdr.bytes)); memcpy(SPAN_BUFFER(&pkt->u_value.single), rr->base, rr->nused); pkt->flags |= MCREQ_F_REQEXT; pkt->u_rdata.exdata = (mc_REQDATAEX *)ctx; mcreq_sched_add(pipeline, pkt); TRACE_OBSERVE_BEGIN(&hdr, SPAN_BUFFER(&pkt->u_value.single)); } destroy_requests(ctx); ctx->base.start = gethrtime(); ctx->base.cookie = cookie; ctx->base.procs = &obs_procs; if (ctx->nrequests == 0 || ctx->remaining == 0) { free(ctx); return LCB_EINVAL; } else { MAYBE_SCHEDLEAVE(ctx->instance); return LCB_SUCCESS; } } static void obs_ctxfail(lcb_MULTICMD_CTX *mctx) { OBSERVECTX *ctx = CTX_FROM_MULTI(mctx); destroy_requests(ctx); free(ctx); } LIBCOUCHBASE_API lcb_MULTICMD_CTX * lcb_observe3_ctxnew(lcb_t instance) { OBSERVECTX *ctx; size_t ii, n_extra = LCBT_NSERVERS(instance)-1; ctx = calloc(1, sizeof(*ctx) + sizeof(ctx->requests) * n_extra); ctx->instance = instance; ctx->nrequests = n_extra + 1; ctx->mctx.addcmd = obs_ctxadd; ctx->mctx.done = obs_ctxdone; ctx->mctx.fail = obs_ctxfail; /* note this block doesn't do anything not done with calloc, but makes for * easier reading/tracking */ for (ii = 0; ii < ctx->nrequests; ii++) { lcb_string_init(ctx->requests + ii); } return &ctx->mctx; } lcb_MULTICMD_CTX * lcb_observe_ctx_dur_new(lcb_t instance) { lcb_MULTICMD_CTX *mctx = lcb_observe3_ctxnew(instance); if (mctx) { OBSERVECTX *ctx = CTX_FROM_MULTI(mctx); ctx->oflags |= F_DURABILITY; } return mctx; } LIBCOUCHBASE_API lcb_error_t lcb_observe(lcb_t instance, const void *command_cookie, lcb_size_t num, const lcb_observe_cmd_t *const *items) { unsigned ii; lcb_error_t err; lcb_MULTICMD_CTX *mctx; lcb_sched_enter(instance); mctx = lcb_observe3_ctxnew(instance); if (mctx == NULL) { return LCB_CLIENT_ENOMEM; } for (ii = 0; ii < num; ii++) { lcb_CMDOBSERVE cmd = { 0 }; const lcb_observe_cmd_t *src = items[ii]; if (src->version == 1 && (src->v.v1.options & LCB_OBSERVE_MASTER_ONLY)) { cmd.cmdflags |= LCB_CMDOBSERVE_F_MASTER_ONLY; } LCB_KREQ_SIMPLE(&cmd.key, src->v.v0.key, src->v.v0.nkey); LCB_KREQ_SIMPLE(&cmd._hashkey, src->v.v0.hashkey, src->v.v0.nhashkey); err = mctx->addcmd(mctx, (lcb_CMDBASE *)&cmd); if (err != LCB_SUCCESS) { mctx->fail(mctx); return err; } } lcb_sched_enter(instance); mctx->done(mctx, command_cookie); lcb_sched_leave(instance); SYNCMODE_INTERCEPT(instance) }