/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* * Copyright 2014 Couchbase, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "packetutils.h" #include "retryq.h" #include "config.h" #include "logging.h" #include "internal.h" #include "bucketconfig/clconfig.h" #define LOGARGS(rq, lvl) (rq)->settings, "retryq", LCB_LOG_##lvl, __FILE__, __LINE__ #define RETRY_PKT_KEY "retry_queue" typedef struct { mc_EPKTDATUM epd; lcb_list_t ll_sched; lcb_list_t ll_tmo; /**Cache the actual start time of the command. Since the start time may * change if read_ts_wait is enabled, and we don't want to end up looping * on a command forever. */ hrtime_t start; hrtime_t trytime; /**< Next retry time */ mc_PACKET *pkt; lcb_error_t origerr; } lcb_RETRYOP; #define RETRY_INTERVAL_NS(q) LCB_US2NS((q)->settings->retry_interval) /** * Fuzz offset. When callback is received to schedule an operation, we may * retry commands whose expiry is up to this many seconds in the future. This * is to avoid excessive callbacks into the timer function */ #define TIMEFUZZ_NS LCB_US2NS(LCB_MS2US(5)) static void update_trytime(lcb_RETRYQ *rq, lcb_RETRYOP *op, hrtime_t now) { /** * Estimate the next retry timestamp. This is: * Base interval + (Number of retries x Backoff factor) */ if (!now) { now = gethrtime(); } op->trytime = now + (hrtime_t) ( (float)RETRY_INTERVAL_NS(rq) * (float)op->pkt->retries * (float)rq->settings->retry_backoff); } /** Comparison routine for sorting by timeout */ static int cmpfn_tmo(lcb_list_t *ll_a, lcb_list_t *ll_b) { lcb_RETRYOP *a = LCB_LIST_ITEM(ll_a, lcb_RETRYOP, ll_tmo); lcb_RETRYOP *b = LCB_LIST_ITEM(ll_b, lcb_RETRYOP, ll_tmo); if (a->start == b->start) { return 0; } else if (a->start> b->start) { return 1; } else { return -1; } } static int cmpfn_retry(lcb_list_t *ll_a, lcb_list_t *ll_b) { lcb_RETRYOP *a = LCB_LIST_ITEM(ll_a, lcb_RETRYOP, ll_sched); lcb_RETRYOP *b = LCB_LIST_ITEM(ll_b, lcb_RETRYOP, ll_sched); if (a->trytime == b->trytime) { return 0; } else if (a->trytime > b->trytime) { return 1; } else { return -1; } } static void assign_error(lcb_RETRYOP *op, lcb_error_t err) { if (err == LCB_NOT_MY_VBUCKET) { err = LCB_ETIMEDOUT; /* :( */ } if (op->origerr == LCB_SUCCESS) { op->origerr = err; } if (err == LCB_ETIMEDOUT) { return; /* Ignore timeout errors */ } if (LCB_EIFNET(op->origerr) && op->origerr != LCB_ETIMEDOUT && (err == LCB_NETWORK_ERROR || err == LCB_CONNECT_ERROR)) { return; } op->origerr = err; } static void clean_op(lcb_RETRYOP *op) { lcb_list_delete(&op->ll_sched); lcb_list_delete(&op->ll_tmo); } static void bail_op(lcb_RETRYQ *rq, lcb_RETRYOP *op, lcb_error_t err) { packet_info info; protocol_binary_request_header hdr; protocol_binary_response_header *res = &info.res; mc_SERVER tmpsrv; /** Temporary pipeline */ mc_PIPELINE *pltmp = &tmpsrv.pipeline; memset(&tmpsrv, 0, sizeof tmpsrv); tmpsrv.instance = rq->cq->cqdata; tmpsrv.pipeline.parent = rq->cq; memset(&info, 0, sizeof(info)); mcreq_read_hdr(op->pkt, &hdr); res->response.opcode = hdr.request.opcode; res->response.status = ntohs(PROTOCOL_BINARY_RESPONSE_EINVAL); res->response.opaque = hdr.request.opaque; assign_error(op, err); lcb_log(LOGARGS(rq, WARN), "Failing command (seq=%u) from retry queue with error code 0x%x", op->pkt->opaque, op->origerr); mcreq_dispatch_response(pltmp, op->pkt, &info, op->origerr); op->pkt->flags |= MCREQ_F_FLUSHED|MCREQ_F_INVOKED; clean_op(op); mcreq_packet_done(pltmp, op->pkt); lcb_maybe_breakout(rq->cq->cqdata); } static void do_schedule(lcb_RETRYQ *q, hrtime_t now) { hrtime_t tmonext, schednext, diff, selected; uint32_t us_interval; lcb_RETRYOP *first_tmo, *first_sched; if (!now) { now = gethrtime(); } if (LCB_LIST_IS_EMPTY(&q->schedops)) { lcbio_timer_disarm(q->timer); return; } /** Figure out which is first */ first_tmo = LCB_LIST_ITEM(LCB_LIST_HEAD(&q->tmoops), lcb_RETRYOP, ll_tmo); first_sched = LCB_LIST_ITEM(LCB_LIST_HEAD(&q->schedops), lcb_RETRYOP, ll_sched); schednext = first_sched->trytime; tmonext = first_tmo->start; tmonext += LCB_US2NS(q->settings->operation_timeout); selected = schednext > tmonext ? tmonext : schednext; if (selected <= now) { diff = 0; } else { diff = selected - now; } us_interval = LCB_NS2US(diff); lcb_log(LOGARGS(q, TRACE), "Next tick in %u ms", (unsigned)us_interval/1000); lcbio_timer_rearm(q->timer, us_interval); } /** * Flush the queue * @param rq The queue to flush * @param throttle Whether to throttle operations to be retried. If this is * set to false then all operations will be attempted (assuming they have * not timed out) */ static void rq_flush(lcb_RETRYQ *rq, int throttle) { hrtime_t now = gethrtime(); lcb_list_t *ll, *ll_next; lcb_list_t resched_next; /** Check timeouts first */ LCB_LIST_SAFE_FOR(ll, ll_next, &rq->tmoops) { lcb_RETRYOP *op = LCB_LIST_ITEM(ll, lcb_RETRYOP, ll_tmo); hrtime_t curtmo = op->start + LCB_US2NS(rq->settings->operation_timeout); if (curtmo <= now) { bail_op(rq, op, LCB_ETIMEDOUT); } else { break; } } lcb_list_init(&resched_next); LCB_LIST_SAFE_FOR(ll, ll_next, &rq->schedops) { protocol_binary_request_header hdr; int vbid, srvix; hrtime_t curnext; lcb_RETRYOP *op = LCB_LIST_ITEM(ll, lcb_RETRYOP, ll_sched); curnext = op->trytime - TIMEFUZZ_NS; if (curnext > now && throttle) { break; } mcreq_read_hdr(op->pkt, &hdr); vbid = ntohs(hdr.request.vbucket); srvix = lcbvb_vbmaster(rq->cq->config, vbid); if (srvix < 0 || (unsigned)srvix >= rq->cq->npipelines) { /* No server found to map to */ lcb_t instance = rq->cq->cqdata; assign_error(op, LCB_NO_MATCHING_SERVER); /* Request a new configuration. If it's time to request a new * configuration (i.e. the attempt has not been throttled) then * keep the command in there until it has a chance to be scheduled. */ lcb_bootstrap_common(instance, LCB_BS_REFRESH_THROTTLE); if (lcb_confmon_is_refreshing(instance->confmon) || rq->settings->retry[LCB_RETRY_ON_MISSINGNODE]) { lcb_list_delete(&op->ll_sched); lcb_list_delete(&op->ll_tmo); lcb_list_append(&resched_next, &op->ll_sched); op->pkt->retries++; update_trytime(rq, op, now); } else { bail_op(rq, op, LCB_NO_MATCHING_SERVER); } } else { mc_PIPELINE *newpl = rq->cq->pipelines[srvix]; mcreq_enqueue_packet(newpl, op->pkt); newpl->flush_start(newpl); clean_op(op); } } LCB_LIST_SAFE_FOR(ll, ll_next, &resched_next) { lcb_RETRYOP *op = LCB_LIST_ITEM(ll, lcb_RETRYOP, ll_sched); lcb_list_add_sorted(&rq->schedops, &op->ll_sched, cmpfn_retry); lcb_list_add_sorted(&rq->tmoops, &op->ll_tmo, cmpfn_tmo); } do_schedule(rq, now); } static void rq_tick(void *arg) { rq_flush(arg, 1); } void lcb_retryq_signal(lcb_RETRYQ *rq) { rq_flush(rq, 0); } static void op_dtorfn(mc_EPKTDATUM *d) { free(d); } #define RETRY_SCHED_IMM 0x01 static void add_op(lcb_RETRYQ *rq, mc_EXPACKET *pkt, const lcb_error_t err, int options) { lcb_RETRYOP *op; mc_EPKTDATUM *d; d = mcreq_epkt_find(pkt, RETRY_PKT_KEY); if (d) { op = (lcb_RETRYOP *)d; } else { op = calloc(1, sizeof *op); op->epd.dtorfn = op_dtorfn; op->epd.key = RETRY_PKT_KEY; op->start = MCREQ_PKT_RDATA(&pkt->base)->start; mcreq_epkt_insert(pkt, &op->epd); } op->pkt = &pkt->base; pkt->base.retries++; assign_error(op, err); if (options & RETRY_SCHED_IMM) { op->trytime = gethrtime(); /* now */ } else if (err == LCB_NOT_MY_VBUCKET) { op->trytime = gethrtime() + LCB_US2NS(rq->settings->retry_nmv_interval); } else { update_trytime(rq, op, 0); } lcb_list_add_sorted(&rq->schedops, &op->ll_sched, cmpfn_retry); lcb_list_add_sorted(&rq->tmoops, &op->ll_tmo, cmpfn_tmo); lcb_log(LOGARGS(rq, DEBUG), "Adding PKT=%p to retry queue. Try count=%u", (void*)pkt, pkt->base.retries); do_schedule(rq, 0); } void lcb_retryq_add(lcb_RETRYQ *rq, mc_EXPACKET *pkt, lcb_error_t err) { add_op(rq, pkt, err, 0); } void lcb_retryq_nmvadd(lcb_RETRYQ *rq, mc_EXPACKET *detchpkt) { int flags = 0; if (rq->settings->nmv_retry_imm) { flags = RETRY_SCHED_IMM; } add_op(rq, detchpkt, LCB_NOT_MY_VBUCKET, flags); } static void fallback_handler(mc_CMDQUEUE *cq, mc_PACKET *pkt) { lcb_t instance = cq->cqdata; mc_PACKET *copy = mcreq_renew_packet(pkt); add_op(instance->retryq, (mc_EXPACKET*)copy, LCB_NO_MATCHING_SERVER, RETRY_SCHED_IMM); } void lcb_retryq_reset_timeouts(lcb_RETRYQ *rq, lcb_U64 now) { lcb_list_t *ll; LCB_LIST_FOR(ll, &rq->schedops) { lcb_RETRYOP *op = LCB_LIST_ITEM(ll, lcb_RETRYOP, ll_sched); op->start = now; } } lcb_RETRYQ * lcb_retryq_new(mc_CMDQUEUE *cq, lcbio_pTABLE table, lcb_settings *settings) { lcb_RETRYQ *rq = calloc(1, sizeof(*rq)); rq->settings = settings; rq->cq = cq; rq->timer = lcbio_timer_new(table, rq, rq_tick); lcb_settings_ref(settings); lcb_list_init(&rq->tmoops); lcb_list_init(&rq->schedops); mcreq_set_fallback_handler(cq, fallback_handler); return rq; } void lcb_retryq_destroy(lcb_RETRYQ *rq) { lcb_list_t *llcur, *llnext; LCB_LIST_SAFE_FOR(llcur, llnext, &rq->schedops) { lcb_RETRYOP *op = LCB_LIST_ITEM(llcur, lcb_RETRYOP, ll_sched); bail_op(rq, op, LCB_ERROR); } lcbio_timer_destroy(rq->timer); lcb_settings_unref(rq->settings); free(rq); } lcb_error_t lcb_retryq_origerr(const mc_PACKET *packet) { mc_EPKTDATUM *d; lcb_RETRYOP *op; if (! (packet->flags & MCREQ_F_DETACHED)) { return LCB_SUCCESS; /* Not detached */ } d = mcreq_epkt_find((mc_EXPACKET*)packet, RETRY_PKT_KEY); if (!d) { return LCB_SUCCESS; } op = (lcb_RETRYOP *)d; return op->origerr; } void lcb_retryq_dump(lcb_RETRYQ *rq, FILE *fp, mcreq_payload_dump_fn dumpfn) { lcb_list_t *cur; LCB_LIST_FOR(cur, &rq->schedops) { lcb_RETRYOP *op = LCB_LIST_ITEM(cur, lcb_RETRYOP, ll_sched); mcreq_dump_packet(op->pkt, fp, dumpfn); } (void)fp; }