fix(stream): free array and check the return values.

This commit is contained in:
Haojun Liao 2025-02-21 16:38:26 +08:00
parent 4edc21e446
commit ea549364d4
2 changed files with 14 additions and 2 deletions

View File

@ -1328,6 +1328,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
int32_t size = taosArrayGetSize(pList);
if (size == 0) {
taosArrayDestroy(pList);
taosArrayDestroy(pLongChkpts);
return code;
}
@ -1340,6 +1342,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
"checkpoint trans are not allowed, wait for 30s",
numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
taosArrayDestroy(pList);
taosArrayDestroy(pLongChkpts);
return code;
}
@ -1379,6 +1383,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
}
taosArrayDestroy(pList);
taosArrayDestroy(pLongChkpts);
return code;
}

View File

@ -73,7 +73,10 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt,
mInfo("long chkpt transId:%d, start:%" PRId64
" exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status",
pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0));
taosArrayPush(pLongChkptTrans, pEntry);
void* p = taosArrayPush(pLongChkptTrans, pEntry);
if (p == NULL) {
mError("failed to add long checkpoint trans, transId:%d, code:%s", pEntry->transId, tstrerror(terrno));
}
}
}
mndReleaseTrans(pMnode, pTrans);
@ -399,7 +402,10 @@ void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) {
mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name,
pTrans->streamId, pTrans->transId, p->checkpointId);
mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId);
code = mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId);
if (code) {
mError("stream:%s 0x%"PRIx64" failed to create reset stream task, code:%s", p->name, p->uid, tstrerror(code));
}
sdbRelease(pMnode->pSdb, p);
}
}