Skip to content

[RR-320] Fix delayed blocked client handling #640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions deps/common/redismodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ typedef uint64_t RedisModuleTimerID;
* Use RedisModule_GetModuleOptionsAll instead. */
#define _REDISMODULE_OPTIONS_FLAGS_NEXT (1<<4)

/* default set of items to flush in an execution unit */
#define REDISMODULE_FLUSH_EXEC_UNIT_FLAG_DEFAULT (1<<0)

/* Definitions for RedisModule_SetCommandInfo. */

typedef enum {
Expand Down Expand Up @@ -976,7 +979,7 @@ REDISMODULE_API int (*RedisModule_GetSelectedDb)(RedisModuleCtx *ctx) REDISMODUL
REDISMODULE_API int (*RedisModule_SelectDb)(RedisModuleCtx *ctx, int newid) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_KeyExists)(RedisModuleCtx *ctx, RedisModuleString *keyname) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleKey * (*RedisModule_OpenKey)(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetOpenKeyModesAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetOpenKeyModesAll)(void) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_CloseKey)(RedisModuleKey *kp) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_KeyType)(RedisModuleKey *kp) REDISMODULE_ATTR;
REDISMODULE_API size_t (*RedisModule_ValueLength)(RedisModuleKey *kp) REDISMODULE_ATTR;
Expand All @@ -986,6 +989,7 @@ REDISMODULE_API RedisModuleString * (*RedisModule_ListGet)(RedisModuleKey *key,
REDISMODULE_API int (*RedisModule_ListSet)(RedisModuleKey *key, long index, RedisModuleString *value) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ListInsert)(RedisModuleKey *key, long index, RedisModuleString *value) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ListDelete)(RedisModuleKey *key, long index) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_FlushExecutionUnit)(RedisModuleCtx *ctx, unsigned int flags) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleCallReply * (*RedisModule_Call)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) REDISMODULE_ATTR;
REDISMODULE_API const char * (*RedisModule_CallReplyProto)(RedisModuleCallReply *reply, size_t *len) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_FreeCallReply)(RedisModuleCallReply *reply) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1098,7 +1102,7 @@ REDISMODULE_API int (*RedisModule_SetClientNameById)(uint64_t id, RedisModuleStr
REDISMODULE_API int (*RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_PublishMessageShard)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetContextFlags)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AvoidReplicaTraffic)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AvoidReplicaTraffic)(void) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleType * (*RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1201,17 +1205,17 @@ REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeys)(Redi
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeysWithFlags)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata, int flags) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleString * (*RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleScanCursor * (*RedisModule_ScanCursorCreate)() REDISMODULE_ATTR;
REDISMODULE_API RedisModuleScanCursor * (*RedisModule_ScanCursorCreate)(void) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_ScanCursorRestart)(RedisModuleScanCursor *cursor) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cursor) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetContextFlagsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetContextFlagsAll)(void) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)(void) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)(void) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_IsSubEventSupported)(RedisModuleEvent event, uint64_t subevent) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetServerVersion)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetTypeMethodVersion)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetServerVersion)(void) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetTypeMethodVersion)(void) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_Yield)(RedisModuleCtx *ctx, int flags, const char *busy_reply) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_BlockClientGetPrivateData)(RedisModuleBlockedClient *blocked_client) REDISMODULE_ATTR;
Expand All @@ -1234,7 +1238,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx)
REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_RegisterClusterMessageReceiver)(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_SendClusterMessage)(RedisModuleCtx *ctx, const char *target_id, uint8_t type, const char *msg, uint32_t len) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1263,7 +1267,7 @@ REDISMODULE_API int (*RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *use
REDISMODULE_API void (*RedisModule_SendChildHeartbeat)(double progress) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ExitFromChild)(int retcode) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_KillForkChild)(int child_pid) REDISMODULE_ATTR;
REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)() REDISMODULE_ATTR;
REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)(void) REDISMODULE_ATTR;
REDISMODULE_API size_t (*RedisModule_MallocSize)(void* ptr) REDISMODULE_ATTR;
REDISMODULE_API size_t (*RedisModule_MallocUsableSize)(void *ptr) REDISMODULE_ATTR;
REDISMODULE_API size_t (*RedisModule_MallocSizeString)(RedisModuleString* str) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1379,6 +1383,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(StringToStreamID);
REDISMODULE_GET_API(Call);
REDISMODULE_GET_API(CallReplyProto);
REDISMODULE_GET_API(FlushExecutionUnit);
REDISMODULE_GET_API(FreeCallReply);
REDISMODULE_GET_API(CallReplyInteger);
REDISMODULE_GET_API(CallReplyDouble);
Expand Down
12 changes: 12 additions & 0 deletions src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,18 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr,
exitRedisModuleCall();
rr->entered_eval = old_entered_eval;

/* Process blocked clients immediately after an RM_Call().
*
* e.g. If above RM_Call() does LPUSH to a list, normally, we can reply
* to a blocked client waiting on that list. Though, Redis will not
* serve blocked clients until the end of its event loop iteration. This
* is not deterministic and RedisRaft nodes might diverge. For this
* reason, we enforce blocking client handling after each RM_Call() to
* make it deterministic in the cluster.
*/
int ret = RedisModule_FlushExecutionUnit(ctx, REDISMODULE_FLUSH_EXEC_UNIT_FLAG_DEFAULT);
RedisModule_Assert(ret == 0);

if (RedisModule_CallReplyType(reply) != REDISMODULE_REPLY_PROMISE) {
/* reply to client if we didn't block */
if (req) {
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/test_blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,6 @@ def test_blocking_with_disconnect(cluster):
assert val == [b'1']


@pytest.mark.skip(reason="skipping, as promise/handler isn't atomic "
"and abort is not working as expected")
def test_blocking_with_timeout_after_unblock(cluster):
cluster.create(3)

Expand Down