fix:memory leak in doFilter
This commit is contained in:
parent
324977a981
commit
7ab66e5b6b
|
@ -1268,12 +1268,12 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
||||||
|
|
||||||
void colDataDestroy(SColumnInfoData* pColData) {
|
void colDataDestroy(SColumnInfoData* pColData) {
|
||||||
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
|
||||||
taosMemoryFree(pColData->varmeta.offset);
|
taosMemoryFreeClear(pColData->varmeta.offset);
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(pColData->nullbitmap);
|
taosMemoryFreeClear(pColData->nullbitmap);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pColData->pData);
|
taosMemoryFreeClear(pColData->pData);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
|
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
|
||||||
|
|
|
@ -746,7 +746,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan
|
||||||
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
|
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
|
||||||
|
|
||||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree);
|
||||||
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
||||||
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
|
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
|
||||||
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
|
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
|
||||||
|
|
|
@ -1818,9 +1818,9 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
|
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, bool needFree);
|
||||||
|
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree) {
|
||||||
if (pFilterNode == NULL) {
|
if (pFilterNode == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1839,11 +1839,11 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
||||||
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
|
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
|
||||||
filterFreeInfo(filter);
|
filterFreeInfo(filter);
|
||||||
|
|
||||||
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
|
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep, needFree);
|
||||||
blockDataUpdateTsWindow(pBlock, 0);
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
|
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, bool needFree) {
|
||||||
if (keep) {
|
if (keep) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1883,9 +1883,18 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
||||||
ASSERT(pBlock->info.rows == numOfRows);
|
ASSERT(pBlock->info.rows == numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
SColumnInfoData tmp = *pSrc;
|
if(needFree){
|
||||||
*pSrc = *pDst;
|
SColumnInfoData tmp = *pSrc;
|
||||||
*pDst = tmp;
|
*pSrc = *pDst;
|
||||||
|
*pDst = tmp;
|
||||||
|
}else{
|
||||||
|
if(IS_VAR_DATA_TYPE(pDst->info.type)){ // this elements do not need free
|
||||||
|
pDst->varmeta.offset = NULL;
|
||||||
|
}else{
|
||||||
|
pDst->nullbitmap = NULL;
|
||||||
|
}
|
||||||
|
pDst->pData = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
blockDataDestroy(px); // fix memory leak
|
blockDataDestroy(px); // fix memory leak
|
||||||
} else {
|
} else {
|
||||||
|
@ -3643,7 +3652,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
doFilter(pProjectInfo->pFilterNode, pBlock);
|
doFilter(pProjectInfo->pFilterNode, pBlock, true);
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
|
|
@ -359,7 +359,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
doFilter(pInfo->pCondition, pRes);
|
doFilter(pInfo->pCondition, pRes, true);
|
||||||
|
|
||||||
bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo);
|
bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo);
|
||||||
if (!hasRemain) {
|
if (!hasRemain) {
|
||||||
|
|
|
@ -265,7 +265,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
doFilter(pTableScanInfo->pFilterNode, pBlock);
|
doFilter(pTableScanInfo->pFilterNode, pBlock, false);
|
||||||
|
|
||||||
int64_t et = taosGetTimestampMs();
|
int64_t et = taosGetTimestampMs();
|
||||||
pTableScanInfo->readRecorder.filterTime += (et - st);
|
pTableScanInfo->readRecorder.filterTime += (et - st);
|
||||||
|
@ -944,7 +944,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||||
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
|
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
doFilter(pInfo->pCondition, pInfo->pRes);
|
doFilter(pInfo->pCondition, pInfo->pRes, false);
|
||||||
blockDataUpdateTsWindow(pInfo->pRes, 0);
|
blockDataUpdateTsWindow(pInfo->pRes, 0);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1716,7 +1716,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRes->info.rows = count;
|
pRes->info.rows = count;
|
||||||
doFilter(pInfo->pFilterNode, pRes);
|
doFilter(pInfo->pFilterNode, pRes, true);
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue