/* * Copyright 2015 Couchbase, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "internal.h" lcb_error_t lcb_observe_seqno3(lcb_t instance, const void *cookie, const lcb_CMDOBSEQNO *cmd) { mc_PACKET *pkt; protocol_binary_request_header hdr; lcb_U64 uuid; if (cmd->server_index > LCBT_NSERVERS(instance)) { return LCB_EINVAL; } lcb::Server *server = instance->get_server(cmd->server_index); pkt = mcreq_allocate_packet(server); mcreq_reserve_header(server, pkt, MCREQ_PKT_BASESIZE); mcreq_reserve_value2(server, pkt, 8); /* Set the static fields */ MCREQ_PKT_RDATA(pkt)->cookie = cookie; MCREQ_PKT_RDATA(pkt)->start = gethrtime(); if (cmd->cmdflags & LCB_CMD_F_INTERNAL_CALLBACK) { pkt->flags |= MCREQ_F_PRIVCALLBACK; } memset(&hdr, 0, sizeof hdr); hdr.request.opaque = pkt->opaque; hdr.request.magic = PROTOCOL_BINARY_REQ; hdr.request.opcode = PROTOCOL_BINARY_CMD_OBSERVE_SEQNO; hdr.request.datatype = PROTOCOL_BINARY_RAW_BYTES; hdr.request.bodylen = htonl((lcb_U32)8); hdr.request.vbucket = htons(cmd->vbid); memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof hdr.bytes); uuid = lcb_htonll(cmd->uuid); memcpy(SPAN_BUFFER(&pkt->u_value.single), &uuid, sizeof uuid); LCB_SCHED_ADD(instance, server, pkt); LCBTRACE_KV_START(instance->settings, cmd, LCBTRACE_OP_OBSERVE_SEQNO, pkt->opaque, MCREQ_PKT_RDATA(pkt)->span); return LCB_SUCCESS; } const lcb_MUTATION_TOKEN * lcb_get_mutation_token(lcb_t instance, const lcb_KEYBUF *kb, lcb_error_t *errp) { int vbix, srvix; lcb_error_t err_s; const lcb_MUTATION_TOKEN *existing; if (!errp) { errp = &err_s; } if (!LCBT_VBCONFIG(instance)) { *errp = LCB_CLIENT_ETMPFAIL; return NULL; } if (LCBT_VBCONFIG(instance)->dtype != LCBVB_DIST_VBUCKET) { *errp = LCB_NOT_SUPPORTED; return NULL; } if (!LCBT_SETTING(instance, fetch_mutation_tokens)) { *errp = LCB_NOT_SUPPORTED; return NULL; } if (!instance->dcpinfo) { *errp = LCB_DURABILITY_NO_MUTATION_TOKENS; return NULL; } mcreq_map_key(&instance->cmdq, kb, kb, 0, &vbix, &srvix); existing = instance->dcpinfo + vbix; if (existing->uuid_ == 0 && existing->seqno_ == 0) { *errp = LCB_DURABILITY_NO_MUTATION_TOKENS; return NULL; } *errp = LCB_SUCCESS; return existing; }