/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* * Copyright 2011-2013 Couchbase, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "internal.h" #include "packetutils.h" #include "bucketconfig/clconfig.h" #include "vbucket/aliases.h" #include "sllist-inl.h" #define LOGARGS(instance, lvl) (instance)->settings, "newconfig", LCB_LOG_##lvl, __FILE__, __LINE__ #define LOG(instance, lvlbase, msg) lcb_log(instance->settings, "newconfig", LCB_LOG_##lvlbase, __FILE__, __LINE__, msg) #define SERVER_FMT "%s:%s (%p)" #define SERVER_ARGS(s) (s)->curhost->host, (s)->curhost->port, (void *)s typedef struct lcb_GUESSVB_st { time_t last_update; /**< Last time this vBucket was heuristically set */ char newix; /**< New index, heuristically determined */ char oldix; /**< Original index, according to map */ char used; /**< Flag indicating whether or not this entry has been used */ } lcb_GUESSVB; /* Ignore configuration updates for heuristically guessed vBuckets for a * maximum amount of [n] seconds */ #define MAX_KEEP_GUESS 20 static int should_keep_guess(lcb_GUESSVB *guess, lcbvb_VBUCKET *vb) { if (guess->newix == guess->oldix) { /* Heuristic position is the same as starting position */ return 0; } if (vb->servers[0] != guess->oldix) { /* Previous master changed */ return 0; } if (time(NULL) - guess->last_update > MAX_KEEP_GUESS) { /* Last usage too old */ return 0; } return 1; } void lcb_vbguess_newconfig(lcb_t instance, lcbvb_CONFIG *cfg, lcb_GUESSVB *guesses) { unsigned ii; if (!guesses) { return; } for (ii = 0; ii < cfg->nvb; ii++) { lcb_GUESSVB *guess = guesses + ii; lcbvb_VBUCKET *vb = cfg->vbuckets + ii; if (!guess->used) { continue; } /* IF: Heuristically learned a new index, _and_ the old index (which is * known to be bad) is the same index stated by the new config */ if (should_keep_guess(guess, vb)) { lcb_log(LOGARGS(instance, TRACE), "Keeping heuristically guessed index. VBID=%d. Current=%d. Old=%d.", ii, guess->newix, guess->oldix); vb->servers[0] = guess->newix; } else { /* We don't reassign to the guess structure here. The idea is that * we will simply use the new config. If this gives us problems, the * config will re-learn again. */ lcb_log(LOGARGS(instance, TRACE), "Ignoring heuristically guessed index. VBID=%d. Current=%d. Old=%d. New=%d", ii, guess->newix, guess->oldix, vb->servers[0]); guess->used = 0; } } } int lcb_vbguess_remap(lcb_t instance, int vbid, int bad) { if (LCBT_SETTING(instance, vb_noguess)) { int newix = lcbvb_nmv_remap_ex(LCBT_VBCONFIG(instance), vbid, bad, 0); if (newix > -1 && newix != bad) { lcb_log(LOGARGS(instance, TRACE), "Got new index from ffmap. VBID=%d. Old=%d. New=%d", vbid, bad, newix); } return newix; } else { lcb_GUESSVB *guesses = instance->vbguess; lcb_GUESSVB *guess = guesses + vbid; int newix = lcbvb_nmv_remap_ex(LCBT_VBCONFIG(instance), vbid, bad, 1); if (!guesses) { guesses = instance->vbguess = calloc( LCBT_VBCONFIG(instance)->nvb, sizeof *guesses); } if (newix > -1 && newix != bad) { guess->newix = newix; guess->oldix = bad; guess->used = 1; guess->last_update = time(NULL); lcb_log(LOGARGS(instance, TRACE), "Guessed new heuristic index VBID=%d. Old=%d. New=%d", vbid, bad, newix); } return newix; } } /** * Finds the index of an older server using the current config. * * This function is used to help reuse the server structures for memcached * packets. * * @param oldconfig The old configuration. This is the configuration the * old server is bound to * @param newconfig The new configuration. This will be inspected for new * nodes which may have been added, and ones which may have been removed. * @param server The server to match * @return The new index, or -1 if the current server is not present in the new * config. */ static int find_new_data_index(lcbvb_CONFIG *oldconfig, lcbvb_CONFIG* newconfig, mc_SERVER *server) { size_t ii; const char *old_datahost = lcbvb_get_hostport(oldconfig, server->pipeline.index, LCBVB_SVCTYPE_DATA, LCBVB_SVCMODE_PLAIN); if (!old_datahost) { /* Old server had no data service */ return -1; } for (ii = 0; ii < LCBVB_NSERVERS(newconfig); ii++) { const char *new_datahost = lcbvb_get_hostport(newconfig, ii, LCBVB_SVCTYPE_DATA, LCBVB_SVCMODE_PLAIN); if (new_datahost && strcmp(new_datahost, old_datahost) == 0) { return ii; } } return -1; } static void log_vbdiff(lcb_t instance, lcbvb_CONFIGDIFF *diff) { char **curserver; lcb_log(LOGARGS(instance, INFO), "Config Diff: [ vBuckets Modified=%d ], [Sequence Changed=%d]", diff->n_vb_changes, diff->sequence_changed); if (diff->servers_added) { for (curserver = diff->servers_added; *curserver; curserver++) { lcb_log(LOGARGS(instance, INFO), "Detected server %s added", *curserver); } } if (diff->servers_removed) { for (curserver = diff->servers_removed; *curserver; curserver++) { lcb_log(LOGARGS(instance, INFO), "Detected server %s removed", *curserver); } } } /** * This callback is invoked for packet relocation twice. It tries to relocate * commands to their destination server. Some commands may not be relocated * either because they have no explicit "Relocation Information" (i.e. no * specific vbucket) or because the command is tied to a specific server (i.e. * CMD_STAT). * * Note that `KEEP_PACKET` here doesn't mean to "Save" the packet, but rather * to keep the packet in the current queue (so that if the server ends up * being removed, the command will fail); rather than being relocated to * another server. */ static int iterwipe_cb(mc_CMDQUEUE *cq, mc_PIPELINE *oldpl, mc_PACKET *oldpkt, void *arg) { protocol_binary_request_header hdr; mc_SERVER *srv = (mc_SERVER *)oldpl; mc_PIPELINE *newpl; mc_PACKET *newpkt; int newix; (void)arg; mcreq_read_hdr(oldpkt, &hdr); if (!lcb_should_retry(srv->settings, oldpkt, LCB_MAX_ERROR)) { return MCREQ_KEEP_PACKET; } if (LCBVB_DISTTYPE(cq->config) == LCBVB_DIST_VBUCKET) { newix = lcbvb_vbmaster(cq->config, ntohs(hdr.request.vbucket)); } else { const void *key = NULL; lcb_SIZE nkey = 0; int tmpid; /* XXX: We ignore hashkey. This is going away soon, and is probably * better than simply failing the items. */ mcreq_get_key(oldpkt, &key, &nkey); lcbvb_map_key(cq->config, key, nkey, &tmpid, &newix); } if (newix < 0 || newix > (int)cq->npipelines-1) { return MCREQ_KEEP_PACKET; } newpl = cq->pipelines[newix]; if (newpl == oldpl || newpl == NULL) { return MCREQ_KEEP_PACKET; } lcb_log(LOGARGS((lcb_t)cq->cqdata, DEBUG), "Remapped packet %p (SEQ=%u) from "SERVER_FMT " to " SERVER_FMT, (void*)oldpkt, oldpkt->opaque, SERVER_ARGS((mc_SERVER*)oldpl), SERVER_ARGS((mc_SERVER*)newpl)); /** Otherwise, copy over the packet and find the new vBucket to map to */ newpkt = mcreq_renew_packet(oldpkt); newpkt->flags &= ~MCREQ_STATE_FLAGS; mcreq_reenqueue_packet(newpl, newpkt); mcreq_packet_handled(oldpl, oldpkt); return MCREQ_REMOVE_PACKET; } static int replace_config(lcb_t instance, lcbvb_CONFIG *oldconfig, lcbvb_CONFIG *newconfig) { mc_CMDQUEUE *cq = &instance->cmdq; mc_PIPELINE **ppold, **ppnew; unsigned ii, nold, nnew; assert(LCBT_VBCONFIG(instance) == newconfig); nnew = LCBVB_NSERVERS(newconfig); ppnew = calloc(nnew, sizeof(*ppnew)); ppold = mcreq_queue_take_pipelines(cq, &nold); /** * Determine which existing servers are still part of the new cluster config * and place it inside the new list. */ for (ii = 0; ii < nold; ii++) { mc_SERVER *cur = (mc_SERVER *)ppold[ii]; int newix = find_new_data_index(oldconfig, newconfig, cur); if (newix > -1) { cur->pipeline.index = newix; ppnew[newix] = &cur->pipeline; ppold[ii] = NULL; lcb_log(LOGARGS(instance, INFO), "Reusing server "SERVER_FMT". OldIndex=%d. NewIndex=%d", SERVER_ARGS(cur), ii, newix); } } /** * Once we've moved the kept servers to the new list, allocate new mc_SERVER * structures for slots that don't have an existing mc_SERVER. We must do * this before add_pipelines() is called, so that there are no holes inside * ppnew */ for (ii = 0; ii < nnew; ii++) { if (!ppnew[ii]) { ppnew[ii] = (mc_PIPELINE *)mcserver_alloc(instance, ii); ppnew[ii]->index = ii; } } /** * Once we have all the server structures in place for the new config, * transfer the new config along with the new list over to the CQ structure. */ mcreq_queue_add_pipelines(cq, ppnew, nnew, newconfig); for (ii = 0; ii < nnew; ii++) { mcreq_iterwipe(cq, ppnew[ii], iterwipe_cb, NULL); } /** * Go through all the servers that are to be removed and relocate commands * from their queues into the new queues */ for (ii = 0; ii < nold; ii++) { if (!ppold[ii]) { continue; } mcreq_iterwipe(cq, ppold[ii], iterwipe_cb, NULL); mcserver_fail_chain((mc_SERVER *)ppold[ii], LCB_MAP_CHANGED); mcserver_close((mc_SERVER *)ppold[ii]); } for (ii = 0; ii < nnew; ii++) { if (mcserver_has_pending((mc_SERVER*)ppnew[ii])) { ppnew[ii]->flush_start(ppnew[ii]); } } free(ppnew); free(ppold); return LCB_CONFIGURATION_CHANGED; } void lcb_update_vbconfig(lcb_t instance, clconfig_info *config) { lcb_size_t ii; int change_status; clconfig_info *old_config; mc_CMDQUEUE *q = &instance->cmdq; old_config = instance->cur_configinfo; instance->cur_configinfo = config; lcb_clconfig_incref(config); q->config = instance->cur_configinfo->vbc; q->cqdata = instance; if (old_config) { lcbvb_CONFIGDIFF *diff = lcbvb_compare(old_config->vbc, config->vbc); if (diff) { log_vbdiff(instance, diff); lcbvb_free_diff(diff); } /* Apply the vb guesses */ lcb_vbguess_newconfig(instance, config->vbc, instance->vbguess); change_status = replace_config(instance, old_config->vbc, config->vbc); if (change_status == -1) { LOG(instance, ERR, "Couldn't replace config"); return; } lcb_clconfig_decref(old_config); } else { unsigned nservers; mc_PIPELINE **servers; nservers = VB_NSERVERS(config->vbc); if ((servers = malloc(sizeof(*servers) * nservers)) == NULL) { assert(servers); lcb_log(LOGARGS(instance, FATAL), "Couldn't allocate memory for new server list! (n=%u)", nservers); return; } for (ii = 0; ii < nservers; ii++) { mc_SERVER *srv; if ((srv = mcserver_alloc(instance, ii)) == NULL) { assert(srv); lcb_log(LOGARGS(instance, FATAL), "Couldn't allocate memory for server instance!"); return; } servers[ii] = &srv->pipeline; } mcreq_queue_add_pipelines(q, servers, nservers, config->vbc); change_status = LCB_CONFIGURATION_NEW; free(servers); } /* Update the list of nodes here for server list */ hostlist_clear(instance->ht_nodes); for (ii = 0; ii < LCBVB_NSERVERS(config->vbc); ++ii) { const char *hp = lcbvb_get_hostport(config->vbc, ii, LCBVB_SVCTYPE_MGMT, LCBVB_SVCMODE_PLAIN); if (hp) { hostlist_add_stringz(instance->ht_nodes, hp, LCB_CONFIG_HTTP_PORT); } } instance->callbacks.configuration(instance, change_status); lcb_maybe_breakout(instance); }