/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* * Copyright 2014 Couchbase, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** * This file contains the CCCP (Cluster Carrier Configuration Protocol) * implementation of the confmon provider. It utilizes a memcached connection * to retrieve configuration information. */ #include "internal.h" #include "clconfig.h" #include "packetutils.h" #include #include #include #include #include "ctx-log-inl.h" #define LOGARGS(cccp, lvl) cccp->parent->settings, "cccp", LCB_LOG_##lvl, __FILE__, __LINE__ #define LOGFMT "<%s:%s> " #define LOGID(cccp) get_ctx_host(cccp->ioctx), get_ctx_port(cccp->ioctx) struct CccpCookie; using namespace lcb::clconfig; struct CccpProvider : public Provider { CccpProvider(Confmon *); ~CccpProvider(); /** * Stops the current request. * @param is_clean Whether the state of the current request is 'clean', * i.e. whether we are stopping because of an error condition, or * because we have received a successful response. */ void stop_current_request(bool is_clean); lcb_error_t schedule_next_request(lcb_error_t why, bool can_rollover); lcb_error_t mcio_error(lcb_error_t why); void on_timeout() { mcio_error(LCB_ETIMEDOUT); } lcb_error_t update(const char *host, const char* data); void request_config(); void on_io_read(); bool pause(); // Override void configure_nodes(const lcb::Hostlist&); // Override void config_updated(lcbvb_CONFIG *); // Override; void dump(FILE*) const; // Override lcb_error_t refresh(); // Override ConfigInfo *get_cached() /* Override */ { return config; } const lcb::Hostlist* get_nodes() const /* Override */ { return nodes; } void enable(void *arg) { instance = reinterpret_cast(arg); Provider::enable(); } // Whether there is a pending CCCP config request. bool has_pending_request() const { return creq != NULL || cmdcookie != NULL || ioctx != NULL; } lcb::Hostlist *nodes; ConfigInfo *config; lcb::io::Timer timer; lcb_t instance; lcb::io::ConnectionRequest *creq; lcbio_CTX *ioctx; CccpCookie *cmdcookie; }; struct CccpCookie { CccpProvider *parent; bool active; CccpCookie(CccpProvider *parent_) : parent(parent_), active(true) { } }; static void io_error_handler(lcbio_CTX *, lcb_error_t); static void io_read_handler(lcbio_CTX *, unsigned nr); static void on_connected(lcbio_SOCKET *, void*, lcb_error_t, lcbio_OSERR); static void pooled_close_cb(lcbio_SOCKET *sock, int reusable, void *arg) { bool *ru_ex = reinterpret_cast(arg); lcbio_ref(sock); if (reusable && *ru_ex) { lcb::io::Pool::put(sock); } else { lcb::io::Pool::discard(sock); } } void CccpProvider::stop_current_request(bool is_clean) { if (cmdcookie) { cmdcookie->active = false; cmdcookie = NULL; } lcb::io::ConnectionRequest::cancel(&creq); if (ioctx) { lcbio_ctx_close(ioctx, pooled_close_cb, &is_clean); ioctx = NULL; } } lcb_error_t CccpProvider::schedule_next_request(lcb_error_t err, bool can_rollover) { lcb_host_t *next_host = nodes->next(can_rollover); if (!next_host) { timer.cancel(); parent->provider_failed(this, err); return err; } lcb::Server* server = instance->find_server(*next_host); if (server) { cmdcookie = new CccpCookie(this); lcb_log(LOGARGS(this, INFO), "Re-Issuing CCCP Command on server struct %p (%s:%s)", (void*)server, next_host->host, next_host->port); timer.rearm(settings().config_node_timeout); instance->request_config(cmdcookie, server); } else { lcb_log(LOGARGS(this, INFO), "Requesting connection to node %s:%s for CCCP configuration", next_host->host, next_host->port); creq = instance->memd_sockpool->get(*next_host, settings().config_node_timeout, on_connected, this); } return LCB_SUCCESS; } lcb_error_t CccpProvider::mcio_error(lcb_error_t err) { if (err != LCB_NOT_SUPPORTED && err != LCB_UNKNOWN_COMMAND) { lcb_log(LOGARGS(this, ERR), LOGFMT "Could not get configuration: %s", LOGID(this), lcb_strerror_short(err)); } stop_current_request(err == LCB_NOT_SUPPORTED); return schedule_next_request(err, false); } /** Update the configuration from a server. */ lcb_error_t lcb::clconfig::cccp_update(Provider *provider, const char *host, const char *data) { return static_cast(provider)->update(host, data); } lcb_error_t CccpProvider::update(const char *host, const char *data) { lcbvb_CONFIG* vbc; int rv; ConfigInfo *new_config; vbc = lcbvb_create(); if (!vbc) { return LCB_CLIENT_ENOMEM; } rv = lcbvb_load_json(vbc, data); if (rv) { lcb_log(LOGARGS(this, ERROR), LOGFMT "Failed to parse config", LOGID(this)); lcb_log_badconfig(LOGARGS(this, ERROR), vbc, data); lcbvb_destroy(vbc); return LCB_PROTOCOL_ERROR; } lcbvb_replace_host(vbc, host); new_config = ConfigInfo::create(vbc, CLCONFIG_CCCP); if (!new_config) { lcbvb_destroy(vbc); return LCB_CLIENT_ENOMEM; } if (config) { config->decref(); } /** TODO: Figure out the comparison vector */ config = new_config; parent->provider_got_config(this, new_config); return LCB_SUCCESS; } void lcb::clconfig::cccp_update( const void *cookie_, lcb_error_t err, const void *bytes, size_t nbytes, const lcb_host_t *origin) { CccpCookie *cookie = reinterpret_cast(const_cast(cookie_)); CccpProvider *cccp = cookie->parent; bool was_active = cookie->active; if (cookie->active) { cookie->active = false; cccp->timer.cancel(); cccp->cmdcookie = NULL; } delete cookie; if (err == LCB_SUCCESS) { std::string ss(reinterpret_cast(bytes), nbytes); err = cccp->update(origin->host, ss.c_str()); } if (err != LCB_SUCCESS && was_active) { cccp->mcio_error(err); } } static void on_connected(lcbio_SOCKET *sock, void *data, lcb_error_t err, lcbio_OSERR) { lcbio_CTXPROCS ioprocs; CccpProvider *cccp = reinterpret_cast(data); lcb_settings *settings = cccp->parent->settings; cccp->creq = NULL; if (err != LCB_SUCCESS) { if (sock) { lcb::io::Pool::discard(sock); } cccp->mcio_error(err); return; } if (lcbio_protoctx_get(sock, LCBIO_PROTOCTX_SESSINFO) == NULL) { cccp->creq = lcb::SessionRequest::start( sock, settings, settings->config_node_timeout, on_connected, cccp); return; } ioprocs.cb_err = io_error_handler; ioprocs.cb_read = io_read_handler; cccp->ioctx = lcbio_ctx_new(sock, data, &ioprocs); cccp->ioctx->subsys = "bc_cccp"; cccp->request_config(); } lcb_error_t CccpProvider::refresh() { if (has_pending_request()) { return LCB_BUSY; } return schedule_next_request(LCB_SUCCESS, true); } bool CccpProvider::pause() { if (!has_pending_request()) { return true; } stop_current_request(false); timer.cancel(); return true; } CccpProvider::~CccpProvider() { stop_current_request(false); if (config) { config->decref(); } if (nodes) { delete nodes; } timer.release(); } void CccpProvider::configure_nodes(const lcb::Hostlist& nodes_) { nodes->assign(nodes_); if (parent->settings->randomize_bootstrap_nodes) { nodes->randomize(); } } void CccpProvider::config_updated(lcbvb_CONFIG *vbc) { lcbvb_SVCMODE mode; if (LCBVB_NSERVERS(vbc) < 1) { return; } nodes->clear(); if (settings().sslopts & LCB_SSL_ENABLED) { mode = LCBVB_SVCMODE_SSL; } else { mode = LCBVB_SVCMODE_PLAIN; } for (size_t ii = 0; ii < LCBVB_NSERVERS(vbc); ii++) { const char *mcaddr = lcbvb_get_hostport(vbc, ii, LCBVB_SVCTYPE_DATA, mode); if (!mcaddr) { lcb_log(LOGARGS(this, DEBUG), "Node %lu has no data service", ii); continue; } nodes->add(mcaddr, LCB_CONFIG_MCD_PORT); } if (settings().randomize_bootstrap_nodes) { nodes->randomize(); } } static void io_error_handler(lcbio_CTX *ctx, lcb_error_t err) { CccpProvider *cccp = reinterpret_cast(lcbio_ctx_data(ctx)); cccp->mcio_error(err); } static void io_read_handler(lcbio_CTX *ioctx, unsigned) { reinterpret_cast(lcbio_ctx_data(ioctx))->on_io_read(); } void CccpProvider::on_io_read() { unsigned required; #define return_error(e) \ resp.release(ioctx); \ mcio_error(e); \ return lcb::MemcachedResponse resp; if (!resp.load(ioctx, &required)) { lcbio_ctx_rwant(ioctx, required); lcbio_ctx_schedule(ioctx); return; } if (resp.status() != PROTOCOL_BINARY_RESPONSE_SUCCESS) { lcb_log(LOGARGS(this, WARN), LOGFMT "CCCP Packet responded with 0x%x; nkey=%d, nbytes=%lu, cmd=0x%x, seq=0x%x", LOGID(this), resp.status(), resp.keylen(), (unsigned long)resp.bodylen(), resp.opcode(), resp.opaque()); switch (resp.status()) { case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED: case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: return_error(LCB_NOT_SUPPORTED); default: return_error(LCB_PROTOCOL_ERROR); } return; } if (!resp.bodylen()) { return_error(LCB_PROTOCOL_ERROR); } std::string jsonstr(resp.body(), resp.bodylen()); std::string hoststr(lcbio_get_host(lcbio_ctx_sock(ioctx))->host); resp.release(ioctx); stop_current_request(true); lcb_error_t err = update(hoststr.c_str(), jsonstr.c_str()); if (err == LCB_SUCCESS) { timer.cancel(); } else { schedule_next_request(LCB_PROTOCOL_ERROR, 0); } #undef return_error } void CccpProvider::request_config() { lcb::MemcachedRequest req(PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG); req.opaque(0xF00D); lcbio_ctx_put(ioctx, req.data(), req.size()); lcbio_ctx_rwant(ioctx, 24); lcbio_ctx_schedule(ioctx); timer.rearm(settings().config_node_timeout); } void CccpProvider::dump(FILE *fp) const { if (!enabled) { return; } fprintf(fp, "## BEGIN CCCP PROVIDER DUMP ##\n"); fprintf(fp, "TIMER ACTIVE: %s\n", timer.is_armed() ? "YES" : "NO"); fprintf(fp, "PIPELINE RESPONSE COOKIE: %p\n", (void*)cmdcookie); if (ioctx) { fprintf(fp, "CCCP Owns connection:\n"); lcbio_ctx_dump(ioctx, fp); } else if (creq) { fprintf(fp, "CCCP Is connecting\n"); } else { fprintf(fp, "CCCP does not have a dedicated connection\n"); } for (size_t ii = 0; ii < nodes->size(); ii++) { const lcb_host_t &curhost = (*nodes)[ii]; fprintf(fp, "CCCP NODE: %s:%s\n", curhost.host, curhost.port); } fprintf(fp, "## END CCCP PROVIDER DUMP ##\n"); } CccpProvider::CccpProvider(Confmon *mon) : Provider(mon, CLCONFIG_CCCP), nodes(new lcb::Hostlist()), config(NULL), timer(mon->iot, this), instance(NULL), ioctx(NULL), cmdcookie(NULL) { std::memset(&creq, 0, sizeof creq); } Provider* lcb::clconfig::new_cccp_provider(Confmon *mon) { return new CccpProvider(mon); }