mirror of
https://github.com/phpredis/phpredis.git
synced 2026-06-19 07:35:31 +00:00
Rehashing callback
This commit is contained in:
+4
-4
@@ -344,11 +344,11 @@ PHP_METHOD(RedisArray, _function)
|
||||
|
||||
PHP_METHOD(RedisArray, _rehash)
|
||||
{
|
||||
zval *object;
|
||||
zval *object, *z_cb = NULL;
|
||||
RedisArray *ra;
|
||||
|
||||
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O",
|
||||
&object, redis_array_ce) == FAILURE) {
|
||||
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O|z",
|
||||
&object, redis_array_ce, &z_cb) == FAILURE) {
|
||||
RETURN_FALSE;
|
||||
}
|
||||
|
||||
@@ -356,7 +356,7 @@ PHP_METHOD(RedisArray, _rehash)
|
||||
RETURN_FALSE;
|
||||
}
|
||||
|
||||
ra_rehash(ra);
|
||||
ra_rehash(ra, z_cb);
|
||||
}
|
||||
|
||||
PHP_METHOD(RedisArray, info)
|
||||
|
||||
+26
-4
@@ -690,8 +690,25 @@ ra_move_key(const char *key, int key_len, zval *z_from, zval *z_to) {
|
||||
ra_index_exec(z_to, NULL);
|
||||
}
|
||||
|
||||
/* callback with the current progress, with hostname and count */
|
||||
static void zval_rehash_callback(zval *z_cb, const char *hostname, long count) {
|
||||
|
||||
zval z_ret, *z_args[2];
|
||||
|
||||
/* run cb(hostname, count) */
|
||||
MAKE_STD_ZVAL(z_args[0]);
|
||||
ZVAL_STRING(z_args[0], hostname, 0);
|
||||
MAKE_STD_ZVAL(z_args[1]);
|
||||
ZVAL_LONG(z_args[1], count);
|
||||
call_user_function(EG(function_table), NULL, z_cb, &z_ret, 2, z_args TSRMLS_CC);
|
||||
|
||||
/* cleanup */
|
||||
efree(z_args[0]);
|
||||
efree(z_args[1]);
|
||||
}
|
||||
|
||||
static void
|
||||
ra_rehash_server(RedisArray *ra, zval *z_redis, const char *hostname, zend_bool b_index) {
|
||||
ra_rehash_server(RedisArray *ra, zval *z_redis, const char *hostname, zend_bool b_index, zval *z_cb) {
|
||||
|
||||
char **keys;
|
||||
int *key_lens;
|
||||
@@ -706,10 +723,15 @@ ra_rehash_server(RedisArray *ra, zval *z_redis, const char *hostname, zend_bool
|
||||
count = ra_rehash_scan_keys(z_redis, &keys, &key_lens);
|
||||
}
|
||||
|
||||
/* callback */
|
||||
if(z_cb) {
|
||||
zval_rehash_callback(z_cb, hostname, count);
|
||||
}
|
||||
|
||||
/* for each key, redistribute */
|
||||
for(i = 0; i < count; ++i) {
|
||||
|
||||
/* TODO: check that we're not moving to the same node. */
|
||||
/* check that we're not moving to the same node. */
|
||||
z_target = ra_find_node(ra, keys[i], key_lens[i], &target_pos);
|
||||
|
||||
if(strcmp(hostname, ra->hosts[target_pos])) { /* different host */
|
||||
@@ -727,7 +749,7 @@ ra_rehash_server(RedisArray *ra, zval *z_redis, const char *hostname, zend_bool
|
||||
}
|
||||
|
||||
void
|
||||
ra_rehash(RedisArray *ra) {
|
||||
ra_rehash(RedisArray *ra, zval *z_cb) {
|
||||
|
||||
int i;
|
||||
|
||||
@@ -736,7 +758,7 @@ ra_rehash(RedisArray *ra) {
|
||||
return; /* TODO: compare the two rings for equality */
|
||||
|
||||
for(i = 0; i < ra->prev->count; ++i) {
|
||||
ra_rehash_server(ra, ra->prev->redis[i], ra->prev->hosts[i], ra->index);
|
||||
ra_rehash_server(ra, ra->prev->redis[i], ra->prev->hosts[i], ra->index, z_cb);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+1
-1
@@ -17,6 +17,6 @@ void ra_index_key(const char *key, int key_len, zval *z_redis TSRMLS_DC);
|
||||
void ra_index_exec(zval *z_redis, zval *return_value);
|
||||
zend_bool ra_is_write_cmd(RedisArray *ra, const char *cmd, int cmd_len);
|
||||
|
||||
void ra_rehash(RedisArray *ra);
|
||||
void ra_rehash(RedisArray *ra, zval *z_cb);
|
||||
|
||||
#endif
|
||||
|
||||
@@ -79,7 +79,11 @@ echo "All key/value pairs read successfuly using the new setup with a fallback\n
|
||||
report_info($ra);
|
||||
|
||||
echo "Rehash array.\n";
|
||||
$ra->_rehash();
|
||||
|
||||
function rehash_callback($host, $count) {
|
||||
echo "Now rehashing $count keys on node $host\n";
|
||||
}
|
||||
$ra->_rehash('rehash_callback');
|
||||
|
||||
echo "Creating a new ring without fallback and reading back all the values.\n";
|
||||
$ra = new RedisArray($secondRing);
|
||||
|
||||
Reference in New Issue
Block a user