fix(stream): make sure that the unit test case can work.

This commit is contained in:
Haojun Liao 2024-06-11 14:26:29 +08:00
parent 00eb621825
commit 0245d82e54
3 changed files with 8 additions and 5 deletions

View File

@ -2629,7 +2629,7 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
}
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
if (pExecInfo->initTaskList) {
if (pExecInfo->initTaskList || pMnode == NULL) {
return;
}

View File

@ -328,7 +328,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
mndDropOrphanTasks(pMnode, pOrphanTasks);
}
mndStreamStartUpdateCheckpointInfo(pMnode);
if (pMnode != NULL) { // make sure that the unit test case can work
mndStreamStartUpdateCheckpointInfo(pMnode);
}
taosThreadMutexUnlock(&execInfo.lock);
tCleanupStreamHbMsg(&req);

View File

@ -801,14 +801,15 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
sdbRelease(pMnode->pSdb, pStream);
}
if (taosArrayGetSize(pDropped) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pDropped); ++i) {
int32_t size = taosArrayGetSize(pDropped);
if (size > 0) {
for (int32_t i = 0; i < size; ++i) {
int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i);
taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId));
}
int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", numOfStreams);
mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
}
taosArrayDestroy(pDropped);