diff --git a/include/libs/function/function.h b/include/libs/function/function.h index d5da306fd2..c8db01625e 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -139,9 +139,8 @@ typedef struct SqlFunctionCtx { struct SExprInfo *pExpr; struct SDiskbasedBuf *pBuf; struct SSDataBlock *pSrcBlock; - struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity + struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity int32_t curBufPage; - bool increase; bool isStream; char udfName[TSDB_FUNC_NAME_LEN]; diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index ef266068cb..57a489c0dd 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -67,10 +67,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId); /** * * @param pBuf - * @param groupId * @return */ -SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId); +SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf); /** * get the specified buffer page by id @@ -101,13 +100,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, struct SPageInfo* pi); */ size_t getTotalBufSize(const SDiskbasedBuf* pBuf); -/** - * get the number of groups in the result buffer - * @param pBuf - * @return - */ -size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf); - /** * destroy result buffer * @param pBuf diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a475e5409a..7c58734b35 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -128,8 +128,10 @@ typedef struct STsdbReader STsdbReader; #define TIMEWINDOW_RANGE_CONTAINED 1 #define TIMEWINDOW_RANGE_EXTERNAL 2 -#define LASTROW_RETRIEVE_TYPE_ALL 0x1 -#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2 +#define CACHESCAN_RETRIEVE_TYPE_ALL 0x1 +#define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2 +#define CACHESCAN_RETRIEVE_LAST_ROW 0x4 +#define CACHESCAN_RETRIEVE_LAST 0x8 int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, @@ -146,9 +148,9 @@ void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); -int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); -int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); -int32_t tsdbLastrowReaderClose(void *pReader); +int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); +int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); +int32_t tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 66843d9a28..ea9a7ec7d9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -18,7 +18,7 @@ #include "tcommon.h" #include "tsdb.h" -typedef struct SLastrowReader { +typedef struct SCacheRowsReader { SVnode* pVnode; STSchema* pSchema; uint64_t uid; @@ -27,9 +27,9 @@ typedef struct SLastrowReader { int32_t type; int32_t tableIndex; // currently returned result tables SArray* pTableList; // table id list -} SLastrowReader; +} SCacheRowsReader; -static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) { +static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) { ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); int32_t numOfRows = pBlock->info.rows; @@ -61,8 +61,10 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade pBlock->info.rows += 1; } -int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) { - SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader)); +int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) { + *pReader = NULL; + + SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -81,9 +83,17 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, p->pTableList = pTableIdList; p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES); + if (p->transferBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) { p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes); + if (p->transferBuf[i] == NULL) { + tsdbCacherowsReaderClose(p); + return TSDB_CODE_OUT_OF_MEMORY; + } } } @@ -91,8 +101,8 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, return TSDB_CODE_SUCCESS; } -int32_t tsdbLastrowReaderClose(void* pReader) { - SLastrowReader* p = pReader; +int32_t tsdbCacherowsReaderClose(void* pReader) { + SCacheRowsReader* p = pReader; if (p->pSchema != NULL) { for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { @@ -107,28 +117,56 @@ int32_t tsdbLastrowReaderClose(void* pReader) { return TSDB_CODE_SUCCESS; } -int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { +static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, STSRow** pRow, LRUHandle** h) { + int32_t code = TSDB_CODE_SUCCESS; + if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) { + code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // no data in the table of Uid + if (*h != NULL) { + *pRow = (STSRow*)taosLRUCacheValue(lruCache, *h); + } + } else { + code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // no data in the table of Uid + if (*h != NULL) { + SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, *h); + tsdbCacheLastArray2Row(pLast, pRow, pr->pSchema); + } + } + + return code; +} + +int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { if (pReader == NULL || pResBlock == NULL) { return TSDB_CODE_INVALID_PARA; } - SLastrowReader* pr = pReader; + SCacheRowsReader* pr = pReader; + int32_t code = TSDB_CODE_SUCCESS; SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache; LRUHandle* h = NULL; STSRow* pRow = NULL; size_t numOfTables = taosArrayGetSize(pr->pTableList); // retrieve the only one last row of all tables in the uid list. - if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) { + if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) { int64_t lastKey = INT64_MIN; bool internalResult = false; for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); - int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); - // int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); - if (code != TSDB_CODE_SUCCESS) { + code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); + if (code != TSDB_CODE_SUCCESS) { return code; } @@ -136,9 +174,6 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t continue; } - pRow = (STSRow*)taosLRUCacheValue(lruCache, h); - // SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h); - // tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema); if (pRow->ts > lastKey) { // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already // appended or not. @@ -155,25 +190,18 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t tsdbCacheRelease(lruCache, h); } - } else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) { + } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); - - int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); - // int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); - if (code != TSDB_CODE_SUCCESS) { + code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); + if (code != TSDB_CODE_SUCCESS) { return code; } - // no data in the table of Uid if (h == NULL) { continue; } - pRow = (STSRow*)taosLRUCacheValue(lruCache, h); - // SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h); - // tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema); - saveOneRow(pRow, pResBlock, pr, slotIds); taosArrayPush(pTableUidList, &pKeyInfo->uid); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0a0ef11774..be8e809a72 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -16,9 +16,9 @@ #include "osDef.h" #include "tsdb.h" -#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ALL_ROWS_CHECKED_INDEX (INT16_MIN) -#define DEFAULT_ROW_INDEX_VAL (-1) +#define INITIAL_ROW_INDEX_VAL (-1) typedef enum { EXTERNAL_ROWS_PREV = 0x1, @@ -234,7 +234,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK } for (int32_t j = 0; j < numOfTables; ++j) { - STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL}; + STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) { info.lastKey = pTsdbReader->window.skey; @@ -266,7 +266,9 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) { p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); } - p->delSkyline = taosArrayDestroy(p->delSkyline); + p->fileDelIndex = -1; + p->delSkyline = taosArrayDestroy(p->delSkyline); + p->lastBlockDelIndex = INITIAL_ROW_INDEX_VAL; } } @@ -414,7 +416,7 @@ _err: return false; } -static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) { +static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; @@ -423,7 +425,6 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashOb } else { taosArrayClear(pIter->blockList); } - pIter->pTableMap = pTableMap; } static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); } @@ -579,7 +580,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) { } // reset the index in last block when handing a new file - px->indexInBlockL = DEFAULT_ROW_INDEX_VAL; + px->indexInBlockL = INITIAL_ROW_INDEX_VAL; tMapDataClear(&px->mapData); taosArrayClear(px->pBlockList); } @@ -887,6 +888,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte pBlockIter->numOfBlocks = numOfBlocks; taosArrayClear(pBlockIter->blockList); + pBlockIter->pTableMap = pReader->status.pTableMap; // access data blocks according to the offset of each block in asc/desc order. int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); @@ -2403,7 +2405,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); int32_t index = pScanInfo->indexInBlockL; - if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { + if (index == INITIAL_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); if (!hasData) { // current table does not have rows in last block, try next table bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); @@ -2470,7 +2472,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // note: the lastblock may be null here initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); - if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { + if (pScanInfo->indexInBlockL == INITIAL_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); } } @@ -2582,7 +2584,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks); } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); + resetDataBlockIterator(pBlockIter, pReader->order); } SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader; @@ -2654,7 +2656,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { initBlockDumpInfo(pReader, pBlockIter); } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { // data blocks in current file are exhausted, let's try the next file now tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); + resetDataBlockIterator(pBlockIter, pReader->order); goto _begin; } else { code = initForFirstBlockInFile(pReader, pBlockIter); @@ -3276,7 +3278,7 @@ int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) { ASSERT(pReader != NULL); taosHashClear(pReader->status.pTableMap); - STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL}; + STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL}; taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); return TDB_CODE_SUCCESS; } @@ -3372,7 +3374,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl SDataBlockIter* pBlockIter = &pReader->status.blockIter; initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); - resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap); + resetDataBlockIterator(&pReader->status.blockIter, pReader->order); // no data in files, let's try buffer in memory if (pReader->status.fileIter.numOfFiles == 0) { @@ -3393,7 +3395,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader); - resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap); + resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order); // no data in files, let's try buffer in memory if (pPrevReader->status.fileIter.numOfFiles == 0) { @@ -3696,7 +3698,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { tsdbDataFReaderClose(&pReader->pFileReader); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); - resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap); + resetDataBlockIterator(&pReader->status.blockIter, pReader->order); resetDataBlockScanInfo(pReader->status.pTableMap); int32_t code = 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fef4a3d590..ba48ec7519 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -909,7 +909,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index b31fa279e5..94d9d0cadb 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -25,24 +25,27 @@ #include "thash.h" #include "ttypes.h" -static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator); +static SSDataBlock* doScanCache(SOperatorInfo* pOperator); static void destroyLastrowScanOperator(void* param); static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); -SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, + SExecTaskInfo* pTaskInfo) { + int32_t code = TSDB_CODE_SUCCESS; SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } pInfo->readHandle = *readHandle; - pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc); + pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc); int32_t numOfCols = 0; pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc, &numOfCols, COL_MATCH_FROM_COL_ID); - int32_t code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); + code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -55,13 +58,17 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead // partition by tbname if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { - pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_ALL; - tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, - taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|CACHESCAN_RETRIEVE_LAST_ROW; + code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, + taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity); } else { // by tags - pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_SINGLE; + pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE|CACHESCAN_RETRIEVE_LAST_ROW; } if (pScanNode->scan.pScanPseudoCols != NULL) { @@ -80,19 +87,19 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL); pOperator->cost.openCost = 0; return pOperator; _error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); + pTaskInfo->code = code; + destroyLastrowScanOperator(pInfo); taosMemoryFree(pOperator); return NULL; } -SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { +SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -109,14 +116,14 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pRes); // check if it is a group by tbname - if (pInfo->retrieveType == LASTROW_RETRIEVE_TYPE_ALL) { + if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) { blockDataCleanup(pInfo->pBufferredRes); taosArrayClear(pInfo->pUidList); - int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList); + int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } // check for tag values @@ -172,11 +179,11 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { while (pInfo->currentGroupIndex < totalGroups) { SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); - tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, + tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); taosArrayClear(pInfo->pUidList); - int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); + int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -200,7 +207,7 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { } } - tsdbLastrowReaderClose(pInfo->pLastrowReader); + tsdbCacherowsReaderClose(pInfo->pLastrowReader); return pInfo->pRes; } } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index e0e0f1198c..7f7e7c0e1c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1217,7 +1217,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->start.key = INT64_MIN; pCtx->end.key = INT64_MIN; pCtx->numOfParams = pExpr->base.numOfParams; - pCtx->increase = false; pCtx->isStream = false; pCtx->param = pFunct->pParam; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 027a9a67b8..ab65a057e0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -184,7 +184,7 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int // in the first scan, new space needed for results int32_t pageId = -1; - SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId); + SIDList list = getDataBufPagesIdList(pResultBuf); if (taosArrayGetSize(list) == 0) { pData = getNewBufPage(pResultBuf, tableGroupId, &pageId); @@ -299,7 +299,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes // in the first scan, new space needed for results int32_t pageId = -1; - SIDList list = getDataBufPagesIdList(pResultBuf, tid); + SIDList list = getDataBufPagesIdList(pResultBuf); if (taosArrayGetSize(list) == 0) { pData = getNewBufPage(pResultBuf, tid, &pageId); @@ -1565,16 +1565,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); - if (pCtx[j].increase) { - int64_t ts = *(int64_t*)in; - for (int32_t k = 0; k < pRow->numOfRows; ++k) { - colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes); - ts++; - } - } else { - for (int32_t k = 0; k < pRow->numOfRows; ++k) { - colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); - } + for (int32_t k = 0; k < pRow->numOfRows; ++k) { + colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); } } } @@ -4137,7 +4129,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - pOperator = createLastrowScanOperator(pScanNode, pHandle, pTaskInfo); + pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); } else { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 7d185dceda..b97970aeef 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1830,7 +1830,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt void increaseTs(SqlFunctionCtx* pCtx) { if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTART) { - pCtx[0].increase = true; +// pCtx[0].increase = true; } } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 48af951773..fc411e850a 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -97,7 +97,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page return pSortHandle; } -static int32_t sortComparClearup(SMsortComparParam* cmpParam) { +static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { for(int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE blockDataDestroy(pSource->src.pBlock); @@ -134,15 +134,14 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) { return TSDB_CODE_SUCCESS; } -static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) { +static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId, SArray* pPageIdList) { SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); if (pSource == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - pSource->pageIdList = getDataBufPagesIdList(pBuf, (*sourceId)); pSource->src.pBlock = pBlock; - + pSource->pageIdList = pPageIdList; taosArrayPush(pAllSources, &pSource); (*sourceId) += 1; @@ -171,6 +170,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { } } + SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while(start < pDataBlock->info.rows) { int32_t stop = 0; blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize); @@ -186,6 +186,8 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { return terrno; } + taosArrayPush(pPageIdList, &pageId); + int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t); assert(size <= getBufPageSize(pHandle->pBuf)); @@ -201,7 +203,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { blockDataCleanup(pDataBlock); SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false); - return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); + return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList); } static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) { @@ -502,6 +504,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return code; } + SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while (1) { SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); if (pDataBlock == NULL) { @@ -514,6 +517,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return terrno; } + taosArrayPush(pPageIdList, &pageId); + int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t); assert(size <= getBufPageSize(pHandle->pBuf)); @@ -525,12 +530,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { blockDataCleanup(pDataBlock); } - sortComparClearup(&pHandle->cmpParam); + sortComparCleanup(&pHandle->cmpParam); tMergeTreeDestroy(pHandle->pMergeTree); pHandle->numOfCompletedSources = 0; SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); - code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId); + code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList); if (code != 0) { return code; } diff --git a/source/libs/function/inc/tpercentile.h b/source/libs/function/inc/tpercentile.h index dfb52f7694..554f9e567f 100644 --- a/source/libs/function/inc/tpercentile.h +++ b/source/libs/function/inc/tpercentile.h @@ -51,20 +51,20 @@ struct tMemBucket; typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value); typedef struct tMemBucket { - int16_t numOfSlots; - int16_t type; - int16_t bytes; - int32_t total; - int32_t elemPerPage; // number of elements for each object - int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result - int32_t bufPageSize; // disk page size - MinMaxEntry range; // value range - int32_t times; // count that has been checked for deciding the correct data value buckets. - __compar_fn_t comparFn; - - tMemBucketSlot * pSlots; - SDiskbasedBuf *pBuffer; - __perc_hash_func_t hashFunc; + int16_t numOfSlots; + int16_t type; + int16_t bytes; + int32_t total; + int32_t elemPerPage; // number of elements for each object + int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result + int32_t bufPageSize; // disk page size + MinMaxEntry range; // value range + int32_t times; // count that has been checked for deciding the correct data value buckets. + __compar_fn_t comparFn; + tMemBucketSlot* pSlots; + SDiskbasedBuf* pBuffer; + __perc_hash_func_t hashFunc; + SHashObj* groupPagesMap; // disk page map for different groups; } tMemBucket; tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, double maxval); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 013c58cc45..32d0472a50 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3643,7 +3643,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx); - int16_t type = pCtx->input.pData[0]->info.type; + int16_t type = pCtx->pExpr->base.resSchema.type; int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 517253dc01..dbe0b6bb3a 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -33,13 +33,13 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) SFilePage *buffer = (SFilePage *)taosMemoryCalloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(SFilePage)); int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times); - SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + SArray* pIdList = *(SArray**)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); int32_t offset = 0; - for(int32_t i = 0; i < list->size; ++i) { - struct SPageInfo* pgInfo = *(struct SPageInfo**) taosArrayGet(list, i); + for(int32_t i = 0; i < taosArrayGetSize(pIdList); ++i) { + int32_t* pageId = taosArrayGet(pIdList, i); - SFilePage* pg = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); + SFilePage* pg = getBufPage(pMemBucket->pBuffer, *pageId); memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); offset += (int32_t)(pg->num * pMemBucket->bytes); @@ -97,11 +97,11 @@ double findOnlyResult(tMemBucket *pMemBucket) { } int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times); - SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + SArray* list = *(SArray**)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); assert(list->size == 1); - struct SPageInfo* pgInfo = (struct SPageInfo*) taosArrayGetP(list, 0); - SFilePage* pPage = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); + int32_t* pageId = taosArrayGet(list, 0); + SFilePage* pPage = getBufPage(pMemBucket->pBuffer, *pageId); assert(pPage->num == 1); double v = 0; @@ -233,7 +233,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, pBucket->times = 1; pBucket->maxCapacity = 200000; - + pBucket->groupPagesMap = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (setBoundingBox(&pBucket->range, pBucket->type, minval, maxval) != 0) { // qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval); taosMemoryFree(pBucket); @@ -280,8 +280,16 @@ void tMemBucketDestroy(tMemBucket *pBucket) { return; } + void* p = taosHashIterate(pBucket->groupPagesMap, NULL); + while(p) { + SArray** p1 = p; + p = taosHashIterate(pBucket->groupPagesMap, p); + taosArrayDestroy(*p1); + } + destroyDiskbasedBuf(pBucket->pBuffer); taosMemoryFreeClear(pBucket->pSlots); + taosHashCleanup(pBucket->groupPagesMap); taosMemoryFreeClear(pBucket); } @@ -357,8 +365,16 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { pSlot->info.data = NULL; } + SArray* pPageIdList = (SArray*)taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId)); + if (pPageIdList == NULL) { + SArray* pList = taosArrayInit(4, sizeof(int32_t)); + taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pList, POINTER_BYTES); + pPageIdList = pList; + } + pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId); pSlot->info.pageId = pageId; + taosArrayPush(pPageIdList, &pageId); } memcpy(pSlot->info.data->data + pSlot->info.data->num * pBucket->bytes, d, pBucket->bytes); @@ -476,7 +492,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) resetSlotInfo(pMemBucket); int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1); - SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); assert(list->size > 0); for (int32_t f = 0; f < list->size; ++f) { diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index ac2128dd70..4d5532b9a6 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -33,7 +33,7 @@ struct SDiskbasedBuf { int32_t pageSize; // current used page size int32_t inMemPages; // numOfPages that are allocated in memory SList* freePgList; // free page list - SHashObj* groupSet; // id hash table, todo remove it + SArray* pIdList; // page id list SHashObj* all; SList* lruList; void* emptyDummyIdList; // dummy id list @@ -241,26 +241,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return 0; } -static SIDList addNewGroup(SDiskbasedBuf* pBuf, int32_t groupId) { - assert(taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)) == NULL); - - SArray* pa = taosArrayInit(1, POINTER_BYTES); - int32_t ret = taosHashPut(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES); - assert(ret == 0); - - return pa; -} - -static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pageId) { - SIDList list = NULL; - - char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); - if (p == NULL) { // it is a new group id - list = addNewGroup(pBuf, groupId); - } else { - list = (SIDList)(*p); - } - +static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) { pBuf->numOfPages += 1; SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo)); @@ -273,7 +254,7 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag ppi->pn = NULL; ppi->dirty = false; - return *(SPageInfo**)taosArrayPush(list, &ppi); + return *(SPageInfo**)taosArrayPush(pBuf->pIdList, &ppi); } static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { @@ -293,16 +274,6 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { } } - // int32_t pos = listNEles(pBuf->lruList); - // SListIter iter1 = {0}; - // tdListInitIter(pBuf->lruList, &iter1, TD_LIST_BACKWARD); - // SListNode* pn1 = NULL; - // while((pn1 = tdListNext(&iter1)) != NULL) { - // SPageInfo* pageInfo = *(SPageInfo**) pn1->data; - // printf("page %d is used, dirty:%d, pos:%d\n", pageInfo->pageId, pageInfo->dirty, pos - 1); - // pos -= 1; - // } - return pn; } @@ -382,7 +353,8 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem // init id hash table _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); - pPBuf->groupSet = taosHashInit(10, fn, true, false); + pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES); + pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES pPBuf->all = taosHashInit(10, fn, true, false); @@ -425,7 +397,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { *pageId = (++pBuf->allocateId); // register page id info - pi = registerPage(pBuf, groupId, *pageId); + pi = registerPage(pBuf, *pageId); // add to hash map taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); @@ -526,19 +498,11 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { pBuf->statis.releasePages += 1; } -size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); } - size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; } -SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) { - assert(pBuf != NULL); - - char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); - if (p == NULL) { // it is a new group id - return pBuf->emptyDummyIdList; - } else { - return (SArray*)(*p); - } +SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf) { + ASSERT(pBuf != NULL); + return pBuf->pIdList; } void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { @@ -578,26 +542,21 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { taosRemoveFile(pBuf->path); taosMemoryFreeClear(pBuf->path); - SArray** p = taosHashIterate(pBuf->groupSet, NULL); - while (p) { - size_t n = taosArrayGetSize(*p); - for (int32_t i = 0; i < n; ++i) { - SPageInfo* pi = taosArrayGetP(*p, i); - taosMemoryFreeClear(pi->pData); - taosMemoryFreeClear(pi); - } - - taosArrayDestroy(*p); - p = taosHashIterate(pBuf->groupSet, p); + size_t n = taosArrayGetSize(pBuf->pIdList); + for (int32_t i = 0; i < n; ++i) { + SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i); + taosMemoryFreeClear(pi->pData); + taosMemoryFreeClear(pi); } + taosArrayDestroy(pBuf->pIdList); + tdListFree(pBuf->lruList); tdListFree(pBuf->freePgList); taosArrayDestroy(pBuf->emptyDummyIdList); taosArrayDestroy(pBuf->pFree); - taosHashCleanup(pBuf->groupSet); taosHashCleanup(pBuf->all); taosMemoryFreeClear(pBuf->id); @@ -661,32 +620,32 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0, listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id); - printf( - "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n", - ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages, - ps->loadBytes / (1024.0 * ps->loadPages)); + if (ps->loadPages > 0) { + printf( + "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n", + ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, + ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages)); + } else { + printf("no page loaded\n"); + } } void clearDiskbasedBuf(SDiskbasedBuf* pBuf) { - SArray** p = taosHashIterate(pBuf->groupSet, NULL); - while (p) { - size_t n = taosArrayGetSize(*p); - for (int32_t i = 0; i < n; ++i) { - SPageInfo* pi = taosArrayGetP(*p, i); - taosMemoryFreeClear(pi->pData); - taosMemoryFreeClear(pi); - } - taosArrayDestroy(*p); - p = taosHashIterate(pBuf->groupSet, p); + size_t n = taosArrayGetSize(pBuf->pIdList); + for (int32_t i = 0; i < n; ++i) { + SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i); + taosMemoryFreeClear(pi->pData); + taosMemoryFreeClear(pi); } + taosArrayClear(pBuf->pIdList); + tdListEmpty(pBuf->lruList); tdListEmpty(pBuf->freePgList); taosArrayClear(pBuf->emptyDummyIdList); taosArrayClear(pBuf->pFree); - taosHashClear(pBuf->groupSet); taosHashClear(pBuf->all); pBuf->numOfPages = 0; // all pages are in buffer in the first place