/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* * Copyright 2011-2012 Couchbase, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define NOMINMAX // Because MS' CRT headers #define min and max :( #include "config.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef WIN32 #include #include #else #define usleep(n) Sleep(n/1000) #endif #include #include #include #include "common/options.h" #include "common/histogram.h" #include "contrib/lcb-jsoncpp/lcb-jsoncpp.h" #include "docgen/seqgen.h" #include "docgen/docgen.h" using namespace std; using namespace cbc; using namespace cliopts; using namespace Pillowfight; using std::vector; using std::string; // Deprecated options which still exist from previous versions. struct DeprecatedOptions { UIntOption iterations; UIntOption instances; BoolOption loop; DeprecatedOptions() : iterations("iterations"), instances("num-instances"), loop("loop") { iterations.abbrev('i').hide().setDefault(1000); instances.abbrev('Q').hide().setDefault(1); loop.abbrev('l').hide().setDefault(false); } void addOptions(Parser &p) { p.addOption(instances); p.addOption(loop); p.addOption(iterations); } }; static TemplateSpec parseTemplateSpec(const string& input) { TemplateSpec spec; // Just need to find the path size_t endpos = input.find(','); if (endpos == string::npos) { throw std::runtime_error("invalid template spec: need field,min,max"); } spec.term = input.substr(0, endpos); unsigned is_sequential = 0; int rv = sscanf(input.c_str() + endpos + 1, "%u,%u,%u", &spec.minval, &spec.maxval, &is_sequential); if (rv < 2) { throw std::runtime_error("invalid template spec: need field,min,max"); } spec.sequential = is_sequential; if (spec.minval > spec.maxval) { throw std::runtime_error("min cannot be higher than max"); } return spec; } class Configuration { public: Configuration() : o_multiSize("batch-size"), o_numItems("num-items"), o_keyPrefix("key-prefix"), o_numThreads("num-threads"), o_randSeed("random-seed"), o_randomBody("random-body"), o_setPercent("set-pct"), o_minSize("min-size"), o_maxSize("max-size"), o_noPopulate("no-population"), o_pauseAtEnd("pause-at-end"), o_numCycles("num-cycles"), o_sequential("sequential"), o_startAt("start-at"), o_rateLimit("rate-limit"), o_userdocs("docs"), o_writeJson("json"), o_templatePairs("template"), o_subdoc("subdoc"), o_noop("noop"), o_sdPathCount("pathcount"), o_populateOnly("populate-only"), o_exptime("expiry"), o_collection("collection"), o_separator("separator"), o_persist("persist-to"), o_replicate("replicate-to"), o_lock("lock") { o_multiSize.setDefault(100).abbrev('B').description("Number of operations to batch"); o_numItems.setDefault(1000).abbrev('I').description("Number of items to operate on"); o_keyPrefix.abbrev('p').description("key prefix to use"); o_numThreads.setDefault(1).abbrev('t').description("The number of threads to use"); o_randSeed.setDefault(0).abbrev('s').description("Specify random seed").hide(); o_randomBody.setDefault(false).abbrev('R').description("Randomize document body (otherwise use 'x' and '*' to fill)"); o_setPercent.setDefault(33).abbrev('r').description("The percentage of operations which should be mutations"); o_minSize.setDefault(50).abbrev('m').description("Set minimum payload size"); o_maxSize.setDefault(5120).abbrev('M').description("Set maximum payload size"); o_noPopulate.setDefault(false).abbrev('n').description("Skip population"); o_pauseAtEnd.setDefault(false).abbrev('E').description("Pause at end of run (holding connections open) until user input"); o_numCycles.setDefault(-1).abbrev('c').description("Number of cycles to be run until exiting. Set to -1 to loop infinitely"); o_sequential.setDefault(false).description("Use sequential access (instead of random)"); o_startAt.setDefault(0).description("For sequential access, set the first item"); o_rateLimit.setDefault(0).description("Set operations per second limit (per thread)"); o_userdocs.description("User documents to load (overrides --min-size and --max-size"); o_writeJson.abbrev('J').description("Enable writing JSON values (rather than bytes)"); o_templatePairs.description("Values for templates to be inserted into user documents"); o_templatePairs.argdesc("FIELD,MIN,MAX[,SEQUENTIAL]").hide(); o_subdoc.description("Use subdoc instead of fulldoc operations"); o_noop.description("Use NOOP instead of document operations").setDefault(0); o_sdPathCount.description("Number of subdoc paths per command").setDefault(1); o_populateOnly.description("Exit after documents have been populated"); o_exptime.description("Set TTL for items").abbrev('e'); o_collection.description("Allowed collection name (could be specified multiple times)").hide(); o_separator.setDefault(":").description("Separator for collection prefix in keys").hide(); o_persist.description("Wait until item is persisted to this number of nodes (-1 for master+replicas)").setDefault(0); o_replicate.description("Wait until item is replicated to this number of nodes (-1 for all replicas)").setDefault(0); o_lock.description("Lock keys for updates for given time (will not lock when set to zero)").setDefault(0); params.getTimings().description("Enable command timings (second time to dump timings automatically)"); } void processOptions() { opsPerCycle = o_multiSize.result(); prefix = o_keyPrefix.result(); setprc = o_setPercent.result(); shouldPopulate = !o_noPopulate.result(); persistTo = o_persist.result(); replicateTo = o_replicate.result(); lockTime = o_lock.result(); if (lockTime && o_numItems < opsPerCycle * o_numThreads) { fprintf(stderr, "The --num-items=%d cannot be smaller than --batch-size=%d multiplied to --num-thread=%d when used with --lock=%d\n", (int)o_numItems, (int)opsPerCycle, (int)o_numThreads, (int)lockTime); exit(EXIT_FAILURE); } if (o_keyPrefix.passed() && o_collection.passed()) { throw std::runtime_error("The --collection is not compatible with --key-prefix"); } if (depr.loop.passed()) { fprintf(stderr, "The --loop/-l option is deprecated. Use --num-cycles\n"); maxCycles = -1; } else { maxCycles = o_numCycles.result(); } if (o_populateOnly.passed()) { // Determine how many iterations are required. if (o_numCycles.passed()) { throw std::runtime_error("--num-cycles incompatible with --populate-only"); } size_t est = (o_numItems / o_numThreads) / o_multiSize; while (est * o_numThreads * o_multiSize < o_numItems) { est++; } maxCycles = est; o_sequential.setDefault(true); fprintf(stderr, "Populating using %d cycles\n", maxCycles); } if (depr.iterations.passed()) { fprintf(stderr, "The --num-iterations/-I option is deprecated. Use --batch-size\n"); opsPerCycle = depr.iterations.result(); } vector specs; vector userdocs; if (o_templatePairs.passed()) { vector specs_str = o_templatePairs.result(); for (size_t ii = 0; ii < specs_str.size(); ii++) { specs.push_back(parseTemplateSpec(specs_str[ii])); } } // Set the document sizes.. if (o_userdocs.passed()) { if (o_minSize.passed() || o_maxSize.passed()) { fprintf(stderr, "--min-size/--max-size invalid with userdocs\n"); } vector filenames = o_userdocs.result(); for (size_t ii = 0; ii < filenames.size(); ii++) { std::stringstream ss; std::ifstream ifs(filenames[ii].c_str()); if (!ifs.is_open()) { perror(filenames[ii].c_str()); exit(EXIT_FAILURE); } ss << ifs.rdbuf(); userdocs.push_back(ss.str()); } } if (specs.empty()) { if (o_writeJson.result()) { docgen = new JsonDocGenerator(o_minSize.result(), o_maxSize.result(), o_randomBody.numSpecified()); } else if (!userdocs.empty()) { docgen = new PresetDocGenerator(userdocs); } else { docgen = new RawDocGenerator(o_minSize.result(), o_maxSize.result(), o_randomBody.numSpecified()); } } else { if (o_writeJson.result()) { if (userdocs.empty()) { docgen = new PlaceholderJsonGenerator( o_minSize.result(), o_maxSize.result(), specs, o_randomBody.numSpecified()); } else { docgen = new PlaceholderJsonGenerator(userdocs, specs); } } else { if (userdocs.empty()) { throw std::runtime_error("Must provide documents with placeholders!"); } docgen = new PlaceholderDocGenerator(userdocs, specs); } } sdOpsPerCmd = o_sdPathCount.result(); if (o_sdPathCount.passed()) { o_subdoc.setDefault(true); } if (o_collection.passed()) { string separator = o_separator.result(); if (separator.empty()) { throw std::runtime_error("Collection name separator must not be empty"); } vector names = o_collection.result(); for (size_t ii = 0; ii < names.size(); ii++) { collections.push_back(names[ii] + separator); } } } void addOptions(Parser& parser) { parser.addOption(o_multiSize); parser.addOption(o_numItems); parser.addOption(o_keyPrefix); parser.addOption(o_numThreads); parser.addOption(o_randSeed); parser.addOption(o_randomBody); parser.addOption(o_setPercent); parser.addOption(o_noPopulate); parser.addOption(o_minSize); parser.addOption(o_maxSize); parser.addOption(o_pauseAtEnd); parser.addOption(o_numCycles); parser.addOption(o_sequential); parser.addOption(o_startAt); parser.addOption(o_rateLimit); parser.addOption(o_userdocs); parser.addOption(o_writeJson); parser.addOption(o_templatePairs); parser.addOption(o_subdoc); parser.addOption(o_noop); parser.addOption(o_sdPathCount); parser.addOption(o_populateOnly); parser.addOption(o_exptime); parser.addOption(o_collection); parser.addOption(o_separator); parser.addOption(o_persist); parser.addOption(o_replicate); parser.addOption(o_lock); params.addToParser(parser); depr.addOptions(parser); } int numTimings(void) { return params.numTimings(); } bool isLoopDone(size_t niter) { if (maxCycles == -1) { return false; } return niter >= (size_t)maxCycles; } uint32_t getRandomSeed() { return o_randSeed; } uint32_t getNumThreads() { return o_numThreads; } string& getKeyPrefix() { return prefix; } bool shouldPauseAtEnd() { return o_pauseAtEnd; } bool sequentialAccess() { return o_sequential; } bool isSubdoc() { return o_subdoc; } bool isNoop() { return o_noop.result(); } bool useCollections() { return o_collection.passed(); } bool writeJson() { return o_writeJson.result(); } unsigned firstKeyOffset() { return o_startAt; } uint32_t getNumItems() { return o_numItems; } uint32_t getRateLimit() { return o_rateLimit; } unsigned getExptime() { return o_exptime; } uint32_t opsPerCycle; uint32_t sdOpsPerCmd; unsigned setprc; string prefix; volatile int maxCycles; bool shouldPopulate; bool hasTemplates; ConnParams params; const DocGeneratorBase *docgen; vector collections; int replicateTo; int persistTo; int lockTime; private: UIntOption o_multiSize; UIntOption o_numItems; StringOption o_keyPrefix; UIntOption o_numThreads; UIntOption o_randSeed; BoolOption o_randomBody; UIntOption o_setPercent; UIntOption o_minSize; UIntOption o_maxSize; BoolOption o_noPopulate; BoolOption o_pauseAtEnd; // Should pillowfight pause execution (with // connections open) before exiting? IntOption o_numCycles; BoolOption o_sequential; UIntOption o_startAt; UIntOption o_rateLimit; // List of paths to user documents to load.. They should all be valid JSON ListOption o_userdocs; // Whether generated values should be JSON BoolOption o_writeJson; // List of template ranges for value generation ListOption o_templatePairs; BoolOption o_subdoc; BoolOption o_noop; UIntOption o_sdPathCount; // Compound option BoolOption o_populateOnly; UIntOption o_exptime; ListOption o_collection; StringOption o_separator; IntOption o_persist; IntOption o_replicate; IntOption o_lock; DeprecatedOptions depr; } config; void log(const char *format, ...) { char buffer[512]; va_list args; va_start(args, format); vsprintf(buffer, format, args); if (config.numTimings() > 0) { std::cerr << "[" << std::fixed << lcb_nstime() / 1000000000.0 << "] "; } std::cerr << buffer << std::endl; va_end(args); } extern "C" { static void operationCallback(lcb_t, int, const lcb_RESPBASE*); static void storeCallback(lcb_t, int, const lcb_RESPBASE *); } class ThreadContext; class InstanceCookie { public: InstanceCookie(lcb_t instance) { lcb_set_cookie(instance, this); lastPrint = 0; if (config.numTimings() > 0) { hg.install(instance, stdout); } stats.total = 0; stats.retried = 0; stats.etmpfail = 0; stats.eexist = 0; stats.etimeout = 0; } static InstanceCookie* get(lcb_t instance) { return (InstanceCookie *)lcb_get_cookie(instance); } static void dumpTimings(lcb_t instance, const char *header = NULL, bool force=false) { time_t now = time(NULL); InstanceCookie *ic = get(instance); if (now - ic->lastPrint > 0) { ic->lastPrint = now; } else if (!force) { return; } Histogram &h = ic->hg; if (header) { printf("[%f %s]\n", lcb_nstime() / 1000000000.0, header); } printf(" +---------+---------+---------+---------+\n"); h.write(); printf(" +----------------------------------------\n"); } void setContext(ThreadContext *context) { m_context = context; } ThreadContext * getContext() { return m_context; } struct { size_t total; size_t retried; size_t etmpfail; size_t eexist; size_t etimeout; } stats; private: time_t lastPrint; Histogram hg; ThreadContext *m_context; }; struct NextOp { NextOp() : m_seqno(0), m_mode(GET), m_cas(0) {} string m_key; uint32_t m_seqno; vector m_valuefrags; vector m_specs; // The mode here is for future use with subdoc enum Mode { STORE, GET, SDSTORE, SDGET, NOOP }; Mode m_mode; uint64_t m_cas; }; class OpGenerator { public: OpGenerator(int id): m_id(id) {} virtual ~OpGenerator() {}; virtual void setNextOp(NextOp& op) = 0; virtual void setValue(NextOp& op) = 0; virtual void populateIov(uint32_t, vector&) = 0; virtual bool inPopulation() const = 0; virtual void checkin(uint32_t) = 0; virtual const char *getStageString() const = 0; protected: int m_id; }; class NoopGenerator : public OpGenerator { public: NoopGenerator(int ix) : OpGenerator(ix) {} void setNextOp(NextOp& op) { op.m_mode = NextOp::NOOP; } void setValue(NextOp&) {} void populateIov(uint32_t, vector&) {} bool inPopulation() const { return false; } void checkin(uint32_t) {} const char *getStageString() const { return "Run"; } }; /** Stateful, per-thread generator */ class KeyGenerator : public OpGenerator { public: KeyGenerator(int ix) : OpGenerator(ix), m_gencount(0), m_force_sequential(false), m_in_population(config.shouldPopulate) { srand(config.getRandomSeed()); m_genrandom = new SeqGenerator( config.firstKeyOffset(), config.getNumItems() + config.firstKeyOffset()); m_gensequence = new SeqGenerator( config.firstKeyOffset(), config.getNumItems() + config.firstKeyOffset(), config.getNumThreads(), ix); if (m_in_population) { m_force_sequential = true; } else { m_force_sequential = config.sequentialAccess(); } m_local_genstate = config.docgen->createState(config.getNumThreads(), ix); if (config.isSubdoc()) { m_mode_read = NextOp::SDGET; m_mode_write = NextOp::SDSTORE; m_sdgenstate = config.docgen->createSubdocState(config.getNumThreads(), ix); if (!m_sdgenstate) { std::cerr << "Current generator does not support subdoc. Did you try --json?" << std::endl; exit(EXIT_FAILURE); } } else { m_mode_read = NextOp::GET; m_mode_write = NextOp::STORE; } } void setValue(NextOp& op) { m_local_genstate->populateIov(op.m_seqno, op.m_valuefrags); } void populateIov(uint32_t seq, vector& iov_out) { m_local_genstate->populateIov(seq, iov_out); } void setNextOp(NextOp& op) { bool store_override = false; if (m_in_population) { if (m_gencount++ < m_gensequence->maxItems()) { store_override = true; } else { printf("Thread %d has finished populating.\n", m_id); m_in_population = false; m_force_sequential = config.sequentialAccess(); } } if (m_in_population || !config.lockTime) { op.m_seqno = (m_force_sequential ? m_gensequence : m_genrandom)->next(); } else { op.m_seqno = (m_force_sequential ? m_gensequence : m_genrandom)->checkout(); } if (store_override) { // Populate op.m_mode = NextOp::STORE; setValue(op); } else if (shouldStore(op.m_seqno)) { op.m_mode = m_mode_write; if (op.m_mode == NextOp::STORE) { setValue(op); } else if (op.m_mode == NextOp::SDSTORE) { op.m_specs.resize(config.sdOpsPerCmd); m_sdgenstate->populateMutate(op.m_seqno, op.m_specs); } else { fprintf(stderr, "Invalid mode for op: %d\n", op.m_mode); abort(); } } else { op.m_mode = m_mode_read; if (op.m_mode == NextOp::SDGET) { op.m_specs.resize(config.sdOpsPerCmd); m_sdgenstate->populateLookup(op.m_seqno, op.m_specs); } } generateKey(op); } bool inPopulation() const { return m_in_population; } void checkin(uint32_t seqno) { (m_force_sequential ? m_gensequence : m_genrandom)->checkin(seqno); } const char *getStageString() const { if (m_in_population) { return "Populate"; } else { return "Run"; } } private: bool shouldStore(uint32_t seqno) { if (config.setprc == 0) { return false; } float seqno_f = seqno % 100; float pct_f = seqno_f / config.setprc; return pct_f < 1; } void generateKey(NextOp& op) { uint32_t seqno = op.m_seqno; char buffer[21]; snprintf(buffer, sizeof(buffer), "%020d", seqno); string &prefix = config.useCollections() ? config.collections[seqno % config.collections.size()] : config.getKeyPrefix(); op.m_key.assign(prefix + buffer); } SeqGenerator *m_genrandom; SeqGenerator *m_gensequence; size_t m_gencount; bool m_force_sequential; bool m_in_population; NextOp::Mode m_mode_read; NextOp::Mode m_mode_write; GeneratorState *m_local_genstate; SubdocGeneratorState *m_sdgenstate; }; #define OPFLAGS_LOCKED 0x01 class ThreadContext { public: ThreadContext(lcb_t handle, int ix) : niter(0), instance(handle) { if (config.isNoop()) { gen = new NoopGenerator(ix); } else { gen = new KeyGenerator(ix); } } ~ThreadContext() { delete gen; gen = NULL; } bool inPopulation() { return gen && (gen->inPopulation() || !retryq.empty()); } void checkin(uint32_t seqno) { if (gen) { gen->checkin(seqno); } } void singleLoop() { bool hasItems = false; lcb_sched_enter(instance); for (size_t ii = 0; ii < config.opsPerCycle; ++ii) { hasItems = scheduleNextOperation(); } if (hasItems) { error = LCB_SUCCESS; lcb_sched_leave(instance); lcb_wait(instance); } else { lcb_sched_fail(instance); } purgeRetryQueue(); } void purgeRetryQueue() { NextOp opinfo; InstanceCookie *cookie = InstanceCookie::get(instance); while (!retryq.empty()) { unsigned exptime = config.getExptime(); lcb_sched_enter(instance); while (!retryq.empty()) { opinfo = retryq.front(); retryq.pop(); lcb_CMDSTOREDUR scmd = { 0 }; scmd.operation = LCB_SET; scmd.exptime = exptime; LCB_CMD_SET_KEY(&scmd, opinfo.m_key.c_str(), opinfo.m_key.size()); LCB_CMD_SET_VALUEIOV(&scmd, &opinfo.m_valuefrags[0], opinfo.m_valuefrags.size()); if (config.persistTo > 0 || config.replicateTo > 0) { scmd.persist_to = config.persistTo; scmd.replicate_to = config.replicateTo; error = lcb_storedur3(instance, NULL, &scmd); } else { error = lcb_store3(instance, NULL, reinterpret_cast(&scmd)); } cookie->stats.retried++; } lcb_sched_leave(instance); lcb_wait(instance); if (error != LCB_SUCCESS) { log("Operation(s) failed: [0x%x] %s", error, lcb_strerror(instance, error)); } } } bool scheduleNextOperation() { NextOp opinfo; unsigned exptime = config.getExptime(); gen->setNextOp(opinfo); switch (opinfo.m_mode) { case NextOp::STORE: { if (!gen->inPopulation() && config.lockTime > 0) { lcb_CMDGET gcmd = { 0 }; LCB_CMD_SET_KEY(&gcmd, opinfo.m_key.c_str(), opinfo.m_key.size()); gcmd.lock = config.lockTime; error = lcb_get3(instance, (void *)OPFLAGS_LOCKED, &gcmd); } else { lcb_CMDSTOREDUR scmd = { 0 }; scmd.operation = LCB_SET; scmd.exptime = exptime; if (config.writeJson()) { scmd.datatype = LCB_VALUE_F_JSON; } LCB_CMD_SET_KEY(&scmd, opinfo.m_key.c_str(), opinfo.m_key.size()); LCB_CMD_SET_VALUEIOV(&scmd, &opinfo.m_valuefrags[0], opinfo.m_valuefrags.size()); if (config.persistTo > 0 || config.replicateTo > 0) { scmd.persist_to = config.persistTo; scmd.replicate_to = config.replicateTo; error = lcb_storedur3(instance, NULL, &scmd); } else { error = lcb_store3(instance, NULL, reinterpret_cast(&scmd)); } } break; } case NextOp::GET: { lcb_CMDGET gcmd = { 0 }; LCB_CMD_SET_KEY(&gcmd, opinfo.m_key.c_str(), opinfo.m_key.size()); gcmd.exptime = exptime; error = lcb_get3(instance, this, &gcmd); break; } case NextOp::SDSTORE: case NextOp::SDGET: { lcb_CMDSUBDOC sdcmd = { 0 }; if (opinfo.m_mode == NextOp::SDSTORE) { sdcmd.exptime = exptime; } LCB_CMD_SET_KEY(&sdcmd, opinfo.m_key.c_str(), opinfo.m_key.size()); sdcmd.specs = &opinfo.m_specs[0]; sdcmd.nspecs = opinfo.m_specs.size(); error = lcb_subdoc3(instance, NULL, &sdcmd); break; } case NextOp::NOOP: { lcb_CMDNOOP ncmd = { 0 }; error = lcb_noop3(instance, NULL, &ncmd); break; } } if (error != LCB_SUCCESS) { log("Failed to schedule operation: [0x%x] %s", error, lcb_strerror(instance, error)); return false; } else { return true; } } bool run() { do { singleLoop(); if (config.numTimings() > 1) { InstanceCookie::dumpTimings(instance, gen->getStageString()); } if (config.params.shouldDump()) { lcb_dump(instance, stderr, LCB_DUMP_ALL); } if (config.getRateLimit() > 0) { rateLimitThrottle(); } } while (!config.isLoopDone(++niter)); if (config.numTimings() > 1) { InstanceCookie::dumpTimings(instance, gen->getStageString(), true); } return true; } void retry(NextOp &op) { if (op.m_mode == NextOp::STORE) { gen->setValue(op); } retryq.push(op); } void populateIov(uint32_t seq, vector& iov_out) { gen->populateIov(seq, iov_out); } #ifndef WIN32 pthread_t thr; #endif lcb_t getInstance() { return instance; } protected: // the callback methods needs to be able to set the error handler.. friend void operationCallback(lcb_t, int, const lcb_RESPBASE*); friend void storeCallback(lcb_t, int, const lcb_RESPBASE *); Histogram histogram; void setError(lcb_error_t e) { error = e; } private: void rateLimitThrottle() { lcb_U64 now = lcb_nstime(); static lcb_U64 previous_time = now; const lcb_U64 elapsed_ns = now - previous_time; const lcb_U64 wanted_duration_ns = (config.getNumThreads() * config.opsPerCycle * 1e9) / config.getRateLimit(); // On first invocation no previous_time, so skip attempting to sleep. if (elapsed_ns > 0 && elapsed_ns < wanted_duration_ns) { // Dampen the sleep time by averaging with the previous // sleep time. static lcb_U64 last_sleep_ns = 0; const lcb_U64 sleep_ns = (last_sleep_ns + wanted_duration_ns - elapsed_ns) / 2; usleep(sleep_ns / 1000); now += sleep_ns; last_sleep_ns = sleep_ns; } previous_time = now; } OpGenerator *gen; size_t niter; lcb_error_t error; lcb_t instance; std::queue retryq; }; static void updateOpsPerSecDisplay() { static time_t start_time = time(NULL); static int is_tty = #ifdef WIN32 0; #else isatty(STDERR_FILENO); #endif static volatile unsigned long nops = 0; time_t now = time(NULL); time_t nsecs = now - start_time; if (!nsecs) { nsecs = 1; } unsigned long ops_sec = nops / nsecs; if (++nops % 10000 == 0) { fprintf(stderr, "OPS/SEC: %10lu%c", ops_sec, is_tty ? '\r' : '\n'); } } static void updateStats(InstanceCookie *cookie, lcb_error_t rc) { cookie->stats.total++; switch (rc) { case LCB_ETMPFAIL: cookie->stats.etmpfail++; break; case LCB_KEY_EEXISTS: cookie->stats.eexist++; break; case LCB_ETIMEDOUT: cookie->stats.etimeout++; break; default: break; } } static void operationCallback(lcb_t instance, int cbtype, const lcb_RESPBASE *resp) { InstanceCookie *cookie = InstanceCookie::get(instance); ThreadContext *tc = cookie->getContext(); tc->setError(resp->rc); updateStats(cookie, resp->rc); uintptr_t flags = 0; if (resp->cookie) { flags = (uintptr_t)resp->cookie; } bool done = true; string key((const char*)resp->key, resp->nkey); uint32_t seqno = atoi(key.c_str()); if (cbtype == LCB_CALLBACK_GET && (flags & OPFLAGS_LOCKED)) { if (resp->rc == LCB_SUCCESS) { lcb_CMDSTOREDUR scmd = { 0 }; vector valuefrags; scmd.operation = LCB_SET; scmd.exptime = config.getExptime(); scmd.cas = resp->cas; tc->populateIov(seqno, valuefrags); LCB_CMD_SET_KEY(&scmd, resp->key, resp->nkey); LCB_CMD_SET_VALUEIOV(&scmd, &valuefrags[0], valuefrags.size()); if (config.persistTo > 0 || config.replicateTo > 0) { scmd.persist_to = config.persistTo; scmd.replicate_to = config.replicateTo; lcb_storedur3(instance, NULL, &scmd); } else { lcb_store3(instance, NULL, reinterpret_cast(&scmd)); } done = false; } else if (resp->rc == LCB_ETMPFAIL) { NextOp op; op.m_mode = NextOp::STORE; op.m_key = key; op.m_seqno = seqno; tc->retry(op); done = false; } } if (done) { tc->checkin(seqno); } updateOpsPerSecDisplay(); } static void storeCallback(lcb_t instance, int, const lcb_RESPBASE *resp) { InstanceCookie *cookie = InstanceCookie::get(instance); ThreadContext *tc = cookie->getContext(); tc->setError(resp->rc); updateStats(cookie, resp->rc); string key((const char*)resp->key, resp->nkey); uint32_t seqno = atoi(key.c_str()); if (resp->rc != LCB_SUCCESS && tc->inPopulation()) { NextOp op; op.m_mode = NextOp::STORE; op.m_key = key; op.m_seqno = seqno; tc->retry(op); } else { tc->checkin(seqno); } updateOpsPerSecDisplay(); } std::list contexts; extern "C" { typedef void (*handler_t)(int); static void dump_metrics(void) { std::list::iterator it; for (it = contexts.begin(); it != contexts.end(); ++it) { lcb_t instance = (*it)->getInstance(); lcb_CMDDIAG req = {}; req.options = LCB_PINGOPT_F_JSONPRETTY; lcb_diag(instance, NULL, &req); if (config.numTimings() > 0) { InstanceCookie::dumpTimings(instance); } } } #ifndef WIN32 static void diag_callback(lcb_t instance, int, const lcb_RESPBASE *rb) { const lcb_RESPDIAG *resp = (const lcb_RESPDIAG *)rb; if (resp->rc != LCB_SUCCESS) { fprintf(stderr, "%p, diag failed: %s\n", (void *)instance, lcb_strerror(NULL, resp->rc)); } else { if (resp->njson) { fprintf(stderr, "\n%.*s", (int)resp->njson, resp->json); } { InstanceCookie *cookie = InstanceCookie::get(instance); lcb_METRICS* metrics; size_t ii; lcb_cntl(instance, LCB_CNTL_GET, LCB_CNTL_METRICS, &metrics); fprintf(stderr, "%p: total: %lu, etmpfail: %lu, eexist: %lu, etimeout: %lu, retried: %lu, rq: %lu\n", (void *)instance, (unsigned long)cookie->stats.total, (unsigned long)cookie->stats.etmpfail, (unsigned long)cookie->stats.eexist, (unsigned long)cookie->stats.etimeout, (unsigned long)cookie->stats.retried, (unsigned long)metrics->packets_retried); for (ii = 0; ii < metrics->nservers; ii++) { fprintf(stderr, " [srv-%d] snt: %lu, rcv: %lu, q: %lu, err: %lu, tmo: %lu, nmv: %lu, orph: %lu\n", (int)ii, (unsigned long)metrics->servers[ii]->packets_sent, (unsigned long)metrics->servers[ii]->packets_read, (unsigned long)metrics->servers[ii]->packets_queued, (unsigned long)metrics->servers[ii]->packets_errored, (unsigned long)metrics->servers[ii]->packets_timeout, (unsigned long)metrics->servers[ii]->packets_nmv, (unsigned long)metrics->servers[ii]->packets_ownerless); } } } } static void sigquit_handler(int) { dump_metrics(); signal(SIGQUIT, sigquit_handler); // Reinstall } static void setup_sigquit_handler() { struct sigaction action; sigemptyset(&action.sa_mask); action.sa_handler = sigquit_handler; action.sa_flags = 0; sigaction(SIGQUIT, &action, NULL); } static void sigint_handler(int) { static int ncalled = 0; ncalled++; if (ncalled < 2) { log("\nTermination requested. Waiting threads to finish. Ctrl-C to force termination."); signal(SIGINT, sigint_handler); // Reinstall config.maxCycles = 0; return; } std::list::iterator it; for (it = contexts.begin(); it != contexts.end(); ++it) { delete *it; } contexts.clear(); exit(EXIT_FAILURE); } static void setup_sigint_handler() { struct sigaction action; sigemptyset(&action.sa_mask); action.sa_handler = sigint_handler; action.sa_flags = 0; sigaction(SIGINT, &action, NULL); } static void* thread_worker(void*); static void start_worker(ThreadContext *ctx) { pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); int rc = pthread_create(&ctx->thr, &attr, thread_worker, ctx); if (rc != 0) { log("Couldn't create thread: (%d)", errno); exit(EXIT_FAILURE); } } static void join_worker(ThreadContext *ctx) { void *arg = NULL; int rc = pthread_join(ctx->thr, &arg); if (rc != 0) { log("Couldn't join thread (%d)", errno); exit(EXIT_FAILURE); } } #else static void setup_sigquit_handler() {} static void setup_sigint_handler() {} static void start_worker(ThreadContext *ctx) { ctx->run(); } static void join_worker(ThreadContext *ctx) { (void)ctx; } #endif static void *thread_worker(void *arg) { ThreadContext *ctx = static_cast(arg); ctx->run(); return NULL; } } int main(int argc, char **argv) { int exit_code = EXIT_SUCCESS; setup_sigint_handler(); setup_sigquit_handler(); Parser parser("cbc-pillowfight"); try { config.addOptions(parser); parser.parse(argc, argv, false); config.processOptions(); } catch (std::string& e) { std::cerr << e << std::endl; exit(EXIT_FAILURE); } catch (std::exception& e) { std::cerr << e.what() << std::endl; exit(EXIT_FAILURE); } size_t nthreads = config.getNumThreads(); log("Running. Press Ctrl-C to terminate..."); #ifdef WIN32 if (nthreads > 1) { log("WARNING: More than a single thread on Windows not supported. Forcing 1"); nthreads = 1; } #endif struct lcb_create_st options; ConnParams& cp = config.params; lcb_error_t error; for (uint32_t ii = 0; ii < nthreads; ++ii) { cp.fillCropts(options); lcb_t instance = NULL; error = lcb_create(&instance, &options); if (error != LCB_SUCCESS) { log("Failed to create instance: %s", lcb_strerror(NULL, error)); exit(EXIT_FAILURE); } lcb_install_callback3(instance, LCB_CALLBACK_STOREDUR, storeCallback); lcb_install_callback3(instance, LCB_CALLBACK_STORE, storeCallback); lcb_install_callback3(instance, LCB_CALLBACK_GET, operationCallback); lcb_install_callback3(instance, LCB_CALLBACK_SDMUTATE, operationCallback); lcb_install_callback3(instance, LCB_CALLBACK_SDLOOKUP, operationCallback); lcb_install_callback3(instance, LCB_CALLBACK_NOOP, operationCallback); #ifndef WIN32 lcb_install_callback3(instance, LCB_CALLBACK_DIAG, diag_callback); { int activate = 1; lcb_cntl(instance, LCB_CNTL_SET, LCB_CNTL_METRICS, &activate); } #endif cp.doCtls(instance); if (config.useCollections()) { int use = 1; lcb_cntl(instance, LCB_CNTL_SET, LCB_CNTL_USE_COLLECTIONS, &use); } InstanceCookie *cookie = new InstanceCookie(instance); lcb_connect(instance); lcb_wait(instance); error = lcb_get_bootstrap_status(instance); if (error != LCB_SUCCESS) { std::cout << std::endl; log("Failed to connect: %s", lcb_strerror(instance, error)); exit(EXIT_FAILURE); } ThreadContext *ctx = new ThreadContext(instance, ii); cookie->setContext(ctx); contexts.push_back(ctx); start_worker(ctx); } for (std::list::iterator it = contexts.begin(); it != contexts.end(); ++it) { join_worker(*it); } if (config.numTimings() > 0) { dump_metrics(); } return exit_code; }