New select DB command to RedisArray - Added retry delay on reconnect

Added the possibility to delay each reconnection attempt, including a
random factor to prevent several or many concurrent connections from
trying to reconnect at the same time.
Added the select command to RedisArray to select a DB on every
connections in one instruction.
Also, fixed a compiler warning:
redis_array_impl.c:1115:15: warning: incompatible pointer types
assigning to 'zval **' (aka 'struct _zval_struct **') from 'zval
**(*)[2]' [-Wincompatible-pointer-types]
This commit is contained in:
Emmanuel Merali
2013-01-29 11:47:36 +02:00
parent 9ba000c54c
commit d4c7f64131
9 changed files with 112 additions and 24 deletions
+1
View File
@@ -156,6 +156,7 @@ typedef struct {
char *host;
short port;
double timeout;
long retry_interval;
int failed;
int status;
int persistent;
+11 -3
View File
@@ -38,8 +38,9 @@ PHPAPI int redis_check_eof(RedisSock *redis_sock TSRMLS_DC)
int eof;
int count = 0;
if (!redis_sock->stream)
if (!redis_sock->stream) {
return -1;
}
eof = php_stream_eof(redis_sock->stream);
for (; eof; count++) {
@@ -60,6 +61,12 @@ PHPAPI int redis_check_eof(RedisSock *redis_sock TSRMLS_DC)
redis_sock->mode = ATOMIC;
redis_sock->watching = 0;
}
// Wait for a while before trying to reconnect
if (redis_sock->retry_interval) {
// Random factor to avoid having several (or many) concurrent connections trying to reconnect at the same time
long retry_interval = (count ? redis_sock->retry_interval : (random() % redis_sock->retry_interval));
usleep(retry_interval);
}
redis_sock_connect(redis_sock TSRMLS_CC); /* reconnect */
if(redis_sock->stream) { /* check for EOF again. */
eof = php_stream_eof(redis_sock->stream);
@@ -821,7 +828,8 @@ PHPAPI void redis_ping_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_s
* redis_sock_create
*/
PHPAPI RedisSock* redis_sock_create(char *host, int host_len, unsigned short port,
double timeout, int persistent, char *persistent_id)
double timeout, int persistent, char *persistent_id,
long retry_interval)
{
RedisSock *redis_sock;
@@ -831,7 +839,7 @@ PHPAPI RedisSock* redis_sock_create(char *host, int host_len, unsigned short por
redis_sock->status = REDIS_SOCK_STATUS_DISCONNECTED;
redis_sock->watching = 0;
redis_sock->dbNumber = 0;
redis_sock->retry_interval = retry_interval * 1000;
redis_sock->persistent = persistent;
if(persistent_id) {
+1 -1
View File
@@ -19,7 +19,7 @@ PHPAPI void redis_string_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis
PHPAPI void redis_ping_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
PHPAPI void redis_info_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
PHPAPI void redis_type_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
PHPAPI RedisSock* redis_sock_create(char *host, int host_len, unsigned short port, double timeout, int persistent, char *persistent_id);
PHPAPI RedisSock* redis_sock_create(char *host, int host_len, unsigned short port, double timeout, int persistent, char *persistent_id, long retry_interval);
PHPAPI int redis_sock_connect(RedisSock *redis_sock TSRMLS_DC);
PHPAPI int redis_sock_server_open(RedisSock *redis_sock, int force_connect TSRMLS_DC);
PHPAPI int redis_sock_disconnect(RedisSock *redis_sock TSRMLS_DC);
+11 -4
View File
@@ -520,7 +520,7 @@ PHP_METHOD(Redis,__destruct) {
}
}
/* {{{ proto boolean Redis::connect(string host, int port [, double timeout])
/* {{{ proto boolean Redis::connect(string host, int port [, double timeout [, long retry_interval]])
*/
PHP_METHOD(Redis, connect)
{
@@ -556,6 +556,7 @@ PHPAPI int redis_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent) {
int host_len, id;
char *host = NULL;
long port = -1;
long retry_interval = 0;
char *persistent_id = NULL;
int persistent_id_len = -1;
@@ -568,9 +569,10 @@ PHPAPI int redis_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent) {
persistent = 0;
#endif
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os|lds",
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os|ldsl",
&object, redis_ce, &host, &host_len, &port,
&timeout, &persistent_id, &persistent_id_len) == FAILURE) {
&timeout, &persistent_id, &persistent_id_len,
&retry_interval) == FAILURE) {
return FAILURE;
}
@@ -579,6 +581,11 @@ PHPAPI int redis_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent) {
return FAILURE;
}
if (retry_interval < 0L || retry_interval > INT_MAX) {
zend_throw_exception(redis_exception_ce, "Invalid retry interval", 0 TSRMLS_CC);
return FAILURE;
}
if(port == -1 && host_len && host[0] != '/') { /* not unix socket, set to default value */
port = 6379;
}
@@ -595,7 +602,7 @@ PHPAPI int redis_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent) {
zend_clear_exception(TSRMLS_C); /* clear exception triggered by non-existent socket during connect(). */
}
redis_sock = redis_sock_create(host, host_len, port, timeout, persistent, persistent_id);
redis_sock = redis_sock_create(host, host_len, port, timeout, persistent, persistent_id, retry_interval);
if (redis_sock_server_open(redis_sock, 1 TSRMLS_CC) < 0) {
redis_free_socket(redis_sock);
+56 -1
View File
@@ -51,6 +51,7 @@ zend_function_entry redis_array_functions[] = {
PHP_ME(RedisArray, _rehash, NULL, ZEND_ACC_PUBLIC)
/* special implementation for a few functions */
PHP_ME(RedisArray, select, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisArray, info, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisArray, ping, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisArray, mget, NULL, ZEND_ACC_PUBLIC)
@@ -192,6 +193,7 @@ PHP_METHOD(RedisArray, __construct)
RedisArray *ra = NULL;
zend_bool b_index = 0, b_autorehash = 0;
HashTable *hPrev = NULL, *hOpts = NULL;
long l_retry_interval = 0;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|a", &z0, &z_opts) == FAILURE) {
RETURN_FALSE;
@@ -232,6 +234,19 @@ PHP_METHOD(RedisArray, __construct)
if(FAILURE != zend_hash_find(hOpts, "autorehash", sizeof("autorehash"), (void**)&zpData) && Z_TYPE_PP(zpData) == IS_BOOL) {
b_autorehash = Z_BVAL_PP(zpData);
}
/* extract retry_interval option. */
zval **z_retry_interval_pp;
if (FAILURE != zend_hash_find(hOpts, "retry_interval", sizeof("retry_interval"), (void**)&z_retry_interval_pp)) {
if (Z_TYPE_PP(z_retry_interval_pp) == IS_LONG || Z_TYPE_PP(z_retry_interval_pp) == IS_STRING) {
if (Z_TYPE_PP(z_retry_interval_pp) == IS_LONG) {
l_retry_interval = Z_LVAL_PP(z_retry_interval_pp);
}
else {
l_retry_interval = atol(Z_STRVAL_PP(z_retry_interval_pp));
}
}
}
}
/* extract either name of list of hosts from z0 */
@@ -241,7 +256,7 @@ PHP_METHOD(RedisArray, __construct)
break;
case IS_ARRAY:
ra = ra_make_array(Z_ARRVAL_P(z0), z_fun, z_dist, hPrev, b_index TSRMLS_CC);
ra = ra_make_array(Z_ARRVAL_P(z0), z_fun, z_dist, hPrev, b_index, l_retry_interval TSRMLS_CC);
break;
default:
@@ -688,6 +703,46 @@ PHP_METHOD(RedisArray, setOption)
efree(z_args[0]);
efree(z_args[1]);
}
PHP_METHOD(RedisArray, select)
{
zval *object, z_fun, *z_tmp, *z_args[2];
int i;
RedisArray *ra;
long opt;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Ol",
&object, redis_array_ce, &opt) == FAILURE) {
RETURN_FALSE;
}
if (redis_array_get(object, &ra TSRMLS_CC) < 0) {
RETURN_FALSE;
}
/* prepare call */
ZVAL_STRING(&z_fun, "select", 0);
/* copy args */
MAKE_STD_ZVAL(z_args[0]);
ZVAL_LONG(z_args[0], opt);
array_init(return_value);
for(i = 0; i < ra->count; ++i) {
MAKE_STD_ZVAL(z_tmp);
/* Call each node in turn */
call_user_function(&redis_ce->function_table, &ra->redis[i],
&z_fun, z_tmp, 1, z_args TSRMLS_CC);
add_assoc_zval(return_value, ra->hosts[i], z_tmp);
}
/* cleanup */
efree(z_args[0]);
}
#define HANDLE_MULTI_EXEC(cmd) do {\
if (redis_array_get(getThis(), &ra TSRMLS_CC) >= 0 && ra->z_multi_exec) {\
int i, num_varargs;\
+1
View File
@@ -15,6 +15,7 @@ PHP_METHOD(RedisArray, _function);
PHP_METHOD(RedisArray, _distributor);
PHP_METHOD(RedisArray, _rehash);
PHP_METHOD(RedisArray, select);
PHP_METHOD(RedisArray, info);
PHP_METHOD(RedisArray, ping);
PHP_METHOD(RedisArray, mget);
+26 -7
View File
@@ -29,7 +29,7 @@ extern int le_redis_sock;
extern zend_class_entry *redis_ce;
RedisArray*
ra_load_hosts(RedisArray *ra, HashTable *hosts TSRMLS_DC)
ra_load_hosts(RedisArray *ra, HashTable *hosts, long retry_interval TSRMLS_DC)
{
int i, host_len, id;
int count = zend_hash_num_elements(hosts);
@@ -67,7 +67,7 @@ ra_load_hosts(RedisArray *ra, HashTable *hosts TSRMLS_DC)
call_user_function(&redis_ce->function_table, &ra->redis[i], &z_cons, &z_ret, 0, NULL TSRMLS_CC);
/* create socket */
redis_sock = redis_sock_create(host, host_len, port, 0, 0, NULL); /* TODO: persistence? */
redis_sock = redis_sock_create(host, host_len, port, 0, 0, NULL, retry_interval); /* TODO: persistence? */
/* connect */
redis_sock_server_open(redis_sock, 1 TSRMLS_CC);
@@ -158,9 +158,11 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) {
zval *z_params_funs, **z_data_pp, *z_fun = NULL, *z_dist = NULL;
zval *z_params_index;
zval *z_params_autorehash;
zval *z_params_retry_interval;
RedisArray *ra = NULL;
zend_bool b_index = 0, b_autorehash = 0;
long l_retry_interval = 0;
HashTable *hHosts = NULL, *hPrev = NULL;
/* find entry */
@@ -223,8 +225,23 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) {
}
}
/* find retry interval option */
MAKE_STD_ZVAL(z_params_retry_interval);
array_init(z_params_retry_interval);
sapi_module.treat_data(PARSE_STRING, estrdup(INI_STR("redis.arrays.retryinterval")), z_params_retry_interval TSRMLS_CC);
if (zend_hash_find(Z_ARRVAL_P(z_params_retry_interval), name, strlen(name) + 1, (void **) &z_data_pp) != FAILURE) {
if (Z_TYPE_PP(z_data_pp) == IS_LONG || Z_TYPE_PP(z_data_pp) == IS_STRING) {
if (Z_TYPE_PP(z_data_pp) == IS_LONG) {
l_retry_interval = Z_LVAL_PP(z_data_pp);
}
else {
l_retry_interval = atol(Z_STRVAL_PP(z_data_pp));
}
}
}
/* create RedisArray object */
ra = ra_make_array(hHosts, z_fun, z_dist, hPrev, b_index TSRMLS_CC);
ra = ra_make_array(hHosts, z_fun, z_dist, hPrev, b_index, l_retry_interval TSRMLS_CC);
ra->auto_rehash = b_autorehash;
/* cleanup */
@@ -238,12 +255,14 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) {
efree(z_params_index);
zval_dtor(z_params_autorehash);
efree(z_params_autorehash);
zval_dtor(z_params_retry_interval);
efree(z_params_retry_interval);
return ra;
}
RedisArray *
ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev, zend_bool b_index TSRMLS_DC) {
ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev, zend_bool b_index, long retry_interval TSRMLS_DC) {
int count = zend_hash_num_elements(hosts);
@@ -261,10 +280,10 @@ ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev
/* init array data structures */
ra_init_function_table(ra);
if(NULL == ra_load_hosts(ra, hosts TSRMLS_CC)) {
if(NULL == ra_load_hosts(ra, hosts, retry_interval TSRMLS_CC)) {
return NULL;
}
ra->prev = hosts_prev ? ra_make_array(hosts_prev, z_fun, z_dist, NULL, b_index TSRMLS_CC) : NULL;
ra->prev = hosts_prev ? ra_make_array(hosts_prev, z_fun, z_dist, NULL, b_index, retry_interval TSRMLS_CC) : NULL;
/* copy function if provided */
if(z_fun) {
@@ -1112,7 +1131,7 @@ static void zval_rehash_callback(zend_fcall_info *z_cb, zend_fcall_info_cache *z
zval *z_host, *z_count;
z_cb->retval_ptr_ptr = &z_ret;
z_cb->params = &z_args;
z_cb->params = (struct _zval_struct ***)&z_args;
z_cb->param_count = 2;
z_cb->no_separation = 0;
+2 -2
View File
@@ -5,9 +5,9 @@
#include "common.h"
#include "redis_array.h"
RedisArray* ra_load_hosts(RedisArray *ra, HashTable *hosts TSRMLS_DC);
RedisArray *ra_load_hosts(RedisArray *ra, HashTable *hosts, long retry_interval TSRMLS_DC);
RedisArray *ra_load_array(const char *name TSRMLS_DC);
RedisArray *ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev, zend_bool b_index TSRMLS_DC);
RedisArray *ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev, zend_bool b_index, long retry_interval TSRMLS_DC);
zval *ra_find_node_by_name(RedisArray *ra, const char *host, int host_len TSRMLS_DC);
zval *ra_find_node(RedisArray *ra, const char *key, int key_len, int *out_pos TSRMLS_DC);
void ra_init_function_table(RedisArray *ra);
+3 -6
View File
@@ -206,6 +206,7 @@ PS_OPEN_FUNC(redis)
int persistent = 0;
int database = -1;
char *prefix = NULL, *auth = NULL, *persistent_id = NULL;
long retry_interval = 0;
/* translate unix: into file: */
if (!strncmp(save_path+i, "unix:", sizeof("unix:")-1)) {
@@ -240,7 +241,6 @@ PS_OPEN_FUNC(redis)
convert_to_long_ex(param);
weight = Z_LVAL_PP(param);
}
if (zend_hash_find(Z_ARRVAL_P(params), "timeout", sizeof("timeout"), (void **) &param) != FAILURE) {
timeout = atof(Z_STRVAL_PP(param));
}
@@ -260,13 +260,10 @@ PS_OPEN_FUNC(redis)
convert_to_long_ex(param);
database = Z_LVAL_PP(param);
}
/* // not supported yet
if (zend_hash_find(Z_ARRVAL_P(params), "retry_interval", sizeof("retry_interval"), (void **) &param) != FAILURE) {
convert_to_long_ex(param);
retry_interval = Z_LVAL_PP(param);
}
*/
zval_ptr_dtor(&params);
}
@@ -280,9 +277,9 @@ PS_OPEN_FUNC(redis)
RedisSock *redis_sock;
if(url->host) {
redis_sock = redis_sock_create(url->host, strlen(url->host), url->port, timeout, persistent, persistent_id);
redis_sock = redis_sock_create(url->host, strlen(url->host), url->port, timeout, persistent, persistent_id, retry_interval);
} else { /* unix */
redis_sock = redis_sock_create(url->path, strlen(url->path), 0, timeout, persistent, persistent_id);
redis_sock = redis_sock_create(url->path, strlen(url->path), 0, timeout, persistent, persistent_id, retry_interval);
}
redis_pool_add(pool, redis_sock, weight, database, prefix, auth TSRMLS_CC);