diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index eabf4d8e4a..68856e11f1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -125,7 +125,7 @@ int32_t mndInitStream(SMnode *pMnode) { taosThreadMutexInit(&execNodeList.lock, NULL); execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); - execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry)); + execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskId)); return sdbSetTable(pMnode->pSdb, table); } @@ -1183,10 +1183,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { taosThreadMutexLock(&execNodeList.lock); for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { - STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i); - if (p->status != TASK_STATUS__NORMAL) { + STaskId *p = taosArrayGet(execNodeList.pTaskList, i); + STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p)); + if (pEntry == NULL) { + continue; + } + + if (pEntry->status != TASK_STATUS__NORMAL) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", - p->id.streamId, (int32_t)p->id.taskId, 0, streamGetTaskStatusStr(p->status)); + pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status)); ready = false; break; } @@ -1557,13 +1562,12 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock char status[20 + VARSTR_HEADER_SIZE] = {0}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - int32_t *index = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id)); - if (index == NULL) { + STaskStatusEntry* pe = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id)); + if (pe == NULL) { continue; } - STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); - const char* pStatus = streamGetTaskStatusStr(pStatusEntry->status); + const char* pStatus = streamGetTaskStatusStr(pe->status); STR_TO_VARSTR(status, pStatus); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -2254,10 +2258,8 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNod if (p == NULL) { STaskStatusEntry entry = { .id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; - taosArrayPush(pExecNode->pTaskList, &entry); - - int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1; - taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &ordinal, sizeof(ordinal)); + taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); + taosArrayPush(pExecNode->pTaskList, &id); } } } @@ -2275,11 +2277,21 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecN STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p != NULL) { - taosArrayRemove(pExecNode->pTaskList, *(int32_t*)p); taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); + + for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { + STaskId* pId = taosArrayGet(pExecNode->pTaskList, k); + if (pId->taskId == id.taskId && pId->streamId == id.streamId) { + taosArrayRemove(pExecNode->pTaskList, k); + break; + } + } + } } } + + ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } // todo: this process should be executed by the write queue worker of the mnode @@ -2308,13 +2320,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - int32_t *index = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); - if (index == NULL) { + STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); + if (pEntry == NULL) { + mError("s-task:0x%"PRIx64" not found in mnode task list", p->id.taskId); continue; } - STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); - pStatusEntry->status = p->status; + pEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status)); }