fix(query): fix memory leak.
This commit is contained in:
parent
2da5a6106e
commit
7c7b2f1cef
|
@ -287,12 +287,17 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
hasRes = true;
|
||||
p->ts = pColVal->ts;
|
||||
|
||||
uint8_t* px = p->colVal.value.pData;
|
||||
p->colVal = pColVal->colVal;
|
||||
if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
p->colVal = pColVal->colVal;
|
||||
} else {
|
||||
if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
|
||||
memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
}
|
||||
|
||||
if (COL_VAL_IS_VALUE(&pColVal->colVal) && IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
p->colVal.value.pData = px;
|
||||
memcpy(px, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
p->colVal.value.nData = pColVal->colVal.value.nData;
|
||||
p->colVal.type = pColVal->colVal.type;
|
||||
p->colVal.flag = pColVal->colVal.flag;
|
||||
p->colVal.cid = pColVal->colVal.cid;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -305,7 +305,7 @@ static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
|
|||
}
|
||||
|
||||
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
|
||||
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
|
||||
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) {
|
||||
// allocate buffer in order to load data blocks from file
|
||||
// todo use simple hash instead, optimize the memory consumption
|
||||
SHashObj* pTableMap =
|
||||
|
@ -315,10 +315,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
|||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
initBlockScanInfoBuf(&pTsdbReader->blockInfoBuf, numOfTables);
|
||||
initBlockScanInfoBuf(pBuf, numOfTables);
|
||||
|
||||
for (int32_t j = 0; j < numOfTables; ++j) {
|
||||
STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(&pTsdbReader->blockInfoBuf, j);
|
||||
STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
|
||||
pScanInfo->uid = idList[j].uid;
|
||||
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
||||
int64_t skey = pTsdbReader->window.skey;
|
||||
|
@ -3785,9 +3785,9 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
|||
}
|
||||
|
||||
STsdbReader* p = (pReader->innerReader[0] != NULL)? pReader->innerReader[0]:pReader;
|
||||
pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
|
||||
pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables);
|
||||
if (pReader->status.pTableMap == NULL) {
|
||||
tsdbReaderClose(pReader);
|
||||
tsdbReaderClose(p);
|
||||
*ppReader = NULL;
|
||||
|
||||
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
|
|
|
@ -157,9 +157,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
|
||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
|
||||
|
||||
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
|
||||
bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes);
|
||||
colDataAppend(pDst, 0, p, isNull);
|
||||
if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
|
||||
colDataAppendNULL(pDst, 0);
|
||||
} else {
|
||||
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
|
||||
colDataAppend(pDst, 0, p, false);
|
||||
}
|
||||
}
|
||||
|
||||
pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
|
||||
|
@ -226,6 +229,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
|
||||
pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
|
||||
return pInfo->pRes;
|
||||
} else {
|
||||
pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6568,7 +6568,9 @@ int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
|
|||
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
||||
numOfElems++;
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
|
||||
char* data = isNull ? NULL : colDataGetData(pInputCol, i);
|
||||
|
||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||
doSaveLastrow(pCtx, data, i, cts, pInfo);
|
||||
|
|
Loading…
Reference in New Issue