fix(stream): fix memory leak.
This commit is contained in:
parent
45880f70d4
commit
5ab739ae46
|
@ -24,12 +24,12 @@ typedef struct STableSinkInfo {
|
||||||
tstr name;
|
tstr name;
|
||||||
} STableSinkInfo;
|
} STableSinkInfo;
|
||||||
|
|
||||||
|
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
|
||||||
|
SSubmitTbData* pTableData);
|
||||||
static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
|
static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
|
||||||
SSubmitTbData* pTableData);
|
SSubmitTbData* pTableData);
|
||||||
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
||||||
int64_t suid);
|
int64_t suid);
|
||||||
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
|
|
||||||
SSubmitTbData* pTableData);
|
|
||||||
static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
|
static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
|
||||||
|
|
||||||
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||||
|
@ -303,6 +303,7 @@ static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData*
|
||||||
|
|
||||||
if (pNewRow->ts == pOldRow->ts) {
|
if (pNewRow->ts == pOldRow->ts) {
|
||||||
k += 1;
|
k += 1;
|
||||||
|
tRowDestroy(pOldRow);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
taosArrayPush(pFinal, &pOldRow);
|
taosArrayPush(pFinal, &pOldRow);
|
||||||
|
@ -803,6 +804,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock);
|
pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock);
|
||||||
if (pTableData->pCreateTbReq == NULL) {
|
if (pTableData->pCreateTbReq == NULL) {
|
||||||
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
|
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
|
||||||
|
taosMemoryFree(pTableSinkInfo);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -827,7 +829,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
|
int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
|
||||||
SSubmitTbData* pTableData) {
|
SSubmitTbData* pTableData) {
|
||||||
int32_t numOfRows = pDataBlock->info.rows;
|
int32_t numOfRows = pDataBlock->info.rows;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue