From e013ba679562aab1e8a69cbf827209300f0f372d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Aug 2023 17:32:34 +0800 Subject: [PATCH] fix(stream): fix the counter. --- source/dnode/vnode/src/tq/tq.c | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index dac5838db0..aed1588a3a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1868,19 +1868,28 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { streamTaskStop(pTask); tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr); - bool allStopped = true; - int32_t numOfCount = streamMetaGetNumOfTasks(pMeta); - for(int32_t i = 0; i < numOfCount; ++i) { - SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); + pMeta->closedTask += 1; - int64_t keys1[2] = {pId->streamId, pId->taskId}; - SStreamTask* p = taosHashGet(pMeta->pTasks, keys1, sizeof(keys1)); - if (p->status.taskStatus != TASK_STATUS__STOP) { - allStopped = false; - tqDebug("vgId:%d, s-task:0x%"PRIx64"-0x%x not updated yet", vgId, keys1[0], pId->taskId); - break; - } + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + bool allStopped = (pMeta->closedTask == numOfTasks); + if (allStopped) { + pMeta->closedTask = 0; + } else { + tqDebug("vgId:%d closed tasks:%d, not closed:%d", vgId, pMeta->closedTask, (numOfTasks - pMeta->closedTask)); } +// bool allStopped = true; +// int32_t numOfCount = streamMetaGetNumOfTasks(pMeta); +// for(int32_t i = 0; i < numOfCount; ++i) { +// SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); +// +// int64_t keys1[2] = {pId->streamId, pId->taskId}; +// SStreamTask** p = taosHashGet(pMeta->pTasks, keys1, sizeof(keys1)); +// if ((*p)->status.taskStatus != TASK_STATUS__STOP) { +// allStopped = false; +// tqDebug("vgId:%d, s-task:0x%"PRIx64"-0x%x not updated yet", vgId, keys1[0], pId->taskId); +// break; +// } +// } taosWUnLockLatch(&pMeta->lock); @@ -1888,6 +1897,7 @@ _end: tDecoderClear(&decoder); if (allStopped) { + if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId); } else {