fix(stream): do NOT remove the stream if the stream object is not accessiable, since the deploy trans may not completed yet.

This commit is contained in:
Haojun Liao 2024-06-11 17:13:44 +08:00
parent 781b644a8c
commit b22679c941
1 changed files with 1 additions and 13 deletions

View File

@ -1115,30 +1115,18 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
continue;
}
// for stopped stream task entry, we do additional check for it
if (pEntry->status == TASK_STATUS__STOP) {
bool invalid = false;
for(int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
STaskId* pId = taosArrayGet(pInvalidList, j);
if (pEntry->id.streamId == pId->streamId) {
taosArrayPush(pInvalidList, &pEntry->id);
invalid = true;
break;
}
}
if (!invalid) {
SStreamObj *pObj = mndGetStreamObj(pMnode, pEntry->id.streamId);
if (pObj == NULL) {
taosArrayPush(pInvalidList, &pEntry->id);
} else {
mndReleaseStream(pMnode, pObj);
}
}
}
if (pEntry->status != TASK_STATUS__READY) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued",
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
ready = false;
}