diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 0b947bb285..161373703f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -16,6 +16,9 @@ #ifndef _TD_VNODE_TSDB_H_ #define _TD_VNODE_TSDB_H_ +//#include "../tsdb/tsdbFile2.h" +//#include "../tsdb/tsdbMerge.h" +//#include "../tsdb/tsdbSttFileRW.h" #include "tsimplehash.h" #include "vnodeInt.h" @@ -303,6 +306,9 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); + +int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); +void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(void *arg); @@ -697,6 +703,8 @@ typedef struct { typedef struct SSttBlockLoadInfo { SBlockData blockData[2]; + void* pBlockArray; + SArray *aSttBlk; int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. int32_t currentLoadBlockIndex; @@ -769,7 +777,6 @@ struct SDiskDataBuilder { typedef struct SLDataIter { SRBTreeNode node; SSttBlk *pSttBlk; - SDataFReader *pReader; int32_t iStt; int8_t backward; int32_t iSttBlk; @@ -780,13 +787,20 @@ typedef struct SLDataIter { SVersionRange verRange; SSttBlockLoadInfo *pBlockLoadInfo; bool ignoreEarlierTs; + struct SSttFileReader* pReader; } SLDataIter; #define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row)) int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); -void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); + +int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint64_t suid, uint64_t uid, + STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, + bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter, void* pCurrentFileSet); + + + void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index ab3bbf78a2..af0923a8d9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -680,7 +680,7 @@ int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) { int32_t code = 0; STFileSet *fset, *fset1; - fsetArr[0] = taosMemoryMalloc(sizeof(*fsetArr[0])); + fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0])); if (fsetArr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; taosThreadRwlockRdlock(&fs->tsdb->rwLock); diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.h b/source/dnode/vnode/src/tsdb/tsdbFSet2.h index 075acd6394..d7b3c1fc8c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.h @@ -15,8 +15,8 @@ #include "tsdbFile2.h" -#ifndef _TSDB_FILE_SET_H -#define _TSDB_FILE_SET_H +#ifndef _TSDB_FILE_SET2_H +#define _TSDB_FILE_SET2_H #ifdef __cplusplus extern "C" { @@ -82,4 +82,4 @@ struct STFileSet { } #endif -#endif /*_TSDB_FILE_SET_H*/ \ No newline at end of file +#endif /*_TSDB_FILE_SET2_H*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 79f4a17f65..8c79b47ce3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -14,6 +14,8 @@ */ #include "tsdb.h" +#include "tsdbFSet2.h" +#include "tsdbSttFileRW.h" // SLDataIter ================================================= SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, @@ -135,7 +137,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { goto _exit; } - code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock); + code = tsdbSttFileReadBlockData(pIter->pReader, pIter->pSttBlk, pBlock); +// code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock); if (code != TSDB_CODE_SUCCESS) { goto _exit; } @@ -253,18 +256,24 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint } int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, + uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, + const char *idStr, bool strictTimeRange) { + return 0; +} + +int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange) { int32_t code = TSDB_CODE_SUCCESS; pIter->uid = uid; - pIter->pReader = pReader; pIter->iStt = iStt; pIter->backward = backward; pIter->verRange.minVer = pRange->minVer; pIter->verRange.maxVer = pRange->maxVer; pIter->timeWindow.skey = pTimeWindow->skey; pIter->timeWindow.ekey = pTimeWindow->ekey; + pIter->pReader = pReader; pIter->pBlockLoadInfo = pBlockLoadInfo; @@ -272,17 +281,18 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t int64_t st = taosGetTimestampUs(); pBlockLoadInfo->sttBlockLoaded = true; - code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk); + code = tsdbSttFileReadSttBlk(pIter->pReader, (const TSttBlkArray **)&pBlockLoadInfo->pBlockArray); if (code) { return code; } // only apply to the child tables, ordinary tables will not incur this filter procedure. - size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); + TSttBlkArray* pArray = pBlockLoadInfo->pBlockArray; + size_t size = pArray->size; if (size >= 1) { - SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0); - SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1); + SSttBlk *pStart = &pArray->data[0]; + SSttBlk *pEnd = &pArray->data[size - 1]; // all identical if (pStart->suid == pEnd->suid) { @@ -320,12 +330,12 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr); } - size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); + TSttBlkArray* pArray = pBlockLoadInfo->pBlockArray; // find the start block - pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward); + pIter->iSttBlk = binarySearchForStartBlock(pArray->data, pArray->size, uid, backward); if (pIter->iSttBlk != -1) { - pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk); + pIter->pSttBlk = &pArray->data[pIter->iSttBlk]; pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1; if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) || @@ -403,17 +413,15 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) { } static void findNextValidRow(SLDataIter *pIter, const char *idStr) { - int32_t step = pIter->backward ? -1 : 1; - bool hasVal = false; + int32_t step = pIter->backward ? -1 : 1; int32_t i = pIter->iRow; - SBlockData *pBlockData = loadLastBlock(pIter, idStr); + SBlockData *pData = loadLastBlock(pIter, idStr); // mostly we only need to find the start position for a given table - if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) && - pBlockData->aUid != NULL) { - i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward); + if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) { + i = binarySearchForStartRowIndex((uint64_t *)pData->aUid, pData->nRow, pIter->uid, pIter->backward); if (i == -1) { tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr); pIter->iRow = -1; @@ -421,20 +429,20 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) { } } - for (; i < pBlockData->nRow && i >= 0; i += step) { - if (pBlockData->aUid != NULL) { + for (; i < pData->nRow && i >= 0; i += step) { + if (pData->aUid != NULL) { if (!pIter->backward) { - if (pBlockData->aUid[i] > pIter->uid) { + if (pData->aUid[i] > pIter->uid) { break; } } else { - if (pBlockData->aUid[i] < pIter->uid) { + if (pData->aUid[i] < pIter->uid) { break; } } } - int64_t ts = pBlockData->aTSKEY[i]; + int64_t ts = pData->aTSKEY[i]; if (!pIter->backward) { // asc if (ts > pIter->timeWindow.ekey) { // no more data break; @@ -449,7 +457,7 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) { } } - int64_t ver = pBlockData->aVersion[i]; + int64_t ver = pData->aVersion[i]; if (ver < pIter->verRange.minVer) { continue; } @@ -485,7 +493,6 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { while (1) { bool skipBlock = false; - findNextValidRow(pIter, idStr); if (pIter->pBlockLoadInfo->checkRemainingRow) { @@ -612,6 +619,62 @@ _end: return code; } +int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint64_t suid, uint64_t uid, + STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, + bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter, void* pCurrentFileSet) { + int32_t code = TSDB_CODE_SUCCESS; + + pMTree->backward = backward; + pMTree->pIter = NULL; + pMTree->idStr = idStr; + + if (!pMTree->backward) { // asc + tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); + } else { // desc + tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn); + } + + pMTree->pLoadInfo = pBlockLoadInfo; + pMTree->destroyLoadInfo = destroyLoadInfo; + pMTree->ignoreEarlierTs = false; + + // todo handle other level of stt files, here only deal with the first level stt + SSttLvl* pSttLevel = ((STFileSet*)pCurrentFileSet)->lvlArr[0].data[0]; + ASSERT(pSttLevel->level == 0); + + for (int32_t i = 0; i < pSttLevel->fobjArr[0].size; ++i) { // open all last file + memset(&pLDataIter[i], 0, sizeof(SLDataIter)); + SSttFileReader* pReader = NULL; + SSttFileReaderConfig conf = {0}; + conf.tsdb = pTsdb; + conf.szPage = pTsdb->pVnode->config.szPage; + conf.file[0] = *pSttLevel->fobjArr[0].data[i]->f; + + code = tsdbSttFileReaderOpen(pSttLevel->fobjArr[0].data[i]->fname, &conf, &pReader); + + code = tLDataIterOpen2(&pLDataIter[i], pReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, + &pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } + + bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr); + if (hasVal) { + tMergeTreeAddIter(pMTree, &pLDataIter[i]); + } else { + if (!pMTree->ignoreEarlierTs) { + pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs; + } + } + } + + return code; + + _end: + tMergeTreeClose(pMTree); + return code; +} + void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); } bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index c69333e796..1dcf5a4b9b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -15,6 +15,9 @@ #include "osDef.h" #include "tsdb.h" +#include "tsdbDataFileRW.h" +#include "tsdbFS2.h" +#include "tsdbMerge.h" #include "tsimplehash.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -120,7 +123,8 @@ typedef struct SLastBlockReader { typedef struct SFilesetIter { int32_t numOfFiles; // number of total files int32_t index; // current accessed index in the list - SArray* pFileList; // data file list + TFileSetArray* pFilesetArray;// data file set list +// SArray* pFileList; // data file list int32_t order; SLastBlockReader* pLastBlockReader; // last file block reader } SFilesetIter; @@ -155,12 +159,12 @@ typedef struct STableUidList { typedef struct SReaderStatus { bool loadFromFile; // check file stage bool composedDataBlock; // the returned data block is a composed block or not - bool mapDataCleaned; // mapData has been cleaned up alreay or not +// bool mapDataCleaned; // mapData has been cleaned up alreay or not SSHashObj* pTableMap; // SHash STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks. STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT. SFileBlockDumpInfo fBlockDumpInfo; - SDFileSet* pCurrentFileset; // current opened file set + STFileSet* pCurrentFileset;// current opened file set SBlockData fileBlockData; SFilesetIter fileIter; SDataBlockIter blockIter; @@ -213,12 +217,13 @@ struct STsdbReader { SHashObj** pIgnoreTables; STSchema* pSchema; // the newest version schema SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema - SDataFReader* pFileReader; // the file reader + SDataFileReader* pFileReader; // the file reader SDelFReader* pDelFReader; // the del file reader SArray* pDelIdx; // del file block index; SBlockInfoBuf blockInfoBuf; EContentData step; STsdbReader* innerReader[2]; + TFileSetArray* pfSetArray; }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); @@ -530,12 +535,12 @@ static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) { } // init file iterator -static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) { - size_t numOfFileset = taosArrayGetSize(aDFileSet); +static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetArray, STsdbReader* pReader) { + size_t numOfFileset = pFileSetArray->size; pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset; pIter->order = pReader->order; - pIter->pFileList = aDFileSet; + pIter->pFilesetArray = pFileSetArray; pIter->numOfFiles = numOfFileset; if (pIter->pLastBlockReader == NULL) { @@ -594,18 +599,41 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo while (1) { if (pReader->pFileReader != NULL) { - tsdbDataFReaderClose(&pReader->pFileReader); + tsdbDataFileReaderClose(&pReader->pFileReader); } - pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index); +// pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFilesetArray, pIter->index); + pReader->status.pCurrentFileset = pIter->pFilesetArray->data[pIter->index]; - code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset); - if (code != TSDB_CODE_SUCCESS) { - goto _err; + STFileObj** pFileObj = pReader->status.pCurrentFileset->farr; + if (pFileObj[0] != NULL) { + SDataFileReaderConfig conf = {.tsdb = pReader->pTsdb, .szPage = pReader->pTsdb->pVnode->config.szPage}; + + conf.files[0].file = *pFileObj[0]->f; + conf.files[0].exist = true; + + conf.files[1].file = *pFileObj[1]->f; + conf.files[1].exist = true; + + conf.files[2].file = *pFileObj[2]->f; + conf.files[2].exist = true; + + const char* filesName[4] = {pFileObj[0]->fname, pFileObj[1]->fname, pFileObj[2]->fname}; + + if (pFileObj[3] != NULL) { + conf.files[3].exist = true; + conf.files[3].file = *pFileObj[3]->f; + filesName[3] = pFileObj[3]->fname; + } + + code = tsdbDataFileReaderOpen(filesName, &conf, &pReader->pFileReader); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + pReader->cost.headFileLoad += 1; } - pReader->cost.headFileLoad += 1; - int32_t fid = pReader->status.pCurrentFileset->fid; tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); @@ -732,7 +760,7 @@ static int32_t tsdbReleaseReader(STsdbReader* pReader) { return code; } -void tsdbReleaseDataBlock(STsdbReader* pReader) { +void tsdbReleaseDataBlock2(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; if (!pStatus->composedDataBlock) { tsdbReleaseReader(pReader); @@ -834,15 +862,20 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void return code; } -static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) { - int64_t st = taosGetTimestampUs(); +static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileReader, SArray* pIndexList) { + int64_t st = taosGetTimestampUs(); + int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); + + const TBrinBlkArray* pBlkArray = NULL; + int32_t code = tsdbDataFileReadBrinBlk(pFileReader, &pBlkArray); + LRUHandle* handle = NULL; +#if 0 int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle); if (code != TSDB_CODE_SUCCESS || handle == NULL) { goto _end; } - int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle); size_t num = taosArrayGetSize(aBlockIdx); @@ -850,44 +883,45 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); return TSDB_CODE_SUCCESS; } +#endif // todo binary search to the start position int64_t et1 = taosGetTimestampUs(); - SBlockIdx* pBlockIdx = NULL; + SBrinBlk* pBrinBlk = NULL; STableUidList* pList = &pReader->status.uidList; int32_t i = 0, j = 0; - while (i < num && j < numOfTables) { - pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i); - if (pBlockIdx->suid != pReader->suid) { + while (i < pBlkArray->size && j < numOfTables) { + pBrinBlk = &pBlkArray->data[i]; + if (pBrinBlk->minTbid.suid != pReader->suid) { i += 1; continue; } - if (pBlockIdx->uid < pList->tableUidList[j]) { + if (pBrinBlk->minTbid.uid < pList->tableUidList[j]) { i += 1; continue; } - if (pBlockIdx->uid > pList->tableUidList[j]) { + if (pBrinBlk->minTbid.uid > pList->tableUidList[j]) { j += 1; continue; } - if (pBlockIdx->uid == pList->tableUidList[j]) { + if (pBrinBlk->minTbid.uid == pList->tableUidList[j]) { // this block belongs to a table that is not queried. - STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr); + STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBrinBlk->minTbid.uid, pReader->idStr); if (pScanInfo == NULL) { - tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); +// tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); return terrno; } if (pScanInfo->pBlockList == NULL) { - pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex)); + pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBrinRecord)); } - taosArrayPush(pIndexList, pBlockIdx); + taosArrayPush(pIndexList, pBrinBlk); i += 1; j += 1; @@ -896,13 +930,13 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, int64_t et2 = taosGetTimestampUs(); tsdbDebug("load block index for %d/%d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s", - numOfTables, (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, + numOfTables, (int32_t)pBlkArray->size, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, pBlkArray->size * sizeof(SBrinBlk) / 1024.0, pReader->idStr); pReader->cost.headFileLoadTime += (et1 - st) / 1000.0; _end: - tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); +// tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); return code; } @@ -913,9 +947,9 @@ static void doCleanupTableScanInfo(STableBlockScanInfo* pScanInfo) { } static void cleanupTableScanInfo(SReaderStatus* pStatus) { - if (pStatus->mapDataCleaned) { - return; - } +// if (pStatus->mapDataCleaned) { +// return; +// } SSHashObj* pTableMap = pStatus->pTableMap; STableBlockScanInfo** px = NULL; @@ -930,7 +964,7 @@ static void cleanupTableScanInfo(SReaderStatus* pStatus) { doCleanupTableScanInfo(*px); } - pStatus->mapDataCleaned = true; +// pStatus->mapDataCleaned = true; } static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, SArray* pTableScanInfoList) { @@ -941,19 +975,23 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN cleanupTableScanInfo(&pReader->status); // set the flag for the new file - pReader->status.mapDataCleaned = false; +// pReader->status.mapDataCleaned = false; for (int32_t i = 0; i < numOfTables; ++i) { - SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i); - STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr); + SBrinBlk* pBlk = taosArrayGet(pIndexList, i); + + STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlk->minTbid.uid, pReader->idStr); if (pScanInfo == NULL) { return terrno; } - tMapDataReset(&pScanInfo->mapData); - tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); - taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem); + SBrinBlock block = {0}; + tsdbDataFileReadBrinBlock(pReader->pFileReader, pBlk, &block); - sizeInDisk += pScanInfo->mapData.nData; +// tMapDataReset(&pScanInfo->mapData); +// tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); +// taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem); + + sizeInDisk += 0;//pScanInfo->mapData.nData; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; STimeWindow w = pReader->window; @@ -967,27 +1005,26 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN continue; } - SDataBlk block = {0}; - for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { - tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block); + for (int32_t j = 0; j < block.numRow->size; ++j) { + SBrinRecord record = {0}; + tBrinBlockGet(&block, j, &record); // 1. time range check - // if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) { - if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) { + if (record.firstKey > w.ekey || record.lastKey < w.skey) { +// if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) { continue; } // 2. version range check - if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) { + if (record.firstKeyVer > pReader->verRange.maxVer || record.lastKeyVer < pReader->verRange.minVer) { continue; } - SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset}; - bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts}; +// SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = record.blockOffset}; +// bIndex.window = (STimeWindow){.skey = record.firstKey, .ekey = record.lastKey}; - void* p1 = taosArrayPush(pScanInfo->pBlockList, &bIndex); + void* p1 = taosArrayPush(pScanInfo->pBlockList, &record); if (p1 == NULL) { - tMapDataClear(&pScanInfo->mapData); return TSDB_CODE_OUT_OF_MEMORY; } @@ -999,7 +1036,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN } } - pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF; + pBlockNum->numOfLastFiles = pReader->status.pCurrentFileset->lvlArr->size; int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks; double el = (taosGetTimestampUs() - st) / 1000.0; @@ -1469,7 +1506,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SDataBlk* pBlock = getCurrentBlock(pBlockIter); - code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); +// code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, code:%s %s", @@ -1540,6 +1577,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v } static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) { +#if 0 SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); if (pBlockInfo != NULL) { STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pBlockIter->pTableMap, pBlockInfo->uid, idStr); @@ -1550,6 +1588,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk); } +#endif #if 0 qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset); @@ -2564,10 +2603,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea TSDBKEY startKey = {0}; if (ASCENDING_TRAVERSE(pReader->order)) { - // startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer}; startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer}; } else { - // startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer}; } @@ -2675,9 +2712,10 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey, pScanInfo->uid, pReader->idStr); - int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), - pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, - pLBlockReader->pInfo, false, pReader->idStr, false, pReader->status.pLDataIter); + int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), + pReader->pTsdb, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, + pLBlockReader->pInfo, false, pReader->idStr, false, pReader->status.pLDataIter, + pReader->status.pCurrentFileset); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -3058,7 +3096,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr pBlockNum->numOfLastFiles = 0; size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); - SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx)); + SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBrinBlk)); while (1) { // only check here, since the iterate data in memory is very fast. @@ -3085,7 +3123,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr return code; } - if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) { + if (taosArrayGetSize(pIndexList) > 0 || pReader->status.pCurrentFileset->lvlArr->size > 0) { code = doLoadFileBlock(pReader, pIndexList, pBlockNum, pTableList); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pIndexList); @@ -3170,7 +3208,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { // reset the index in last block when handing a new file doCleanupTableScanInfo(pScanInfo); - pStatus->mapDataCleaned = true; +// pStatus->mapDataCleaned = true; bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { @@ -3182,7 +3220,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { // reset the index in last block when handing a new file doCleanupTableScanInfo(pScanInfo); - pStatus->mapDataCleaned = true; +// pStatus->mapDataCleaned = true; bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (!hasDataInLastFile) { @@ -3367,7 +3405,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade STableBlockScanInfo* pScanInfo = *p; tMapDataReset(&pScanInfo->mapData); - tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); +// tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); SDataBlk block = {0}; for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { @@ -3385,7 +3423,7 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SSttBlockLoadInfo* pBlockLoadInfo = NULL; - +#if 0 for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file pBlockLoadInfo = &pLastBlockReader->pInfo[i]; @@ -3427,6 +3465,7 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) { } } } +#endif return code; } @@ -3445,7 +3484,7 @@ static int32_t readRowsCountFromFiles(STsdbReader* pReader) { break; } - code = doSumFileBlockRows(pReader, pReader->pFileReader); +// code = doSumFileBlockRows(pReader, pReader->pFileReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3649,7 +3688,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { initBlockDumpInfo(pReader, pBlockIter); } else { // all data blocks in files are checked, let's check the data in last files. - ASSERT(pReader->status.pCurrentFileset->nSttF > 0); +// ASSERT(pReader->status.pCurrentFileset->nSttF > 0); // data blocks in current file are exhausted, let's try the next file now SBlockData* pBlockData = &pReader->status.fileBlockData; @@ -4146,7 +4185,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } -int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey, +static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey, bool* freeTSRow) { TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); @@ -4353,7 +4392,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e } // TODO refactor: with createDataBlockScanInfo -int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) { +int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t num) { int32_t size = tSimpleHashGetSize(pReader->status.pTableMap); STableBlockScanInfo** p = NULL; @@ -4404,27 +4443,27 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n return TDB_CODE_SUCCESS; } -void* tsdbGetIdx(SMeta* pMeta) { +void* tsdbGetIdx2(SMeta* pMeta) { if (pMeta == NULL) { return NULL; } return metaGetIdx(pMeta); } -void* tsdbGetIvtIdx(SMeta* pMeta) { +void* tsdbGetIvtIdx2(SMeta* pMeta) { if (pMeta == NULL) { return NULL; } return metaGetIvtIdx(pMeta); } -uint64_t tsdbGetReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; } +uint64_t tsdbGetReaderMaxVersion2(STsdbReader* pReader) { return pReader->verRange.maxVer; } static int32_t doOpenReaderImpl(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; - initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); + initFilesetIterator(&pStatus->fileIter, pReader->pfSetArray, pReader); resetDataBlockIterator(&pStatus->blockIter, pReader->order); int32_t code = TSDB_CODE_SUCCESS; @@ -4599,7 +4638,7 @@ static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { } } -void tsdbReaderClose(STsdbReader* pReader) { +void tsdbReaderClose2(STsdbReader* pReader) { if (pReader == NULL) { return; } @@ -4643,7 +4682,7 @@ void tsdbReaderClose(STsdbReader* pReader) { } if (pReader->pFileReader != NULL) { - tsdbDataFReaderClose(&pReader->pFileReader); + tsdbDataFileReaderClose(&pReader->pFileReader); } if (pReader->pDelFReader != NULL) { @@ -4699,7 +4738,7 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFreeClear(pReader); } -int32_t tsdbReaderSuspend(STsdbReader* pReader) { +int32_t tsdbReaderSuspend2(STsdbReader* pReader) { int32_t code = 0; // save reader's base state & reset top state to be reconstructed from base state @@ -4717,7 +4756,7 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { pBlockScanInfo = *pStatus->pTableIter; } - tsdbDataFReaderClose(&pReader->pFileReader); + tsdbDataFileReaderClose(&pReader->pFileReader); // resetDataBlockScanInfo excluding lastKey STableBlockScanInfo** p = NULL; @@ -4812,7 +4851,7 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) { return code; } - tsdbReaderSuspend(pReader); + tsdbReaderSuspend2(pReader); tsdbReleaseReader(pReader); @@ -4825,7 +4864,7 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) { } } -int32_t tsdbReaderResume(STsdbReader* pReader) { +int32_t tsdbReaderResume2(STsdbReader* pReader) { int32_t code = 0; STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter; @@ -4835,7 +4874,7 @@ int32_t tsdbReaderResume(STsdbReader* pReader) { int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); if (numOfTables > 0) { qTrace("tsdb/reader: %p, take snapshot", pReader); - code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); + code = tsdbTakeReadSnap2(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -4900,7 +4939,7 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { return pBlock->info.rows > 0; } -static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { +static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { int32_t code = TSDB_CODE_SUCCESS; // cleanup the data that belongs to the previous data block @@ -4937,7 +4976,7 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { return code; } -int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { +int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { int32_t code = TSDB_CODE_SUCCESS; *hasNext = false; @@ -4952,7 +4991,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); if (pReader->flag == READER_STATUS_SUSPEND) { - code = tsdbReaderResume(pReader); + code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { tsdbReleaseReader(pReader); return code; @@ -4960,7 +4999,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { } if (pReader->innerReader[0] != NULL && pReader->step == 0) { - code = doTsdbNextDataBlock(pReader->innerReader[0], hasNext); + code = doTsdbNextDataBlock2(pReader->innerReader[0], hasNext); if (code) { tsdbReleaseReader(pReader); return code; @@ -4991,7 +5030,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { pReader->step = EXTERNAL_ROWS_MAIN; } - code = doTsdbNextDataBlock(pReader, hasNext); + code = doTsdbNextDataBlock2(pReader, hasNext); if (code != TSDB_CODE_SUCCESS) { tsdbReleaseReader(pReader); return code; @@ -5015,7 +5054,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { return code; } - code = doTsdbNextDataBlock(pReader->innerReader[1], hasNext); + code = doTsdbNextDataBlock2(pReader->innerReader[1], hasNext); if (code != TSDB_CODE_SUCCESS) { tsdbReleaseReader(pReader); return code; @@ -5079,7 +5118,7 @@ static bool doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ return hasNullSMA; } -int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool *hasNullSMA) { +int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool *hasNullSMA) { SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg; int32_t code = 0; @@ -5106,7 +5145,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); if (tDataBlkHasSma(pBlock)) { - code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg); +// code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg); if (code != TSDB_CODE_SUCCESS) { tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code), pReader->idStr); @@ -5218,7 +5257,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { return pReader->resBlockInfo.pResBlock; } -SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { +SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) { STsdbReader* pTReader = pReader; if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pReader->step == EXTERNAL_ROWS_PREV) { @@ -5241,14 +5280,14 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { return ret; } -int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { +int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t code = TSDB_CODE_SUCCESS; qTrace("tsdb/reader-reset: %p, take read mutex", pReader); tsdbAcquireReader(pReader); if (pReader->flag == READER_STATUS_SUSPEND) { - code = tsdbReaderResume(pReader); + code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { tsdbReleaseReader(pReader); return code; @@ -5274,11 +5313,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg)); pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; - tsdbDataFReaderClose(&pReader->pFileReader); + tsdbDataFileReaderClose(&pReader->pFileReader); int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); - initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); + ASSERT(0); +// initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); resetDataBlockIterator(pBlockIter, pReader->order); resetTableListIndex(&pReader->status); @@ -5323,7 +5363,7 @@ static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t num return bucketIndex; } -int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) { +int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) { int32_t code = TSDB_CODE_SUCCESS; pTableBlockInfo->totalSize = 0; pTableBlockInfo->totalRows = 0; @@ -5334,7 +5374,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa // find the start data block in file tsdbAcquireReader(pReader); if (pReader->flag == READER_STATUS_SUSPEND) { - code = tsdbReaderResume(pReader); + code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { tsdbReleaseReader(pReader); return code; @@ -5405,14 +5445,14 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa return code; } -int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { +int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; int64_t rows = 0; SReaderStatus* pStatus = &pReader->status; tsdbAcquireReader(pReader); if (pReader->flag == READER_STATUS_SUSPEND) { - code = tsdbReaderResume(pReader); + code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { tsdbReleaseReader(pReader); return code; @@ -5450,7 +5490,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { return rows; } -int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { +int32_t tsdbGetTableSchema2(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { SMetaReader mr = {0}; metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0); int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid); @@ -5486,13 +5526,14 @@ int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_ return code; } -int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) { - int32_t code = 0; - //todo add -#if 0 +int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) { + int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; SVersionRange* pRange = &pReader->verRange; + // lock + taosThreadRwlockRdlock(&pTsdb->rwLock); + // alloc STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(*pSnap)); if (pSnap == NULL) { @@ -5500,9 +5541,6 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsd goto _exit; } - // lock - taosThreadRwlockRdlock(&pTsdb->rwLock); - // take snapshot if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { pSnap->pMem = pTsdb->mem; @@ -5533,7 +5571,7 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsd } // fs - code = tsdbFSRef(pTsdb, &pSnap->fs); + code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pReader->pfSetArray); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); goto _exit; @@ -5555,12 +5593,11 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsd } else { *ppSnap = pSnap; } -#endif return code; } -void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proactive) { +void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proactive) { STsdb* pTsdb = pReader->pTsdb; if (pSnap) { @@ -5581,9 +5618,9 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti } // if failed, do nothing -void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { +void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { taosMemoryFreeClear(pReader->idStr); pReader->idStr = taosStrdup(idstr); } -void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } +void tsdbReaderSetCloseFlag2(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index d2db6368a2..67011fbc86 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -42,15 +42,15 @@ void initStorageAPI(SStorageAPI* pAPI) { void initTsdbReaderAPI(TsdReader* pReader) { pReader->tsdReaderOpen = (int32_t(*)(void*, SQueryTableDataCond*, void*, int32_t, SSDataBlock*, void**, const char*, - bool, SHashObj**))tsdbReaderOpen; - pReader->tsdReaderClose = tsdbReaderClose; + bool, SHashObj**))tsdbReaderOpen2; + pReader->tsdReaderClose = tsdbReaderClose2; - pReader->tsdNextDataBlock = tsdbNextDataBlock; + pReader->tsdNextDataBlock = tsdbNextDataBlock2; - pReader->tsdReaderRetrieveDataBlock = tsdbRetrieveDataBlock; - pReader->tsdReaderReleaseDataBlock = tsdbReleaseDataBlock; + pReader->tsdReaderRetrieveDataBlock = tsdbRetrieveDataBlock2; + pReader->tsdReaderReleaseDataBlock = tsdbReleaseDataBlock2; - pReader->tsdReaderRetrieveBlockSMAInfo = tsdbRetrieveDatablockSMA; + pReader->tsdReaderRetrieveBlockSMAInfo = tsdbRetrieveDatablockSMA2; pReader->tsdReaderNotifyClosing = tsdbReaderSetCloseFlag; pReader->tsdReaderResetStatus = tsdbReaderReset;