fix memory leak
This commit is contained in:
parent
d653da7971
commit
004a84ac81
|
@ -829,8 +829,6 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
|||
|
||||
tDecoderInit(pCoder, pReq, len);
|
||||
tDecodeDeleteRes(pCoder, pRes);
|
||||
/*ASSERT(pRes->skey != 0);*/
|
||||
/*ASSERT(pRes->ekey != 0);*/
|
||||
tDecoderClear(pCoder);
|
||||
|
||||
int32_t sz = taosArrayGetSize(pRes->uidList);
|
||||
|
@ -859,6 +857,8 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
|||
colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pRes->uidList);
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
||||
|
@ -890,6 +890,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
|||
streamTaskInputFail(pTask);
|
||||
}
|
||||
}
|
||||
blockDataDestroy(pDelBlock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -1107,8 +1107,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
|
||||
tDecoderInit(pCoder, pReq, len);
|
||||
tDecodeDeleteRes(pCoder, pRes);
|
||||
ASSERT(pRes->skey != 0);
|
||||
ASSERT(pRes->ekey != 0);
|
||||
ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0));
|
||||
|
||||
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
|
||||
code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
|
||||
|
|
|
@ -52,7 +52,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
// TODO: if a block was set but not consumed,
|
||||
// prevent setting a different type of block
|
||||
pInfo->validBlockIndex = 0;
|
||||
taosArrayClear(pInfo->pBlockLists);
|
||||
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
|
||||
taosArrayClearP(pInfo->pBlockLists, taosMemoryFree);
|
||||
} else {
|
||||
taosArrayClear(pInfo->pBlockLists);
|
||||
}
|
||||
|
||||
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||
// ASSERT(numOfBlocks > 1);
|
||||
|
|
|
@ -76,11 +76,11 @@ typedef struct STopBotResItem {
|
|||
} STopBotResItem;
|
||||
|
||||
typedef struct STopBotRes {
|
||||
int32_t maxSize;
|
||||
int16_t type;
|
||||
int32_t maxSize;
|
||||
int16_t type;
|
||||
|
||||
STuplePos nullTuplePos;
|
||||
bool nullTupleSaved;
|
||||
STuplePos nullTuplePos;
|
||||
bool nullTupleSaved;
|
||||
|
||||
STopBotResItem* pItems;
|
||||
} STopBotRes;
|
||||
|
@ -223,14 +223,14 @@ typedef struct SMavgInfo {
|
|||
} SMavgInfo;
|
||||
|
||||
typedef struct SSampleInfo {
|
||||
int32_t samples;
|
||||
int32_t totalPoints;
|
||||
int32_t numSampled;
|
||||
uint8_t colType;
|
||||
int16_t colBytes;
|
||||
int32_t samples;
|
||||
int32_t totalPoints;
|
||||
int32_t numSampled;
|
||||
uint8_t colType;
|
||||
int16_t colBytes;
|
||||
|
||||
STuplePos nullTuplePos;
|
||||
bool nullTupleSaved;
|
||||
STuplePos nullTuplePos;
|
||||
bool nullTupleSaved;
|
||||
|
||||
char* data;
|
||||
STuplePos* tuplePos;
|
||||
|
@ -1147,7 +1147,7 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
}
|
||||
|
||||
static STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock);
|
||||
static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||
static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||
static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos);
|
||||
|
||||
static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) {
|
||||
|
@ -1357,8 +1357,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
|
||||
numOfElems += 1;
|
||||
}
|
||||
} else if (type == TSDB_DATA_TYPE_BIGINT ||
|
||||
type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
} else if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
int64_t* pData = (int64_t*)pCol->pData;
|
||||
int64_t* val = (int64_t*)&pBuf->v;
|
||||
|
||||
|
@ -1581,7 +1580,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
}
|
||||
|
||||
_min_max_over:
|
||||
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved ) {
|
||||
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) {
|
||||
pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
|
||||
pBuf->nullTupleSaved = true;
|
||||
}
|
||||
|
@ -1601,7 +1600,8 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
|
||||
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex);
|
||||
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
|
||||
int32_t rowIndex);
|
||||
|
||||
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
|
@ -1651,7 +1651,7 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
|||
|
||||
if (pCtx->saveHandle.pBuf != NULL) {
|
||||
if (pTuplePos->pageId != -1) {
|
||||
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||
const char* p = loadTupleData(pCtx, pTuplePos);
|
||||
|
||||
bool* nullList = (bool*)p;
|
||||
|
@ -1660,7 +1660,7 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
|||
// todo set the offset value to optimize the performance.
|
||||
for (int32_t j = 0; j < numOfCols; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
||||
|
@ -1701,7 +1701,7 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos)
|
|||
char* pData = colDataGetData(pSrcCol, rowIndex);
|
||||
|
||||
// append to dest col
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pCtx->pDstBlock->pDataBlock, dstSlotId);
|
||||
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
||||
|
@ -1712,7 +1712,6 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos)
|
|||
colDataAppend(pDstCol, pos, pData, false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
|
||||
|
@ -2590,8 +2589,8 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo*
|
|||
memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
|
||||
|
||||
qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems,
|
||||
pHisto->numOfEntries, pHisto);
|
||||
qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems, pHisto->numOfEntries,
|
||||
pHisto);
|
||||
} else {
|
||||
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
|
||||
qDebug("%s input histogram, elem:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems,
|
||||
|
@ -2601,8 +2600,8 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo*
|
|||
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
|
||||
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
|
||||
|
||||
qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems,
|
||||
pHisto->numOfEntries, pHisto);
|
||||
qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems, pHisto->numOfEntries,
|
||||
pHisto);
|
||||
tHistogramDestroy(&pRes);
|
||||
}
|
||||
}
|
||||
|
@ -2629,8 +2628,8 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
if (pInfo->algo != APERCT_ALGO_TDIGEST) {
|
||||
qDebug("%s after merge, total:%d, numOfEntry:%d, %p", __FUNCTION__, pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries,
|
||||
pInfo->pHisto);
|
||||
qDebug("%s after merge, total:%d, numOfEntry:%d, %p", __FUNCTION__, pInfo->pHisto->numOfElems,
|
||||
pInfo->pHisto->numOfEntries, pInfo->pHisto);
|
||||
}
|
||||
|
||||
SET_VAL(pResInfo, 1, 1);
|
||||
|
@ -2709,7 +2708,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
|
|||
}
|
||||
|
||||
EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
|
||||
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*) pRes;
|
||||
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*)pRes;
|
||||
|
||||
// not initialized yet, data is required
|
||||
if (pEntry == NULL) {
|
||||
|
@ -2752,7 +2751,8 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde
|
|||
return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
|
||||
}
|
||||
|
||||
static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) {
|
||||
static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
|
||||
SFirstLastRes* pInfo) {
|
||||
if (pCtx->subsidiaries.num <= 0) {
|
||||
return;
|
||||
}
|
||||
|
@ -3176,7 +3176,7 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
|||
static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
pDiffInfo->prev.i64 = *(bool*)pv? 1:0;
|
||||
pDiffInfo->prev.i64 = *(bool*)pv ? 1 : 0;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
pDiffInfo->prev.i64 = *(int8_t*)pv;
|
||||
|
@ -3537,7 +3537,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
* |(n columns, one bit for each column)| src column #1| src column #2|
|
||||
* +------------------------------------+--------------+--------------+
|
||||
*/
|
||||
void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsidiaryResInfo* pSubsidiaryies, char* buf) {
|
||||
void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsidiaryResInfo* pSubsidiaryies,
|
||||
char* buf) {
|
||||
char* nullList = buf;
|
||||
char* pStart = (char*)(nullList + sizeof(bool) * pSubsidiaryies->num);
|
||||
|
||||
|
@ -3585,7 +3586,7 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
|
|||
}
|
||||
}
|
||||
|
||||
p = (STuplePos) {.pageId = pHandle->currentPage, .offset = pPage->num};
|
||||
p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num};
|
||||
memcpy(pPage->data + pPage->num, pBuf, length);
|
||||
|
||||
pPage->num += length;
|
||||
|
@ -3621,7 +3622,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
|
|||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
} else {
|
||||
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -3636,7 +3636,7 @@ static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSD
|
|||
static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) {
|
||||
if (pHandle->pBuf != NULL) {
|
||||
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
|
||||
char* p = pPage->data + pPos->offset;
|
||||
char* p = pPage->data + pPos->offset;
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
return p;
|
||||
} else {
|
||||
|
@ -3980,8 +3980,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
if (pCtx->end.key == INT64_MIN) {
|
||||
pInfo->min = (pInfo->min > ptsList[start + pInput->numOfRows - 1]) ?
|
||||
ptsList[start + pInput->numOfRows - 1] : pInfo->min;
|
||||
pInfo->min =
|
||||
(pInfo->min > ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->min;
|
||||
} else {
|
||||
pInfo->min = pCtx->end.key;
|
||||
}
|
||||
|
@ -3993,8 +3993,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
if (pCtx->end.key == INT64_MIN) {
|
||||
pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ?
|
||||
ptsList[start + pInput->numOfRows - 1] : pInfo->max;
|
||||
pInfo->max =
|
||||
(pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max;
|
||||
} else {
|
||||
pInfo->max = pCtx->end.key + 1;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue