diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 1c4c24b09d..e23cceb3e9 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -44,6 +44,7 @@ target_sources( "src/tsdb/tsdbReadImpl.c" "src/tsdb/tsdbWrite.c" "src/tsdb/tsdbReaderWriter.c" + "src/tsdb/tsdbUtil.c" "src/tsdb/tsdbSnapshot.c" # tq diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 45d36cebd4..f4e52f2659 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -37,8 +37,6 @@ typedef struct TSDBKEY TSDBKEY; typedef struct TABLEID TABLEID; typedef struct SDelOp SDelOp; -static int tsdbKeyCmprFn(const void *p1, const void *p2); - // tsdbMemTable ============================================================================================== typedef struct STbData STbData; typedef struct SMemTable SMemTable; @@ -59,70 +57,88 @@ bool tsdbTbDataIterNext(STbDataIter *pIter); bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow); // tsdbFile.c ============================================================================================== -typedef int32_t TSDB_FILE_T; -typedef struct SDFInfo SDFInfo; -typedef struct SDFile SDFile; -typedef struct SDFileSet SDFileSet; - -// SDFile -void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype); -void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile); -int tsdbOpenDFile(SDFile *pDFile, int flags); -void tsdbCloseDFile(SDFile *pDFile); -int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence); -int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte); -void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm); -int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset); -int tsdbRemoveDFile(SDFile *pDFile); -int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte); -int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest); -int tsdbEncodeSDFile(void **buf, SDFile *pDFile); -void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile); -int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType); -int tsdbUpdateDFileHeader(SDFile *pDFile); -int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo); -int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version); - -// SDFileSet -void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver); -void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet); -int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet); -void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet); -int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet); -void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet); -int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to); -int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader); -int tsdbUpdateDFileSetHeader(SDFileSet *pSet); -int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet); -void tsdbCloseDFileSet(SDFileSet *pSet); -int tsdbOpenDFileSet(SDFileSet *pSet, int flags); -void tsdbRemoveDFileSet(SDFileSet *pSet); -int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest); -void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey); +typedef struct STsdbTombstoneFile STsdbTombstoneFile; +typedef struct STsdbCacheFile STsdbCacheFile; +typedef struct STsdbIndexFile STsdbIndexFile; +typedef struct STsdbDataFile STsdbDataFile; +typedef struct STsdbLastFile STsdbLastFile; +typedef struct STsdbSmaFile STsdbSmaFile; +typedef struct STsdbSmalFile STsdbSmalFile; +typedef struct SDFileSet SDFileSet; // tsdbFS.c ============================================================================================== -typedef struct STsdbFS STsdbFS; -typedef struct SFSIter SFSIter; -typedef struct STsdbFSMeta STsdbFSMeta; +typedef struct STsdbFS STsdbFS; -STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg); -void *tsdbFreeFS(STsdbFS *pfs); -int tsdbOpenFS(STsdb *pRepo); -void tsdbCloseFS(STsdb *pRepo); -void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd); -int tsdbEndFSTxn(STsdb *pRepo); -int tsdbEndFSTxnWithError(STsdbFS *pfs); -void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta); -// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile); -int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet); +int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS); +int32_t tsdbFSClose(STsdbFS *pFS); +int32_t tsdbFSStart(STsdbFS *pFS); +int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback); -void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction); -void tsdbFSIterSeek(SFSIter *pIter, int fid); -SDFileSet *tsdbFSIterNext(SFSIter *pIter); -int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta); -int tsdbRLockFS(STsdbFS *pFs); -int tsdbWLockFS(STsdbFS *pFs); -int tsdbUnLockFS(STsdbFS *pFs); +// tsdbReaderWritter.c ============================================================================================== + +// tsdbCommit.c ============================================================================================== + +// old +// typedef int32_t TSDB_FILE_T; +// typedef struct SDFInfo SDFInfo; +// typedef struct SDFile SDFile; +// typedef struct SDFileSet SDFileSet; + +// // SDFile +// void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype); +// void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile); +// int tsdbOpenDFile(SDFile *pDFile, int flags); +// void tsdbCloseDFile(SDFile *pDFile); +// int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence); +// int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte); +// void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm); +// int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset); +// int tsdbRemoveDFile(SDFile *pDFile); +// int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte); +// int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest); +// int tsdbEncodeSDFile(void **buf, SDFile *pDFile); +// void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile); +// int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType); +// int tsdbUpdateDFileHeader(SDFile *pDFile); +// int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo); +// int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version); + +// // SDFileSet +// void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver); +// void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet); +// int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet); +// void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet); +// int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet); +// void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet); +// int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to); +// int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader); +// int tsdbUpdateDFileSetHeader(SDFileSet *pSet); +// int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet); +// void tsdbCloseDFileSet(SDFileSet *pSet); +// int tsdbOpenDFileSet(SDFileSet *pSet, int flags); +// void tsdbRemoveDFileSet(SDFileSet *pSet); +// int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest); +// void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey); + +// typedef struct SFSIter SFSIter; +// typedef struct STsdbFSMeta STsdbFSMeta; + +// STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg); +// void *tsdbFreeFS(STsdbFS *pfs); +// int tsdbOpenFS(STsdb *pRepo); +// void tsdbCloseFS(STsdb *pRepo); +// void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd); +// int tsdbEndFSTxn(STsdb *pRepo); +// int tsdbEndFSTxnWithError(STsdbFS *pfs); +// int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet); + +// void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction); +// void tsdbFSIterSeek(SFSIter *pIter, int fid); +// SDFileSet *tsdbFSIterNext(SFSIter *pIter); +// int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta); +// int tsdbRLockFS(STsdbFS *pFs); +// int tsdbWLockFS(STsdbFS *pFs); +// int tsdbUnLockFS(STsdbFS *pFs); // tsdbReadImpl.c ============================================================================================== typedef struct SBlockIdx SBlockIdx; @@ -135,20 +151,21 @@ typedef struct SBlockData SBlockData; typedef struct SReadH SReadH; // SReadH -int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo); -void tsdbDestroyReadH(SReadH *pReadh); -int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); -void tsdbCloseAndUnsetFSet(SReadH *pReadh); -int tsdbLoadBlockIdx(SReadH *pReadh); -int tsdbSetReadTable(SReadH *pReadh, STable *pTable); -int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); -int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); -int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds, - bool mergeBitmap); -int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); -int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); -void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); -void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock); +// int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo); +// void tsdbDestroyReadH(SReadH *pReadh); +// int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); +// void tsdbCloseAndUnsetFSet(SReadH *pReadh); +// int tsdbLoadBlockIdx(SReadH *pReadh); +// int tsdbSetReadTable(SReadH *pReadh, STable *pTable); +// int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); +// int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); +// int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int +// numOfColsIds, +// bool mergeBitmap); +// int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); +// int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); +// void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); +// void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock); typedef struct SDFileSetReader SDFileSetReader; typedef struct SDFileSetWriter SDFileSetWriter; @@ -177,6 +194,10 @@ int32_t tsdbTomstoneFileWriterClose(STombstoneFileWriter *pWriter); int32_t tsdbTomstoneFileReaderOpen(STombstoneFileReader *pReader, STsdb *pTsdb); int32_t tsdbTomstoneFileReaderClose(STombstoneFileReader *pReader); +// tsdbUtil.c ============================================================================================== +int32_t tTABLEIDCmprFn(const void *p1, const void *p2); +int32_t tsdbKeyCmprFn(const void *p1, const void *p2); + // structs typedef struct { int minFid; @@ -209,41 +230,41 @@ struct STable { }; // int tsdbPrepareCommit(STsdb *pTsdb); -typedef enum { - TSDB_FILE_HEAD = 0, // .head - TSDB_FILE_DATA, // .data - TSDB_FILE_LAST, // .last - TSDB_FILE_SMAD, // .smad(Block-wise SMA) - TSDB_FILE_SMAL, // .smal(Block-wise SMA) - TSDB_FILE_MAX, // - TSDB_FILE_META, // meta -} E_TSDB_FILE_T; +// typedef enum { +// TSDB_FILE_HEAD = 0, // .head +// TSDB_FILE_DATA, // .data +// TSDB_FILE_LAST, // .last +// TSDB_FILE_SMAD, // .smad(Block-wise SMA) +// TSDB_FILE_SMAL, // .smal(Block-wise SMA) +// TSDB_FILE_MAX, // +// TSDB_FILE_META, // meta +// } E_TSDB_FILE_T; -struct SDFInfo { - uint32_t magic; - uint32_t fver; - uint32_t len; - uint32_t totalBlocks; - uint32_t totalSubBlocks; - uint32_t offset; - uint64_t size; - uint64_t tombSize; -}; +// struct SDFInfo { +// uint32_t magic; +// uint32_t fver; +// uint32_t len; +// uint32_t totalBlocks; +// uint32_t totalSubBlocks; +// uint32_t offset; +// uint64_t size; +// uint64_t tombSize; +// }; -struct SDFile { - SDFInfo info; - STfsFile f; - TdFilePtr pFile; - uint8_t state; -}; +// struct SDFile { +// SDFInfo info; +// STfsFile f; +// TdFilePtr pFile; +// uint8_t state; +// }; -struct SDFileSet { - int fid; - int8_t state; // -128~127 - uint8_t ver; // 0~255, DFileSet version - uint16_t reserve; - SDFile files[TSDB_FILE_MAX]; -}; +// struct SDFileSet { +// int fid; +// int8_t state; // -128~127 +// uint8_t ver; // 0~255, DFileSet version +// uint16_t reserve; +// SDFile files[TSDB_FILE_MAX]; +// }; struct TSDBKEY { int64_t version; @@ -285,105 +306,109 @@ struct SMemTable { SArray *aTbData; // SArray }; -struct STsdbFSMeta { - uint32_t version; // Commit version from 0 to increase - int64_t totalPoints; // total points - int64_t totalStorage; // Uncompressed total storage -}; +// struct STsdbFSMeta { +// uint32_t version; // Commit version from 0 to increase +// int64_t totalPoints; // total points +// int64_t totalStorage; // Uncompressed total storage +// }; // ================== -typedef struct { - STsdbFSMeta meta; // FS meta - SDFile cacheFile; // cache file - SDFile tombstone; // tomestome file - SArray *df; // data file array - SArray *sf; // sma data file array v2f1900.index_name_1 -} SFSStatus; +// typedef struct { +// STsdbFSMeta meta; // FS meta +// SDFile cacheFile; // cache file +// SDFile tombstone; // tomestome file +// SArray *df; // data file array +// SArray *sf; // sma data file array v2f1900.index_name_1 +// } SFSStatus; -struct STsdbFS { - TdThreadRwlock lock; +// struct STsdbFS { +// TdThreadRwlock lock; - SFSStatus *cstatus; // current status - bool intxn; - SFSStatus *nstatus; // new status -}; +// SFSStatus *cstatus; // current status +// bool intxn; +// SFSStatus *nstatus; // new status +// }; -#define REPO_ID(r) TD_VID((r)->pVnode) -#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg) -#define REPO_KEEP_CFG(r) (&(r)->keepCfg) -#define REPO_FS(r) ((r)->fs) -#define REPO_META(r) ((r)->pVnode->pMeta) -#define REPO_TFS(r) ((r)->pVnode->pTfs) -#define IS_REPO_LOCKED(r) ((r)->repoLocked) +// #define REPO_ID(r) TD_VID((r)->pVnode) +// #define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg) +// #define REPO_KEEP_CFG(r) (&(r)->keepCfg) +// #define REPO_FS(r) ((r)->fs) +// #define REPO_META(r) ((r)->pVnode->pMeta) +// #define REPO_TFS(r) ((r)->pVnode->pTfs) +// #define IS_REPO_LOCKED(r) ((r)->repoLocked) int tsdbLockRepo(STsdb *pTsdb); int tsdbUnlockRepo(STsdb *pTsdb); -static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy, - int32_t version) { - if ((version < 0) || (schemaVersion(pTable->pSchema) == version)) { - return pTable->pSchema; - } +// static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy, +// int32_t version) { +// if ((version < 0) || (schemaVersion(pTable->pSchema) == version)) { +// return pTable->pSchema; +// } - if (!pTable->pCacheSchema || (schemaVersion(pTable->pCacheSchema) != version)) { - taosMemoryFreeClear(pTable->pCacheSchema); - pTable->pCacheSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version); - } - return pTable->pCacheSchema; -} +// if (!pTable->pCacheSchema || (schemaVersion(pTable->pCacheSchema) != version)) { +// taosMemoryFreeClear(pTable->pCacheSchema); +// pTable->pCacheSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version); +// } +// return pTable->pCacheSchema; +// } -// tsdbMemTable.h -struct SMergeInfo { - int rowsInserted; - int rowsUpdated; - int rowsDeleteSucceed; - int rowsDeleteFailed; - int nOperations; - TSKEY keyFirst; - TSKEY keyLast; -}; +// // tsdbMemTable.h +// struct SMergeInfo { +// int rowsInserted; +// int rowsUpdated; +// int rowsDeleteSucceed; +// int rowsDeleteFailed; +// int nOperations; +// TSKEY keyFirst; +// TSKEY keyLast; +// }; -static void *taosTMalloc(size_t size); -static void *taosTCalloc(size_t nmemb, size_t size); -static void *taosTRealloc(void *ptr, size_t size); -static void *taosTZfree(void *ptr); -static size_t taosTSizeof(void *ptr); -static void taosTMemset(void *ptr, int c); +// static void *taosTMalloc(size_t size); +// static void *taosTCalloc(size_t nmemb, size_t size); +// static void *taosTRealloc(void *ptr, size_t size); +// static void *taosTZfree(void *ptr); +// static size_t taosTSizeof(void *ptr); +// static void taosTMemset(void *ptr, int c); struct TSDBROW { int64_t version; STSRow *pTSRow; }; -static FORCE_INLINE STSRow *tsdbNextIterRow(STbDataIter *pIter) { - TSDBROW row; +// static FORCE_INLINE STSRow *tsdbNextIterRow(STbDataIter *pIter) { +// TSDBROW row; - if (pIter == NULL) return NULL; +// if (pIter == NULL) return NULL; - if (tsdbTbDataIterGet(pIter, &row)) { - return row.pTSRow; - } +// if (tsdbTbDataIterGet(pIter, &row)) { +// return row.pTSRow; +// } - return NULL; -} +// return NULL; +// } -static FORCE_INLINE TSKEY tsdbNextIterKey(STbDataIter *pIter) { - STSRow *row = tsdbNextIterRow(pIter); - if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; +// static FORCE_INLINE TSKEY tsdbNextIterKey(STbDataIter *pIter) { +// STSRow *row = tsdbNextIterRow(pIter); +// if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; - return TD_ROW_KEY(row); -} +// return TD_ROW_KEY(row); +// } // tsdbReadImpl struct SBlockIdx { - uint64_t suid; - uint64_t uid; - uint32_t len; - uint32_t offset; - uint32_t hasLast : 2; - uint32_t numOfBlocks : 30; - TSDBKEY maxKey; + int64_t suid; + int64_t uid; + int64_t maxVersion; + int64_t minVersion; + int64_t offset; + int64_t size; + // uint32_t len; + // uint32_t offset; + // uint32_t hasLast : 2; + // uint32_t numOfBlocks : 30; + // TSDBKEY maxKey; }; typedef enum { @@ -394,22 +419,25 @@ typedef enum { #define SBlockVerLatest TSDB_SBLK_VER_0 struct SBlock { - uint8_t last : 1; - uint8_t hasDupKey : 1; // 0: no dup TS key, 1: has dup TS key(since supporting Multi-Version) - uint8_t blkVer : 6; - uint8_t numOfSubBlocks; - col_id_t numOfCols; // not including timestamp column - uint32_t len; // data block length - uint32_t keyLen : 20; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols - uint32_t algorithm : 4; - uint32_t reserve : 8; - col_id_t numOfBSma; - uint16_t numOfRows; - int64_t offset; - uint64_t aggrStat : 1; - uint64_t aggrOffset : 63; - TSDBKEY minKey; - TSDBKEY maxKey; + TSDBKEY minKey; + TSDBKEY maxKey; + int64_t maxVersion; + int64_t minVersion; + uint8_t flags; // last, algorithm + // uint8_t last : 1; + // uint8_t hasDupKey : 1; // 0: no dup TS key, 1: has dup TS key(since supporting Multi-Version) + // uint8_t blkVer : 6; + // uint8_t numOfSubBlocks; + // col_id_t numOfCols; // not including timestamp column + // uint32_t len; // data block length + // uint32_t keyLen : 20; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols + // uint32_t algorithm : 4; + // uint32_t reserve : 8; + // col_id_t numOfBSma; + // uint16_t numOfRows; + // int64_t offset; + // uint64_t aggrStat : 1; + // uint64_t aggrOffset : 63; }; struct SBlockInfo { @@ -446,128 +474,128 @@ struct SBlockData { typedef void SAggrBlkData; // SBlockCol cols[]; -struct SReadH { - STsdb *pRepo; - SDFileSet rSet; // FSET to read - SArray *aBlkIdx; // SBlockIdx array - STable *pTable; // table to read - SBlockIdx *pBlkIdx; // current reading table SBlockIdx - int cidx; - SBlockInfo *pBlkInfo; - SBlockData *pBlkData; // Block info - SAggrBlkData *pAggrBlkData; // Aggregate Block info - SDataCols *pDCols[2]; - void *pBuf; // buffer - void *pCBuf; // compression buffer - void *pExBuf; // extra buffer -}; +// struct SReadH { +// STsdb *pRepo; +// SDFileSet rSet; // FSET to read +// SArray *aBlkIdx; // SBlockIdx array +// STable *pTable; // table to read +// SBlockIdx *pBlkIdx; // current reading table SBlockIdx +// int cidx; +// SBlockInfo *pBlkInfo; +// SBlockData *pBlkData; // Block info +// SAggrBlkData *pAggrBlkData; // Aggregate Block info +// SDataCols *pDCols[2]; +// void *pBuf; // buffer +// void *pCBuf; // compression buffer +// void *pExBuf; // extra buffer +// }; -#define TSDB_READ_REPO(rh) ((rh)->pRepo) -#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh)) -#define TSDB_READ_FSET(rh) (&((rh)->rSet)) -#define TSDB_READ_TABLE(rh) ((rh)->pTable) -#define TSDB_READ_TABLE_UID(rh) ((rh)->pTable->uid) -#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) -#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) -#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) -#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD) -#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL) -#define TSDB_READ_BUF(rh) ((rh)->pBuf) -#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf) -#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf) +// #define TSDB_READ_REPO(rh) ((rh)->pRepo) +// #define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh)) +// #define TSDB_READ_FSET(rh) (&((rh)->rSet)) +// #define TSDB_READ_TABLE(rh) ((rh)->pTable) +// #define TSDB_READ_TABLE_UID(rh) ((rh)->pTable->uid) +// #define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) +// #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) +// #define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) +// #define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD) +// #define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL) +// #define TSDB_READ_BUF(rh) ((rh)->pBuf) +// #define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf) +// #define TSDB_READ_EXBUF(rh) ((rh)->pExBuf) -#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM)) +// #define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM)) -static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) { - switch (blkVer) { - case TSDB_SBLK_VER_0: - default: - return TSDB_BLOCK_STATIS_SIZE(nCols, 0); - } -} +// static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) { +// switch (blkVer) { +// case TSDB_SBLK_VER_0: +// default: +// return TSDB_BLOCK_STATIS_SIZE(nCols, 0); +// } +// } -#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM)) +// #define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM)) -static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) { - switch (blkVer) { - case TSDB_SBLK_VER_0: - default: - return TSDB_BLOCK_AGGR_SIZE(nCols, 0); - } -} +// static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) { +// switch (blkVer) { +// case TSDB_SBLK_VER_0: +// default: +// return TSDB_BLOCK_AGGR_SIZE(nCols, 0); +// } +// } -static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { - void *pBuf = *ppBuf; - size_t tsize = taosTSizeof(pBuf); +// static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { +// void *pBuf = *ppBuf; +// size_t tsize = taosTSizeof(pBuf); - if (tsize < size) { - if (tsize == 0) tsize = 1024; +// if (tsize < size) { +// if (tsize == 0) tsize = 1024; - while (tsize < size) { - tsize *= 2; - } +// while (tsize < size) { +// tsize *= 2; +// } - *ppBuf = taosTRealloc(pBuf, tsize); - if (*ppBuf == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - } +// *ppBuf = taosTRealloc(pBuf, tsize); +// if (*ppBuf == NULL) { +// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; +// return -1; +// } +// } - return 0; -} +// return 0; +// } -// tsdbMemory -static FORCE_INLINE void *taosTMalloc(size_t size) { - if (size <= 0) return NULL; +// // tsdbMemory +// static FORCE_INLINE void *taosTMalloc(size_t size) { +// if (size <= 0) return NULL; - void *ret = taosMemoryMalloc(size + sizeof(size_t)); - if (ret == NULL) return NULL; +// void *ret = taosMemoryMalloc(size + sizeof(size_t)); +// if (ret == NULL) return NULL; - *(size_t *)ret = size; +// *(size_t *)ret = size; - return (void *)((char *)ret + sizeof(size_t)); -} +// return (void *)((char *)ret + sizeof(size_t)); +// } -static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) { - size_t tsize = nmemb * size; - void *ret = taosTMalloc(tsize); - if (ret == NULL) return NULL; +// static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) { +// size_t tsize = nmemb * size; +// void *ret = taosTMalloc(tsize); +// if (ret == NULL) return NULL; - taosTMemset(ret, 0); - return ret; -} +// taosTMemset(ret, 0); +// return ret; +// } -static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; } +// static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; } -static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); } +// static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); } -static FORCE_INLINE void *taosTRealloc(void *ptr, size_t size) { - if (ptr == NULL) return taosTMalloc(size); +// static FORCE_INLINE void *taosTRealloc(void *ptr, size_t size) { +// if (ptr == NULL) return taosTMalloc(size); - if (size <= taosTSizeof(ptr)) return ptr; +// if (size <= taosTSizeof(ptr)) return ptr; - void *tptr = (void *)((char *)ptr - sizeof(size_t)); - size_t tsize = size + sizeof(size_t); - void *tptr1 = taosMemoryRealloc(tptr, tsize); - if (tptr1 == NULL) return NULL; - tptr = tptr1; +// void *tptr = (void *)((char *)ptr - sizeof(size_t)); +// size_t tsize = size + sizeof(size_t); +// void *tptr1 = taosMemoryRealloc(tptr, tsize); +// if (tptr1 == NULL) return NULL; +// tptr = tptr1; - *(size_t *)tptr = size; +// *(size_t *)tptr = size; - return (void *)((char *)tptr + sizeof(size_t)); -} +// return (void *)((char *)tptr + sizeof(size_t)); +// } -static FORCE_INLINE void *taosTZfree(void *ptr) { - if (ptr) { - taosMemoryFree((void *)((char *)ptr - sizeof(size_t))); - } - return NULL; -} +// static FORCE_INLINE void *taosTZfree(void *ptr) { +// if (ptr) { +// taosMemoryFree((void *)((char *)ptr - sizeof(size_t))); +// } +// return NULL; +// } // tsdbCommit -void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn); +// void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn); static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t minutes, int8_t precision) { if (key < 0) { @@ -590,107 +618,106 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { } // tsdbFile -#define TSDB_FILE_HEAD_SIZE 512 -#define TSDB_FILE_DELIMITER 0xF00AFA0F -#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF -#define TSDB_IVLD_FID INT_MIN -#define TSDB_FILE_STATE_OK 0 -#define TSDB_FILE_STATE_BAD 1 +// #define TSDB_FILE_HEAD_SIZE 512 +// #define TSDB_FILE_DELIMITER 0xF00AFA0F +// #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF +// #define TSDB_IVLD_FID INT_MIN +// #define TSDB_FILE_STATE_OK 0 +// #define TSDB_FILE_STATE_BAD 1 -#define TSDB_FILE_F(tf) (&((tf)->f)) -#define TSDB_FILE_PFILE(tf) ((tf)->pFile) -#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname) -#define TSDB_FILE_OPENED(tf) (TSDB_FILE_PFILE(tf) != NULL) -#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf)) -#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_PFILE(f) = NULL) -#define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level) -#define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id) -#define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did) -#define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname) -#define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname) -#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_PFILE(tf)) -#define TSDB_FILE_STATE(tf) ((tf)->state) -#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) -#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) -#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) +// #define TSDB_FILE_F(tf) (&((tf)->f)) +// #define TSDB_FILE_PFILE(tf) ((tf)->pFile) +// #define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname) +// #define TSDB_FILE_OPENED(tf) (TSDB_FILE_PFILE(tf) != NULL) +// #define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf)) +// #define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_PFILE(f) = NULL) +// #define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level) +// #define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id) +// #define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did) +// #define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname) +// #define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname) +// #define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_PFILE(tf)) +// #define TSDB_FILE_STATE(tf) ((tf)->state) +// #define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) +// #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) +// #define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) -typedef enum { - TSDB_FS_VER_0 = 0, - TSDB_FS_VER_MAX, -} ETsdbFsVer; +// typedef enum { +// TSDB_FS_VER_0 = 0, +// TSDB_FS_VER_MAX, +// } ETsdbFsVer; -#define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile -#define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file +// #define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile +// #define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file -static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile - switch (fType) { - case TSDB_FILE_HEAD: // .head - case TSDB_FILE_DATA: // .data - case TSDB_FILE_LAST: // .last - case TSDB_FILE_SMAD: // .smad(Block-wise SMA) - case TSDB_FILE_SMAL: // .smal(Block-wise SMA) - default: - return TSDB_LATEST_FVER; - } -} +// static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile +// switch (fType) { +// case TSDB_FILE_HEAD: // .head +// case TSDB_FILE_DATA: // .data +// case TSDB_FILE_LAST: // .last +// case TSDB_FILE_SMAD: // .smad(Block-wise SMA) +// case TSDB_FILE_SMAL: // .smal(Block-wise SMA) +// default: +// return TSDB_LATEST_FVER; +// } +// } // =============== SDFileSet -#define TSDB_LATEST_FSET_VER 0 -#define TSDB_FSET_FID(s) ((s)->fid) -#define TSDB_FSET_STATE(s) ((s)->state) -#define TSDB_FSET_VER(s) ((s)->ver) -#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) -#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) -#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0)) -#define TSDB_FSET_SET_CLOSED(s) \ - do { \ - for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ - TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \ - } \ - } while (0); -#define TSDB_FSET_FSYNC(s) \ - do { \ - for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ - TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \ - } \ - } while (0); +// #define TSDB_LATEST_FSET_VER 0 +// #define TSDB_FSET_FID(s) ((s)->fid) +// #define TSDB_FSET_STATE(s) ((s)->state) +// #define TSDB_FSET_VER(s) ((s)->ver) +// #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) +// #define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) +// #define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0)) +// #define TSDB_FSET_SET_CLOSED(s) \ +// do { \ +// for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ +// TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \ +// } \ +// } while (0); +// #define TSDB_FSET_FSYNC(s) \ +// do { \ +// for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ +// TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \ +// } \ +// } while (0); -static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet *pSet) { - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) { - return false; - } - } +// static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet *pSet) { +// for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { +// if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) { +// return false; +// } +// } - return true; -} +// return true; +// } -// tsdbFS // ================== TSDB global config extern bool tsdbForceKeepFile; // ================== CURRENT file header info -typedef struct { - uint32_t version; // Current file system version (relating to code) - uint32_t len; // Encode content length (including checksum) -} SFSHeader; +// typedef struct { +// uint32_t version; // Current file system version (relating to code) +// uint32_t len; // Encode content length (including checksum) +// } SFSHeader; -// ================== TSDB File System Meta -#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus) -#define FS_NEW_STATUS(pfs) ((pfs)->nstatus) -#define FS_IN_TXN(pfs) (pfs)->intxn -#define FS_VERSION(pfs) ((pfs)->cstatus->meta.version) -#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version) +// // ================== TSDB File System Meta +// #define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus) +// #define FS_NEW_STATUS(pfs) ((pfs)->nstatus) +// #define FS_IN_TXN(pfs) (pfs)->intxn +// #define FS_VERSION(pfs) ((pfs)->cstatus->meta.version) +// #define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version) -struct SFSIter { - int direction; - uint64_t version; // current FS version - STsdbFS *pfs; - int index; // used to position next fset when version the same - int fid; // used to seek when version is changed - SDFileSet *pSet; -}; +// struct SFSIter { +// int direction; +// uint64_t version; // current FS version +// STsdbFS *pfs; +// int index; // used to position next fset when version the same +// int fid; // used to seek when version is changed +// SDFileSet *pSet; +// }; #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC @@ -715,25 +742,6 @@ typedef struct { TSKEY eKey; } SDelInfo; -static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) { - TSDBKEY *pKey1 = (TSDBKEY *)p1; - TSDBKEY *pKey2 = (TSDBKEY *)p2; - - if (pKey1->ts < pKey2->ts) { - return -1; - } else if (pKey1->ts > pKey2->ts) { - return 1; - } - - if (pKey1->version < pKey2->version) { - return -1; - } else if (pKey1->version > pKey2->version) { - return 1; - } - - return 0; -} - struct STbDataIter { STbData *pTbData; int8_t backward; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 92b7d13c80..f90cf6ea7d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -15,58 +15,51 @@ #include "tsdb.h" -typedef struct SCommitIter SCommitIter; -typedef struct SCommitter SCommitter; +typedef struct SCommitter SCommitter; struct SCommitter { - SRtn rtn; // retention snapshot - SFSIter fsIter; // tsdb file iterator - int niters; // memory iterators - SCommitIter *iters; - bool isRFileSet; // read and commit FSET - int32_t fid; - SDFileSet *pSet; - SReadH readh; - SDFileSet wSet; - bool isDFileSame; - bool isLFileSame; - TSKEY minKey; - TSKEY maxKey; - SArray *aBlkIdx; // SBlockIdx array - STable *pTable; - SArray *aSupBlk; // Table super-block array - SArray *aSubBlk; // table sub-block array - SDataCols *pDataCols; + STsdb *pTsdb; + int32_t minutes; + int8_t precision; + TSKEY nextCommitKey; + // commit file data + int32_t commitFid; + TSKEY minKey; + TSKEY maxKey; + SDFileSetReader *pReader; + SDFileSetWriter *pWriter; + SArray *aOBlockIdx; + SArray *aBlockIdx; + // commit table data + STbData *pTbData; + SBlockIdx *pBlockIdx; }; -static int32_t tsdbCommitData(SCommitter *pCommith); -static int32_t tsdbCommitDel(SCommitter *pCommith); -static int32_t tsdbCommitCache(SCommitter *pCommith); -static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCHandle); -static int32_t tsdbEndCommit(SCommitter *pCHandle, int eno); +static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter); +static int32_t tsdbCommitData(SCommitter *pCommitter); +static int32_t tsdbCommitDel(SCommitter *pCommitter); +static int32_t tsdbCommitCache(SCommitter *pCommitter); +static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno); int32_t tsdbBegin(STsdb *pTsdb) { - if (!pTsdb) return 0; + int32_t code = 0; - SMemTable *pMem; - - if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) { - return -1; + code = tsdbMemTableCreate(pTsdb, &pTsdb->mem); + if (code) { + goto _err; } - return 0; + return code; + +_err: + return code; } int32_t tsdbCommit(STsdb *pTsdb) { int32_t code = 0; SCommitter commith = {0}; - SDFileSet *pSet = NULL; int fid; - ASSERT(pTsdb->imem == NULL && pTsdb->mem); - pTsdb->imem = pTsdb->mem; - pTsdb->mem = NULL; - // start commit code = tsdbStartCommit(pTsdb, &commith); if (code) { @@ -102,11 +95,348 @@ _err: return code; } -#ifdef USE_NEW // ===================================== NEW ====================================== +static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { + int32_t code = 0; -#else -// ===================================== OLD ====================================== + ASSERT(pTsdb->mem && pTsdb->imem == NULL); + // lock(); + pTsdb->imem = pTsdb->mem; + pTsdb->mem = NULL; + // unlock(); + + pCommitter->pTsdb = pTsdb; + + return code; +} + +static int32_t tsdbCommitDataStart(SCommitter *pCommitter); +static int32_t tsdbCommitDataImpl(SCommitter *pCommitter); +static int32_t tsdbCommitDataEnd(SCommitter *pCommitter); + +static int32_t tsdbCommitData(SCommitter *pCommitter) { + int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; + SMemTable *pMemTable = pTsdb->imem; + + // no data, just return + if (pMemTable->nRow == 0) { + tsdbDebug("vgId:%d no data to commit", TD_VID(pTsdb->pVnode)); + goto _exit; + } + + // start + code = tsdbCommitDataStart(pCommitter); + if (code) { + goto _err; + } + + // commit + code = tsdbCommitDataImpl(pCommitter); + if (code) { + goto _err; + } + + // end + code = tsdbCommitDataEnd(pCommitter); + if (code) { + goto _err; + } + +_exit: + tsdbDebug("vgId:%d commit TSDB data, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow); + return code; + +_err: + tsdbError("vgId:%d failed to commit TSDB data since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitDel(SCommitter *pCommitter) { + int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; + SMemTable *pMemTable = pTsdb->imem; + + if (pMemTable->nDelOp == 0) { + goto _exit; + } + + // start + code = tsdbCommitDelStart(pCommitter); + if (code) { + goto _err; + } + + // impl + code = tsdbCommitDelImpl(pCommitter); + if (code) { + goto _err; + } + + // end + code = tsdbCommitDelEnd(pCommitter); + if (code) { + goto _err; + } + +_exit: + return code; + +_err: + return code; +} + +static int32_t tsdbCommitCache(SCommitter *pCommitter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { + int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; + SMemTable *pMemTable = pTsdb->imem; + + pCommitter->nextCommitKey = pMemTable->minKey.ts; + + return code; +} + +static int32_t tsdbCommitFileData(SCommitter *pCommitter); + +static int32_t tsdbCommitDataImpl(SCommitter *pCommitter) { + int32_t code = 0; + + for (;;) { + if (pCommitter->nextCommitKey == TSKEY_MAX) break; + + pCommitter->commitFid = TSDB_KEY_FID(pCommitter->nextCommitKey, pCommitter->minutes, pCommitter->precision); + code = tsdbCommitFileData(pCommitter); + if (code) { + goto _err; + } + } + +_exit: + return code; + +_err: + return code; +} + +static int32_t tsdbCommitDataEnd(SCommitter *pCommitter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter); +static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter); +static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter); + +static int32_t tsdbCommitFileData(SCommitter *pCommitter) { + int32_t code = 0; + + // commit file data start + code = tsdbCommitFileDataStart(pCommitter); + if (code) { + goto _err; + } + + // commit file data impl + code = tsdbCommitFileDataImpl(pCommitter); + if (code) { + goto _err; + } + + // commit file data end + code = tsdbCommitFileDataEnd(pCommitter); + if (code) { + goto _err; + } + + return code; + +_err: + return code; +} + +static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { + int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; + SDFileSet *pRSet = NULL; + SDFileSet *pWSet = NULL; + + taosArrayClear(pCommitter->aOBlockIdx); + taosArrayClear(pCommitter->aBlockIdx); + + // search pRSet (todo) + + // open file to read + if (pRSet) { + code = tsdbDFileSetReaderOpen(pCommitter->pReader, pTsdb, pRSet); + if (code) goto _err; + + // create/open the pWSet (todo) + } else { + // create the pWSet (todo) + } + + // open file for write + code = tsdbDFileSetWriterOpen(pCommitter->pWriter, pTsdb, pWSet); + if (code) goto _err; + + // read the SBlockIdx part for merge purpose + code = tsdbLoadSBlockIdx(pCommitter->pReader, pCommitter->aOBlockIdx); + if (code) goto _err; + +_exit: + return code; + +_err: + return code; +} + +static int32_t tsdbCommitTableData(SCommitter *pCommitter); + +static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { + int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; + SMemTable *pMemTable = pTsdb->imem; + int32_t iBlockIdx = 0; + int32_t nBlockIdx = taosArrayGetSize(pCommitter->aOBlockIdx); + int32_t iTbData = 0; + int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); + SBlockIdx *pBlockIdx; + STbData *pTbData; + + while (iTbData < nTbData || iBlockIdx < nBlockIdx) { + pTbData = NULL; + pBlockIdx = NULL; + + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } + if (iBlockIdx < nBlockIdx) { + pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->aOBlockIdx, iBlockIdx); + } + + if (pTbData && pBlockIdx) { + int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx); + if (c == 0) { + iTbData++; + iBlockIdx++; + } else if (c < 0) { + iTbData++; + pBlockIdx = NULL; + } else { + iBlockIdx++; + pTbData = NULL; + } + } else { + if (pTbData) { + iTbData++; + } else { + iBlockIdx++; + } + } + + pCommitter->pTbData = pTbData; + pCommitter->pBlockIdx = pBlockIdx; + code = tsdbCommitTableData(pCommitter); + if (code) { + goto _err; + } + } + + return code; + +_err: + return code; +} + +static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter); +static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter); +static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter); + +static int32_t tsdbCommitTableData(SCommitter *pCommitter) { + int32_t code = 0; + + // start + code = tsdbCommitTableDataStart(pCommitter); + if (code) { + goto _err; + } + + // impl + code = tsdbCommitTableDataImpl(pCommitter); + if (code) { + goto _err; + } + + // end + code = tsdbCommitTableDataEnd(pCommitter); + if (code) { + goto _err; + } + +_exit: + return code; + +_err: + return code; +} + +static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter) { + int32_t code = 0; + // TODO + return code; +} + +// // ===================================== OLD ====================================== +#if 0 struct SCommitIter { STable *pTable; STbDataIter *pIter; @@ -146,8 +476,9 @@ static int tsdbWriteBlockInfo(SCommitter *pCommih); static int tsdbCommitMemData(SCommitter *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); static int tsdbMergeMemData(SCommitter *pCommith, SCommitIter *pIter, int bidx); static int tsdbMoveBlock(SCommitter *pCommith, int bidx); -static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); -static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, +static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int +nSubBlocks); static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY +keyLimit, bool isLastOneBlock); static void tsdbResetCommitTable(SCommitter *pCommith); static void tsdbCloseCommitFile(SCommitter *pCommith, bool hasError); @@ -273,28 +604,14 @@ static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { return 0; } -void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { - STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); - TSKEY minKey, midKey, maxKey, now; - - now = taosGetTimestamp(pCfg->precision); - minKey = now - pCfg->keep2 * tsTickPerMin[pCfg->precision]; - midKey = now - pCfg->keep1 * tsTickPerMin[pCfg->precision]; - maxKey = now - pCfg->keep0 * tsTickPerMin[pCfg->precision]; - - pRtn->minKey = minKey; - pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision)); - pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->days, pCfg->precision)); - pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->days, pCfg->precision)); - tsdbDebug("vgId:%d, now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey, - pRtn->minFid, pRtn->midFid, pRtn->maxFid); -} - static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCHandle) { int32_t code = 0; tsdbInfo("vgId:%d, start to commit", REPO_ID(pTsdb)); + ASSERT(pTsdb->imem == NULL && pTsdb->mem); + pTsdb->imem = pTsdb->mem; + pTsdb->mem = NULL; if (tsdbInitCommitH(pCHandle, pTsdb) < 0) { return -1; } @@ -443,7 +760,8 @@ static int32_t tsdbCommitToFileEnd(SCommitter *pCommith) { int32_t code = 0; STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) < + if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) + < 0) { tsdbError("vgId:%d, failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), pCommith->fid, tstrerror(terrno)); @@ -499,7 +817,8 @@ static int32_t tsdbCommitToFile(SCommitter *pCommith, SDFileSet *pSet, int fid) break; } - if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->suid <= pIdx->suid || pIter->pTable->uid <= pIdx->uid))) { + if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->suid <= pIdx->suid || pIter->pTable->uid <= pIdx->uid))) + { if (tsdbCommitToTable(pCommith, mIter) < 0) { tsdbCloseCommitFile(pCommith, true); // revert the file change @@ -701,7 +1020,8 @@ static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int f pCommith->isLFileSame = false; if (tsdbCreateDFile(pRepo, pWLastf, true, TSDB_FILE_LAST) < 0) { - tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), + tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), + TSDB_FILE_FULL_NAME(pWLastf), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); @@ -722,7 +1042,8 @@ static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int f tsdbInitDFile(pRepo, pWSmadF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD); if (tsdbCreateDFile(pRepo, pWSmadF, true, TSDB_FILE_SMAD) < 0) { - tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF), + tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), + TSDB_FILE_FULL_NAME(pWSmadF), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); @@ -769,7 +1090,8 @@ static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int f tsdbInitDFile(pRepo, pWSmalF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL); if (tsdbCreateDFile(pRepo, pWSmalF, true, TSDB_FILE_SMAL) < 0) { - tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF), + tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), + TSDB_FILE_FULL_NAME(pWSmalF), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); @@ -1084,7 +1406,8 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { * @return int */ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, - SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) { + SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) + { STsdbCfg *pCfg = REPO_CFG(pRepo); SBlockData *pBlockData = NULL; SAggrBlkData *pAggrBlkData = NULL; @@ -1132,7 +1455,8 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull)); #endif - (*tDataTypes[pDataCol->type].statisFunc)(pDataCols->bitmapMode, pDataCol->pBitmap, pDataCol->pData, rowsToWrite, + (*tDataTypes[pDataCol->type].statisFunc)(pDataCols->bitmapMode, pDataCol->pBitmap, pDataCol->pData, + rowsToWrite, &(pAggrBlkCol->min), &(pAggrBlkCol->max), &(pAggrBlkCol->sum), &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex), &(pAggrBlkCol->numOfNull)); @@ -1314,7 +1638,8 @@ static int tsdbWriteBlockInfo(SCommitter *pCommih) { SBlockIdx blkIdx; STable *pTable = TSDB_COMMIT_TABLE(pCommih); - if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))), + if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void + **)(&(TSDB_COMMIT_BUF(pCommih))), &blkIdx) < 0) { return -1; } @@ -1430,7 +1755,8 @@ static int tsdbMergeMemData(SCommitter *pCommith, SCommitIter *pIter, int bidx) if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; } else { if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; - if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1; + if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return + -1; } return 0; @@ -1483,7 +1809,8 @@ static int tsdbMoveBlock(SCommitter *pCommith, int bidx) { return 0; } -static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) { +static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int +nSubBlocks) { if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; @@ -1508,7 +1835,8 @@ static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCol int biter = 0; while (true) { - tsdbLoadAndMergeFromCache(TSDB_COMMIT_REPO(pCommith), pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, + tsdbLoadAndMergeFromCache(TSDB_COMMIT_REPO(pCommith), pCommith->readh.pDCols[0], &biter, pIter, + pCommith->pDataCols, keyLimit, defaultRows, pCfg->update); if (pCommith->pDataCols->numOfRows == 0) break; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index f1941a3bad..9924f5f759 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -15,6 +15,41 @@ #include "tsdb.h" +struct STsdbFS { + STsdb *pTsdb; + TdThreadRwlock lock; + int64_t minVersion; + int64_t maxVersion; + STsdbTombstoneFile *pTombstoneF; + STsdbCacheFile *pCacheF; + SArray *pArray; +}; + +int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbFSClose(STsdbFS *pFS) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbFSStart(STsdbFS *pFS) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback) { + int32_t code = 0; + // TODO + return code; +} + +#if 0 extern const char *TSDB_LEVEL_DNAME[]; typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T; @@ -300,8 +335,6 @@ void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd) { pfs->nstatus->meta.totalStorage = pfs->cstatus->meta.totalStorage += storageAdd; } -void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; } - int tsdbEndFSTxn(STsdb *pRepo) { STsdbFS *pfs = REPO_FS(pRepo); ASSERT(FS_IN_TXN(pfs)); @@ -334,8 +367,6 @@ int tsdbEndFSTxnWithError(STsdbFS *pfs) { return 0; } -// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pfs->nstatus, pMFile); } - int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); } static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) { @@ -1060,4 +1091,22 @@ int tsdbUnLockFS(STsdbFS *pFs) { return -1; } return 0; -} \ No newline at end of file +} + +void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { + STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); + TSKEY minKey, midKey, maxKey, now; + + now = taosGetTimestamp(pCfg->precision); + minKey = now - pCfg->keep2 * tsTickPerMin[pCfg->precision]; + midKey = now - pCfg->keep1 * tsTickPerMin[pCfg->precision]; + maxKey = now - pCfg->keep0 * tsTickPerMin[pCfg->precision]; + + pRtn->minKey = minKey; + pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision)); + pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->days, pCfg->precision)); + pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->days, pCfg->precision)); + tsdbDebug("vgId:%d, now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey, + pRtn->minFid, pRtn->midFid, pRtn->maxFid); +} +#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 11d206dc35..61d8c820c7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -15,6 +15,45 @@ #include "tsdb.h" +static const char *tsdbFileSuffix[] = {".tombstone", ".cache", ".index", ".data", ".last", ".sma", ""}; + +// .tombstone +struct STsdbTombstoneFile { + TSKEY minKey; + TSKEY maxKey; + int64_t minVersion; + int64_t maxVersion; +}; + +struct STsdbIndexFile { + int64_t size; + int64_t offset; + int32_t nRef; +}; + +struct STsdbDataFile { + int64_t size; + int32_t nRef; +}; + +struct STsdbLastFile { + int64_t size; + int32_t nRef; +}; + +struct STsdbSmaFile { + int64_t size; + int32_t nRef; +}; + +struct SDFileSet { + STsdbIndexFile *pIndexF; + STsdbDataFile *pDataF; + STsdbLastFile *pLastF; + STsdbSmaFile *pSmaF; +}; + +#if 0 static const char *TSDB_FNAME_SUFFIX[] = { "head", // TSDB_FILE_HEAD "data", // TSDB_FILE_DATA @@ -579,4 +618,5 @@ int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest) { void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey) { *minKey = fid * days * tsTickPerMin[precision]; *maxKey = *minKey + days * tsTickPerMin[precision] - 1; -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 943263e1a3..798433daad 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -17,7 +17,6 @@ static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg); - // implementation static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg) { @@ -64,13 +63,13 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee } else { memcpy(&pTsdb->keepCfg, pKeepCfg, sizeof(STsdbKeepCfg)); } - pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb)); + // pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb)); // create dir (TODO: use tfsMkdir) taosMkDir(pTsdb->path); // open tsdb - if (tsdbOpenFS(pTsdb) < 0) { + if (tsdbFSOpen(pTsdb, &pTsdb->fs) < 0) { goto _err; } @@ -89,8 +88,8 @@ int tsdbClose(STsdb **pTsdb) { if (*pTsdb) { // TODO: destroy mem/imem taosThreadMutexDestroy(&(*pTsdb)->mutex); - tsdbCloseFS(*pTsdb); - tsdbFreeFS((*pTsdb)->fs); + tsdbFSClose((*pTsdb)->fs); + // tsdbFreeFS((*pTsdb)->fs); taosMemoryFreeClear(*pTsdb); } return 0; @@ -99,7 +98,7 @@ int tsdbClose(STsdb **pTsdb) { int tsdbLockRepo(STsdb *pTsdb) { int code = taosThreadMutexLock(&pTsdb->mutex); if (code != 0) { - tsdbError("vgId:%d, failed to lock tsdb since %s", REPO_ID(pTsdb), strerror(errno)); + tsdbError("vgId:%d, failed to lock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno)); terrno = TAOS_SYSTEM_ERROR(code); return -1; } @@ -108,11 +107,11 @@ int tsdbLockRepo(STsdb *pTsdb) { } int tsdbUnlockRepo(STsdb *pTsdb) { - ASSERT(IS_REPO_LOCKED(pTsdb)); + // ASSERT(IS_REPO_LOCKED(pTsdb)); pTsdb->repoLocked = false; int code = taosThreadMutexUnlock(&pTsdb->mutex); if (code != 0) { - tsdbError("vgId:%d, failed to unlock tsdb since %s", REPO_ID(pTsdb), strerror(errno)); + tsdbError("vgId:%d, failed to unlock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno)); terrno = TAOS_SYSTEM_ERROR(code); return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4a433a557b..79563048c8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -14,6 +14,8 @@ */ #include "tsdb.h" + +#if 0 #include "vnode.h" #define EXTRA_BYTES 2 @@ -3725,3 +3727,5 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { taosMemoryFreeClear(pTsdbReadHandle); } + +#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index 1c2514d46f..a4a5d57511 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -15,6 +15,7 @@ #include "tsdb.h" +#if 0 #define TSDB_KEY_COL_OFFSET 0 static void tsdbResetReadTable(SReadH *pReadh); @@ -975,3 +976,4 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc return 0; } +#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index d7f7835e2e..dd6321bcee 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -17,22 +17,20 @@ // SDFileSetWritter ==================================================== struct SDFileSetWritter { - STsdb *pTsdb; - SDFileSet wSet; - int32_t szBuf1; - uint8_t *pBuf1; - int32_t szBuf2; - uint8_t *pBuf2; + STsdb *pTsdb; + int32_t szBuf1; + uint8_t *pBuf1; + int32_t szBuf2; + uint8_t *pBuf2; }; // SDFileSetReader ==================================================== struct SDFileSetReader { - STsdb *pTsdb; - SDFileSet rSet; - int32_t szBuf1; - uint8_t *pBuf1; - int32_t szBuf2; - uint8_t *pBuf2; + STsdb *pTsdb; + int32_t szBuf1; + uint8_t *pBuf1; + int32_t szBuf2; + uint8_t *pBuf2; }; int32_t tsdbDFileSetReaderOpen(SDFileSetReader *pReader, STsdb *pTsdb, SDFileSet *pSet) { @@ -40,12 +38,6 @@ int32_t tsdbDFileSetReaderOpen(SDFileSetReader *pReader, STsdb *pTsdb, SDFileSet memset(pReader, 0, sizeof(*pReader)); pReader->pTsdb = pTsdb; - pReader->rSet = *pSet; - - code = tsdbOpenDFileSet(&pReader->rSet, TD_FILE_READ); - if (code) { - goto _err; - } return code; @@ -59,7 +51,6 @@ int32_t tsdbDFileSetReaderClose(SDFileSetReader *pReader) { taosMemoryFreeClear(pReader->pBuf1); taosMemoryFreeClear(pReader->pBuf2); - tsdbCloseDFileSet(&pReader->rSet); return code; } @@ -84,12 +75,10 @@ int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockSta // STombstoneFileWriter ==================================================== struct STombstoneFileWriter { - STsdb *pTsdb; - SDFile *pTombstoneF; + STsdb *pTsdb; }; // STombstoneFileReader ==================================================== struct STombstoneFileReader { - STsdb *pTsdb; - SDFile *pTombstoneF; + STsdb *pTsdb; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c new file mode 100644 index 0000000000..238cf431c3 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdb.h" + +int32_t tTABLEIDCmprFn(const void *p1, const void *p2) { + TABLEID *pId1 = (TABLEID *)p1; + TABLEID *pId2 = (TABLEID *)p2; + + if (pId1->suid < pId2->suid) { + return -1; + } else if (pId1->suid > pId2->suid) { + return 1; + } + + if (pId1->uid < pId2->uid) { + return -1; + } else if (pId1->uid > pId2->uid) { + return 1; + } + + return 0; +} + +int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { + TSDBKEY *pKey1 = (TSDBKEY *)p1; + TSDBKEY *pKey2 = (TSDBKEY *)p2; + + if (pKey1->ts < pKey2->ts) { + return -1; + } else if (pKey1->ts > pKey2->ts) { + return 1; + } + + if (pKey1->version < pKey2->version) { + return -1; + } else if (pKey1->version > pKey2->version) { + return 1; + } + + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index e184763bc8..a221bc1795 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -28,7 +28,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * // scan and convert if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) { if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) { - tsdbError("vgId:%d, failed to insert data since %s", REPO_ID(pTsdb), tstrerror(terrno)); + tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(terrno)); } return -1; } @@ -77,7 +77,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro if (rowKey < minKey || rowKey > maxKey) { tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 " maxKey %" PRId64 " row key %" PRId64, - REPO_ID(pTsdb), uid, now, minKey, maxKey, rowKey); + TD_VID(pTsdb->pVnode), uid, now, minKey, maxKey, rowKey); terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; return -1; } @@ -92,7 +92,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { SSubmitBlk *pBlock = NULL; SSubmitBlkIter blkIter = {0}; STSRow *row = NULL; - STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb); + STsdbKeepCfg *pCfg = &pTsdb->keepCfg; TSKEY now = taosGetTimestamp(pCfg->precision); TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2; TSKEY maxKey = now + tsTickPerMin[pCfg->precision] * pCfg->days;