From 29219ba9f47d343d3840cf04628963f71d2300a9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 7 Jan 2024 01:32:50 +0800 Subject: [PATCH] fix(stream): add task number check. --- include/dnode/vnode/tqCommon.h | 2 +- source/dnode/snode/src/snode.c | 3 ++- source/dnode/vnode/src/tqCommon/tqCommon.c | 14 ++++++++------ source/dnode/vnode/src/vnd/vnodeSync.c | 8 +++++--- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 6c84c3ea09..7265d6f58f 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -32,7 +32,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve bool isLeader, bool restored); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); -int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta); +int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 4ad054be27..613480f4bc 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -145,7 +145,8 @@ FAIL: } int32_t sndInit(SSnode *pSnode) { - tqStreamTaskResetStatus(pSnode->pMeta); + int32_t numOfTasks = 0; + tqStreamTaskResetStatus(pSnode->pMeta, &numOfTasks); streamMetaStartAllTasks(pSnode->pMeta); return 0; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index aed1091c84..6b1c7ccedb 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -706,16 +706,16 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen return 0; } -int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) { +int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) { int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + *numOfTasks = taosArrayGetSize(pMeta->pTaskList); - tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); - if (numOfTasks == 0) { + tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, *numOfTasks); + if (*numOfTasks == 0) { return TSDB_CODE_SUCCESS; } - for (int32_t i = 0; i < numOfTasks; ++i) { + for (int32_t i = 0; i < (*numOfTasks); ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; @@ -767,8 +767,10 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } if (isLeader && !tsDisableStream) { - tqStreamTaskResetStatus(pMeta); + int32_t numOfTasks = 0; + tqStreamTaskResetStatus(pMeta, &numOfTasks); streamMetaWUnLock(pMeta); + streamMetaStartAllTasks(pMeta); } else { streamMetaResetStartInfo(&pMeta->startInfo); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 34ae6623a7..f0ef351184 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -570,10 +570,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) streamMetaWUnLock(pMeta); vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); } else { - vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); - tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta); + vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId); - { + int32_t numOfTasks = 0; + tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta, &numOfTasks); + + if (numOfTasks > 0) { if (pMeta->startInfo.taskStarting == 1) { pMeta->startInfo.restartCount += 1; tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,