From 8215bc4de3f2af391aff991323c33b17e60253ff Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 27 Apr 2022 08:03:49 +0000 Subject: [PATCH] refact TSDB --- source/dnode/vnode/src/inc/tsdb.h | 20 +------------------- source/dnode/vnode/src/tsdb/tsdbCommit.c | 2 +- source/dnode/vnode/src/tsdb/tsdbOpen.c | 3 +-- source/dnode/vnode/src/tsdb/tsdbRead.c | 18 +++++++++--------- source/dnode/vnode/src/tsdb/tsdbReadImpl.c | 5 +++-- source/dnode/vnode/src/tsdb/tsdbWrite.c | 7 ++++--- 6 files changed, 19 insertions(+), 36 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4fc94da06a..9ac8434949 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -73,7 +73,6 @@ struct STsdb { SVnode *pVnode; bool repoLocked; TdThreadMutex mutex; - STsdbCfg config; STsdbMemTable *mem; STsdbMemTable *imem; SRtn rtn; @@ -185,7 +184,7 @@ struct STsdbFS { }; #define REPO_ID(r) TD_VID((r)->pVnode) -#define REPO_CFG(r) (&(r)->config) +#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg) #define REPO_FS(r) ((r)->fs) #define REPO_META(r) ((r)->pVnode->pMeta) #define REPO_TFS(r) ((r)->pVnode->pTfs) @@ -534,23 +533,6 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { } } -// tsdbDBDef -// typedef struct SDBFile SDBFile; -// typedef DB_ENV* TDBEnv; - -// struct SDBFile { -// int32_t fid; -// DB* pDB; -// char* path; -// }; - -// int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF); -// void tsdbCloseDBF(SDBFile* pDBF); -// int32_t tsdbOpenBDBEnv(DB_ENV** ppEnv, const char* path); -// void tsdbCloseBDBEnv(DB_ENV* pEnv); -// int32_t tsdbSaveSmaToDB(SDBFile* pDBF, void* key, uint32_t keySize, void* data, uint32_t dataSize); -// void* tsdbGetSmaDataByKey(SDBFile* pDBF, void* key, uint32_t keySize, uint32_t* valueSize); - // tsdbFile #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 8bcb848516..c316dad32a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -55,7 +55,7 @@ typedef struct { #define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) #define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh)) -#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRows) +#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) static void tsdbStartCommit(STsdb *pRepo); diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index e5b2518415..c18cfb5e86 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -35,8 +35,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) { pTsdb->pVnode = pVnode; pTsdb->repoLocked = false; tdbMutexInit(&pTsdb->mutex, NULL); - pTsdb->config = pVnode->config.tsdbCfg; - pTsdb->fs = tsdbNewFS(&pTsdb->config); + pTsdb->fs = tsdbNewFS(REPO_CFG(pTsdb)); // create dir (TODO: use tfsMkdir) taosMkDir(pTsdb->path); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 302ee89e51..c537a19f0b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -311,7 +311,7 @@ static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) { // Update the query time window according to the data time to live(TTL) information, in order to avoid to return // the expired data to client, even it is queried already. static int64_t getEarliestValidTimestamp(STsdb* pTsdb) { - STsdbCfg* pCfg = &pTsdb->config; + STsdbCfg* pCfg = REPO_CFG(pTsdb); int64_t now = taosGetTimestamp(pCfg->precision); return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick @@ -404,7 +404,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, SQueryTableDataCond* pC pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true); } - pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRows); + pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->pVnode->config.tsdbCfg.maxRows); if (pReadHandle->pDataCols == NULL) { tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -889,7 +889,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { } static bool hasMoreDataInCache(STsdbReadHandle* pHandle) { - STsdbCfg* pCfg = &pHandle->pTsdb->config; + STsdbCfg* pCfg = REPO_CFG(pHandle->pTsdb); size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); pHandle->cur.fid = INT32_MIN; @@ -1169,7 +1169,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) { SQueryFilePos* cur = &pTsdbReadHandle->cur; - STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; + STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); TSKEY key; int32_t code = TSDB_CODE_SUCCESS; @@ -1754,7 +1754,7 @@ int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* p static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) { SQueryFilePos* cur = &pTsdbReadHandle->cur; SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); - STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; + STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); initTableMemIterator(pTsdbReadHandle, pCheckInfo); @@ -2198,7 +2198,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi int32_t numOfBlocks = 0; int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); - STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; + STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); STimeWindow win = TSWINDOW_INITIALIZER; while (true) { @@ -2304,7 +2304,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* // find the start data block in file pTsdbReadHandle->locateStart = true; - STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; + STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision); tsdbRLockFS(pFileHandle); @@ -2405,7 +2405,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis // find the start data block in file if (!pTsdbReadHandle->locateStart) { pTsdbReadHandle->locateStart = true; - STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; + STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision); tsdbRLockFS(pFileHandle); @@ -2496,7 +2496,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int STsdbReadHandle* pTsdbReadHandle) { int numOfRows = 0; int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns); - STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; + STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); win->skey = TSKEY_INITIAL_VAL; int64_t st = taosGetTimestampUs(); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index c39780372b..6ab731ae48 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -248,7 +248,8 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ASSERT(pBlock->numOfSubBlocks > 0); - int8_t update = pReadh->pRepo->config.update; + STsdbCfg *pCfg = REPO_CFG(pReadh->pRepo); + int8_t update = pCfg->update; SBlock *iBlock = pBlock; if (pBlock->numOfSubBlocks > 1) { @@ -279,7 +280,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds, bool mergeBitmap) { ASSERT(pBlock->numOfSubBlocks > 0); - int8_t update = pReadh->pRepo->config.update; + int8_t update = pReadh->pRepo->pVnode->config.tsdbCfg.update; SBlock *iBlock = pBlock; if (pBlock->numOfSubBlocks > 1) { diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 82fa6b9ae5..9fb6aad472 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -60,9 +60,10 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { SSubmitBlk *pBlock = NULL; SSubmitBlkIter blkIter = {0}; STSRow *row = NULL; - TSKEY now = taosGetTimestamp(pTsdb->config.precision); - TSKEY minKey = now - tsTickPerDay[pTsdb->config.precision] * pTsdb->config.keep2; - TSKEY maxKey = now + tsTickPerDay[pTsdb->config.precision] * pTsdb->config.days; + STsdbCfg *pCfg = REPO_CFG(pTsdb); + TSKEY now = taosGetTimestamp(pCfg->precision); + TSKEY minKey = now - tsTickPerDay[pCfg->precision] * pCfg->keep2; + TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days; terrno = TSDB_CODE_SUCCESS; pMsg->length = htonl(pMsg->length);