mirror of
https://github.com/phpredis/phpredis.git
synced 2026-06-19 07:35:31 +00:00
Implement consistent hashing algorithm for RedisArray
This commit is contained in:
+81
-5
@@ -24,6 +24,7 @@
|
||||
#include "SAPI.h"
|
||||
#include "ext/standard/url.h"
|
||||
#include "ext/standard/crc32.h"
|
||||
#include "ext/standard/md5.h"
|
||||
|
||||
#define PHPREDIS_INDEX_NAME "__phpredis_array_index__"
|
||||
|
||||
@@ -173,9 +174,10 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) {
|
||||
zval z_params_connect_timeout;
|
||||
zval z_params_read_timeout;
|
||||
zval z_params_lazy_connect;
|
||||
zval z_params_consistent;
|
||||
RedisArray *ra = NULL;
|
||||
|
||||
zend_bool b_index = 0, b_autorehash = 0, b_pconnect = 0;
|
||||
zend_bool b_index = 0, b_autorehash = 0, b_pconnect = 0, consistent = 0;
|
||||
long l_retry_interval = 0;
|
||||
zend_bool b_lazy_connect = 0;
|
||||
double d_connect_timeout = 0, read_timeout = 0.0;
|
||||
@@ -312,9 +314,20 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) {
|
||||
}
|
||||
}
|
||||
|
||||
/* find consistent option */
|
||||
array_init(&z_params_consistent);
|
||||
if ((iptr = INI_STR("redis.arrays.consistent")) != NULL) {
|
||||
sapi_module.treat_data(PARSE_STRING, estrdup(iptr), &z_params_consistent TSRMLS_CC);
|
||||
}
|
||||
if ((z_data = zend_hash_str_find(Z_ARRVAL(z_params_consistent), name, name_len)) != NULL) {
|
||||
if (Z_TYPE_P(z_data) == IS_STRING && strncmp(Z_STRVAL_P(z_data), "1", 1) == 0) {
|
||||
consistent = 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* create RedisArray object */
|
||||
ra = ra_make_array(hHosts, &z_fun, &z_dist, hPrev, b_index, b_pconnect, l_retry_interval, b_lazy_connect, d_connect_timeout, read_timeout TSRMLS_CC);
|
||||
ra = ra_make_array(hHosts, &z_fun, &z_dist, hPrev, b_index, b_pconnect, l_retry_interval, b_lazy_connect, d_connect_timeout, read_timeout, consistent TSRMLS_CC);
|
||||
if (ra) {
|
||||
ra->auto_rehash = b_autorehash;
|
||||
if(ra->prev) ra->prev->auto_rehash = b_autorehash;
|
||||
@@ -332,14 +345,55 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) {
|
||||
zval_dtor(&z_params_connect_timeout);
|
||||
zval_dtor(&z_params_read_timeout);
|
||||
zval_dtor(&z_params_lazy_connect);
|
||||
zval_dtor(&z_params_consistent);
|
||||
zval_dtor(&z_dist);
|
||||
zval_dtor(&z_fun);
|
||||
|
||||
return ra;
|
||||
}
|
||||
|
||||
static int
|
||||
ra_points_cmp(const void *v1, const void *v2)
|
||||
{
|
||||
const ContinuumPoint *p1 = v1, *p2 = v2;
|
||||
|
||||
return p1->value < p2->value ? - 1 : p1->value > p2->value;
|
||||
}
|
||||
|
||||
static Continuum *
|
||||
ra_make_continuum(zend_string **hosts, int nb_hosts)
|
||||
{
|
||||
int i, j, k, len, idx = 0;
|
||||
char host[HOST_NAME_MAX];
|
||||
unsigned char digest[16];
|
||||
PHP_MD5_CTX ctx;
|
||||
Continuum *c;
|
||||
|
||||
c = ecalloc(1, sizeof(*c));
|
||||
c->nb_points = nb_hosts * 160; /* 40 hashes, 4 numbers per hash = 160 points per server */
|
||||
c->points = ecalloc(c->nb_points, sizeof(*c->points));
|
||||
|
||||
for (i = 0; i < nb_hosts; ++i) {
|
||||
for (j = 0; j < 40; ++j) {
|
||||
len = snprintf(host, sizeof(host), "%.*s-%u", ZSTR_LEN(hosts[i]), ZSTR_VAL(hosts[i]), j);
|
||||
PHP_MD5Init(&ctx);
|
||||
PHP_MD5Update(&ctx, host, len);
|
||||
PHP_MD5Final(digest, &ctx);
|
||||
for (k = 0; k < 4; ++k) {
|
||||
c->points[idx].index = i;
|
||||
c->points[idx++].value = (digest[3 + k * 4] << 24)
|
||||
| (digest[2 + k * 4] << 16)
|
||||
| (digest[1 + k * 4] << 8)
|
||||
| (digest[k * 4]);
|
||||
}
|
||||
}
|
||||
}
|
||||
qsort(c->points, c->nb_points, sizeof(*c->points), ra_points_cmp);
|
||||
return c;
|
||||
}
|
||||
|
||||
RedisArray *
|
||||
ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev, zend_bool b_index, zend_bool b_pconnect, long retry_interval, zend_bool b_lazy_connect, double connect_timeout, double read_timeout TSRMLS_DC) {
|
||||
ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev, zend_bool b_index, zend_bool b_pconnect, long retry_interval, zend_bool b_lazy_connect, double connect_timeout, double read_timeout, zend_bool consistent TSRMLS_DC) {
|
||||
|
||||
int i, count;
|
||||
RedisArray *ra;
|
||||
@@ -357,6 +411,7 @@ ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev
|
||||
ra->pconnect = b_pconnect;
|
||||
ra->connect_timeout = connect_timeout;
|
||||
ra->read_timeout = read_timeout;
|
||||
ra->continuum = NULL;
|
||||
|
||||
if (ra_load_hosts(ra, hosts, retry_interval, b_lazy_connect TSRMLS_CC) == NULL || !ra->count) {
|
||||
for (i = 0; i < ra->count; ++i) {
|
||||
@@ -368,7 +423,7 @@ ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev
|
||||
efree(ra);
|
||||
return NULL;
|
||||
}
|
||||
ra->prev = hosts_prev ? ra_make_array(hosts_prev, z_fun, z_dist, NULL, b_index, b_pconnect, retry_interval, b_lazy_connect, connect_timeout, read_timeout TSRMLS_CC) : NULL;
|
||||
ra->prev = hosts_prev ? ra_make_array(hosts_prev, z_fun, z_dist, NULL, b_index, b_pconnect, retry_interval, b_lazy_connect, connect_timeout, read_timeout, consistent TSRMLS_CC) : NULL;
|
||||
|
||||
/* init array data structures */
|
||||
ra_init_function_table(ra);
|
||||
@@ -377,6 +432,11 @@ ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev
|
||||
ZVAL_ZVAL(&ra->z_fun, z_fun, 1, 0);
|
||||
ZVAL_ZVAL(&ra->z_dist, z_dist, 1, 0);
|
||||
|
||||
/* init continuum */
|
||||
if (consistent) {
|
||||
ra->continuum = ra_make_continuum(ra->hosts, ra->count);
|
||||
}
|
||||
|
||||
return ra;
|
||||
}
|
||||
|
||||
@@ -480,7 +540,23 @@ ra_find_node(RedisArray *ra, const char *key, int key_len, int *out_pos TSRMLS_D
|
||||
}
|
||||
|
||||
/* get position on ring */
|
||||
pos = (int)((ret ^ 0xffffffff) * ra->count / 0xffffffff);
|
||||
if (ra->continuum) {
|
||||
int left = 0, right = ra->continuum->nb_points;
|
||||
while (left < right) {
|
||||
i = (int)((left + right) / 2);
|
||||
if (ra->continuum->points[i].value < ret) {
|
||||
left = i + 1;
|
||||
} else {
|
||||
right = i;
|
||||
}
|
||||
}
|
||||
if (right == ra->continuum->nb_points) {
|
||||
right = 0;
|
||||
}
|
||||
pos = ra->continuum->points[right].index;
|
||||
} else {
|
||||
pos = (int)((ret ^ 0xffffffff) * ra->count / 0xffffffff);
|
||||
}
|
||||
} else {
|
||||
pos = ra_call_distributor(ra, key, key_len TSRMLS_CC);
|
||||
if (pos < 0 || pos >= ra->count) {
|
||||
|
||||
Reference in New Issue
Block a user