/* * twemproxy - A fast and lightweight proxy for memcached protocol. * Copyright (C) 2011 Twitter, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include static uint32_t ctx_id; /* context generation */ /* function prototype for use in core_ctx_create() */ static void core_core(void *arg, uint32_t evflags); static struct context * core_ctx_create(struct instance *nci) { rstatus_t status; struct context *ctx; ctx = nc_alloc(sizeof(*ctx)); if (ctx == NULL) { return NULL; } ctx->id = ++ctx_id; ctx->cf = NULL; ctx->stats = NULL; ctx->evb = NULL; array_null(&ctx->pool); ctx->max_timeout = nci->stats_interval; ctx->timeout = ctx->max_timeout; /* parse and create configuration */ ctx->cf = conf_create(nci->conf_filename); if (ctx->cf == NULL) { nc_free(ctx); return NULL; } /* initialize server pool from configuration */ status = server_pool_init(&ctx->pool, &ctx->cf->pool, ctx); if (status != NC_OK) { conf_destroy(ctx->cf); nc_free(ctx); return NULL; } /* create stats per server pool */ ctx->stats = stats_create(nci->stats_port, nci->stats_addr, nci->stats_interval, nci->hostname, &ctx->pool); if (ctx->stats == NULL) { server_pool_deinit(&ctx->pool); conf_destroy(ctx->cf); nc_free(ctx); return NULL; } /* initialize event handling for client, proxy and server */ ctx->evb = evbase_create(NC_EVENT_SIZE, &core_core); if (ctx->evb == NULL) { stats_destroy(ctx->stats); server_pool_deinit(&ctx->pool); conf_destroy(ctx->cf); nc_free(ctx); return NULL; } /* preconnect? servers in server pool */ status = server_pool_preconnect(ctx); if (status != NC_OK) { server_pool_disconnect(ctx); evbase_destroy(ctx->evb); stats_destroy(ctx->stats); server_pool_deinit(&ctx->pool); conf_destroy(ctx->cf); nc_free(ctx); return NULL; } /* initialize proxy per server pool */ status = proxy_init(ctx); if (status != NC_OK) { server_pool_disconnect(ctx); evbase_destroy(ctx->evb); stats_destroy(ctx->stats); server_pool_deinit(&ctx->pool); conf_destroy(ctx->cf); nc_free(ctx); return NULL; } log_debug(LOG_VVERB, "created ctx %p id %"PRIu32"", ctx, ctx->id); return ctx; } static void core_ctx_destroy(struct context *ctx) { log_debug(LOG_VVERB, "destroy ctx %p id %"PRIu32"", ctx, ctx->id); proxy_deinit(ctx); server_pool_disconnect(ctx); evbase_destroy(ctx->evb); stats_destroy(ctx->stats); server_pool_deinit(&ctx->pool); conf_destroy(ctx->cf); nc_free(ctx); } struct context * core_start(struct instance *nci) { struct context *ctx; mbuf_init(nci); msg_init(); conn_init(); ctx = core_ctx_create(nci); if (ctx != NULL) { nci->ctx = ctx; return ctx; } conn_deinit(); msg_deinit(); mbuf_deinit(); return NULL; } void core_stop(struct context *ctx) { conn_deinit(); msg_deinit(); mbuf_deinit(); core_ctx_destroy(ctx); } static rstatus_t core_recv(struct context *ctx, struct conn *conn) { rstatus_t status; status = conn->recv(ctx, conn); if (status != NC_OK) { log_debug(LOG_INFO, "recv on %c %d failed: %s", conn->client ? 'c' : (conn->proxy ? 'p' : 's'), conn->sd, strerror(errno)); } return status; } static rstatus_t core_send(struct context *ctx, struct conn *conn) { rstatus_t status; status = conn->send(ctx, conn); if (status != NC_OK) { log_debug(LOG_INFO, "send on %c %d failed: %s", conn->client ? 'c' : (conn->proxy ? 'p' : 's'), conn->sd, strerror(errno)); } return status; } static void core_close(struct context *ctx, struct conn *conn) { rstatus_t status; char type, *addrstr; ASSERT(conn->sd > 0); if (conn->client) { type = 'c'; addrstr = nc_unresolve_peer_desc(conn->sd); } else { type = conn->proxy ? 'p' : 's'; addrstr = nc_unresolve_addr(conn->addr, conn->addrlen); } log_debug(LOG_NOTICE, "close %c %d '%s' on event %04"PRIX32" eof %d done " "%d rb %zu sb %zu%c %s", type, conn->sd, addrstr, conn->events, conn->eof, conn->done, conn->recv_bytes, conn->send_bytes, conn->err ? ':' : ' ', conn->err ? strerror(conn->err) : ""); status = event_del_conn(ctx->evb, conn); if (status < 0) { log_warn("event del conn %c %d failed, ignored: %s", type, conn->sd, strerror(errno)); } conn->close(ctx, conn); } static void core_error(struct context *ctx, struct conn *conn) { rstatus_t status; char type = conn->client ? 'c' : (conn->proxy ? 'p' : 's'); status = nc_get_soerror(conn->sd); if (status < 0) { log_warn("get soerr on %c %d failed, ignored: %s", type, conn->sd, strerror(errno)); } conn->err = errno; core_close(ctx, conn); } static void core_timeout(struct context *ctx) { for (;;) { struct msg *msg; struct conn *conn; int64_t now, then; msg = msg_tmo_min(); if (msg == NULL) { ctx->timeout = ctx->max_timeout; return; } /* skip over req that are in-error or done */ if (msg->error || msg->done) { msg_tmo_delete(msg); continue; } /* * timeout expired req and all the outstanding req on the timing * out server */ conn = msg->tmo_rbe.data; then = msg->tmo_rbe.key; now = nc_msec_now(); if (now < then) { int delta = (int)(then - now); ctx->timeout = MIN(delta, ctx->max_timeout); return; } log_debug(LOG_INFO, "req %"PRIu64" on s %d timedout", msg->id, conn->sd); msg_tmo_delete(msg); conn->err = ETIMEDOUT; core_close(ctx, conn); } } static void core_core(void *arg, uint32_t evflags) { rstatus_t status; struct conn *conn = (struct conn *) arg; struct context *ctx; if ((conn->proxy) || (conn->client)) { ctx = ((struct server_pool *) (conn -> owner)) -> ctx; } else { ctx = ((struct server_pool *) (((struct server *) (conn -> owner)) -> owner )) -> ctx; } log_debug(LOG_VVERB, "event %04"PRIX32" on %c %d", evflags, conn->client ? 'c' : (conn->proxy ? 'p' : 's'), conn->sd); conn->events = evflags; /* error takes precedence over read | write */ if (evflags & EV_ERR) { core_error(ctx, conn); return; } /* read takes precedence over write */ if (evflags & EV_READ) { status = core_recv(ctx, conn); if (status != NC_OK || conn->done || conn->err) { core_close(ctx, conn); return; } } if (evflags & EV_WRITE) { status = core_send(ctx, conn); if (status != NC_OK || conn->done || conn->err) { core_close(ctx, conn); return; } } } rstatus_t core_loop(struct context *ctx) { int i, nsd; nsd = event_wait(ctx->evb, ctx->timeout); if (nsd < 0) { return nsd; } core_timeout(ctx); stats_swap(ctx->stats); return NC_OK; }