diff --git a/libmemcached-0.32/libmemcached/memcached_response.c b/libmemcached/libmemcached/memcached_response.c --- a/libmemcached-0.32/libmemcached/memcached_response.c +++ b/libmemcached/libmemcached/memcached_response.c @@ -496,6 +496,8 @@ case PROTOCOL_BINARY_RESPONSE_E2BIG: case PROTOCOL_BINARY_RESPONSE_EINVAL: case PROTOCOL_BINARY_RESPONSE_NOT_STORED: + rc= MEMCACHED_NOTSTORED; + break; case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: case PROTOCOL_BINARY_RESPONSE_ENOMEM: default: diff --git a/libmemcached-0.32/libmemcached/memcached.c b/libmemcached/libmemcached/memcached.c --- a/libmemcached-0.32/libmemcached/memcached.c +++ b/libmemcached/libmemcached/memcached.c @@ -50,6 +50,9 @@ void memcached_free(memcached_st *ptr) if (ptr->continuum) ptr->call_free(ptr, ptr->continuum); + if (ptr->live_host_indices) + ptr->call_free(ptr, ptr->live_host_indices); + if (ptr->is_allocated) ptr->call_free(ptr, ptr); else diff --git a/libmemcached-0.32/libmemcached/memcached.h b/libmemcached/libmemcached/memcached.h --- a/libmemcached-0.32/libmemcached/memcached.h +++ b/libmemcached/libmemcached/memcached.h @@ -112,6 +112,9 @@ struct memcached_st { memcached_trigger_delete_key delete_trigger; char prefix_key[MEMCACHED_PREFIX_KEY_MAX_SIZE]; uint32_t number_of_replicas; + uint32_t number_of_live_hosts; + uint32_t *live_host_indices; + uint32_t live_host_indices_size; }; LIBMEMCACHED_API diff --git a/libmemcached-0.32/libmemcached/memcached_connect.c b/libmemcached/libmemcached/memcached_connect.c --- a/libmemcached-0.32/libmemcached/memcached_connect.c +++ b/libmemcached/libmemcached/memcached_connect.c @@ -273,7 +272,6 @@ static memcached_return network_connect(memcached_server_st *ptr) (void)fcntl(ptr->fd, F_SETFL, flags & ~O_NONBLOCK); WATCHPOINT_ASSERT(ptr->cursor_active == 0); - ptr->server_failure_counter= 0; return MEMCACHED_SUCCESS; } use = use->ai_next; @@ -282,21 +280,13 @@ static memcached_return network_connect(memcached_server_st *ptr) if (ptr->fd == -1) { - /* Failed to connect. schedule next retry */ - if (ptr->root->retry_timeout) - { - struct timeval next_time; - - if (gettimeofday(&next_time, NULL) == 0) - ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout; - } - ptr->server_failure_counter+= 1; + ptr->server_failure_counter ++; if (ptr->cached_errno == 0) return MEMCACHED_TIMEOUT; + return MEMCACHED_ERRNO; /* The last error should be from connect() */ } - ptr->server_failure_counter= 0; return MEMCACHED_SUCCESS; /* The last error should be from connect() */ } @@ -315,7 +305,7 @@ memcached_return memcached_connect(memcached_server_st *ptr) gettimeofday(&next_time, NULL); /* if we've had too many consecutive errors on this server, mark it dead. */ - if (ptr->server_failure_counter > ptr->root->server_failure_limit) + if (ptr->server_failure_counter >= ptr->root->server_failure_limit) { ptr->next_retry= next_time.tv_sec + ptr->root->retry_timeout; ptr->server_failure_counter= 0; diff --git a/libmemcached-0.32/libmemcached/memcached_get.h b/libmemcached/libmemcached/memcached_get.h diff --git a/libmemcached-0.32/libmemcached/memcached_hash.c b/libmemcached/libmemcached/memcached_hash.c index 129d761..3a7fa55 100644 --- a/libmemcached-0.32/libmemcached/memcached_hash.c +++ b/libmemcached/libmemcached/memcached_hash.c @@ -11,6 +11,7 @@ static uint32_t FNV_32_PRIME= 16777619; /* Prototypes */ static uint32_t internal_generate_hash(const char *key, size_t key_length); static uint32_t internal_generate_md5(const char *key, size_t key_length); +uint32_t memcached_live_host_index(memcached_st *ptr, uint32_t num); uint32_t memcached_generate_hash_value(const char *key, size_t key_length, memcached_hash hash_algorithm) { @@ -146,10 +144,10 @@ static uint32_t dispatch_host(memcached_st *ptr, uint32_t hash) right= begin; return right->index; } - case MEMCACHED_DISTRIBUTION_MODULA: - return hash % ptr->number_of_hosts; + case MEMCACHED_DISTRIBUTION_MODULA: + return memcached_live_host_index(ptr, hash); case MEMCACHED_DISTRIBUTION_RANDOM: - return (uint32_t) random() % ptr->number_of_hosts; + return memcached_live_host_index(ptr, (uint32_t) random()); default: WATCHPOINT_ASSERT(0); /* We have added a distribution without extending the logic */ return hash % ptr->number_of_hosts; @@ -201,6 +199,19 @@ uint32_t memcached_generate_hash(memcached_st *ptr, const char *key, size_t key_ return dispatch_host(ptr, hash); } +uint32_t memcached_live_host_index(memcached_st *ptr, uint32_t num) +{ + if (memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS)) { + if (ptr->number_of_live_hosts > 0) { + return ptr->live_host_indices[num % ptr->number_of_live_hosts]; + } else { + return 0; /* FIXME: we should do something different if every server's dead, but I dunno what. */ + } + } else { + return num % ptr->number_of_hosts; + } +} + static uint32_t internal_generate_hash(const char *key, size_t key_length) { const char *ptr= key; diff --git a/libmemcached-0.32/libmemcached/memcached_hosts.c b/libmemcached/libmemcached/memcached_hosts.c --- a/libmemcached-0.32/libmemcached/memcached_hosts.c +++ b/libmemcached/libmemcached/memcached_hosts.c @@ -7,6 +7,7 @@ static memcached_return server_add(memcached_st *ptr, const char *hostname, uint32_t weight, memcached_connection type); memcached_return update_continuum(memcached_st *ptr); +memcached_return update_live_host_indices(memcached_st *ptr); static int compare_servers(const void *p1, const void *p2) { @@ -44,8 +45,12 @@ memcached_return run_distribution(memcached_st *ptr) case MEMCACHED_DISTRIBUTION_MODULA: if (ptr->flags & MEM_USE_SORT_HOSTS) sort_hosts(ptr); + if (memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS)) + update_live_host_indices(ptr); break; case MEMCACHED_DISTRIBUTION_RANDOM: + if (memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_AUTO_EJECT_HOSTS)) + update_live_host_indices(ptr); break; default: WATCHPOINT_ASSERT(0); /* We have added a distribution without extending the logic */ @@ -85,6 +90,46 @@ static uint32_t ketama_server_hash(const char *key, unsigned int key_length, int | (results[0 + alignment * 4] & 0xFF); } +memcached_return update_live_host_indices(memcached_st *ptr) +{ + uint32_t host_index; + struct timeval now; + uint32_t i = 0; + memcached_server_st *list; + + if (gettimeofday(&now, NULL) != 0) + { + ptr->cached_errno = errno; + return MEMCACHED_ERRNO; + } + + if (ptr->live_host_indices == NULL) { + ptr->live_host_indices = (uint32_t *)ptr->call_malloc(ptr, sizeof(uint32_t) * ptr->number_of_hosts); + ptr->live_host_indices_size = ptr->number_of_live_hosts; + } + + /* somehow we added some hosts. Shouldn't get here much, if at all. */ + if (ptr->live_host_indices_size < ptr->number_of_hosts ) { + ptr->live_host_indices = (uint32_t *)ptr->call_realloc(ptr, ptr->live_host_indices, sizeof(uint32_t) * ptr->number_of_hosts); + ptr->live_host_indices_size = ptr->number_of_live_hosts; + } + + if (ptr->live_host_indices == NULL) + return MEMCACHED_FAILURE; + + list = ptr->hosts; + for (host_index= 0; host_index < ptr->number_of_hosts; ++host_index) + { + if (list[host_index].next_retry <= now.tv_sec) { + ptr->live_host_indices[i++] = host_index; + } else if (ptr->next_distribution_rebuild == 0 || list[host_index].next_retry < ptr->next_distribution_rebuild) { + ptr->next_distribution_rebuild= list[host_index].next_retry; + } + } + ptr->number_of_live_hosts = i; + return MEMCACHED_SUCCESS; +} + static int continuum_item_cmp(const void *t1, const void *t2) { memcached_continuum_item_st *ct1= (memcached_continuum_item_st *)t1; @@ -111,8 +156,8 @@ memcached_return update_continuum(memcached_st *ptr) uint32_t pointer_per_server= MEMCACHED_POINTS_PER_SERVER; uint32_t pointer_per_hash= 1; uint64_t total_weight= 0; - uint64_t is_ketama_weighted= 0; - uint64_t is_auto_ejecting= 0; + uint32_t is_ketama_weighted= 0; + uint32_t is_auto_ejecting= 0; uint32_t points_per_server= 0; uint32_t live_servers= 0; struct timeval now; diff --git a/libmemcached-0.32/libmemcached/memcached_storage.c b/libmemcached/libmemcached/memcached_storage.c index ecefc56..ee79a7e 100644 --- a/libmemcached-0.32/libmemcached/memcached_storage.c +++ b/libmemcached/libmemcached/memcached_storage.c @@ -135,22 +135,16 @@ static inline memcached_return memcached_send(memcached_st *ptr, } if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) - { - rc= MEMCACHED_WRITE_FAILURE; - goto error; - } + return MEMCACHED_WRITE_FAILURE; /* Send command header */ rc= memcached_do(&ptr->hosts[server_key], buffer, write_length, 0); if (rc != MEMCACHED_SUCCESS) - goto error; + return rc; /* Send command body */ if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1) - { - rc= MEMCACHED_WRITE_FAILURE; - goto error; - } + return MEMCACHED_WRITE_FAILURE; if ((ptr->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) to_write= 0; @@ -158,10 +152,7 @@ static inline memcached_return memcached_send(memcached_st *ptr, to_write= 1; if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1) - { - rc= MEMCACHED_WRITE_FAILURE; - goto error; - } + return MEMCACHED_WRITE_FAILURE; if (ptr->flags & MEM_NOREPLY) return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS; @@ -170,15 +161,8 @@ static inline memcached_return memcached_send(memcached_st *ptr, return MEMCACHED_BUFFERED; rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); - if (rc == MEMCACHED_STORED) - return MEMCACHED_SUCCESS; - else - return rc; - -error: - memcached_io_reset(&ptr->hosts[server_key]); - + return MEMCACHED_SUCCESS; return rc; }