From 0245d82e54990cd78a9128fe5b99745a941c67f1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 14:26:29 +0800 Subject: [PATCH] fix(stream): make sure that the unit test case can work. --- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/mnode/impl/src/mndStreamHb.c | 4 +++- source/dnode/mnode/impl/src/mndStreamUtil.c | 7 ++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ab72996058..2fb0505d86 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2629,7 +2629,7 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) { } void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { - if (pExecInfo->initTaskList) { + if (pExecInfo->initTaskList || pMnode == NULL) { return; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 2cb4111e97..a79fe0cf0a 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -328,7 +328,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mndDropOrphanTasks(pMnode, pOrphanTasks); } - mndStreamStartUpdateCheckpointInfo(pMnode); + if (pMnode != NULL) { // make sure that the unit test case can work + mndStreamStartUpdateCheckpointInfo(pMnode); + } taosThreadMutexUnlock(&execInfo.lock); tCleanupStreamHbMsg(&req); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 7a45ce1f2a..915235ab47 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -801,14 +801,15 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { sdbRelease(pMnode->pSdb, pStream); } - if (taosArrayGetSize(pDropped) > 0) { - for (int32_t i = 0; i < taosArrayGetSize(pDropped); ++i) { + int32_t size = taosArrayGetSize(pDropped); + if (size > 0) { + for (int32_t i = 0; i < size; ++i) { int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i); taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId)); } int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); - mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", numOfStreams); + mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams); } taosArrayDestroy(pDropped);