From ea549364d490f82fe7c6bbcf0ae9bc04c5c8e7b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Feb 2025 16:38:26 +0800 Subject: [PATCH] fix(stream): free array and check the return values. --- source/dnode/mnode/impl/src/mndStream.c | 6 ++++++ source/dnode/mnode/impl/src/mndStreamTrans.c | 10 ++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4cd4721bc4..18e404564d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1328,6 +1328,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t size = taosArrayGetSize(pList); if (size == 0) { taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); + return code; } @@ -1340,6 +1342,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { "checkpoint trans are not allowed, wait for 30s", numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint); taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); + return code; } @@ -1379,6 +1383,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { } taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); + return code; } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index fb990c6fe9..f4f7c65a00 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -73,7 +73,10 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, mInfo("long chkpt transId:%d, start:%" PRId64 " exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status", pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0)); - taosArrayPush(pLongChkptTrans, pEntry); + void* p = taosArrayPush(pLongChkptTrans, pEntry); + if (p == NULL) { + mError("failed to add long checkpoint trans, transId:%d, code:%s", pEntry->transId, tstrerror(terrno)); + } } } mndReleaseTrans(pMnode, pTrans); @@ -399,7 +402,10 @@ void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) { mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name, pTrans->streamId, pTrans->transId, p->checkpointId); - mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); + code = mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); + if (code) { + mError("stream:%s 0x%"PRIx64" failed to create reset stream task, code:%s", p->name, p->uid, tstrerror(code)); + } sdbRelease(pMnode->pSdb, p); } }