fix(stream): fix the counter.

This commit is contained in:
Haojun Liao 2023-08-25 17:32:34 +08:00
parent f951635602
commit e013ba6795
1 changed files with 21 additions and 11 deletions

View File

@ -1868,19 +1868,28 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
streamTaskStop(pTask);
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
bool allStopped = true;
int32_t numOfCount = streamMetaGetNumOfTasks(pMeta);
for(int32_t i = 0; i < numOfCount; ++i) {
SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
pMeta->closedTask += 1;
int64_t keys1[2] = {pId->streamId, pId->taskId};
SStreamTask* p = taosHashGet(pMeta->pTasks, keys1, sizeof(keys1));
if (p->status.taskStatus != TASK_STATUS__STOP) {
allStopped = false;
tqDebug("vgId:%d, s-task:0x%"PRIx64"-0x%x not updated yet", vgId, keys1[0], pId->taskId);
break;
}
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
bool allStopped = (pMeta->closedTask == numOfTasks);
if (allStopped) {
pMeta->closedTask = 0;
} else {
tqDebug("vgId:%d closed tasks:%d, not closed:%d", vgId, pMeta->closedTask, (numOfTasks - pMeta->closedTask));
}
// bool allStopped = true;
// int32_t numOfCount = streamMetaGetNumOfTasks(pMeta);
// for(int32_t i = 0; i < numOfCount; ++i) {
// SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
//
// int64_t keys1[2] = {pId->streamId, pId->taskId};
// SStreamTask** p = taosHashGet(pMeta->pTasks, keys1, sizeof(keys1));
// if ((*p)->status.taskStatus != TASK_STATUS__STOP) {
// allStopped = false;
// tqDebug("vgId:%d, s-task:0x%"PRIx64"-0x%x not updated yet", vgId, keys1[0], pId->taskId);
// break;
// }
// }
taosWUnLockLatch(&pMeta->lock);
@ -1888,6 +1897,7 @@ _end:
tDecoderClear(&decoder);
if (allStopped) {
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
} else {