From 2de0f6e9713fb56c04cfa34ba505cfe3e2cf8d3d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Dec 2023 18:56:15 +0800 Subject: [PATCH] test(stream): lock the update procedure. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 63 +++------------------- 1 file changed, 8 insertions(+), 55 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 0d932e6e72..c39e653596 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -106,14 +106,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM return rsp.code; } - streamMetaWUnLock(pMeta); +// streamMetaWUnLock(pMeta); + // todo for test purpose // the following two functions should not be executed within the scope of meta lock to avoid deadlock streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskResetStatus(pTask); // continue after lock the meta again - streamMetaWLock(pMeta); +// streamMetaWLock(pMeta); SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -166,65 +167,17 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaWUnLock(pMeta); } else { if (!restored) { - tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", - vgId); + tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.tasksWillRestart = 0; streamMetaWUnLock(pMeta); } else { tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); -#if 1 +#if 0 + // for test purpose, to trigger the leader election + taosMSleep(5000); +#endif tqStreamTaskStartAsync(pMeta, cb, true); streamMetaWUnLock(pMeta); -#else - streamMetaWUnLock(pMeta); - - // For debug purpose. - // the following procedure consume many CPU resource, result in the re-election of leader - // with high probability. So we employ it as a test case for the stream processing framework, with - // checkpoint/restart/nodeUpdate etc. - while (1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); - if (startVal == 0) { - break; - } - - tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); - taosMsleep(500); - } - - while (streamMetaTaskInTimer(pMeta)) { - tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - streamMetaWLock(pMeta); - - int32_t code = streamMetaReopen(pMeta); - if (code != 0) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - streamMetaInitBackend(pMeta); - - if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { - tqError("vgId:%d failed to load stream tasks", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - tqInfo("vgId:%d start all stream tasks after all being updated", vgId); - tqStreamTaskResetStatus(pTq->pStreamMeta); - tqStartStreamTaskAsync(pTq, false); - } else { - tqInfo("vgId:%d, follower node not start stream tasks", vgId); - } - streamMetaWUnLock(pMeta); -#endif } }