From 980382d4334b6f2b11dc9dde59f7d1d44d3c980c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 11:18:08 +0800 Subject: [PATCH] fix(tsdb): fix memory error. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 22 ++++++++++-------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 1 + source/libs/executor/src/cachescanoperator.c | 24 ++++++++++---------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index dd5da28b6b..d5f3624851 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -57,7 +57,6 @@ static void saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, c static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, const int32_t* dstSlotIds, void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; - // bool allNullRow = true; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { uint64_t ts = TSKEY_MIN; @@ -108,11 +107,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p } } - // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it + // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to subtract it p->hasResult = true; varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE); colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false); } + for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx); if (idx < funcTypeBlockArray->size) { @@ -233,6 +233,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, if (IS_VAR_DATA_TYPE(pPkCol->type)) { p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes); } + + p->pkColumn = *pPkCol; } if (numOfTables == 0) { @@ -366,15 +368,15 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 goto _end; } - for (int32_t j = 0; j < pr->numOfCols; ++j) { - int32_t bytes; - if (slotIds[j] == -1) { - bytes = 1; - } else { - bytes = pr->pSchema->columns[slotIds[j]].bytes; - } + int32_t pkBufLen = 0; + if (pr->rowKey.numOfPKs > 0) { + pkBufLen = pr->pkColumn.bytes; + } - pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE); + for (int32_t j = 0; j < pr->numOfCols; ++j) { + int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes; + + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + pkBufLen + VARSTR_HEADER_SIZE); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); p->ts = INT64_MIN; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index af7d00e019..bece22adad 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -382,6 +382,7 @@ typedef struct SCacheRowsReader { SArray* pFuncTypeList; __compar_fn_t pkComparFn; SRowKey rowKey; + SColumnInfo pkColumn; } SCacheRowsReader; int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 56052434a4..23e873d335 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -40,7 +40,7 @@ typedef struct SCacheRowsScanInfo { SExprSupp pseudoExprSup; int32_t retrieveType; int32_t currentGroupIndex; - SSDataBlock* pBufferredRes; + SSDataBlock* pBufferedRes; SArray* pUidList; SArray* pCidList; int32_t indexOfBufferedRes; @@ -160,9 +160,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe capacity = TMIN(totalTables, 4096); - pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); - setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode); - blockDataEnsureCapacity(pInfo->pBufferredRes, capacity); + pInfo->pBufferedRes = createOneDataBlock(pInfo->pRes, false); + setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode); + blockDataEnsureCapacity(pInfo->pBufferedRes, capacity); } else { // by tags pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull); capacity = 1; // only one row output @@ -219,18 +219,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) { - blockDataCleanup(pInfo->pBufferredRes); + if (pInfo->indexOfBufferedRes >= pInfo->pBufferedRes->info.rows) { + blockDataCleanup(pInfo->pBufferedRes); taosArrayClear(pInfo->pUidList); - int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, + int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } // check for tag values - int32_t resultRows = pInfo->pBufferredRes->info.rows; + int32_t resultRows = pInfo->pBufferedRes->info.rows; // the results may be null, if last values are all null ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList)); @@ -239,12 +239,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->pRes; - if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) { + if (pInfo->indexOfBufferedRes < pInfo->pBufferedRes->info.rows) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferedRes->pDataBlock); ++i) { SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i); int32_t slotId = pCol->info.slotId; - SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); + SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferedRes->pDataBlock, slotId); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId); if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) { @@ -350,7 +350,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { void destroyCacheScanOperator(void* param) { SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param; blockDataDestroy(pInfo->pRes); - blockDataDestroy(pInfo->pBufferredRes); + blockDataDestroy(pInfo->pBufferedRes); taosMemoryFree(pInfo->pSlotIds); taosMemoryFree(pInfo->pDstSlotIds); taosArrayDestroy(pInfo->pCidList);