/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* * Copyright 2014 Couchbase, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** * This file contains the CCCP (Cluster Carrier Configuration Protocol) * implementation of the confmon provider. It utilizes a memcached connection * to retrieve configuration information. */ #include "internal.h" #include "clconfig.h" #include "packetutils.h" #include #include #include #include #include "ctx-log-inl.h" #define LOGARGS(cccp, lvl) cccp->parent->settings, "cccp", LCB_LOG_##lvl, __FILE__, __LINE__ #define LOGFMT "<%s:%s> " #define LOGID(cccp) get_ctx_host(cccp->ioctx), get_ctx_port(cccp->ioctx) struct CccpCookie; using namespace lcb::clconfig; struct CccpProvider : public Provider { CccpProvider(Confmon *); ~CccpProvider(); void release_socket(bool can_reuse); lcb_error_t schedule_next_request(lcb_error_t why, bool can_rollover); lcb_error_t mcio_error(lcb_error_t why); void on_timeout() { mcio_error(LCB_ETIMEDOUT); } lcb_error_t update(const char *host, const char* data); void request_config(); void on_io_read(); bool pause(); // Override void configure_nodes(const lcb::Hostlist&); // Override void config_updated(lcbvb_CONFIG *); // Override; void dump(FILE*) const; // Override lcb_error_t refresh(); // Override ConfigInfo *get_cached() /* Override */ { return config; } const lcb::Hostlist* get_nodes() const /* Override */ { return nodes; } void enable(void *arg) { instance = reinterpret_cast(arg); Provider::enable(); } lcb::Hostlist *nodes; ConfigInfo *config; bool server_active; lcb::io::Timer timer; lcb_t instance; lcbio_CONNREQ creq; lcbio_CTX *ioctx; CccpCookie *cmdcookie; }; struct CccpCookie { CccpProvider *parent; bool ignore_errors; CccpCookie(CccpProvider *parent_) : parent(parent_), ignore_errors(false) { } }; static void io_error_handler(lcbio_CTX *, lcb_error_t); static void io_read_handler(lcbio_CTX *, unsigned nr); static void on_connected(lcbio_SOCKET *, void*, lcb_error_t, lcbio_OSERR); static void pooled_close_cb(lcbio_SOCKET *sock, int reusable, void *arg) { bool *ru_ex = reinterpret_cast(arg); lcbio_ref(sock); if (reusable && *ru_ex) { lcbio_mgr_put(sock); } else { lcbio_mgr_discard(sock); } } void CccpProvider::release_socket(bool can_reuse) { if (cmdcookie) { cmdcookie->ignore_errors = 1; cmdcookie = NULL; return; } lcbio_connreq_cancel(&creq); if (ioctx) { lcbio_ctx_close(ioctx, pooled_close_cb, &can_reuse); ioctx = NULL; } } lcb_error_t CccpProvider::schedule_next_request(lcb_error_t err, bool can_rollover) { lcb::Server *server; lcb_host_t *next_host = nodes->next(can_rollover); if (!next_host) { timer.cancel(); parent->provider_failed(this, err); server_active = false; return err; } server = instance->find_server(*next_host); if (server) { CccpCookie *cookie = new CccpCookie(this); cmdcookie = cookie; lcb_log(LOGARGS(this, INFO), "Re-Issuing CCCP Command on server struct %p (%s:%s)", (void*)server, next_host->host, next_host->port); timer.rearm(settings().config_node_timeout); instance->request_config(cookie, server); } else { lcbio_pMGRREQ preq; lcb_log(LOGARGS(this, INFO), "Requesting connection to node %s:%s for CCCP configuration", next_host->host, next_host->port); preq = lcbio_mgr_get( instance->memd_sockpool, next_host, settings().config_node_timeout, on_connected, this); LCBIO_CONNREQ_MKPOOLED(&creq, preq); } server_active = true; return LCB_SUCCESS; } lcb_error_t CccpProvider::mcio_error(lcb_error_t err) { if (err != LCB_NOT_SUPPORTED && err != LCB_UNKNOWN_COMMAND) { lcb_log(LOGARGS(this, ERR), LOGFMT "Could not get configuration: %s", LOGID(this), lcb_strerror_short(err)); } release_socket(err == LCB_NOT_SUPPORTED); return schedule_next_request(err, false); } /** Update the configuration from a server. */ lcb_error_t lcb::clconfig::cccp_update(Provider *provider, const char *host, const char *data) { return static_cast(provider)->update(host, data); } lcb_error_t CccpProvider::update(const char *host, const char *data) { /** TODO: replace this with lcbvb_ names */ lcbvb_CONFIG* vbc; int rv; ConfigInfo *new_config; vbc = lcbvb_create(); if (!vbc) { return LCB_CLIENT_ENOMEM; } rv = lcbvb_load_json(vbc, data); if (rv) { lcb_log(LOGARGS(this, ERROR), LOGFMT "Failed to parse config", LOGID(this)); lcb_log_badconfig(LOGARGS(this, ERROR), vbc, data); lcbvb_destroy(vbc); return LCB_PROTOCOL_ERROR; } lcbvb_replace_host(vbc, host); new_config = ConfigInfo::create(vbc, CLCONFIG_CCCP); if (!new_config) { lcbvb_destroy(vbc); return LCB_CLIENT_ENOMEM; } if (config) { config->decref(); } /** TODO: Figure out the comparison vector */ config = new_config; parent->provider_got_config(this, new_config); return LCB_SUCCESS; } void lcb::clconfig::cccp_update( const void *cookie, lcb_error_t err, const void *bytes, size_t nbytes, const lcb_host_t *origin) { CccpCookie *ck = reinterpret_cast(const_cast(cookie)); CccpProvider *cccp = ck->parent; if (ck == cccp->cmdcookie) { cccp->timer.cancel(); cccp->cmdcookie = NULL; } if (err == LCB_SUCCESS) { std::string ss(reinterpret_cast(bytes), nbytes); err = cccp->update(origin->host, ss.c_str()); } if (err != LCB_SUCCESS && ck->ignore_errors == 0) { cccp->mcio_error(err); } free(ck); } static void on_connected(lcbio_SOCKET *sock, void *data, lcb_error_t err, lcbio_OSERR) { lcbio_CTXPROCS ioprocs; CccpProvider *cccp = reinterpret_cast(data); lcb_settings *settings = cccp->parent->settings; LCBIO_CONNREQ_CLEAR(&cccp->creq); if (err != LCB_SUCCESS) { if (sock) { lcbio_mgr_discard(sock); } cccp->mcio_error(err); return; } if (lcbio_protoctx_get(sock, LCBIO_PROTOCTX_SESSINFO) == NULL) { lcb::SessionRequest *sreq = lcb::SessionRequest::start( sock, settings, settings->config_node_timeout, on_connected, cccp); LCBIO_CONNREQ_MKGENERIC(&cccp->creq, sreq, lcb::sessreq_cancel); return; } ioprocs.cb_err = io_error_handler; ioprocs.cb_read = io_read_handler; cccp->ioctx = lcbio_ctx_new(sock, data, &ioprocs); cccp->ioctx->subsys = "bc_cccp"; cccp->request_config(); } lcb_error_t CccpProvider::refresh() { if (creq.u.p_generic || server_active || cmdcookie) { return LCB_BUSY; } return schedule_next_request(LCB_SUCCESS, true); } bool CccpProvider::pause() { if (!server_active) { return true; } server_active = 0; release_socket(false); timer.cancel(); return true; } CccpProvider::~CccpProvider() { release_socket(false); if (config) { config->decref(); } if (nodes) { delete nodes; } timer.release(); if (cmdcookie) { cmdcookie->ignore_errors = 1; } } void CccpProvider::configure_nodes(const lcb::Hostlist& nodes_) { nodes->assign(nodes_); if (parent->settings->randomize_bootstrap_nodes) { nodes->randomize(); } } void CccpProvider::config_updated(lcbvb_CONFIG *vbc) { lcbvb_SVCMODE mode; if (LCBVB_NSERVERS(vbc) < 1) { return; } nodes->clear(); if (settings().sslopts & LCB_SSL_ENABLED) { mode = LCBVB_SVCMODE_SSL; } else { mode = LCBVB_SVCMODE_PLAIN; } for (size_t ii = 0; ii < LCBVB_NSERVERS(vbc); ii++) { const char *mcaddr = lcbvb_get_hostport(vbc, ii, LCBVB_SVCTYPE_DATA, mode); if (!mcaddr) { lcb_log(LOGARGS(this, DEBUG), "Node %lu has no data service", ii); continue; } nodes->add(mcaddr, LCB_CONFIG_MCD_PORT); } if (settings().randomize_bootstrap_nodes) { nodes->randomize(); } } static void io_error_handler(lcbio_CTX *ctx, lcb_error_t err) { CccpProvider *cccp = reinterpret_cast(lcbio_ctx_data(ctx)); cccp->mcio_error(err); } static void io_read_handler(lcbio_CTX *ioctx, unsigned) { reinterpret_cast(lcbio_ctx_data(ioctx))->on_io_read(); } void CccpProvider::on_io_read() { unsigned required; #define return_error(e) \ resp.release(ioctx); \ mcio_error(e); \ return lcb::MemcachedResponse resp; if (!resp.load(ioctx, &required)) { lcbio_ctx_rwant(ioctx, required); lcbio_ctx_schedule(ioctx); return; } if (resp.status() != PROTOCOL_BINARY_RESPONSE_SUCCESS) { lcb_log(LOGARGS(this, WARN), LOGFMT "CCCP Packet responded with 0x%x; nkey=%d, nbytes=%lu, cmd=0x%x, seq=0x%x", LOGID(this), resp.status(), resp.keylen(), (unsigned long)resp.bodylen(), resp.opcode(), resp.opaque()); switch (resp.status()) { case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED: case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: return_error(LCB_NOT_SUPPORTED); default: return_error(LCB_PROTOCOL_ERROR); } return; } if (!resp.bodylen()) { return_error(LCB_PROTOCOL_ERROR); } std::string jsonstr(resp.body(), resp.bodylen()); std::string hoststr(lcbio_get_host(lcbio_ctx_sock(ioctx))->host); resp.release(ioctx); release_socket(true); lcb_error_t err = update(hoststr.c_str(), jsonstr.c_str()); if (err == LCB_SUCCESS) { timer.cancel(); server_active = false; } else { schedule_next_request(LCB_PROTOCOL_ERROR, 0); } #undef return_error } void CccpProvider::request_config() { lcb::MemcachedRequest req(PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG); req.opaque(0xF00D); lcbio_ctx_put(ioctx, req.data(), req.size()); lcbio_ctx_rwant(ioctx, 24); lcbio_ctx_schedule(ioctx); timer.rearm(settings().config_node_timeout); } void CccpProvider::dump(FILE *fp) const { if (!enabled) { return; } fprintf(fp, "## BEGIN CCCP PROVIDER DUMP ##\n"); fprintf(fp, "TIMER ACTIVE: %s\n", timer.is_armed() ? "YES" : "NO"); fprintf(fp, "PIPELINE RESPONSE COOKIE: %p\n", (void*)cmdcookie); if (ioctx) { fprintf(fp, "CCCP Owns connection:\n"); lcbio_ctx_dump(ioctx, fp); } else if (creq.u.p_generic) { fprintf(fp, "CCCP Is connecting\n"); } else { fprintf(fp, "CCCP does not have a dedicated connection\n"); } for (size_t ii = 0; ii < nodes->size(); ii++) { const lcb_host_t &curhost = (*nodes)[ii]; fprintf(fp, "CCCP NODE: %s:%s\n", curhost.host, curhost.port); } fprintf(fp, "## END CCCP PROVIDER DUMP ##\n"); } CccpProvider::CccpProvider(Confmon *mon) : Provider(mon, CLCONFIG_CCCP), nodes(new lcb::Hostlist()), config(NULL), server_active(false), timer(mon->iot, this), instance(NULL), ioctx(NULL), cmdcookie(NULL) { std::memset(&creq, 0, sizeof creq); } Provider* lcb::clconfig::new_cccp_provider(Confmon *mon) { return new CccpProvider(mon); }