fix(stream): fix memory leak if error occurs.

This commit is contained in:
Haojun Liao 2025-03-01 19:22:26 +08:00
parent 14ffa60e6e
commit d2d7279f0d
1 changed files with 3 additions and 0 deletions

View File

@ -1418,6 +1418,7 @@ int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return code;
}
@ -1429,12 +1430,14 @@ int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_
tbData.pCreateTbReq = NULL;
}
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return code;
}
void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
if (p == NULL) {
tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno));
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return terrno;
}