fix(stream): remove invalid cached task info in buffer.

This commit is contained in:
Haojun Liao 2023-09-20 19:19:04 +08:00
parent 1f45bd82d2
commit a3d13d6634
2 changed files with 53 additions and 0 deletions

View File

@ -2179,6 +2179,56 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
}
}
static int32_t doRemoveFromTask(SStreamExecNodeInfo* pExecNode, STaskId* pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId* pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t) pRemovedId->taskId,
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
break;
}
}
}
return 0;
}
static int32_t removeInvalidStreamTask(SArray *pNodeSnapshot) {
SArray* pRemoveTaskList = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execNodeList.pTaskList);
int32_t numOfVgroups = taosArrayGetSize(pNodeSnapshot);
for(int32_t i = 0; i < numOfTask; ++i) {
STaskId* pId = taosArrayGet(execNodeList.pTaskList, i);
STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, pId, sizeof(*pId));
bool existed = false;
for(int32_t j = 0; j < numOfVgroups; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(pNodeSnapshot, j);
if (pNodeEntry->nodeId == pEntry->nodeId) {
existed = true;
break;
}
}
if (!existed) {
taosArrayPush(pRemoveTaskList, pId);
}
}
for(int32_t i = 0; i < taosArrayGetSize(pRemoveTaskList); ++i) {
STaskId* pId = taosArrayGet(pRemoveTaskList, i);
doRemoveFromTask(&execNodeList, pId);
}
return 0;
}
// this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t code = 0;
@ -2210,6 +2260,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
taosThreadMutexLock(&execNodeList.lock);
removeInvalidStreamTask(pNodeSnapshot);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
code = mndProcessVgroupChange(pMnode, &changeInfo);

View File

@ -155,6 +155,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
if (code == 0) {
streamDispatchStreamBlock(pTask);
} else {
stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
streamFreeQitem((SStreamQueueItem*)pBlock);
}