diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 87a6a90a7e..81e3af88a5 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -253,6 +253,7 @@ typedef struct SQueryTableDataCond { STimeWindow twindows; int64_t startVersion; int64_t endVersion; + bool trimData; // response the actual data, not only the rows in the attribute of info.row of ssdatablock } SQueryTableDataCond; int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 045f2bad70..e8e90541d1 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -154,8 +154,7 @@ typedef struct { /*-------------------------------------------------new api format---------------------------------------------------*/ typedef struct TsdReader { int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, - SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, - SHashObj** pIgnoreTables); + SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables); void (*tsdReaderClose)(); void (*tsdSetReaderTaskId)(void *pReader, const char *pId); int32_t (*tsdSetQueryTableList)(); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 74e4b66098..32ee7526c0 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -154,8 +154,7 @@ typedef struct STsdbReader STsdbReader; #define CACHESCAN_RETRIEVE_LAST 0x8 int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, - SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly, - SHashObj **pIgnoreTables); + SSDataBlock *pResBlock, void **ppReader, const char *idstr, SHashObj **pIgnoreTables); int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num); void tsdbReaderSetId2(STsdbReader *pReader, const char *idstr); void tsdbReaderClose2(STsdbReader *pReader); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 6d76529101..3876048010 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -675,8 +675,6 @@ struct SDelFWriter { }; #include "tarray2.h" -// #include "tsdbFS2.h" -// struct STFileSet; typedef struct STFileSet STFileSet; typedef TARRAY2(STFileSet *) TFileSetArray; @@ -772,20 +770,25 @@ typedef struct SBlockDataInfo { int32_t sttBlockIndex; } SBlockDataInfo; -typedef struct SSttBlockLoadInfo { - SBlockDataInfo blockData[2]; // buffered block data - int32_t statisBlockIndex; // buffered statistics block index - void *statisBlock; // buffered statistics block data - void *pSttStatisBlkArray; - SArray *aSttBlk; - int32_t currentLoadBlockIndex; - STSchema *pSchema; - int16_t *colIds; - int32_t numOfCols; - bool checkRemainingRow; // todo: no assign value? - bool isLast; - bool sttBlockLoaded; +// todo: move away +typedef struct { + SArray *pUid; + SArray *pFirstKey; + SArray *pLastKey; + SArray *pCount; +} SSttTableRowsInfo; +typedef struct SSttBlockLoadInfo { + SBlockDataInfo blockData[2]; // buffered block data + SArray *aSttBlk; + int32_t currentLoadBlockIndex; + STSchema *pSchema; + int16_t *colIds; + int32_t numOfCols; + bool checkRemainingRow; // todo: no assign value? + bool isLast; + bool sttBlockLoaded; + SSttTableRowsInfo info; SSttBlockLoadCostInfo cost; } SSttBlockLoadInfo; @@ -874,27 +877,31 @@ typedef struct { _load_tomb_fn loadTombFn; void *pReader; void *idstr; + bool rspRows; // response the rows in stt-file, if possible } SMergeTreeConf; -int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf); +typedef struct SSttDataInfoForTable { + SArray* pTimeWindowList; + int64_t numOfRows; +} SSttDataInfoForTable; -void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); -bool tMergeTreeNext(SMergeTree *pMTree); -void tMergeTreePinSttBlock(SMergeTree *pMTree); -void tMergeTreeUnpinSttBlock(SMergeTree *pMTree); -bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); -void tMergeTreeClose(SMergeTree *pMTree); +int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable* pTableInfo); +void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); +bool tMergeTreeNext(SMergeTree *pMTree); +void tMergeTreePinSttBlock(SMergeTree *pMTree); +void tMergeTreeUnpinSttBlock(SMergeTree *pMTree); +bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); +void tMergeTreeClose(SMergeTree *pMTree); SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); -void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost); void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost); // tsdbCache ============================================================================================== typedef enum { - READ_MODE_COUNT_ONLY = 0x1, - READ_MODE_ALL, -} EReadMode; + READER_EXEC_DATA = 0x1, + READER_EXEC_ROWS = 0x2, +} EExecMode; typedef struct { TSKEY ts; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 5076599753..5fc0e333b9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1870,7 +1870,7 @@ static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb .idstr = pr->idstr, }; - code = tMergeTreeOpen2(&iter->mergeTree, &conf); + code = tMergeTreeOpen2(&iter->mergeTree, &conf, NULL); if (code != TSDB_CODE_SUCCESS) { return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 8017f1f4d0..ee92edc2a9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -15,6 +15,7 @@ #include "tsdb.h" #include "tsdbFSet2.h" +#include "tsdbUtil2.h" #include "tsdbMerge.h" #include "tsdbReadUtil.h" #include "tsdbSttFileRW.h" @@ -52,15 +53,6 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, return pLoadInfo; } -void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pLoadCost) { - for (int32_t i = 0; i < 1; ++i) { - pLoadCost->blockElapsedTime += pLoadInfo[i].cost.blockElapsedTime; - pLoadCost->loadBlocks += pLoadInfo[i].cost.loadBlocks; - pLoadCost->loadStatisBlocks += pLoadInfo[i].cost.loadStatisBlocks; - pLoadCost->statisElapsedTime += pLoadInfo[i].cost.statisElapsedTime; - } -} - void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { if (pLoadInfo == NULL) { return NULL; @@ -78,9 +70,11 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { pInfo->sttBlockIndex = -1; pInfo->pin = false; - if (pLoadInfo->statisBlock != NULL) { - tStatisBlockDestroy(pLoadInfo->statisBlock); - taosMemoryFreeClear(pLoadInfo->statisBlock); + if (pLoadInfo->info.pCount != NULL) { + taosArrayDestroy(pLoadInfo->info.pUid); + taosArrayDestroy(pLoadInfo->info.pFirstKey); + taosArrayDestroy(pLoadInfo->info.pLastKey); + taosArrayDestroy(pLoadInfo->info.pCount); } taosArrayDestroy(pLoadInfo->aSttBlk); @@ -323,95 +317,74 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray return TSDB_CODE_SUCCESS; } -static int32_t suidComparFn(const void *target, const void *p2) { - const uint64_t *targetUid = target; - const uint64_t *uid2 = p2; - if (*uid2 == (*targetUid)) { +static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, + TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) { + int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray); + if (numOfBlocks <= 0) { return 0; - } else { - return (*targetUid) < (*uid2) ? -1 : 1; - } -} - -static bool existsFromSttBlkStatis(SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, uint64_t uid, - SSttFileReader *pReader) { - const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray; - if (TARRAY2_SIZE(pStatisBlkArray) <= 0) { - return true; } - int32_t i = 0; - for (i = 0; i < TARRAY2_SIZE(pStatisBlkArray); ++i) { - SStatisBlk *p = &pStatisBlkArray->data[i]; - if (p->minTbid.suid <= suid && p->maxTbid.suid >= suid) { - break; - } + int32_t startIndex = 0; + while((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) { + ++startIndex; } - if (i >= TARRAY2_SIZE(pStatisBlkArray)) { - return false; + if (startIndex >= numOfBlocks || pStatisBlkArray->data[startIndex].minTbid.suid > suid) { + return 0; } - while (i < TARRAY2_SIZE(pStatisBlkArray)) { - SStatisBlk *p = &pStatisBlkArray->data[i]; - if (p->minTbid.suid > suid) { - return false; + int32_t endIndex = startIndex; + while(endIndex < numOfBlocks && pStatisBlkArray->data[endIndex].minTbid.suid <= suid) { + ++endIndex; + } + + int32_t num = endIndex - startIndex; + pBlockLoadInfo->cost.loadStatisBlocks += num; + + STbStatisBlock block; + tStatisBlockInit(&block); + + int64_t st = taosGetTimestampUs(); + + for(int32_t k = startIndex; k < endIndex; ++k) { + tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block); + + int32_t i = 0; + while(block.suid->data[i] != suid) { + ++i; } -// if (pBlockLoadInfo->statisBlock == NULL) { -// pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock)); -// -// int64_t st = taosGetTimestampMs(); -// tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock); -// pBlockLoadInfo->statisBlockIndex = i; -// -// double el = (taosGetTimestampMs() - st) / 1000.0; -// pBlockLoadInfo->cost.loadStatisBlocks += 1; -// pBlockLoadInfo->cost.statisElapsedTime += el; -// } else if (pBlockLoadInfo->statisBlockIndex != i) { -// tStatisBlockDestroy(pBlockLoadInfo->statisBlock); -// -// int64_t st = taosGetTimestampMs(); -// tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock); -// pBlockLoadInfo->statisBlockIndex = i; -// -// double el = (taosGetTimestampMs() - st) / 1000.0; -// pBlockLoadInfo->cost.loadStatisBlocks += 1; -// pBlockLoadInfo->cost.statisElapsedTime += el; -// } - - STbStatisBlock* pBlock = pBlockLoadInfo->statisBlock; - int32_t index = tarray2SearchIdx(pBlock->suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ); - if (index == -1) { - return false; + int32_t rows = TARRAY2_SIZE(block.suid); + if (pBlockLoadInfo->info.pUid == NULL) { + pBlockLoadInfo->info.pUid = taosArrayInit(rows, sizeof(int64_t)); + pBlockLoadInfo->info.pFirstKey = taosArrayInit(rows, sizeof(int64_t)); + pBlockLoadInfo->info.pLastKey = taosArrayInit(rows, sizeof(int64_t)); + pBlockLoadInfo->info.pCount = taosArrayInit(rows, sizeof(int64_t)); } - int32_t j = index; - if (pBlock->uid->data[j] == uid) { - return true; - } else if (pBlock->uid->data[j] > uid) { - while (j >= 0 && pBlock->suid->data[j] == suid) { - if (pBlock->uid->data[j] == uid) { - return true; - } else { - j -= 1; - } - } + if (pStatisBlkArray->data[k].maxTbid.suid == suid) { + taosArrayAddBatch(pBlockLoadInfo->info.pUid, &block.uid->data[i], rows - i); + taosArrayAddBatch(pBlockLoadInfo->info.pFirstKey, &block.firstKey->data[i], rows - i); + taosArrayAddBatch(pBlockLoadInfo->info.pLastKey, &block.lastKey->data[i], rows - i); + taosArrayAddBatch(pBlockLoadInfo->info.pCount, &block.count->data[i], rows - i); } else { - j = index + 1; - while (j < pBlock->suid->size && pBlock->suid->data[j] == suid) { - if (pBlock->uid->data[j] == uid) { - return true; - } else { - j += 1; - } + while(i < rows && block.suid->data[i] == suid) { + taosArrayPush(pBlockLoadInfo->info.pUid, &block.uid->data[i]); + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &block.firstKey->data[i]); + taosArrayPush(pBlockLoadInfo->info.pLastKey, &block.lastKey->data[i]); + taosArrayPush(pBlockLoadInfo->info.pCount, &block.count->data[i]); + i += 1; } } - - i += 1; } - return false; + tStatisBlockDestroy(&block); + + double el = (taosGetTimestampUs() - st) / 1000.0; + pBlockLoadInfo->cost.statisElapsedTime += el; + + tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el); + return TSDB_CODE_SUCCESS; } static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid, @@ -428,19 +401,28 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter * return code; } + // load the stt block info for each stt file block code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid); if (code != TSDB_CODE_SUCCESS) { tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr); return code; } - // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file - code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray); + // load stt statistics block for all stt-blocks, to decide if the data of queried table exists in current stt file + TStatisBlkArray *pStatisBlkArray = NULL; + code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray); if (code != TSDB_CODE_SUCCESS) { tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr); return code; } + // load statistics block for all tables in current stt file + code = loadSttStatisticsBlockData(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, suid, idStr); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("failed to load stt statistics block data, code:%s, %s", tstrerror(code), idStr); + return code; + } + code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo); double el = (taosGetTimestampUs() - st) / 1000.0; @@ -448,19 +430,44 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter * return code; } +static int32_t uidComparFn(const void* p1, const void* p2) { + const uint64_t *pFirst = p1; + const uint64_t *pVal = p2; + + if (*pFirst == *pVal) { + return 0; + } else { + return *pFirst < *pVal? -1:1; + } +} + +static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid, STimeWindow *pTimeWindow, + int64_t *numOfRows) { + if (pTimeWindow == NULL || taosArrayGetSize(pLoadInfo->info.pUid) == 0) { + return; + } + + int32_t index = taosArraySearchIdx(pLoadInfo->info.pUid, &uid, uidComparFn, TD_EQ); + if (index >= 0) { + pTimeWindow->skey = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstKey, index); + pTimeWindow->ekey = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastKey, index); + + *numOfRows += *(int64_t*) taosArrayGet(pLoadInfo->info.pCount, index); + } +} + int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t cid, int8_t backward, - uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, - SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange, - _load_tomb_fn loadTombFn, void *pReader1) { + SMergeTreeConf *pConf, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow, + int64_t *numOfRows, const char *idStr) { int32_t code = TSDB_CODE_SUCCESS; - pIter->uid = uid; + pIter->uid = pConf->uid; pIter->cid = cid; pIter->backward = backward; - pIter->verRange.minVer = pRange->minVer; - pIter->verRange.maxVer = pRange->maxVer; - pIter->timeWindow.skey = pTimeWindow->skey; - pIter->timeWindow.ekey = pTimeWindow->ekey; + pIter->verRange.minVer = pConf->verRange.minVer; + pIter->verRange.maxVer = pConf->verRange.maxVer; + pIter->timeWindow.skey = pConf->timewindow.skey; + pIter->timeWindow.ekey = pConf->timewindow.ekey; pIter->pReader = pSttFileReader; pIter->pBlockLoadInfo = pBlockLoadInfo; @@ -473,34 +480,29 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32 } if (!pBlockLoadInfo->sttBlockLoaded) { - code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, suid, loadTombFn, pReader1, idStr); + code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, pConf->suid, pConf->loadTombFn, pConf->pReader, idStr); if (code != TSDB_CODE_SUCCESS) { return code; } } -// bool exists = existsFromSttBlkStatis(pBlockLoadInfo, suid, uid, pIter->pReader); -// if (!exists) { -// pIter->iSttBlk = -1; -// pIter->pSttBlk = NULL; -// return TSDB_CODE_SUCCESS; -// } + setSttInfoForCurrentTable(pBlockLoadInfo, pConf->uid, pTimeWindow, numOfRows); // find the start block, actually we could load the position to avoid repeatly searching for the start position when // the skey is updated. size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); - pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward); + pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, pConf->uid, backward); if (pIter->iSttBlk != -1) { pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk); pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1; - if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) || - (!strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) { + if ((!backward) && ((pConf->strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) || + (!pConf->strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) { pIter->pSttBlk = NULL; } - if (backward && ((strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) || - (!strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) { + if (backward && ((pConf->strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) || + (!pConf->strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) { pIter->pSttBlk = NULL; pIter->ignoreEarlierTs = true; } @@ -708,8 +710,6 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); } -SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; } - // SMergeTree ================================================= static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) { SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node)); @@ -737,7 +737,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR return -1 * tLDataIterCmprFn(p1, p2); } -int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { +int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable* pSttDataInfo) { int32_t code = TSDB_CODE_SUCCESS; pMTree->pIter = NULL; @@ -758,17 +758,16 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { goto _end; } - // add the list/iter placeholder - adjustLDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset); + adjustSttDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset); for (int32_t j = 0; j < numOfLevels; ++j) { SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j]; - SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j); + SArray * pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j); for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file SLDataIter *pIter = taosArrayGetP(pList, i); - SSttFileReader *pSttFileReader = pIter->pReader; + SSttFileReader * pSttFileReader = pIter->pReader; SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo; // open stt file reader if not opened yet @@ -790,10 +789,11 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { memset(pIter, 0, sizeof(SLDataIter)); + STimeWindow w = {0}; + int64_t numOfRows = 0; + int64_t cid = pSttLevel->fobjArr->data[i]->f->cid; - code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf->suid, pConf->uid, &pConf->timewindow, - &pConf->verRange, pLoadInfo, pMTree->idStr, pConf->strictTimeRange, pConf->loadTombFn, - pConf->pReader); + code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows, pMTree->idStr); if (code != TSDB_CODE_SUCCESS) { goto _end; } @@ -801,6 +801,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr); if (hasVal) { tMergeTreeAddIter(pMTree, pIter); + + // let's record the time window for current table of uid in the stt files + if (pSttDataInfo != NULL) { + taosArrayPush(pSttDataInfo->pTimeWindowList, &w); + pSttDataInfo->numOfRows += numOfRows; + } } else { if (!pMTree->ignoreEarlierTs) { pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 0753c2454d..e09ac504d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -25,6 +25,16 @@ #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define getCurrentKeyInSttBlock(_r) ((_r)->currentKey) +typedef struct { + bool overlapWithNeighborBlock; + bool hasDupTs; + bool overlapWithDelInfo; + bool overlapWithSttBlock; + bool overlapWithKeyInBuf; + bool partiallyRequired; + bool moreThanCapcity; +} SDataBlockToLoadInfo; + static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); @@ -1221,78 +1231,6 @@ static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersio (pBlock->record.maxVer >= pVerRange->minVer) && (pBlock->record.minVer <= pVerRange->maxVer); } -static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, - int32_t startIndex) { - size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline); - - for (int32_t i = startIndex; i < num; i += 1) { - TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i); - if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) { - if (p->version >= pRecord->minVer) { - return true; - } - } else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts - if (p->version >= pRecord->minVer) { - if (i < num - 1) { - TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1); - if (pnext->ts >= pRecord->firstKey) { - return true; - } - } else { // it must be the last point - ASSERT(p->version == 0); - } - } - } else { // (p->ts > pBlock->maxKey.ts) { - return false; - } - } - - return false; -} - -static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) { - if (pBlockScanInfo->delSkyline == NULL || (taosArrayGetSize(pBlockScanInfo->delSkyline) == 0)) { - return false; - } - - // ts is not overlap - TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0); - TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline); - if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) { - return false; - } - - // version is not overlap - if (ASCENDING_TRAVERSE(order)) { - return doCheckforDatablockOverlap(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex); - } else { - int32_t index = pBlockScanInfo->fileDelIndex; - while (1) { - TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index); - if (p->ts > pRecord->firstKey && index > 0) { - index -= 1; - } else { // find the first point that is smaller than the minKey.ts of dataBlock. - if (p->ts == pRecord->firstKey && p->version < pRecord->maxVer && index > 0) { - index -= 1; - } - break; - } - } - - return doCheckforDatablockOverlap(pBlockScanInfo, pRecord, index); - } -} - -typedef struct { - bool overlapWithNeighborBlock; - bool hasDupTs; - bool overlapWithDelInfo; - bool overlapWithLastBlock; - bool overlapWithKeyInBuf; - bool partiallyRequired; - bool moreThanCapcity; -} SDataBlockToLoadInfo; - static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) { SBrinRecord rec = {0}; @@ -1313,7 +1251,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT); if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) { int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey; - pInfo->overlapWithLastBlock = + pInfo->overlapWithSttBlock = !(pBlockInfo->record.lastKey < nextProcKeyInStt || pBlockInfo->record.firstKey > nextProcKeyInStt); } @@ -1335,15 +1273,15 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock bool loadDataBlock = (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf || - info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock); + info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithSttBlock); // log the reason why load the datablock for profile if (loadDataBlock) { tsdbDebug("%p uid:%" PRIu64 " need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, " - "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s", + "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithSttBlock:%d, %s", pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired, - info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock, + info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithSttBlock, pReader->idStr); } @@ -1355,7 +1293,7 @@ static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBloc SDataBlockToLoadInfo info = {0}; getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader); bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf || - info.overlapWithDelInfo || info.overlapWithLastBlock); + info.overlapWithDelInfo || info.overlapWithSttBlock); return isCleanFileBlock; } @@ -2110,27 +2048,34 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum return true; } -static bool initSttBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { - // the last block reader has been initialized for this table. - if (pLBlockReader->uid == pScanInfo->uid) { - return hasDataInSttBlock(pLBlockReader); +static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { + bool hasData = true; + + // the stt block reader has been initialized for this table. + if (pSttBlockReader->uid == pScanInfo->uid) { + return hasDataInSttBlock(pSttBlockReader); } - if (pLBlockReader->uid != 0) { - tMergeTreeClose(&pLBlockReader->mergeTree); + if (pSttBlockReader->uid != 0) { + tMergeTreeClose(&pSttBlockReader->mergeTree); } - pLBlockReader->uid = pScanInfo->uid; + pSttBlockReader->uid = pScanInfo->uid; - STimeWindow w = pLBlockReader->window; - if (ASCENDING_TRAVERSE(pLBlockReader->order)) { + // second time init stt block reader + if (pScanInfo->cleanSttBlocks && pReader->info.execMode == READER_EXEC_ROWS) { + return true; + } + + STimeWindow w = pSttBlockReader->window; + if (ASCENDING_TRAVERSE(pSttBlockReader->order)) { w.skey = pScanInfo->sttKeyInfo.nextProcKey; } else { w.ekey = pScanInfo->sttKeyInfo.nextProcKey; } int64_t st = taosGetTimestampUs(); - tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey, + tsdbDebug("init stt block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey, pScanInfo->uid, pReader->idStr); SMergeTreeConf conf = { @@ -2138,20 +2083,22 @@ static bool initSttBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanIn .suid = pReader->info.suid, .pTsdb = pReader->pTsdb, .timewindow = w, - .verRange = pLBlockReader->verRange, + .verRange = pSttBlockReader->verRange, .strictTimeRange = false, .pSchema = pReader->info.pSchema, .pCurrentFileset = pReader->status.pCurrentFileset, - .backward = (pLBlockReader->order == TSDB_ORDER_DESC), + .backward = (pSttBlockReader->order == TSDB_ORDER_DESC), .pSttFileBlockIterArray = pReader->status.pLDataIterArray, .pCols = pReader->suppInfo.colId, .numOfCols = pReader->suppInfo.numOfCols, .loadTombFn = loadSttTombDataForAll, .pReader = pReader, .idstr = pReader->idStr, + .rspRows = (pReader->info.execMode == READER_EXEC_ROWS), }; - int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, &conf); + SSttDataInfoForTable info = {.pTimeWindowList = taosArrayInit(4, sizeof(STimeWindow))}; + int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -2159,13 +2106,44 @@ static bool initSttBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanIn initMemDataIterator(pScanInfo, pReader); initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); - code = nextRowFromSttBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange); + if (conf.rspRows) { + pScanInfo->cleanSttBlocks = + isCleanSttBlock(info.pTimeWindowList, &pReader->info.window, pScanInfo, pReader->info.order); + + if (pScanInfo->cleanSttBlocks) { + pScanInfo->numOfRowsInStt = info.numOfRows; + pScanInfo->sttWindow.skey = INT64_MAX; + pScanInfo->sttWindow.ekey = INT64_MIN; + + // calculate the time window for data in stt files + for(int32_t i = 0; i < taosArrayGetSize(info.pTimeWindowList); ++i) { + STimeWindow* pWindow = taosArrayGet(info.pTimeWindowList, i); + if (pScanInfo->sttWindow.skey > pWindow->skey) { + pScanInfo->sttWindow.skey = pWindow->skey; + } + + if (pScanInfo->sttWindow.ekey < pWindow->ekey) { + pScanInfo->sttWindow.ekey = pWindow->ekey; + } + } + + pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList)? STT_FILE_HAS_DATA:STT_FILE_NO_DATA; + pScanInfo->sttKeyInfo.nextProcKey = ASCENDING_TRAVERSE(pReader->info.order)? pScanInfo->sttWindow.skey:pScanInfo->sttWindow.ekey; + hasData = true; + } else { + hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); + } + } else { + hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); + } + + taosArrayDestroy(info.pTimeWindowList); int64_t el = taosGetTimestampUs() - st; pReader->cost.initSttBlockReader += (el / 1000.0); - tsdbDebug("init last block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr); - return code; + tsdbDebug("init stt block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr); + return hasData; } static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; } @@ -2356,18 +2334,15 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf static int32_t buildComposedDataBlock(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + int64_t st = taosGetTimestampUs(); + int32_t step = asc ? 1 : -1; + double el = 0; - SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; - + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); - SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; - - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - int64_t st = taosGetTimestampUs(); - int32_t step = asc ? 1 : -1; - double el = 0; - SBrinRecord* pRecord = &pBlockInfo->record; - + SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; + SBrinRecord* pRecord = &pBlockInfo->record; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; STableBlockScanInfo* pBlockScanInfo = NULL; @@ -2629,10 +2604,10 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt } static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { - SReaderStatus* pStatus = &pReader->status; + SReaderStatus* pStatus = &pReader->status; SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; - STableUidList* pUidList = &pStatus->uidList; - int32_t code = TSDB_CODE_SUCCESS; + STableUidList* pUidList = &pStatus->uidList; + int32_t code = TSDB_CODE_SUCCESS; if (tSimpleHashGetSize(pStatus->pTableMap) == 0) { return TSDB_CODE_SUCCESS; @@ -2668,8 +2643,8 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { continue; } - bool hasDataInLastFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader); - if (!hasDataInLastFile) { + bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + if (!hasDataInSttFile) { bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -2678,12 +2653,31 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { continue; } + // if only require the total rows, no need to load data from stt file if it is clean stt blocks + if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) { + SDataBlockInfo* pInfo = &pResBlock->info; + pInfo->rows = pScanInfo->numOfRowsInStt; + pInfo->id.uid = pScanInfo->uid; + pInfo->dataLoad = 1; + pInfo->window = pScanInfo->sttWindow; + setComposedBlockFlag(pReader, true); + pScanInfo->sttKeyInfo.nextProcKey = + ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1; + pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; + pScanInfo->lastProcKey = + ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; + pSttBlockReader->mergeTree.pIter = NULL; + + tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s", + pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, + pResBlock->info.rows, pReader->idStr); + return TSDB_CODE_SUCCESS; + } + int64_t st = taosGetTimestampUs(); while (1) { - bool hasBlockLData = hasDataInSttBlock(pSttBlockReader); - - // no data in last block and block, no need to proceed. - if (hasBlockLData == false) { + // no data in stt block and block, no need to proceed. + if (!hasDataInSttBlock(pSttBlockReader)) { break; } @@ -2728,14 +2722,13 @@ static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockSc } static int32_t doBuildDataBlock(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; STableBlockScanInfo* pScanInfo = NULL; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); - SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; + SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; bool asc = ASCENDING_TRAVERSE(pReader->info.order); + int32_t code = TSDB_CODE_SUCCESS; if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) { setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); @@ -2793,13 +2786,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; - tsdbDebug("load data in last block firstly %s", pReader->idStr); + tsdbDebug("load data in stt block firstly %s", pReader->idStr); int64_t st = taosGetTimestampUs(); // let's load data from stt files initSttBlockReader(pSttBlockReader, pScanInfo, pReader); - // no data in last block, no need to proceed. + // no data in stt block, no need to proceed. while (hasDataInSttBlock(pSttBlockReader)) { ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); @@ -2837,147 +2830,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; } -static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) { - int64_t st = taosGetTimestampUs(); - LRUHandle* handle = NULL; - int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle); - if (code != TSDB_CODE_SUCCESS || handle == NULL) { - goto _end; - } - -#if 0 - int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); - - SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle); - size_t num = taosArrayGetSize(aBlockIdx); - if (num == 0) { - tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); - return TSDB_CODE_SUCCESS; - } - - SBlockIdx* pBlockIdx = NULL; - for (int32_t i = 0; i < num; ++i) { - pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i); - if (pBlockIdx->suid != pReader->info.suid) { - continue; - } - - STableBlockScanInfo** p = tSimpleHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid)); - if (p == NULL) { - continue; - } - - STableBlockScanInfo* pScanInfo = *p; - SDataBlk block = {0}; - // for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { - // tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block); - // pReader->rowsNum += block.nRow; - // } - } -#endif - -_end: - tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); - return code; -} - -static int32_t doSumSttBlockRows(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; - SSttBlockLoadInfo* pBlockLoadInfo = NULL; -#if 0 - for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file - pBlockLoadInfo = &pSttBlockReader->pInfo[i]; - - code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk); - if (code) { - return code; - } - - size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); - if (size >= 1) { - SSttBlk* pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0); - SSttBlk* pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1); - - // all identical - if (pStart->suid == pEnd->suid) { - if (pStart->suid != pReader->info.suid) { - // no qualified stt block existed - taosArrayClear(pBlockLoadInfo->aSttBlk); - continue; - } - for (int32_t j = 0; j < size; ++j) { - SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j); - pReader->rowsNum += p->nRow; - } - } else { - for (int32_t j = 0; j < size; ++j) { - SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j); - uint64_t s = p->suid; - if (s < pReader->info.suid) { - continue; - } - - if (s == pReader->info.suid) { - pReader->rowsNum += p->nRow; - } else if (s > pReader->info.suid) { - break; - } - } - } - } - } -#endif - - return code; -} - -static int32_t readRowsCountFromFiles(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - - while (1) { - bool hasNext = false; - code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext); - if (code) { - return code; - } - - if (!hasNext) { // no data files on disk - break; - } - - // code = doSumFileBlockRows(pReader, pReader->pFileReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doSumSttBlockRows(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - - pReader->status.loadFromFile = false; - - return code; -} - -static int32_t readRowsCountFromMem(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t memNum = 0, imemNum = 0; - if (pReader->pReadSnap->pMem != NULL) { - tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum); - } - - if (pReader->pReadSnap->pIMem != NULL) { - tsdbMemTableCountRows(pReader->pReadSnap->pIMem, pReader->status.pTableMap, &imemNum); - } - - pReader->rowsNum += memNum + imemNum; - - return code; -} - static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; STableUidList* pUidList = &pStatus->uidList; @@ -3107,7 +2959,7 @@ static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) { return TSDB_READ_RETURN; } - // all data blocks are checked in this last block file, now let's try the next file + // all data blocks are checked in this stt file, now let's try the next file set ASSERT(pReader->status.pTableIter == NULL); code = initForFirstBlockInFile(pReader, pBlockIter); @@ -3946,7 +3798,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { pStatus->loadFromFile = false; - } else if (READ_MODE_COUNT_ONLY == pReader->info.readMode) { +// } else if (READER_EXEC_DATA == pReader->info.readMode) { // DO NOTHING } else { code = initForFirstBlockInFile(pReader, pBlockIter); @@ -3987,8 +3839,7 @@ static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { // ====================================== EXPOSED APIs ====================================== int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, - SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, - SHashObj** pIgnoreTables) { + SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables) { STimeWindow window = pCond->twindows; SVnodeCfg* pConf = &(((SVnode*)pVnode)->config); @@ -4094,13 +3945,10 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi } pReader->flag = READER_STATUS_SUSPEND; - - if (countOnly) { - pReader->info.readMode = READ_MODE_COUNT_ONLY; - } +// pReader->info.execMode = pCond->trimData ? READER_EXEC_ROWS : READER_EXEC_DATA; + pReader->info.execMode = READER_EXEC_ROWS; pReader->pIgnoreTables = pIgnoreTables; - tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64 " in this query %s", pReader, numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->info.verRange.minVer, @@ -4332,32 +4180,6 @@ _err: return code; } -static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; - - if (pReader->status.loadFromFile == false) { - return false; - } - - code = readRowsCountFromFiles(pReader); - if (code != TSDB_CODE_SUCCESS) { - return false; - } - - code = readRowsCountFromMem(pReader); - if (code != TSDB_CODE_SUCCESS) { - return false; - } - - pBlock->info.rows = pReader->rowsNum; - pBlock->info.id.uid = 0; - pBlock->info.dataLoad = 0; - pReader->rowsNum = 0; - - return pBlock->info.rows > 0; -} - static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { int32_t code = TSDB_CODE_SUCCESS; @@ -4372,10 +4194,6 @@ static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { return code; } - if (READ_MODE_COUNT_ONLY == pReader->info.readMode) { - return tsdbReadRowsCountOnly(pReader); - } - if (pStatus->loadFromFile) { code = buildBlockFromFiles(pReader); if (code != TSDB_CODE_SUCCESS) { @@ -4685,7 +4503,7 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) { } SReaderStatus* pStatus = &pTReader->status; - if (pStatus->composedDataBlock) { + if (pStatus->composedDataBlock || pReader->info.execMode == READER_EXEC_ROWS) { return pTReader->resBlockInfo.pResBlock; } @@ -4722,9 +4540,9 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { pReader->info.order = pCond->order; pReader->type = TIMEWINDOW_RANGE_CONTAINED; + pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pStatus->loadFromFile = true; pStatus->pTableIter = NULL; - pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); // allocate buffer in order to load data blocks from file memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg)); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index a058c0173d..2db49b8815 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -22,15 +22,7 @@ #include "tsdbUtil2.h" #include "tsimplehash.h" -int32_t uidComparFunc(const void* p1, const void* p2) { - uint64_t pu1 = *(uint64_t*)p1; - uint64_t pu2 = *(uint64_t*)p2; - if (pu1 == pu2) { - return 0; - } else { - return (pu1 < pu2) ? -1 : 1; - } -} +static bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order); static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { int32_t num = numOfTables / pBuf->numPerBucket; @@ -61,6 +53,16 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { return TSDB_CODE_SUCCESS; } +int32_t uidComparFunc(const void* p1, const void* p2) { + uint64_t pu1 = *(uint64_t*)p1; + uint64_t pu2 = *(uint64_t*)p2; + if (pu1 == pu2) { + return 0; + } else { + return (pu1 < pu2) ? -1 : 1; + } +} + int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { if (numOfTables <= pBuf->numOfTables) { return TSDB_CODE_SUCCESS; @@ -243,6 +245,10 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) { taosArrayClear(pScanInfo->pBlockList); taosArrayClear(pScanInfo->pBlockIdxList); taosArrayClear(pScanInfo->pFileDelData); // del data from each file set + pScanInfo->cleanSttBlocks = false; + pScanInfo->numOfRowsInStt = 0; + pScanInfo->sttWindow.skey = INT64_MAX; + pScanInfo->sttWindow.ekey = INT64_MIN; pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; } @@ -488,7 +494,7 @@ typedef enum { BLK_CHECK_QUIT = 0x2, } ETombBlkCheckEnum; -static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo, +static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock, const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j); static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j, ETombBlkCheckEnum* pRet) { @@ -662,17 +668,17 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT } } -int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, - const uint64_t* pUidList, int32_t numOfTables) { +int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, + TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList, + int32_t numOfTables) { int32_t num = 0; - const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray; if (TARRAY2_SIZE(pStatisBlkArray) <= 0) { return 0; } int32_t i = 0; - while((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].minTbid.suid < suid)) { + while((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].maxTbid.suid < suid)) { ++i; } @@ -681,64 +687,65 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo } SStatisBlk *p = &pStatisBlkArray->data[i]; - if (pBlockLoadInfo->statisBlock == NULL) { - pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock)); - tStatisBlockInit(pBlockLoadInfo->statisBlock); - } + STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock)); + tStatisBlockInit(pStatisBlock); int64_t st = taosGetTimestampMs(); - tsdbSttFileReadStatisBlock(pSttFileReader, p, pBlockLoadInfo->statisBlock); - pBlockLoadInfo->statisBlockIndex = i; + tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock); double el = (taosGetTimestampMs() - st) / 1000.0; pBlockLoadInfo->cost.loadStatisBlocks += 1; pBlockLoadInfo->cost.statisElapsedTime += el; - STbStatisBlock *pBlock = pBlockLoadInfo->statisBlock; - int32_t index = 0; - while (index < TARRAY2_SIZE(pBlock->suid) && pBlock->suid->data[index] < suid) { + while (index < TARRAY2_SIZE(pStatisBlock->suid) && pStatisBlock->suid->data[index] < suid) { ++index; } - if (index >= TARRAY2_SIZE(pBlock->suid)) { + if (index >= TARRAY2_SIZE(pStatisBlock->suid)) { + tStatisBlockDestroy(pStatisBlock); + taosMemoryFreeClear(pStatisBlock); return num; } int32_t j = index; int32_t uidIndex = 0; - while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex <= numOfTables) { + while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) { p = &pStatisBlkArray->data[i]; if (p->minTbid.suid > suid) { + tStatisBlockDestroy(pStatisBlock); + taosMemoryFreeClear(pStatisBlock); return num; } uint64_t uid = pUidList[uidIndex]; - if (pBlock->uid->data[j] == uid) { - num += pBlock->count->data[j]; + if (pStatisBlock->uid->data[j] == uid) { + num += pStatisBlock->count->data[j]; uidIndex += 1; j += 1; - loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j); - } else if (pBlock->uid->data[j] < uid) { + loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->suid->size, &i, &j); + } else if (pStatisBlock->uid->data[j] < uid) { j += 1; - loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j); + loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->suid->size, &i, &j); } else { uidIndex += 1; } } + tStatisBlockDestroy(pStatisBlock); + taosMemoryFreeClear(pStatisBlock); return num; } // load next stt statistics block -static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo, +static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock, const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) { if ((*j) >= numOfRows) { (*i) += 1; (*j) = 0; if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) { - tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pBlockLoadInfo->statisBlock); + tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock); } } } @@ -762,7 +769,7 @@ void doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) { } } -int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) { +int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) { int32_t numOfLevels = pFileSet->lvlArr->size; // add the list/iter placeholder @@ -791,7 +798,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra } // add the list/iter placeholder - adjustLDataIters(pSttFileBlockIterArray, pFileSet); + adjustSttDataIters(pSttFileBlockIterArray, pFileSet); for (int32_t j = 0; j < numOfLevels; ++j) { SSttLvl* pSttLevel = pFileSet->lvlArr->data[j]; @@ -819,7 +826,8 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra } // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file - int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pIter->pBlockLoadInfo->pSttStatisBlkArray); + TStatisBlkArray *pStatisBlkArray = NULL; + int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray); if (code != TSDB_CODE_SUCCESS) { tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr); continue; @@ -829,9 +837,199 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra STsdbReader* pReader = pConf->pReader; int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); uint64_t* pUidList = pReader->status.uidList.tableUidList; - numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pConf->suid, pUidList, numOfTables); + numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList, + numOfTables); } } return numOfRows; +} + +// overlap with deletion skyline +static bool overlapWithTimeWindow(STimeWindow* p1, STimeWindow* pQueryWindow, STableBlockScanInfo* pBlockScanInfo, + int32_t order) { + // overlap with query window + if (!(p1->skey >= pQueryWindow->skey && p1->ekey <= pQueryWindow->ekey)) { + return true; + } + + // overlap with mem data + SIterInfo* pMemIter = &pBlockScanInfo->iter; + SIterInfo* pIMemIter = &pBlockScanInfo->iiter; + + if ((pMemIter->hasVal) && p1->ekey >= pMemIter->iter->pTbData->minKey && p1->skey <= pMemIter->iter->pTbData->maxKey) { + return true; + } + + // overlap with imem data + if ((pIMemIter->hasVal) && p1->ekey >= pIMemIter->iter->pTbData->minKey && p1->skey <= pIMemIter->iter->pTbData->maxKey) { + return true; + } + + // overlap with deletion skyline + SBrinRecord record = {.firstKey = p1->skey, .lastKey = p1->ekey}; + if (overlapWithDelSkylineWithoutVer(pBlockScanInfo, &record, order)) { + return true; + } + + return false; +} + +static int32_t sortUidComparFn(const void* p1, const void* p2) { + const STimeWindow* px1 = p1; + const STimeWindow* px2 = p2; + if (px1->skey == px2->skey) { + return 0; + } else { + return px1->skey < px2->skey? -1:1; + } +} + +bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo *pScanInfo, int32_t order) { + // check if it overlap with del skyline + taosArraySort(pTimewindowList, sortUidComparFn); + + int32_t num = taosArrayGetSize(pTimewindowList); + if (num == 0) { + return false; + } + + STimeWindow* p = taosArrayGet(pTimewindowList, 0); + if (overlapWithTimeWindow(p, pQueryWindow, pScanInfo, order)) { + return false; + } + + for (int32_t i = 0; i < num - 1; ++i) { + STimeWindow* p1 = taosArrayGet(pTimewindowList, i); + STimeWindow* p2 = taosArrayGet(pTimewindowList, i + 1); + + if (p1->ekey >= p2->skey) { + return false; + } + + bool overlap = overlapWithTimeWindow(p2, pQueryWindow, pScanInfo, order); + if (overlap) { + return false; + } + } + + return true; +} + +static bool doCheckDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, + int32_t startIndex) { + size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline); + + for (int32_t i = startIndex; i < num; i += 1) { + TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i); + if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) { + if (p->version >= pRecord->minVer) { + return true; + } + } else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts + if (p->version >= pRecord->minVer) { + if (i < num - 1) { + TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1); + if (pnext->ts >= pRecord->firstKey) { + return true; + } + } else { // it must be the last point + ASSERT(p->version == 0); + } + } + } else { // (p->ts > pBlock->maxKey.ts) { + return false; + } + } + + return false; +} + +static bool doCheckDatablockOverlapWithoutVersion(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, + int32_t startIndex) { + size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline); + + for (int32_t i = startIndex; i < num; i += 1) { + TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i); + if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) { + return true; + } else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts + if (i < num - 1) { + TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1); + if (pnext->ts >= pRecord->firstKey) { + return true; + } + } + } else { // (p->ts > pBlock->maxKey.ts) { + return false; + } + } + + return false; +} + +bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) { + if (pBlockScanInfo->delSkyline == NULL || (taosArrayGetSize(pBlockScanInfo->delSkyline) == 0)) { + return false; + } + + // ts is not overlap + TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0); + TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline); + if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) { + return false; + } + + // version is not overlap + if (ASCENDING_TRAVERSE(order)) { + return doCheckDatablockOverlap(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex); + } else { + int32_t index = pBlockScanInfo->fileDelIndex; + while (1) { + TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index); + if (p->ts > pRecord->firstKey && index > 0) { + index -= 1; + } else { // find the first point that is smaller than the minKey.ts of dataBlock. + if (p->ts == pRecord->firstKey && p->version < pRecord->maxVer && index > 0) { + index -= 1; + } + break; + } + } + + return doCheckDatablockOverlap(pBlockScanInfo, pRecord, index); + } +} + +bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) { + if (pBlockScanInfo->delSkyline == NULL || (taosArrayGetSize(pBlockScanInfo->delSkyline) == 0)) { + return false; + } + + // ts is not overlap + TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0); + TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline); + if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) { + return false; + } + + // version is not overlap + if (ASCENDING_TRAVERSE(order)) { + return doCheckDatablockOverlapWithoutVersion(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex); + } else { + int32_t index = pBlockScanInfo->fileDelIndex; + while (1) { + TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index); + if (p->ts > pRecord->firstKey && index > 0) { + index -= 1; + } else { // find the first point that is smaller than the minKey.ts of dataBlock. + if (p->ts == pRecord->firstKey && index > 0) { + index -= 1; + } + break; + } + } + + return doCheckDatablockOverlapWithoutVersion(pBlockScanInfo, pRecord, index); + } } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index e9c7449082..401eadee0f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -39,8 +39,7 @@ typedef enum { typedef struct STsdbReaderInfo { uint64_t suid; STSchema* pSchema; - EReadMode readMode; - uint64_t rowsNum; + EExecMode execMode; STimeWindow window; SVersionRange verRange; int16_t order; @@ -74,6 +73,11 @@ typedef struct SSttKeyInfo { int64_t nextProcKey; } SSttKeyInfo; +// clean stt file blocks: +// 1. not overlap with stt blocks in other stt files of the same fileset +// 2. not overlap with delete skyline +// 3. not overlap with in-memory data (mem/imem) +// 4. not overlap with data file blocks typedef struct STableBlockScanInfo { uint64_t uid; TSKEY lastProcKey; @@ -88,6 +92,9 @@ typedef struct STableBlockScanInfo { int32_t fileDelIndex; // file block delete index int32_t sttBlockDelIndex; // delete index for last block bool iterInit; // whether to initialize the in-memory skip list iterator or not + bool cleanSttBlocks; // stt block is clean in current fileset + int64_t numOfRowsInStt; + STimeWindow sttWindow; } STableBlockScanInfo; typedef struct SResultBlockInfo { @@ -145,6 +152,7 @@ typedef struct SBlockLoadSuppInfo { bool smaValid; // the sma on all queried columns are activated } SBlockLoadSuppInfo; +// each blocks in stt file not overlaps with in-memory/data-file/tomb-files, and not overlap with any other blocks in stt-file typedef struct SSttBlockReader { STimeWindow window; SVersionRange verRange; @@ -262,12 +270,17 @@ bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr); void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver); int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); -int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, - const uint64_t* pUidList, int32_t numOfTables); +int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, + TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList, + int32_t numOfTables); + void destroyLDataIter(SLDataIter* pIter); -int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet); +int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet); int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf, const char* pstr); +bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order); +bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order); + typedef struct { SArray* pTombData; } STableLoadInfo; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index a6673917bf..48e82700c3 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -42,7 +42,7 @@ void initStorageAPI(SStorageAPI* pAPI) { void initTsdbReaderAPI(TsdReader* pReader) { pReader->tsdReaderOpen = (int32_t(*)(void*, SQueryTableDataCond*, void*, int32_t, SSDataBlock*, void**, const char*, - bool, SHashObj**))tsdbReaderOpen2; + SHashObj**))tsdbReaderOpen2; pReader->tsdReaderClose = tsdbReaderClose2; pReader->tsdNextDataBlock = tsdbNextDataBlock2; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1f82a9477b..1fa911e646 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1232,7 +1232,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (pScanBaseInfo->dataReader == NULL) { int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen( pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, pScanInfo->pResBlock, - (void**)&pScanBaseInfo->dataReader, id, false, NULL); + (void**)&pScanBaseInfo->dataReader, id, NULL); if (code != TSDB_CODE_SUCCESS) { qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id); terrno = code; @@ -1291,7 +1291,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT int32_t size = tableListGetSize(pTableListInfo); pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, - (void**)&pInfo->dataReader, NULL, false, NULL); + (void**)&pInfo->dataReader, NULL, NULL); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 448c585869..813e086c55 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -889,7 +889,7 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { ASSERT(pInfo->base.dataReader == NULL); int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, - (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables); + (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->pIgnoreTables); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -1179,7 +1179,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU SSDataBlock* pBlock = pTableScanInfo->pResBlock; STsdbReader* pReader = NULL; int32_t code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, - (void**)&pReader, GET_TASKID(pTaskInfo), false, NULL); + (void**)&pReader, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { terrno = code; T_LONG_JMP(pTaskInfo->env, code); @@ -3373,7 +3373,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { param->pOperator = pOperator; STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, - (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); + (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->mSkipTables); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = param; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 6bdbefc5c0..ac4b8e88c7 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2304,7 +2304,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi void* pList = tableListGetInfo(pTableListInfo, 0); code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, - (void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL); + (void**)&pInfo->pHandle, pTaskInfo->id.str, NULL); cleanupQueryTableDataCond(&cond); if (code != 0) { goto _error;