fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2024-01-22 11:49:16 +08:00
parent 2cc584ff44
commit db474626e6
1 changed files with 6 additions and 2 deletions

View File

@ -240,7 +240,9 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
} }
pRow = sdbAllocRow(sizeof(SStreamObj)); pRow = sdbAllocRow(sizeof(SStreamObj));
if (pRow == NULL) goto STREAM_DECODE_OVER; if (pRow == NULL) {
goto STREAM_DECODE_OVER;
}
pStream = sdbGetRowObj(pRow); pStream = sdbGetRowObj(pRow);
if (pStream == NULL) { if (pStream == NULL) {
@ -2820,10 +2822,10 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
pStream->uid, transId); pStream->uid, transId);
code = createStreamResetStatusTrans(pMnode, pStream); code = createStreamResetStatusTrans(pMnode, pStream);
mndReleaseStream(pMnode, pStream);
} }
} }
mndReleaseStream(pMnode, pStream);
return code; return code;
} }
@ -3025,6 +3027,7 @@ SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return pStream; return pStream;
} }
sdbRelease(pSdb, pStream);
} }
return NULL; return NULL;
@ -3097,5 +3100,6 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
return 0; return 0;
} }