/* * twemproxy - A fast and lightweight proxy for memcached protocol. * Copyright (C) 2011 Twitter, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include void server_ref(struct conn *conn, void *owner) { struct server *server = owner; ASSERT(!conn->client && !conn->proxy); ASSERT(conn->owner == NULL); conn->family = server->family; conn->addrlen = server->addrlen; conn->addr = server->addr; server->ns_conn_q++; TAILQ_INSERT_TAIL(&server->s_conn_q, conn, conn_tqe); conn->owner = owner; log_debug(LOG_VVERB, "ref conn %p owner %p into '%.*s", conn, server, server->pname.len, server->pname.data); } void server_unref(struct conn *conn) { struct server *server; ASSERT(!conn->client && !conn->proxy); ASSERT(conn->owner != NULL); server = conn->owner; conn->owner = NULL; ASSERT(server->ns_conn_q != 0); server->ns_conn_q--; TAILQ_REMOVE(&server->s_conn_q, conn, conn_tqe); log_debug(LOG_VVERB, "unref conn %p owner %p from '%.*s'", conn, server, server->pname.len, server->pname.data); } int server_timeout(struct conn *conn) { struct server *server; struct server_pool *pool; ASSERT(!conn->client && !conn->proxy); server = conn->owner; pool = server->owner; return pool->timeout; } bool server_active(struct conn *conn) { ASSERT(!conn->client && !conn->proxy); if (!TAILQ_EMPTY(&conn->imsg_q)) { log_debug(LOG_VVERB, "s %d is active", conn->sd); return true; } if (!TAILQ_EMPTY(&conn->omsg_q)) { log_debug(LOG_VVERB, "s %d is active", conn->sd); return true; } if (conn->rmsg != NULL) { log_debug(LOG_VVERB, "s %d is active", conn->sd); return true; } if (conn->smsg != NULL) { log_debug(LOG_VVERB, "s %d is active", conn->sd); return true; } log_debug(LOG_VVERB, "s %d is inactive", conn->sd); return false; } static rstatus_t server_each_set_owner(void *elem, void *data) { struct server *s = elem; struct server_pool *sp = data; s->owner = sp; return NC_OK; } rstatus_t server_init(struct array *server, struct array *conf_server, struct server_pool *sp) { rstatus_t status; uint32_t nserver; nserver = array_n(conf_server); ASSERT(nserver != 0); ASSERT(array_n(server) == 0); status = array_init(server, nserver, sizeof(struct server)); if (status != NC_OK) { return status; } /* transform conf server to server */ status = array_each(conf_server, conf_server_each_transform, server); if (status != NC_OK) { server_deinit(server); return status; } ASSERT(array_n(server) == nserver); /* set server owner */ status = array_each(server, server_each_set_owner, sp); if (status != NC_OK) { server_deinit(server); return status; } log_debug(LOG_DEBUG, "init %"PRIu32" servers in pool %"PRIu32" '%.*s'", nserver, sp->idx, sp->name.len, sp->name.data); return NC_OK; } void server_deinit(struct array *server) { uint32_t i, nserver; for (i = 0, nserver = array_n(server); i < nserver; i++) { struct server *s; s = array_pop(server); ASSERT(TAILQ_EMPTY(&s->s_conn_q) && s->ns_conn_q == 0); } array_deinit(server); } struct conn * server_conn(struct server *server) { struct server_pool *pool; struct conn *conn; pool = server->owner; /* * FIXME: handle multiple server connections per server and do load * balancing on it. Support multiple algorithms for * 'server_connections:' > 0 key */ if (server->ns_conn_q < pool->server_connections) { return conn_get(server, false, pool->redis); } ASSERT(server->ns_conn_q == pool->server_connections); /* * Pick a server connection from the head of the queue and insert * it back into the tail of queue to maintain the lru order */ conn = TAILQ_FIRST(&server->s_conn_q); ASSERT(!conn->client && !conn->proxy); TAILQ_REMOVE(&server->s_conn_q, conn, conn_tqe); TAILQ_INSERT_TAIL(&server->s_conn_q, conn, conn_tqe); return conn; } static rstatus_t server_each_preconnect(void *elem, void *data) { rstatus_t status; struct server *server; struct server_pool *pool; struct conn *conn; server = elem; pool = server->owner; conn = server_conn(server); if (conn == NULL) { return NC_ENOMEM; } status = server_connect(pool->ctx, server, conn); if (status != NC_OK) { log_warn("connect to server '%.*s' failed, ignored: %s", server->pname.len, server->pname.data, strerror(errno)); server_close(pool->ctx, conn); } return NC_OK; } static rstatus_t server_each_disconnect(void *elem, void *data) { struct server *server; struct server_pool *pool; server = elem; pool = server->owner; while (!TAILQ_EMPTY(&server->s_conn_q)) { struct conn *conn; ASSERT(server->ns_conn_q > 0); conn = TAILQ_FIRST(&server->s_conn_q); conn->close(pool->ctx, conn); } return NC_OK; } static void server_failure(struct context *ctx, struct server *server) { struct server_pool *pool = server->owner; int64_t now, next; rstatus_t status; if (!pool->auto_eject_hosts) { return; } server->failure_count++; log_debug(LOG_VERB, "server '%.*s' failure count %"PRIu32" limit %"PRIu32, server->pname.len, server->pname.data, server->failure_count, pool->server_failure_limit); if (server->failure_count < pool->server_failure_limit) { return; } now = nc_usec_now(); if (now < 0) { return; } next = now + pool->server_retry_timeout; log_debug(LOG_INFO, "update pool %"PRIu32" '%.*s' to delete server '%.*s' " "for next %"PRIu32" secs", pool->idx, pool->name.len, pool->name.data, server->pname.len, server->pname.data, pool->server_retry_timeout / 1000 / 1000); stats_pool_incr(ctx, pool, server_ejects); server->failure_count = 0; server->next_retry = next; status = server_pool_run(pool); if (status != NC_OK) { log_error("updating pool %"PRIu32" '%.*s' failed: %s", pool->idx, pool->name.len, pool->name.data, strerror(errno)); } } static void server_close_stats(struct context *ctx, struct server *server, err_t err, unsigned eof, unsigned connected) { if (connected) { stats_server_decr(ctx, server, server_connections); } if (eof) { stats_server_incr(ctx, server, server_eof); return; } switch (err) { case ETIMEDOUT: stats_server_incr(ctx, server, server_timedout); break; case EPIPE: case ECONNRESET: case ECONNABORTED: case ECONNREFUSED: case ENOTCONN: case ENETDOWN: case ENETUNREACH: case EHOSTDOWN: case EHOSTUNREACH: default: stats_server_incr(ctx, server, server_err); break; } } void server_close(struct context *ctx, struct conn *conn) { rstatus_t status; struct msg *msg, *nmsg; /* current and next message */ struct conn *c_conn; /* peer client connection */ ASSERT(!conn->client && !conn->proxy); server_close_stats(ctx, conn->owner, conn->err, conn->eof, conn->connected); if (conn->sd < 0) { server_failure(ctx, conn->owner); conn->unref(conn); conn_put(conn); return; } for (msg = TAILQ_FIRST(&conn->imsg_q); msg != NULL; msg = nmsg) { nmsg = TAILQ_NEXT(msg, s_tqe); /* dequeue the message (request) from server inq */ conn->dequeue_inq(ctx, conn, msg); /* * Don't send any error response, if * 1. request is tagged as noreply or, * 2. client has already closed its connection */ if (msg->swallow || msg->noreply) { log_debug(LOG_INFO, "close s %d swallow req %"PRIu64" len %"PRIu32 " type %d", conn->sd, msg->id, msg->mlen, msg->type); req_put(msg); } else { c_conn = msg->owner; ASSERT(c_conn->client && !c_conn->proxy); msg->done = 1; msg->error = 1; msg->err = conn->err; if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) { event_add_out(ctx->evb, msg->owner); } log_debug(LOG_INFO, "close s %d schedule error for req %"PRIu64" " "len %"PRIu32" type %d from c %d%c %s", conn->sd, msg->id, msg->mlen, msg->type, c_conn->sd, conn->err ? ':' : ' ', conn->err ? strerror(conn->err): " "); } } ASSERT(TAILQ_EMPTY(&conn->imsg_q)); for (msg = TAILQ_FIRST(&conn->omsg_q); msg != NULL; msg = nmsg) { nmsg = TAILQ_NEXT(msg, s_tqe); /* dequeue the message (request) from server outq */ conn->dequeue_outq(ctx, conn, msg); if (msg->swallow) { log_debug(LOG_INFO, "close s %d swallow req %"PRIu64" len %"PRIu32 " type %d", conn->sd, msg->id, msg->mlen, msg->type); req_put(msg); } else { c_conn = msg->owner; ASSERT(c_conn->client && !c_conn->proxy); msg->done = 1; msg->error = 1; msg->err = conn->err; if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) { event_add_out(ctx->evb, msg->owner); } log_debug(LOG_INFO, "close s %d schedule error for req %"PRIu64" " "len %"PRIu32" type %d from c %d%c %s", conn->sd, msg->id, msg->mlen, msg->type, c_conn->sd, conn->err ? ':' : ' ', conn->err ? strerror(conn->err): " "); } } ASSERT(TAILQ_EMPTY(&conn->omsg_q)); msg = conn->rmsg; if (msg != NULL) { conn->rmsg = NULL; ASSERT(!msg->request); ASSERT(msg->peer == NULL); rsp_put(msg); log_debug(LOG_INFO, "close s %d discarding rsp %"PRIu64" len %"PRIu32" " "in error", conn->sd, msg->id, msg->mlen); } ASSERT(conn->smsg == NULL); server_failure(ctx, conn->owner); conn->unref(conn); status = close(conn->sd); if (status < 0) { log_error("close s %d failed, ignored: %s", conn->sd, strerror(errno)); } conn->sd = -1; conn_put(conn); } rstatus_t server_connect(struct context *ctx, struct server *server, struct conn *conn) { rstatus_t status; ASSERT(!conn->client && !conn->proxy); if (conn->sd > 0) { /* already connected on server connection */ return NC_OK; } log_debug(LOG_VVERB, "connect to server '%.*s'", server->pname.len, server->pname.data); conn->sd = socket(conn->family, SOCK_STREAM, 0); if (conn->sd < 0) { log_error("socket for server '%.*s' failed: %s", server->pname.len, server->pname.data, strerror(errno)); status = NC_ERROR; goto error; } status = nc_set_nonblocking(conn->sd); if (status != NC_OK) { log_error("set nonblock on s %d for server '%.*s' failed: %s", conn->sd, server->pname.len, server->pname.data, strerror(errno)); goto error; } if (server->pname.data[0] != '/') { status = nc_set_tcpnodelay(conn->sd); if (status != NC_OK) { log_warn("set tcpnodelay on s %d for server '%.*s' failed, ignored: %s", conn->sd, server->pname.len, server->pname.data, strerror(errno)); } } status = event_add_conn(ctx->evb, conn); if (status != NC_OK) { log_error("event add conn s %d for server '%.*s' failed: %s", conn->sd, server->pname.len, server->pname.data, strerror(errno)); goto error; } ASSERT(!conn->connecting && !conn->connected); status = connect(conn->sd, conn->addr, conn->addrlen); if (status != NC_OK) { if (errno == EINPROGRESS) { conn->connecting = 1; log_debug(LOG_DEBUG, "connecting on s %d to server '%.*s'", conn->sd, server->pname.len, server->pname.data); return NC_OK; } log_error("connect on s %d to server '%.*s' failed: %s", conn->sd, server->pname.len, server->pname.data, strerror(errno)); goto error; } ASSERT(!conn->connecting); conn->connected = 1; log_debug(LOG_INFO, "connected on s %d to server '%.*s'", conn->sd, server->pname.len, server->pname.data); return NC_OK; error: conn->err = errno; return status; } void server_connected(struct context *ctx, struct conn *conn) { struct server *server = conn->owner; ASSERT(!conn->client && !conn->proxy); ASSERT(conn->connecting && !conn->connected); stats_server_incr(ctx, server, server_connections); conn->connecting = 0; conn->connected = 1; log_debug(LOG_INFO, "connected on s %d to server '%.*s'", conn->sd, server->pname.len, server->pname.data); } void server_ok(struct context *ctx, struct conn *conn) { struct server *server = conn->owner; ASSERT(!conn->client && !conn->proxy); ASSERT(conn->connected); if (server->failure_count != 0) { log_debug(LOG_VERB, "reset server '%.*s' failure count from %"PRIu32 " to 0", server->pname.len, server->pname.data, server->failure_count); server->failure_count = 0; server->next_retry = 0LL; } } static rstatus_t server_pool_update(struct server_pool *pool) { rstatus_t status; int64_t now; uint32_t pnlive_server; /* prev # live server */ if (!pool->auto_eject_hosts) { return NC_OK; } if (pool->next_rebuild == 0LL) { return NC_OK; } now = nc_usec_now(); if (now < 0) { return NC_ERROR; } if (now <= pool->next_rebuild) { if (pool->nlive_server == 0) { errno = ECONNREFUSED; return NC_ERROR; } return NC_OK; } pnlive_server = pool->nlive_server; status = server_pool_run(pool); if (status != NC_OK) { log_error("updating pool %"PRIu32" with dist %d failed: %s", pool->idx, pool->dist_type, strerror(errno)); return status; } log_debug(LOG_INFO, "update pool %"PRIu32" '%.*s' to add %"PRIu32" servers", pool->idx, pool->name.len, pool->name.data, pool->nlive_server - pnlive_server); return NC_OK; } static uint32_t server_pool_hash(struct server_pool *pool, uint8_t *key, uint32_t keylen) { ASSERT(array_n(&pool->server) != 0); if (array_n(&pool->server) == 1) { return 0; } ASSERT(key != NULL && keylen != 0); return pool->key_hash((char *)key, keylen); } static struct server * server_pool_server(struct server_pool *pool, uint8_t *key, uint32_t keylen) { struct server *server; uint32_t hash, idx; ASSERT(array_n(&pool->server) != 0); ASSERT(key != NULL && keylen != 0); switch (pool->dist_type) { case DIST_KETAMA: hash = server_pool_hash(pool, key, keylen); idx = ketama_dispatch(pool->continuum, pool->ncontinuum, hash); break; case DIST_MODULA: hash = server_pool_hash(pool, key, keylen); idx = modula_dispatch(pool->continuum, pool->ncontinuum, hash); break; case DIST_RANDOM: idx = random_dispatch(pool->continuum, pool->ncontinuum, 0); break; default: NOT_REACHED(); return NULL; } ASSERT(idx < array_n(&pool->server)); server = array_get(&pool->server, idx); log_debug(LOG_VERB, "key '%.*s' on dist %d maps to server '%.*s'", keylen, key, pool->dist_type, server->pname.len, server->pname.data); return server; } struct conn * server_pool_conn(struct context *ctx, struct server_pool *pool, uint8_t *key, uint32_t keylen) { rstatus_t status; struct server *server; struct conn *conn; status = server_pool_update(pool); if (status != NC_OK) { return NULL; } /* from a given {key, keylen} pick a server from pool */ server = server_pool_server(pool, key, keylen); if (server == NULL) { return NULL; } /* pick a connection to a given server */ conn = server_conn(server); if (conn == NULL) { return NULL; } status = server_connect(ctx, server, conn); if (status != NC_OK) { server_close(ctx, conn); return NULL; } return conn; } static rstatus_t server_pool_each_preconnect(void *elem, void *data) { rstatus_t status; struct server_pool *sp = elem; if (!sp->preconnect) { return NC_OK; } status = array_each(&sp->server, server_each_preconnect, NULL); if (status != NC_OK) { return status; } return NC_OK; } rstatus_t server_pool_preconnect(struct context *ctx) { rstatus_t status; status = array_each(&ctx->pool, server_pool_each_preconnect, NULL); if (status != NC_OK) { return status; } return NC_OK; } static rstatus_t server_pool_each_disconnect(void *elem, void *data) { rstatus_t status; struct server_pool *sp = elem; status = array_each(&sp->server, server_each_disconnect, NULL); if (status != NC_OK) { return status; } return NC_OK; } void server_pool_disconnect(struct context *ctx) { array_each(&ctx->pool, server_pool_each_disconnect, NULL); } static rstatus_t server_pool_each_set_owner(void *elem, void *data) { struct server_pool *sp = elem; struct context *ctx = data; sp->ctx = ctx; return NC_OK; } rstatus_t server_pool_run(struct server_pool *pool) { ASSERT(array_n(&pool->server) != 0); switch (pool->dist_type) { case DIST_KETAMA: return ketama_update(pool); case DIST_MODULA: return modula_update(pool); case DIST_RANDOM: return random_update(pool); default: NOT_REACHED(); return NC_ERROR; } return NC_OK; } static rstatus_t server_pool_each_run(void *elem, void *data) { return server_pool_run(elem); } rstatus_t server_pool_init(struct array *server_pool, struct array *conf_pool, struct context *ctx) { rstatus_t status; uint32_t npool; npool = array_n(conf_pool); ASSERT(npool != 0); ASSERT(array_n(server_pool) == 0); status = array_init(server_pool, npool, sizeof(struct server_pool)); if (status != NC_OK) { return status; } /* transform conf pool to server pool */ status = array_each(conf_pool, conf_pool_each_transform, server_pool); if (status != NC_OK) { server_pool_deinit(server_pool); return status; } ASSERT(array_n(server_pool) == npool); /* set ctx as the server pool owner */ status = array_each(server_pool, server_pool_each_set_owner, ctx); if (status != NC_OK) { server_pool_deinit(server_pool); return status; } /* update server pool continuum */ status = array_each(server_pool, server_pool_each_run, NULL); if (status != NC_OK) { server_pool_deinit(server_pool); return status; } log_debug(LOG_DEBUG, "init %"PRIu32" pools", npool); return NC_OK; } void server_pool_deinit(struct array *server_pool) { uint32_t i, npool; for (i = 0, npool = array_n(server_pool); i < npool; i++) { struct server_pool *sp; sp = array_pop(server_pool); ASSERT(sp->p_conn == NULL); ASSERT(TAILQ_EMPTY(&sp->c_conn_q) && sp->nc_conn_q == 0); if (sp->continuum != NULL) { nc_free(sp->continuum); sp->ncontinuum = 0; sp->nserver_continuum = 0; sp->nlive_server = 0; } server_deinit(&sp->server); log_debug(LOG_DEBUG, "deinit pool %"PRIu32" '%.*s'", sp->idx, sp->name.len, sp->name.data); } array_deinit(server_pool); log_debug(LOG_DEBUG, "deinit %"PRIu32" pools", npool); }