mirror of
https://github.com/phpredis/phpredis.git
synced 2026-06-19 07:35:31 +00:00
Initial attempt at weighted session failover
This commit modifies our weighted session selection algorithm by adding a scratch array so we can mark servers that we've picked but were unable to connect to. All we really do is treat any server that has failed as if it had a weight of zero meaning it'll never be picked. Addresses #1214
This commit is contained in:
+89
-52
@@ -67,49 +67,43 @@ typedef struct {
|
||||
} redis_session_lock_status;
|
||||
|
||||
typedef struct redis_pool_member_ {
|
||||
|
||||
RedisSock *redis_sock;
|
||||
int weight;
|
||||
int database;
|
||||
struct redis_pool_member_ *next;
|
||||
|
||||
} redis_pool_member;
|
||||
|
||||
typedef struct {
|
||||
redis_pool_member *entry; /* Member array */
|
||||
unsigned char *etest; /* Scratch storage (for failover) */
|
||||
int entries; /* Member count */
|
||||
|
||||
int totalWeight;
|
||||
int count;
|
||||
|
||||
redis_pool_member *head;
|
||||
redis_session_lock_status lock_status;
|
||||
|
||||
} redis_pool;
|
||||
|
||||
PHP_REDIS_API void
|
||||
redis_pool_add(redis_pool *pool, RedisSock *redis_sock, int weight, int database)
|
||||
{
|
||||
redis_pool_member *rpm = ecalloc(1, sizeof(redis_pool_member));
|
||||
rpm->redis_sock = redis_sock;
|
||||
rpm->weight = weight;
|
||||
rpm->database = database;
|
||||
redis_pool_add(redis_pool *pool, RedisSock *redis_sock, int weight, int database) {
|
||||
/* Reallocate our member array and scratch storage */
|
||||
pool->entry = erealloc(pool->entry, sizeof(*pool->entry) * (pool->entries + 1));
|
||||
pool->etest = erealloc(pool->etest, sizeof(*pool->etest) * (pool->entries + 1));
|
||||
|
||||
rpm->next = pool->head;
|
||||
pool->head = rpm;
|
||||
/* Set member info */
|
||||
pool->entry[pool->entries].redis_sock = redis_sock;
|
||||
pool->entry[pool->entries].weight = weight;
|
||||
pool->entry[pool->entries].database = database;
|
||||
|
||||
/* Increment weight and bump count */
|
||||
pool->totalWeight += weight;
|
||||
pool->entries++;
|
||||
}
|
||||
|
||||
PHP_REDIS_API void
|
||||
redis_pool_free(redis_pool *pool) {
|
||||
int i;
|
||||
|
||||
redis_pool_member *rpm, *next;
|
||||
rpm = pool->head;
|
||||
while (rpm) {
|
||||
next = rpm->next;
|
||||
redis_sock_disconnect(rpm->redis_sock, 0);
|
||||
redis_free_socket(rpm->redis_sock);
|
||||
efree(rpm);
|
||||
rpm = next;
|
||||
for (i = 0; i < pool->entries; i++) {
|
||||
redis_sock_disconnect(pool->entry[i].redis_sock, 0);
|
||||
redis_free_socket(pool->entry[i].redis_sock);
|
||||
}
|
||||
|
||||
/* Cleanup after our lock */
|
||||
@@ -117,7 +111,11 @@ redis_pool_free(redis_pool *pool) {
|
||||
if (pool->lock_status.lock_secret) zend_string_release(pool->lock_status.lock_secret);
|
||||
if (pool->lock_status.lock_key) zend_string_release(pool->lock_status.lock_key);
|
||||
|
||||
/* Cleanup pool itself */
|
||||
/* Cleanup members */
|
||||
efree(pool->entry);
|
||||
efree(pool->etest);
|
||||
|
||||
/* Free pool itself */
|
||||
efree(pool);
|
||||
}
|
||||
|
||||
@@ -150,43 +148,82 @@ redis_pool_member_select(redis_pool_member *rpm) {
|
||||
efree(cmd);
|
||||
}
|
||||
|
||||
PHP_REDIS_API redis_pool_member *
|
||||
redis_pool_get_sock(redis_pool *pool, const char *key) {
|
||||
|
||||
/*
|
||||
* Pick a weighted random server from our list. This algorithm is slightly
|
||||
* modified in that we use a scratch array to keep track of servers we picked
|
||||
* but were unable to connect to for whatever reason so we can failover to
|
||||
* different servers in the event one is down. */
|
||||
static int
|
||||
redis_pool_select(redis_pool *pool, const char *key, int total_weight) {
|
||||
redis_pool_member *rpm;
|
||||
size_t counter = 0, index;
|
||||
unsigned int pos, i;
|
||||
int weight;
|
||||
|
||||
/* Make a guess based on the key */
|
||||
memcpy(&pos, key, sizeof(pos));
|
||||
pos %= pool->totalWeight;
|
||||
pos %= total_weight;
|
||||
|
||||
redis_pool_member *rpm = pool->head;
|
||||
/* Perform weighted random selection */
|
||||
for(i = 0; i < total_weight;) {
|
||||
index = counter % pool->entries;
|
||||
rpm = &pool->entry[index];
|
||||
weight = !pool->etest[index] ? rpm->weight : 0;
|
||||
|
||||
for(i = 0; i < pool->totalWeight;) {
|
||||
if (pos >= i && pos < i + rpm->weight) {
|
||||
int needs_auth = 0;
|
||||
if (rpm->redis_sock->auth && rpm->redis_sock->status != REDIS_SOCK_STATUS_CONNECTED) {
|
||||
needs_auth = 1;
|
||||
}
|
||||
if (redis_sock_server_open(rpm->redis_sock) == 0) {
|
||||
if (needs_auth) {
|
||||
redis_sock_auth(rpm->redis_sock);
|
||||
}
|
||||
if (rpm->database >= 0) { /* default is -1 which leaves the choice to redis. */
|
||||
redis_pool_member_select(rpm);
|
||||
}
|
||||
|
||||
return rpm;
|
||||
}
|
||||
if (pos >= i && pos < i + weight) {
|
||||
return index;
|
||||
}
|
||||
i += rpm->weight;
|
||||
rpm = rpm->next;
|
||||
|
||||
/* Add this weight */
|
||||
i += weight;
|
||||
counter++;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* A couple of helper macros for authenticating when needed */
|
||||
#define SESSION_SOCK_NEEDS_AUTH(rs) \
|
||||
((rs)->auth && (rs)->status != REDIS_SOCK_STATUS_CONNECTED)
|
||||
#define SESSION_SOCK_AUTH(rs) \
|
||||
(!SESSION_SOCK_NEEDS_AUTH(rs) || redis_sock_auth(rs) == 0)
|
||||
|
||||
PHP_REDIS_API
|
||||
redis_pool_member *redis_pool_get_sock(redis_pool *pool, const char *key) {
|
||||
redis_pool_member *rpm;
|
||||
int i, total_weight;
|
||||
|
||||
/* Initialize overall weight and clean fail array */
|
||||
total_weight = pool->totalWeight;
|
||||
memset(pool->etest, 0, sizeof(*pool->etest) * pool->entries);
|
||||
|
||||
/* Iterate until we run out of nodes to check */
|
||||
while (total_weight > 0 && (i = redis_pool_select(pool, key, total_weight)) != -1) {
|
||||
rpm = &pool->entry[i];
|
||||
|
||||
/* If we can't connect to the server we picked, flag it so it's treated as having
|
||||
* weight = 0 and will never be picked again (for this request) */
|
||||
if (redis_sock_server_open(rpm->redis_sock) != 0 || !SESSION_SOCK_AUTH(rpm->redis_sock)) {
|
||||
total_weight -= rpm->weight;
|
||||
pool->etest[i] = 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Select database if specified */
|
||||
if (rpm->database >= 0) {
|
||||
redis_pool_member_select(rpm);
|
||||
}
|
||||
|
||||
/* Success */
|
||||
return rpm;
|
||||
}
|
||||
|
||||
/* Failure */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Helper to set our session lock key */
|
||||
static int set_session_lock_key(RedisSock *redis_sock, char *cmd, int cmd_len
|
||||
)
|
||||
{
|
||||
static int set_session_lock_key(RedisSock *redis_sock, char *cmd, int cmd_len) {
|
||||
char *reply;
|
||||
int sent_len, reply_len;
|
||||
|
||||
@@ -492,7 +529,7 @@ PS_OPEN_FUNC(redis)
|
||||
}
|
||||
}
|
||||
|
||||
if (pool->head) {
|
||||
if (pool->entries > 0) {
|
||||
PS_SET_MOD_DATA(pool);
|
||||
return SUCCESS;
|
||||
}
|
||||
@@ -573,7 +610,7 @@ PS_CREATE_SID_FUNC(redis)
|
||||
|
||||
if (pool->lock_status.session_key) zend_string_release(pool->lock_status.session_key);
|
||||
pool->lock_status.session_key = redis_session_key(redis_sock, ZSTR_VAL(sid), ZSTR_LEN(sid));
|
||||
|
||||
|
||||
if (lock_acquire(redis_sock, &pool->lock_status) == SUCCESS) {
|
||||
return sid;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user