fix(stream): remove invalid cached task info in buffer.
This commit is contained in:
parent
6633fb971b
commit
92f18989df
|
@ -2179,6 +2179,56 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t doRemoveFromTask(SStreamExecNodeInfo* pExecNode, STaskId* pRemovedId) {
|
||||
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
|
||||
|
||||
if (p != NULL) {
|
||||
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
|
||||
|
||||
for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
|
||||
STaskId* pId = taosArrayGet(pExecNode->pTaskList, k);
|
||||
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
|
||||
taosArrayRemove(pExecNode->pTaskList, k);
|
||||
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t) pRemovedId->taskId,
|
||||
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t removeInvalidStreamTask(SArray *pNodeSnapshot) {
|
||||
SArray* pRemoveTaskList = taosArrayInit(4, sizeof(STaskId));
|
||||
|
||||
int32_t numOfTask = taosArrayGetSize(execNodeList.pTaskList);
|
||||
int32_t numOfVgroups = taosArrayGetSize(pNodeSnapshot);
|
||||
for(int32_t i = 0; i < numOfTask; ++i) {
|
||||
STaskId* pId = taosArrayGet(execNodeList.pTaskList, i);
|
||||
STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, pId, sizeof(*pId));
|
||||
|
||||
bool existed = false;
|
||||
for(int32_t j = 0; j < numOfVgroups; ++j) {
|
||||
SNodeEntry* pNodeEntry = taosArrayGet(pNodeSnapshot, j);
|
||||
if (pNodeEntry->nodeId == pEntry->nodeId) {
|
||||
existed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!existed) {
|
||||
taosArrayPush(pRemoveTaskList, pId);
|
||||
}
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pRemoveTaskList); ++i) {
|
||||
STaskId* pId = taosArrayGet(pRemoveTaskList, i);
|
||||
doRemoveFromTask(&execNodeList, pId);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// this function runs by only one thread, so it is not multi-thread safe
|
||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
|
@ -2210,6 +2260,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
|||
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
|
||||
|
||||
taosThreadMutexLock(&execNodeList.lock);
|
||||
removeInvalidStreamTask(pNodeSnapshot);
|
||||
|
||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
|
||||
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
||||
code = mndProcessVgroupChange(pMnode, &changeInfo);
|
||||
|
|
|
@ -155,6 +155,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
|
|||
if (code == 0) {
|
||||
streamDispatchStreamBlock(pTask);
|
||||
} else {
|
||||
stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue