Merge pull request #22555 from taosdata/fix/TD-25948

mem leak
This commit is contained in:
Haojun Liao 2023-08-25 09:28:18 +08:00 committed by GitHub
commit f805a49df2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 14 additions and 6 deletions

View File

@ -1340,8 +1340,9 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
void* pData = colDataGetData(pSrc, rowIdx);
bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
void* pData = NULL;
if (!isNull) pData = colDataGetData(pSrc, rowIdx);
colDataSetVal(pDst, 0, pData, isNull);
}

View File

@ -1564,6 +1564,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
if (pStream->status != STREAM_STATUS__PAUSE) {
sdbRelease(pMnode->pSdb, pStream);
return 0;
}

View File

@ -839,6 +839,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
(pFillSup->next.key == pFillInfo->nextRowKey && !hasPrevWindow(pFillSup)))) {
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
resetFillWindow(&pFillSup->prev);
pFillSup->prev.key = pFillSup->cur.key;
pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
} else if (hasPrevWindow(pFillSup)) {
@ -1231,8 +1232,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
SWinKey nextKey = {.groupId = groupId, .ts = ts};
while (pInfo->srcDelRowIndex < pBlock->info.rows) {
void* nextVal = NULL;
int32_t nextLen = 0;
TSKEY delTs = tsStarts[pInfo->srcDelRowIndex];
uint64_t delGroupId = groupIds[pInfo->srcDelRowIndex];
int32_t code = TSDB_CODE_SUCCESS;
@ -1247,7 +1246,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
if (delTs == nextKey.ts) {
code = pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur);
if (code == TSDB_CODE_SUCCESS) {
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextLen);
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, NULL, NULL);
}
// ts will be deleted later
if (delTs != ts) {

View File

@ -972,7 +972,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
char* pSrcData = colDataGetData(pSrcCol, rowIndex);
char* pSrcData = NULL;
if (!isNull) pSrcData = colDataGetData(pSrcCol, rowIndex);
colDataSetVal(pDestCol, pDest->info.rows, pSrcData, isNull);
}
pDest->info.rows++;

View File

@ -372,6 +372,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
printDataBlock(p, "project");
}
return (p->info.rows > 0) ? p : NULL;
}

View File

@ -1247,7 +1247,8 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
bool isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
char* pSrcData = colDataGetData(pSrcCol, i);
char* pSrcData = NULL;
if (!isNull) pSrcData = colDataGetData(pSrcCol, i);
colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
}
pResult->info.rows++;

View File

@ -2922,6 +2922,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
blockDataDestroy(pInfo->pUpdateRes);
tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
taosArrayDestroy(pInfo->historyWins);
taosMemoryFreeClear(param);