From a4213aedf0ff1cee8390fdce89727b23bab7ab28 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 23 Dec 2022 17:23:31 +0800 Subject: [PATCH 1/3] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 1 - source/libs/executor/src/executorimpl.c | 10 ++++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b302641c94..55cbac2821 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -354,7 +354,6 @@ typedef struct STableMergeScanInfo { SLimitInfo limitInfo; int64_t numOfRows; SScanInfo scanInfo; - int32_t scanTimes; SSDataBlock* pResBlock; SSampleExecInfo sample; // sample execution info SSortExecInfo sortExecInfo; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1c80eff685..668a93740d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -99,6 +99,8 @@ static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SC int32_t status); static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); +static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, + SGroupResInfo* pGroupResInfo); void setOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; @@ -139,9 +141,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, return fpSet; } -static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo); - SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) { SFilePage* pData = NULL; @@ -245,7 +244,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR } // a new buffer page for each table. Needs to opt this design -static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) { +static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) { if (pWindowRes->pageId != -1) { return 0; } @@ -916,8 +915,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { - int32_t ret = - addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize); + int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); if (ret != TSDB_CODE_SUCCESS) { return; } From 224f5a72c255d233716379a33e74b7544b023894 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 23 Dec 2022 17:45:12 +0800 Subject: [PATCH 2/3] refactor: do some internal refactor. --- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/tsdb.h | 20 ++++++++-------- source/dnode/vnode/src/tsdb/tsdbCache.c | 24 -------------------- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 23 +++++++++++-------- source/libs/executor/src/cachescanoperator.c | 4 ++-- 5 files changed, 26 insertions(+), 47 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 540f0c3127..dad8e1191d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -185,7 +185,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void **pReader); + uint64_t suid, void **pReader, const char* idstr); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 7fb962e3a7..5a63af41af 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -715,21 +715,21 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); // tsdbCache ============================================================================================== typedef struct SCacheRowsReader { - SVnode *pVnode; - STSchema *pSchema; - uint64_t uid; - uint64_t suid; - char **transferBuf; // todo remove it soon - int32_t numOfCols; - int32_t type; - int32_t tableIndex; // currently returned result tables - + SVnode *pVnode; + STSchema *pSchema; + uint64_t uid; + uint64_t suid; + char **transferBuf; // todo remove it soon + int32_t numOfCols; + int32_t type; + int32_t tableIndex; // currently returned result tables STableKeyInfo *pTableList; // table id list int32_t numOfTables; SSttBlockLoadInfo *pLoadInfo; STsdbReadSnap *pReadSnap; SDataFReader *pDataFReader; SDataFReader *pDataFReaderLast; + const char *idstr; } SCacheRowsReader; typedef struct { @@ -752,8 +752,6 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); size_t tsdbCacheGetCapacity(SVnode *pVnode); -int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema); - // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { TSDBKEY *pKey1 = (TSDBKEY *)p1; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 5b09ce5eb6..6a82517067 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1406,30 +1406,6 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader * return code; } -int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pTSchema) { - int32_t code = 0; - int16_t nCol = taosArrayGetSize(pLastArray); - SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); - - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLastArray, iCol); - SColVal *tColVal = &tTsVal->colVal; - taosArrayPush(pColArray, tColVal); - } - - code = tdSTSRowNew(pColArray, pTSchema, ppRow); - if (code) goto _err; - - taosArrayDestroy(pColArray); - - return code; - -_err: - taosArrayDestroy(pColArray); - - return code; -} - int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) { int32_t code = 0; char key[32] = {0}; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index b87e5d5503..f05f5d5c88 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -20,9 +20,8 @@ #define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) -static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, - void** pRes) { - ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); +static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, + void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { @@ -65,9 +64,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea } pBlock->info.rows += allNullRow ? 0 : 1; - } else { - ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)); - + } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) { for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); @@ -94,11 +91,16 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea } pBlock->info.rows += 1; + } else { + tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr); + return TSDB_CODE_INVALID_PARA; } + + return TSDB_CODE_SUCCESS; } int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void** pReader) { + uint64_t suid, void** pReader, const char* idstr) { *pReader = NULL; SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); if (p == NULL) { @@ -142,6 +144,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, return TSDB_CODE_OUT_OF_MEMORY; } + p->idstr = taosMemoryStrDup(idstr); + *pReader = p; return TSDB_CODE_SUCCESS; } @@ -160,6 +164,7 @@ void* tsdbCacherowsReaderClose(void* pReader) { destroyLastBlockLoadInfo(p->pLoadInfo); + taosMemoryFree((void*) p->idstr); taosMemoryFree(pReader); return NULL; } @@ -308,7 +313,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } if (hasRes) { - saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes); + saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr); } } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { @@ -323,7 +328,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 continue; } - saveOneRow(pRow, pResBlock, pr, slotIds, pRes); + saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr); // TODO reset the pRes taosArrayPush(pTableUidList, &pKeyInfo->uid); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 672bb09b14..294424746a 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -90,7 +90,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe uint64_t suid = tableListGetSuid(pTableList); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, - taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -216,7 +216,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, - taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); taosArrayClear(pInfo->pUidList); code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); From 1edf1e1799efe0a42a91cbc0b85f6a383f406e94 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 23 Dec 2022 23:02:40 +0800 Subject: [PATCH 3/3] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 120 +++++++++++-------------- 1 file changed, 54 insertions(+), 66 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 6b71def573..050d03cf73 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -396,7 +396,6 @@ static void destroyAllBlockScanInfo(SHashObj* pTableMap) { } static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { - ASSERT(pWindow != NULL); return pWindow->skey > pWindow->ekey; } @@ -1447,7 +1446,6 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock index += step; } - ASSERT(0); return -1; } @@ -2421,6 +2419,46 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } } +static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo, + STsdbReader* pReader, bool* loadNeighbor) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; + int32_t nextIndex = -1; + SBlockIndex nxtBIndex = {0}; + + *loadNeighbor = false; + SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); + + bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &nxtBIndex); + if (!hasNeighbor) { // do nothing + return code; + } + + if (overlapWithNeighborBlock(pBlock, &nxtBIndex, pReader->order)) { // load next block + SReaderStatus* pStatus = &pReader->status; + SDataBlockIter* pBlockIter = &pStatus->blockIter; + + // 1. find the next neighbor block in the scan block list + SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex}; + int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb); + + // 2. remove it from the scan block list + setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step); + + // 3. load the neighbor block, and set it to be the currently accessed file data block + code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // 4. check the data values + initBlockDumpInfo(pReader, pBlockIter); + *loadNeighbor = true; + } + + return code; +} + static int32_t buildComposedDataBlock(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; @@ -2479,38 +2517,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { - - int32_t nextIndex = -1; - SBlockIndex bIndex = {0}; pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info - bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &bIndex); - if (!hasNeighbor) { // do nothing - setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); - break; - } - - if (overlapWithNeighborBlock(pBlock, &bIndex, pReader->order)) { // load next block - SReaderStatus* pStatus = &pReader->status; - SDataBlockIter* pBlockIter = &pStatus->blockIter; - - // 1. find the next neighbor block in the scan block list - SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex}; - int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb); - - // 2. remove it from the scan block list - setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step); - - // 3. load the neighbor block, and set it to be the currently accessed file data block - code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid); - if (code != TSDB_CODE_SUCCESS) { - setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); - break; - } - - // 4. check the data values - initBlockDumpInfo(pReader, pBlockIter); - } else { + // continue check for the next file block if the last ts in the current block + // is overlapped with the next neighbor block + bool loadNeighbor = false; + code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor); + if ((!loadNeighbor) || (code != 0)) { setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); break; } @@ -2777,7 +2790,10 @@ static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex]; pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid)); - ASSERT(pStatus->pTableIter != NULL); + if (pStatus->pTableIter == NULL) { + return false; + } + return true; } @@ -3117,10 +3133,10 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_ } bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) { - ASSERT(pKey != NULL); if (pDelList == NULL) { return false; } + size_t num = taosArrayGetSize(pDelList); bool asc = ASCENDING_TRAVERSE(order); int32_t step = asc ? 1 : -1; @@ -3318,35 +3334,10 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn *state = CHECK_FILEBLOCK_QUIT; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; - int32_t nextIndex = -1; - SBlockIndex bIndex = {0}; - - bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &bIndex); - if (!hasNeighbor) { // do nothing - return 0; - } - - bool overlap = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order); - if (overlap) { // load next block - SReaderStatus* pStatus = &pReader->status; - SDataBlockIter* pBlockIter = &pStatus->blockIter; - - // 1. find the next neighbor block in the scan block list - SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex}; - int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb); - - // 2. remove it from the scan block list - setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step); - - // 3. load the neighbor block, and set it to be the currently accessed file data block - int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // 4. check the data values - initBlockDumpInfo(pReader, pBlockIter); + bool loadNeighbor = true; + int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor); + if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) { pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) { @@ -3354,7 +3345,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn } } - return TSDB_CODE_SUCCESS; + return code; } int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, @@ -3709,13 +3700,11 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e } } while (1); - ASSERT(pBlock->info.rows <= capacity); return TSDB_CODE_SUCCESS; } // TODO refactor: with createDataBlockScanInfo int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) { - ASSERT(pReader != NULL); int32_t size = taosHashGetSize(pReader->status.pTableMap); STableBlockScanInfo** p = NULL; @@ -4079,7 +4068,6 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { } static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) { - ASSERT(pReader != NULL); *rows = pReader->pResBlock->info.rows; *uid = pReader->pResBlock->info.id.uid; *pWindow = pReader->pResBlock->info.window;