From 1e9ce7c543eb9941edee3edf94a0e3b17014c20d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 11 Jun 2022 03:59:06 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 488 +----------------- source/dnode/vnode/src/tsdb/tsdbCommit.c | 8 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 144 +++++- source/dnode/vnode/src/tsdb/tsdbUtil.c | 40 +- 4 files changed, 192 insertions(+), 488 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 5cc17088f1..d5e715068f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -75,8 +75,10 @@ int32_t tsdbFSStart(STsdbFS *pFS); int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback); // tsdbReaderWriter.c ============================================================================================== -typedef struct SDelData SDelData; -typedef struct SDelIdx SDelIdx; +typedef struct SDelDataItem SDelDataItem; +typedef struct SDelData SDelData; +typedef struct SDelIdxItem SDelIdxItem; +typedef struct SDelIdx SDelIdx; // SDataFWriter typedef struct SDataFWriter SDataFWriter; @@ -87,15 +89,15 @@ typedef struct SDataFReader SDataFReader; // SDelFWriter typedef struct SDelFWriter SDelFWriter; -int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile); +int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterClose(SDelFWriter *pWriter); -int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf); +int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf, SDelIdxItem *pItem); int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf); // SDelFReader typedef struct SDelFReader SDelFReader; -int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile); +int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf); int32_t tsdbDelFReaderClose(SDelFReader *pReader); int32_t tsdbReadDelData(SDelFReader *pReader, SDelData *pDelData, uint8_t **ppBuf); int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf); @@ -108,68 +110,6 @@ typedef struct SCacheFReader SCacheFReader; // tsdbCommit.c ============================================================================================== -// old -// typedef int32_t TSDB_FILE_T; -// typedef struct SDFInfo SDFInfo; -// typedef struct SDFile SDFile; -// typedef struct SDFileSet SDFileSet; - -// // SDFile -// void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype); -// void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile); -// int tsdbOpenDFile(SDFile *pDFile, int flags); -// void tsdbCloseDFile(SDFile *pDFile); -// int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence); -// int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte); -// void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm); -// int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset); -// int tsdbRemoveDFile(SDFile *pDFile); -// int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte); -// int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest); -// int tsdbEncodeSDFile(void **buf, SDFile *pDFile); -// void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile); -// int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType); -// int tsdbUpdateDFileHeader(SDFile *pDFile); -// int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo); -// int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version); - -// // SDFileSet -// void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver); -// void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet); -// int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet); -// void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet); -// int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet); -// void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet); -// int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to); -// int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader); -// int tsdbUpdateDFileSetHeader(SDFileSet *pSet); -// int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet); -// void tsdbCloseDFileSet(SDFileSet *pSet); -// int tsdbOpenDFileSet(SDFileSet *pSet, int flags); -// void tsdbRemoveDFileSet(SDFileSet *pSet); -// int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest); -// void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey); - -// typedef struct SFSIter SFSIter; -// typedef struct STsdbFSMeta STsdbFSMeta; - -// STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg); -// void *tsdbFreeFS(STsdbFS *pfs); -// int tsdbOpenFS(STsdb *pRepo); -// void tsdbCloseFS(STsdb *pRepo); -// void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd); -// int tsdbEndFSTxn(STsdb *pRepo); -// int tsdbEndFSTxnWithError(STsdbFS *pfs); -// int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet); - -// void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction); -// void tsdbFSIterSeek(SFSIter *pIter, int fid); -// SDFileSet *tsdbFSIterNext(SFSIter *pIter); -// int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta); -// int tsdbRLockFS(STsdbFS *pFs); -// int tsdbWLockFS(STsdbFS *pFs); -// int tsdbUnLockFS(STsdbFS *pFs); - // tsdbReadImpl.c ============================================================================================== typedef struct SBlockIdx SBlockIdx; typedef struct SBlockInfo SBlockInfo; @@ -180,23 +120,6 @@ typedef struct SAggrBlkCol SAggrBlkCol; typedef struct SBlockData SBlockData; typedef struct SReadH SReadH; -// SReadH -// int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo); -// void tsdbDestroyReadH(SReadH *pReadh); -// int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); -// void tsdbCloseAndUnsetFSet(SReadH *pReadh); -// int tsdbLoadBlockIdx(SReadH *pReadh); -// int tsdbSetReadTable(SReadH *pReadh, STable *pTable); -// int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); -// int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); -// int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int -// numOfColsIds, -// bool mergeBitmap); -// int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); -// int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); -// void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); -// void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock); - typedef struct SDFileSetReader SDFileSetReader; typedef struct SDFileSetWriter SDFileSetWriter; @@ -222,15 +145,15 @@ int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockSta int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size); void tsdbFree(uint8_t *pBuf); -// STMap -typedef struct STMap STMap; - -int32_t tPutTMap(uint8_t *p, STMap *pMap); -int32_t tGetTMap(uint8_t *p, STMap *pMap); - int32_t tTABLEIDCmprFn(const void *p1, const void *p2); int32_t tsdbKeyCmprFn(const void *p1, const void *p2); +int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx); +int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx); + +int32_t tPutDelData(uint8_t *p, SDelData *pDelData); +int32_t tGetDelData(uint8_t *p, SDelData *pDelData); + // structs typedef struct { int minFid; @@ -260,43 +183,6 @@ struct STable { STSchema *pCacheSchema; // cached cache }; -// int tsdbPrepareCommit(STsdb *pTsdb); -// typedef enum { -// TSDB_FILE_HEAD = 0, // .head -// TSDB_FILE_DATA, // .data -// TSDB_FILE_LAST, // .last -// TSDB_FILE_SMAD, // .smad(Block-wise SMA) -// TSDB_FILE_SMAL, // .smal(Block-wise SMA) -// TSDB_FILE_MAX, // -// TSDB_FILE_META, // meta -// } E_TSDB_FILE_T; - -// struct SDFInfo { -// uint32_t magic; -// uint32_t fver; -// uint32_t len; -// uint32_t totalBlocks; -// uint32_t totalSubBlocks; -// uint32_t offset; -// uint64_t size; -// uint64_t tombSize; -// }; - -// struct SDFile { -// SDFInfo info; -// STfsFile f; -// TdFilePtr pFile; -// uint8_t state; -// }; - -// struct SDFileSet { -// int fid; -// int8_t state; // -128~127 -// uint8_t ver; // 0~255, DFileSet version -// uint16_t reserve; -// SDFile files[TSDB_FILE_MAX]; -// }; - struct TSDBKEY { int64_t version; TSKEY ts; @@ -337,97 +223,14 @@ struct SMemTable { SArray *aTbData; // SArray }; -// struct STsdbFSMeta { -// uint32_t version; // Commit version from 0 to increase -// int64_t totalPoints; // total points -// int64_t totalStorage; // Uncompressed total storage -// }; - -// ================== -// typedef struct { -// STsdbFSMeta meta; // FS meta -// SDFile cacheFile; // cache file -// SDFile tombstone; // tomestome file -// SArray *df; // data file array -// SArray *sf; // sma data file array v2f1900.index_name_1 -// } SFSStatus; - -// struct STsdbFS { -// TdThreadRwlock lock; - -// SFSStatus *cstatus; // current status -// bool intxn; -// SFSStatus *nstatus; // new status -// }; - -// #define REPO_ID(r) TD_VID((r)->pVnode) -// #define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg) -// #define REPO_KEEP_CFG(r) (&(r)->keepCfg) -// #define REPO_FS(r) ((r)->fs) -// #define REPO_META(r) ((r)->pVnode->pMeta) -// #define REPO_TFS(r) ((r)->pVnode->pTfs) -// #define IS_REPO_LOCKED(r) ((r)->repoLocked) - int tsdbLockRepo(STsdb *pTsdb); int tsdbUnlockRepo(STsdb *pTsdb); -// static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy, -// int32_t version) { -// if ((version < 0) || (schemaVersion(pTable->pSchema) == version)) { -// return pTable->pSchema; -// } - -// if (!pTable->pCacheSchema || (schemaVersion(pTable->pCacheSchema) != version)) { -// taosMemoryFreeClear(pTable->pCacheSchema); -// pTable->pCacheSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version); -// } -// return pTable->pCacheSchema; -// } - -// // tsdbMemTable.h -// struct SMergeInfo { -// int rowsInserted; -// int rowsUpdated; -// int rowsDeleteSucceed; -// int rowsDeleteFailed; -// int nOperations; -// TSKEY keyFirst; -// TSKEY keyLast; -// }; - -// static void *taosTMalloc(size_t size); -// static void *taosTCalloc(size_t nmemb, size_t size); -// static void *taosTRealloc(void *ptr, size_t size); -// static void *taosTZfree(void *ptr); -// static size_t taosTSizeof(void *ptr); -// static void taosTMemset(void *ptr, int c); - struct TSDBROW { int64_t version; STSRow *pTSRow; }; -// static FORCE_INLINE STSRow *tsdbNextIterRow(STbDataIter *pIter) { -// TSDBROW row; - -// if (pIter == NULL) return NULL; - -// if (tsdbTbDataIterGet(pIter, &row)) { -// return row.pTSRow; -// } - -// return NULL; -// } - -// static FORCE_INLINE TSKEY tsdbNextIterKey(STbDataIter *pIter) { -// STSRow *row = tsdbNextIterRow(pIter); -// if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; - -// return TD_ROW_KEY(row); -// } - -// tsdbReadImpl - struct SBlockIdx { int64_t suid; int64_t uid; @@ -435,11 +238,6 @@ struct SBlockIdx { int64_t minVersion; int64_t offset; int64_t size; - // uint32_t len; - // uint32_t offset; - // uint32_t hasLast : 2; - // uint32_t numOfBlocks : 30; - // TSDBKEY maxKey; }; typedef enum { @@ -455,20 +253,6 @@ struct SBlock { int64_t maxVersion; int64_t minVersion; uint8_t flags; // last, algorithm - // uint8_t last : 1; - // uint8_t hasDupKey : 1; // 0: no dup TS key, 1: has dup TS key(since supporting Multi-Version) - // uint8_t blkVer : 6; - // uint8_t numOfSubBlocks; - // col_id_t numOfCols; // not including timestamp column - // uint32_t len; // data block length - // uint32_t keyLen : 20; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols - // uint32_t algorithm : 4; - // uint32_t reserve : 8; - // col_id_t numOfBSma; - // uint16_t numOfRows; - // int64_t offset; - // uint64_t aggrStat : 1; - // uint64_t aggrOffset : 63; }; struct SBlockInfo { @@ -505,129 +289,6 @@ struct SBlockData { typedef void SAggrBlkData; // SBlockCol cols[]; -// struct SReadH { -// STsdb *pRepo; -// SDFileSet rSet; // FSET to read -// SArray *aBlkIdx; // SBlockIdx array -// STable *pTable; // table to read -// SBlockIdx *pBlkIdx; // current reading table SBlockIdx -// int cidx; -// SBlockInfo *pBlkInfo; -// SBlockData *pBlkData; // Block info -// SAggrBlkData *pAggrBlkData; // Aggregate Block info -// SDataCols *pDCols[2]; -// void *pBuf; // buffer -// void *pCBuf; // compression buffer -// void *pExBuf; // extra buffer -// }; - -// #define TSDB_READ_REPO(rh) ((rh)->pRepo) -// #define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh)) -// #define TSDB_READ_FSET(rh) (&((rh)->rSet)) -// #define TSDB_READ_TABLE(rh) ((rh)->pTable) -// #define TSDB_READ_TABLE_UID(rh) ((rh)->pTable->uid) -// #define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) -// #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) -// #define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) -// #define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD) -// #define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL) -// #define TSDB_READ_BUF(rh) ((rh)->pBuf) -// #define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf) -// #define TSDB_READ_EXBUF(rh) ((rh)->pExBuf) - -// #define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM)) - -// static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) { -// switch (blkVer) { -// case TSDB_SBLK_VER_0: -// default: -// return TSDB_BLOCK_STATIS_SIZE(nCols, 0); -// } -// } - -// #define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM)) - -// static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) { -// switch (blkVer) { -// case TSDB_SBLK_VER_0: -// default: -// return TSDB_BLOCK_AGGR_SIZE(nCols, 0); -// } -// } - -// static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { -// void *pBuf = *ppBuf; -// size_t tsize = taosTSizeof(pBuf); - -// if (tsize < size) { -// if (tsize == 0) tsize = 1024; - -// while (tsize < size) { -// tsize *= 2; -// } - -// *ppBuf = taosTRealloc(pBuf, tsize); -// if (*ppBuf == NULL) { -// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; -// return -1; -// } -// } - -// return 0; -// } - -// // tsdbMemory -// static FORCE_INLINE void *taosTMalloc(size_t size) { -// if (size <= 0) return NULL; - -// void *ret = taosMemoryMalloc(size + sizeof(size_t)); -// if (ret == NULL) return NULL; - -// *(size_t *)ret = size; - -// return (void *)((char *)ret + sizeof(size_t)); -// } - -// static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) { -// size_t tsize = nmemb * size; -// void *ret = taosTMalloc(tsize); -// if (ret == NULL) return NULL; - -// taosTMemset(ret, 0); -// return ret; -// } - -// static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; } - -// static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); } - -// static FORCE_INLINE void *taosTRealloc(void *ptr, size_t size) { -// if (ptr == NULL) return taosTMalloc(size); - -// if (size <= taosTSizeof(ptr)) return ptr; - -// void *tptr = (void *)((char *)ptr - sizeof(size_t)); -// size_t tsize = size + sizeof(size_t); -// void *tptr1 = taosMemoryRealloc(tptr, tsize); -// if (tptr1 == NULL) return NULL; -// tptr = tptr1; - -// *(size_t *)tptr = size; - -// return (void *)((char *)tptr + sizeof(size_t)); -// } - -// static FORCE_INLINE void *taosTZfree(void *ptr) { -// if (ptr) { -// taosMemoryFree((void *)((char *)ptr - sizeof(size_t))); -// } -// return NULL; -// } - -// tsdbCommit - -// void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn); - static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t minutes, int8_t precision) { if (key < 0) { return (int)((key + 1) / tsTickPerMin[precision] / minutes - 1); @@ -648,108 +309,9 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { } } -// tsdbFile -// #define TSDB_FILE_HEAD_SIZE 512 -// #define TSDB_FILE_DELIMITER 0xF00AFA0F -// #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF -// #define TSDB_IVLD_FID INT_MIN -// #define TSDB_FILE_STATE_OK 0 -// #define TSDB_FILE_STATE_BAD 1 - -// #define TSDB_FILE_F(tf) (&((tf)->f)) -// #define TSDB_FILE_PFILE(tf) ((tf)->pFile) -// #define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname) -// #define TSDB_FILE_OPENED(tf) (TSDB_FILE_PFILE(tf) != NULL) -// #define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf)) -// #define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_PFILE(f) = NULL) -// #define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level) -// #define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id) -// #define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did) -// #define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname) -// #define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname) -// #define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_PFILE(tf)) -// #define TSDB_FILE_STATE(tf) ((tf)->state) -// #define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) -// #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) -// #define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) - -// typedef enum { -// TSDB_FS_VER_0 = 0, -// TSDB_FS_VER_MAX, -// } ETsdbFsVer; - -// #define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile -// #define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file - -// static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile -// switch (fType) { -// case TSDB_FILE_HEAD: // .head -// case TSDB_FILE_DATA: // .data -// case TSDB_FILE_LAST: // .last -// case TSDB_FILE_SMAD: // .smad(Block-wise SMA) -// case TSDB_FILE_SMAL: // .smal(Block-wise SMA) -// default: -// return TSDB_LATEST_FVER; -// } -// } - -// =============== SDFileSet - -// #define TSDB_LATEST_FSET_VER 0 -// #define TSDB_FSET_FID(s) ((s)->fid) -// #define TSDB_FSET_STATE(s) ((s)->state) -// #define TSDB_FSET_VER(s) ((s)->ver) -// #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) -// #define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) -// #define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0)) -// #define TSDB_FSET_SET_CLOSED(s) \ -// do { \ -// for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ -// TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \ -// } \ -// } while (0); -// #define TSDB_FSET_FSYNC(s) \ -// do { \ -// for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ -// TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \ -// } \ -// } while (0); - -// static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet *pSet) { -// for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { -// if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) { -// return false; -// } -// } - -// return true; -// } - // ================== TSDB global config extern bool tsdbForceKeepFile; -// ================== CURRENT file header info -// typedef struct { -// uint32_t version; // Current file system version (relating to code) -// uint32_t len; // Encode content length (including checksum) -// } SFSHeader; - -// // ================== TSDB File System Meta -// #define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus) -// #define FS_NEW_STATUS(pfs) ((pfs)->nstatus) -// #define FS_IN_TXN(pfs) (pfs)->intxn -// #define FS_VERSION(pfs) ((pfs)->cstatus->meta.version) -// #define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version) - -// struct SFSIter { -// int direction; -// uint64_t version; // current FS version -// STsdbFS *pfs; -// int index; // used to position next fset when version the same -// int fid; // used to seek when version is changed -// SDFileSet *pSet; -// }; - #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC @@ -771,19 +333,24 @@ struct SDelOp { SDelOp *pNext; }; -typedef struct { - int64_t version; - TSKEY sKey; - TSKEY eKey; -} SDelDataItem; - -struct SDelData { +struct SDelDataItem { int64_t version; TSKEY sKey; TSKEY eKey; }; -struct SDelIdx { +struct SDelData { + uint32_t delimiter; + tb_uid_t suid; + tb_uid_t uid; + uint8_t flags; + uint32_t nOffset; + uint8_t *pOffset; + uint32_t nData; + uint8_t *pData; +}; + +struct SDelIdxItem { tb_uid_t suid; tb_uid_t uid; TSKEY minKey; @@ -794,7 +361,8 @@ struct SDelIdx { int64_t size; }; -struct STMap { +struct SDelIdx { + uint32_t delimiter; uint8_t flags; uint32_t nOffset; uint8_t *pOffset; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index c2e8de6a80..7705af3e2f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -172,7 +172,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { SDelFile *pDelFileW = NULL; // TODO if (pDelFileR) { - code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR); + code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL); if (code) { goto _err; } @@ -183,7 +183,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { } } - code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW); + code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb); if (code) { goto _err; } @@ -299,11 +299,11 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { } _exit: - tsdbDebug("vgId:%d commit del data, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel); + tsdbDebug("vgId:%d commit del data done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel); return code; _err: - tsdbError("vgId:%d failed to commit del data since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d commit del data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 2bfd54a773..7a44fd799f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -15,6 +15,9 @@ #include "tsdb.h" +#define TSDB_FHDR_SIZE 512 +#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) + // SDFileSetWritter ==================================================== struct SDFileSetWritter { STsdb *pTsdb; @@ -75,13 +78,39 @@ int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockSta // SDelFWriter ==================================================== struct SDelFWriter { + STsdb *pTsdb; SDelFile *pFile; TdFilePtr pWriteH; }; -int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile) { - int32_t code = 0; - // TODO +int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) { + int32_t code = 0; + char *fname = NULL; // TODO + SDelFWriter *pDelFWriter; + + pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter)); + if (pDelFWriter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + pDelFWriter->pTsdb = pTsdb; + pDelFWriter->pFile = pFile; + pDelFWriter->pWriteH = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE); + if (pDelFWriter->pWriteH == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosLSeekFile(pDelFWriter->pWriteH, TSDB_FHDR_SIZE, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -91,15 +120,44 @@ int32_t tsdbDelFWriterClose(SDelFWriter *pWriter) { return code; } -int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf) { +int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf, SDelIdxItem *pItem) { int32_t code = 0; + int64_t size; + // TODO return code; } int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf) { int32_t code = 0; - // TODO + int64_t size; + + size = tPutDelIdx(NULL, pDelIdx) + sizeof(TSCKSUM); + + // alloc + code = tsdbRealloc(ppBuf, size); + if (code) { + goto _err; + } + + // encode + tPutDelIdx(*ppBuf, pDelIdx); + + // checksum + taosCalcChecksumAppend(0, *ppBuf, size); + + // write + if (taosWriteFile(pWriter->pWriteH, *ppBuf, size) < size) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pWriter->pFile->offset = pWriter->pFile->size; + pWriter->pFile->size += size; + + return code; + +_err: return code; } @@ -110,15 +168,65 @@ struct SDelFReader { TdFilePtr pReadH; }; -int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile) { - int32_t code = 0; - // TODO +int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf) { + int32_t code = 0; + char *fname = NULL; // todo + SDelFReader *pDelFReader; + + // alloc + pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader)); + if (pDelFReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + // open impl + pDelFReader->pTsdb = pTsdb; + pDelFReader->pFile = pFile; + pDelFReader->pReadH = taosOpenFile(fname, TD_FILE_READ); + if (pDelFReader == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(pDelFReader); + goto _err; + } + + // load and check hdr if buffer is given + if (ppBuf) { + code = tsdbRealloc(ppBuf, TSDB_FHDR_SIZE); + if (code) { + goto _err; + } + + if (taosReadFile(pDelFReader->pReadH, *ppBuf, TSDB_FHDR_SIZE) < TSDB_FHDR_SIZE) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + if (!taosCheckChecksumWhole(*ppBuf, TSDB_FHDR_SIZE)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // TODO: check the content + } + +_exit: + *ppReader = pDelFReader; + return code; + +_err: + *ppReader = NULL; return code; } int32_t tsdbDelFReaderClose(SDelFReader *pReader) { int32_t code = 0; - // TODO + + if (pReader) { + taosCloseFile(&pReader->pReadH); + taosMemoryFree(pReader); + } + return code; } @@ -134,23 +242,23 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf) int64_t size = pReader->pFile->size - offset; // seek - if (taosLSeekFile(pReader->pReadH, pReader->pFile->offset, SEEK_SET) < 0) { + if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } + // alloc + code = tsdbRealloc(ppBuf, size); + if (code) { + goto _err; + } + // read if (taosReadFile(pReader->pReadH, *ppBuf, size) < size) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - // realloc buf - code = tsdbRealloc(ppBuf, size); - if (code) { - goto _err; - } - // check if (!taosCheckChecksumWhole(*ppBuf, size)) { code = TSDB_CODE_FILE_CORRUPTED; @@ -158,6 +266,10 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf) } // decode + int32_t n = tGetDelIdx(*ppBuf, pDelIdx); + ASSERT(n == size - sizeof(TSCKSUM)); + ASSERT(pDelIdx->delimiter == TSDB_FILE_DLMT); + ASSERT(pDelIdx->nOffset > 0 && pDelIdx->nData > 0); return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 96a386146b..0fa4e17b6e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -88,22 +88,46 @@ int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { return 0; } -int32_t tPutTMap(uint8_t *p, STMap *pMap) { +int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx) { int32_t n = 0; - n += tPutU8(p ? p + n : p, pMap->flags); - n += tPutBinary(p ? p + n : p, pMap->pOffset, pMap->nOffset); - n += tPutBinary(p ? p + n : p, pMap->pData, pMap->nData); + n += tPutU32(p ? p + n : p, pDelIdx->delimiter); + n += tPutU8(p ? p + n : p, pDelIdx->flags); + n += tPutBinary(p ? p + n : p, pDelIdx->pOffset, pDelIdx->nOffset); + n += tPutBinary(p ? p + n : p, pDelIdx->pData, pDelIdx->nData); return n; } -int32_t tGetTMap(uint8_t *p, STMap *pMap) { +int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx) { int32_t n = 0; - n += tGetU8(p, &pMap->flags); - n += tGetBinary(p, &pMap->pOffset, &pMap->nOffset); - n += tGetBinary(p, &pMap->pData, &pMap->nData); + n += tGetU32(p, &pDelIdx->delimiter); + n += tGetU8(p, &pDelIdx->flags); + n += tGetBinary(p, &pDelIdx->pOffset, &pDelIdx->nOffset); + n += tGetBinary(p, &pDelIdx->pData, &pDelIdx->nData); + + return n; +} + +int32_t tPutDelData(uint8_t *p, SDelData *pDelData) { + int32_t n = 0; + + n += tPutU32(p ? p + n : p, pDelData->delimiter); + n += tPutU8(p ? p + n : p, pDelData->flags); + n += tPutBinary(p ? p + n : p, pDelData->pOffset, pDelData->nOffset); + n += tPutBinary(p ? p + n : p, pDelData->pData, pDelData->nData); + + return n; +} + +int32_t tGetDelData(uint8_t *p, SDelData *pDelData) { + int32_t n = 0; + + n += tGetU32(p, &pDelData->delimiter); + n += tGetU8(p, &pDelData->flags); + n += tGetBinary(p, &pDelData->pOffset, &pDelData->nOffset); + n += tGetBinary(p, &pDelData->pData, &pDelData->nData); return n; } \ No newline at end of file