Refactor PubSub command

This commit is contained in:
Pavlo Yatsukhnenko
2022-10-31 15:39:56 +02:00
committed by Michael Grunder
parent cc2383f076
commit 2a0d1c1e6d
6 changed files with 105 additions and 113 deletions
+22
View File
@@ -455,6 +455,22 @@ redis_sock_read_scan_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
}
}
PHP_REDIS_API int
redis_pubsub_response(INTERNAL_FUNCTION_PARAMETERS,
RedisSock *redis_sock, zval *z_tab, void *ctx)
{
if (ctx == NULL) {
return redis_long_response(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL);
} else if (ctx == PHPREDIS_CTX_PTR) {
return redis_read_variant_reply(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL);
} else if (ctx == PHPREDIS_CTX_PTR + 1) {
return redis_mbulk_reply_zipped_keys_int(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL);
} else {
ZEND_ASSERT(!"memory corruption?");
return FAILURE;
}
}
static void
ht_free_subs(zval *data)
{
@@ -1354,6 +1370,7 @@ redis_zrandmember_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
return redis_mbulk_reply_zipped_keys_dbl(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL);
} else {
ZEND_ASSERT(!"memory corruption?");
return FAILURE;
}
}
@@ -1366,6 +1383,7 @@ redis_zdiff_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *
return redis_mbulk_reply_zipped_keys_dbl(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL);
} else {
ZEND_ASSERT(!"memory corruption?");
return FAILURE;
}
}
@@ -1378,6 +1396,7 @@ redis_set_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_
return redis_string_response(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL);
} else {
ZEND_ASSERT(!"memory corruption?");
return FAILURE;
}
}
@@ -1392,6 +1411,7 @@ redis_hrandfield_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, z
return redis_mbulk_reply_zipped_raw(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL);
} else {
ZEND_ASSERT(!"memory corruption?");
return FAILURE;
}
}
@@ -1404,6 +1424,7 @@ redis_pop_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_
return redis_mbulk_reply_raw(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL);
} else {
ZEND_ASSERT(!"memory corruption?");
return FAILURE;
}
}
@@ -1447,6 +1468,7 @@ redis_lpos_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z
}
} else {
ZEND_ASSERT(!"memory corruption?");
return FAILURE;
}
if (IS_ATOMIC(redis_sock)) {
+2
View File
@@ -109,6 +109,8 @@ PHP_REDIS_API int redis_xclaim_reply(INTERNAL_FUNCTION_PARAMETERS,
PHP_REDIS_API int redis_xinfo_reply(INTERNAL_FUNCTION_PARAMETERS,
RedisSock *redis_sock, zval *z_tab, void *ctx);
PHP_REDIS_API int redis_pubsub_response(INTERNAL_FUNCTION_PARAMETERS,
RedisSock *redis_sock, zval *z_tab, void *ctx);
PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
RedisSock *redis_sock, zval *z_tab, void *ctx);
PHP_REDIS_API int redis_unsubscribe_response(INTERNAL_FUNCTION_PARAMETERS,
+1 -110
View File
@@ -2707,122 +2707,13 @@ PHP_METHOD(Redis, wait) {
REDIS_PROCESS_RESPONSE(redis_long_response);
}
/* Construct a PUBSUB command */
PHP_REDIS_API int
redis_build_pubsub_cmd(RedisSock *redis_sock, char **ret, PUBSUB_TYPE type,
zval *arg)
{
HashTable *ht_chan;
zval *z_ele;
smart_string cmd = {0};
if (type == PUBSUB_CHANNELS) {
if (arg) {
/* With a pattern */
return REDIS_SPPRINTF(ret, "PUBSUB", "sk", "CHANNELS", sizeof("CHANNELS") - 1,
Z_STRVAL_P(arg), Z_STRLEN_P(arg));
} else {
/* No pattern */
return REDIS_SPPRINTF(ret, "PUBSUB", "s", "CHANNELS", sizeof("CHANNELS") - 1);
}
} else if (type == PUBSUB_NUMSUB) {
ht_chan = Z_ARRVAL_P(arg);
// Add PUBSUB and NUMSUB bits
redis_cmd_init_sstr(&cmd, zend_hash_num_elements(ht_chan)+1, "PUBSUB", sizeof("PUBSUB")-1);
redis_cmd_append_sstr(&cmd, "NUMSUB", sizeof("NUMSUB")-1);
/* Iterate our elements */
ZEND_HASH_FOREACH_VAL(ht_chan, z_ele) {
zend_string *zstr = zval_get_string(z_ele);
redis_cmd_append_sstr_key(&cmd, ZSTR_VAL(zstr), ZSTR_LEN(zstr), redis_sock, NULL);
zend_string_release(zstr);
} ZEND_HASH_FOREACH_END();
/* Set return */
*ret = cmd.c;
return cmd.len;
} else if (type == PUBSUB_NUMPAT) {
return REDIS_SPPRINTF(ret, "PUBSUB", "s", "NUMPAT", sizeof("NUMPAT") - 1);
}
/* Shouldn't ever happen */
return -1;
}
/*
* {{{ proto Redis::pubsub("channels", pattern);
* proto Redis::pubsub("numsub", Array channels);
* proto Redis::pubsub("numpat"); }}}
*/
PHP_METHOD(Redis, pubsub) {
zval *object;
RedisSock *redis_sock;
char *keyword, *cmd;
int cmd_len;
size_t kw_len;
PUBSUB_TYPE type;
zval *arg = NULL;
// Parse arguments
if(zend_parse_method_parameters(ZEND_NUM_ARGS(), getThis(),
"Os|z", &object, redis_ce, &keyword,
&kw_len, &arg)==FAILURE)
{
RETURN_FALSE;
}
/* Validate our sub command keyword, and that we've got proper arguments */
if(!strncasecmp(keyword, "channels", sizeof("channels"))) {
/* One (optional) string argument */
if(arg && Z_TYPE_P(arg) != IS_STRING) {
RETURN_FALSE;
}
type = PUBSUB_CHANNELS;
} else if(!strncasecmp(keyword, "numsub", sizeof("numsub"))) {
/* One array argument */
if(ZEND_NUM_ARGS() < 2 || Z_TYPE_P(arg) != IS_ARRAY ||
zend_hash_num_elements(Z_ARRVAL_P(arg)) == 0)
{
RETURN_FALSE;
}
type = PUBSUB_NUMSUB;
} else if(!strncasecmp(keyword, "numpat", sizeof("numpat"))) {
type = PUBSUB_NUMPAT;
} else {
/* Invalid keyword */
RETURN_FALSE;
}
/* Grab our socket context object */
if ((redis_sock = redis_sock_get(object, 0)) == NULL) {
RETURN_FALSE;
}
/* Construct our "PUBSUB" command */
cmd_len = redis_build_pubsub_cmd(redis_sock, &cmd, type, arg);
REDIS_PROCESS_REQUEST(redis_sock, cmd, cmd_len);
if(type == PUBSUB_NUMSUB) {
if (IS_ATOMIC(redis_sock)) {
if(redis_mbulk_reply_zipped_keys_int(INTERNAL_FUNCTION_PARAM_PASSTHRU,
redis_sock, NULL, NULL) < 0)
{
RETURN_FALSE;
}
}
REDIS_PROCESS_RESPONSE(redis_mbulk_reply_zipped_keys_int);
} else {
if (IS_ATOMIC(redis_sock)) {
if(redis_read_variant_reply(INTERNAL_FUNCTION_PARAM_PASSTHRU,
redis_sock, NULL, NULL) < 0)
{
RETURN_FALSE;
}
}
REDIS_PROCESS_RESPONSE(redis_read_variant_reply);
}
REDIS_PROCESS_CMD(pubsub, redis_pubsub_response);
}
/* {{{ proto variant Redis::eval(string script, [array keys, long num_keys]) */
+75 -1
View File
@@ -1373,7 +1373,81 @@ redis_zinterunionstore_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
return SUCCESS;
}
/* SUBSCRIBE/PSUBSCRIBE */
int redis_pubsub_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char **cmd, int *cmd_len, short *slot, void **ctx)
{
HashTable *channels = NULL;
smart_string cmdstr = {0};
zend_string *op, *pattern = NULL;
zval *arg = NULL, *z_chan;
ZEND_PARSE_PARAMETERS_START(1, 2)
Z_PARAM_STR(op)
Z_PARAM_OPTIONAL
Z_PARAM_ZVAL(arg)
ZEND_PARSE_PARAMETERS_END_EX(return FAILURE);
if (zend_string_equals_literal_ci(op, "NUMPAT")) {
*ctx = NULL;
} else if (zend_string_equals_literal_ci(op, "CHANNELS") ||
zend_string_equals_literal_ci(op, "SHARDCHANNELS")
) {
if (arg != NULL) {
if (Z_TYPE_P(arg) != IS_STRING) {
php_error_docref(NULL, E_WARNING, "Invalid patern value");
return FAILURE;
}
pattern = zval_get_string(arg);
}
*ctx = PHPREDIS_CTX_PTR;
} else if (zend_string_equals_literal_ci(op, "NUMSUB") ||
zend_string_equals_literal_ci(op, "SHARDNUMSUB")
) {
if (arg != NULL) {
if (Z_TYPE_P(arg) != IS_ARRAY) {
php_error_docref(NULL, E_WARNING, "Invalid channels value");
return FAILURE;
}
channels = Z_ARRVAL_P(arg);
}
*ctx = PHPREDIS_CTX_PTR + 1;
} else {
php_error_docref(NULL, E_WARNING, "Unknown PUBSUB operation '%s'", ZSTR_VAL(op));
return FAILURE;
}
REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, 1 + !!pattern + (channels ? zend_hash_num_elements(channels) : 0), "PUBSUB");
redis_cmd_append_sstr_zstr(&cmdstr, op);
if (pattern != NULL) {
redis_cmd_append_sstr_zstr(&cmdstr, pattern);
zend_string_release(pattern);
} else if (channels != NULL) {
ZEND_HASH_FOREACH_VAL(channels, z_chan) {
// We want to deal with strings here
zend_string *zstr = zval_get_string(z_chan);
// Grab channel name, prefix if required
char *key = ZSTR_VAL(zstr);
size_t key_len = ZSTR_LEN(zstr);
int key_free = redis_key_prefix(redis_sock, &key, &key_len);
// Add this channel
redis_cmd_append_sstr(&cmdstr, key, key_len);
zend_string_release(zstr);
// Free our key if it was prefixed
if (key_free) efree(key);
} ZEND_HASH_FOREACH_END();
}
*cmd = cmdstr.c;
*cmd_len = cmdstr.len;
return SUCCESS;
}
/* SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE */
int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char *kw, char **cmd, int *cmd_len, short *slot,
void **ctx)
+3
View File
@@ -139,6 +139,9 @@ int redis_mpop_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, char *kw
int redis_restore_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char **cmd, int *cmd_len, short *slot, void **ctx);
int redis_pubsub_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char **cmd, int *cmd_len, short *slot, void **ctx);
int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char *kw, char **cmd, int *cmd_len, short *slot, void **ctx);
+2 -2
View File
@@ -202,8 +202,8 @@ class Redis_Test extends TestSuite
$this->assertTrue(is_int($result));
// Invalid calls
$this->assertFalse($this->redis->pubsub("notacommand"));
$this->assertFalse($this->redis->pubsub("numsub", "not-an-array"));
$this->assertFalse(@$this->redis->pubsub("notacommand"));
$this->assertFalse(@$this->redis->pubsub("numsub", "not-an-array"));
}
/* These test cases were generated randomly. We're just trying to test