more work
This commit is contained in:
parent
a21f83c984
commit
1e9ce7c543
|
@ -75,8 +75,10 @@ int32_t tsdbFSStart(STsdbFS *pFS);
|
|||
int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback);
|
||||
|
||||
// tsdbReaderWriter.c ==============================================================================================
|
||||
typedef struct SDelData SDelData;
|
||||
typedef struct SDelIdx SDelIdx;
|
||||
typedef struct SDelDataItem SDelDataItem;
|
||||
typedef struct SDelData SDelData;
|
||||
typedef struct SDelIdxItem SDelIdxItem;
|
||||
typedef struct SDelIdx SDelIdx;
|
||||
|
||||
// SDataFWriter
|
||||
typedef struct SDataFWriter SDataFWriter;
|
||||
|
@ -87,15 +89,15 @@ typedef struct SDataFReader SDataFReader;
|
|||
// SDelFWriter
|
||||
typedef struct SDelFWriter SDelFWriter;
|
||||
|
||||
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile);
|
||||
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
|
||||
int32_t tsdbDelFWriterClose(SDelFWriter *pWriter);
|
||||
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf);
|
||||
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf, SDelIdxItem *pItem);
|
||||
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf);
|
||||
|
||||
// SDelFReader
|
||||
typedef struct SDelFReader SDelFReader;
|
||||
|
||||
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile);
|
||||
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf);
|
||||
int32_t tsdbDelFReaderClose(SDelFReader *pReader);
|
||||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelData *pDelData, uint8_t **ppBuf);
|
||||
int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf);
|
||||
|
@ -108,68 +110,6 @@ typedef struct SCacheFReader SCacheFReader;
|
|||
|
||||
// tsdbCommit.c ==============================================================================================
|
||||
|
||||
// old
|
||||
// typedef int32_t TSDB_FILE_T;
|
||||
// typedef struct SDFInfo SDFInfo;
|
||||
// typedef struct SDFile SDFile;
|
||||
// typedef struct SDFileSet SDFileSet;
|
||||
|
||||
// // SDFile
|
||||
// void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
|
||||
// void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile);
|
||||
// int tsdbOpenDFile(SDFile *pDFile, int flags);
|
||||
// void tsdbCloseDFile(SDFile *pDFile);
|
||||
// int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence);
|
||||
// int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte);
|
||||
// void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm);
|
||||
// int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset);
|
||||
// int tsdbRemoveDFile(SDFile *pDFile);
|
||||
// int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte);
|
||||
// int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest);
|
||||
// int tsdbEncodeSDFile(void **buf, SDFile *pDFile);
|
||||
// void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile);
|
||||
// int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType);
|
||||
// int tsdbUpdateDFileHeader(SDFile *pDFile);
|
||||
// int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo);
|
||||
// int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version);
|
||||
|
||||
// // SDFileSet
|
||||
// void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver);
|
||||
// void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet);
|
||||
// int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet);
|
||||
// void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet);
|
||||
// int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet);
|
||||
// void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet);
|
||||
// int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to);
|
||||
// int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader);
|
||||
// int tsdbUpdateDFileSetHeader(SDFileSet *pSet);
|
||||
// int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet);
|
||||
// void tsdbCloseDFileSet(SDFileSet *pSet);
|
||||
// int tsdbOpenDFileSet(SDFileSet *pSet, int flags);
|
||||
// void tsdbRemoveDFileSet(SDFileSet *pSet);
|
||||
// int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest);
|
||||
// void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey);
|
||||
|
||||
// typedef struct SFSIter SFSIter;
|
||||
// typedef struct STsdbFSMeta STsdbFSMeta;
|
||||
|
||||
// STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg);
|
||||
// void *tsdbFreeFS(STsdbFS *pfs);
|
||||
// int tsdbOpenFS(STsdb *pRepo);
|
||||
// void tsdbCloseFS(STsdb *pRepo);
|
||||
// void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
|
||||
// int tsdbEndFSTxn(STsdb *pRepo);
|
||||
// int tsdbEndFSTxnWithError(STsdbFS *pfs);
|
||||
// int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
|
||||
|
||||
// void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
|
||||
// void tsdbFSIterSeek(SFSIter *pIter, int fid);
|
||||
// SDFileSet *tsdbFSIterNext(SFSIter *pIter);
|
||||
// int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
|
||||
// int tsdbRLockFS(STsdbFS *pFs);
|
||||
// int tsdbWLockFS(STsdbFS *pFs);
|
||||
// int tsdbUnLockFS(STsdbFS *pFs);
|
||||
|
||||
// tsdbReadImpl.c ==============================================================================================
|
||||
typedef struct SBlockIdx SBlockIdx;
|
||||
typedef struct SBlockInfo SBlockInfo;
|
||||
|
@ -180,23 +120,6 @@ typedef struct SAggrBlkCol SAggrBlkCol;
|
|||
typedef struct SBlockData SBlockData;
|
||||
typedef struct SReadH SReadH;
|
||||
|
||||
// SReadH
|
||||
// int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo);
|
||||
// void tsdbDestroyReadH(SReadH *pReadh);
|
||||
// int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet);
|
||||
// void tsdbCloseAndUnsetFSet(SReadH *pReadh);
|
||||
// int tsdbLoadBlockIdx(SReadH *pReadh);
|
||||
// int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
|
||||
// int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
|
||||
// int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
|
||||
// int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int
|
||||
// numOfColsIds,
|
||||
// bool mergeBitmap);
|
||||
// int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
|
||||
// int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
|
||||
// void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
|
||||
// void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock);
|
||||
|
||||
typedef struct SDFileSetReader SDFileSetReader;
|
||||
typedef struct SDFileSetWriter SDFileSetWriter;
|
||||
|
||||
|
@ -222,15 +145,15 @@ int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockSta
|
|||
int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size);
|
||||
void tsdbFree(uint8_t *pBuf);
|
||||
|
||||
// STMap
|
||||
typedef struct STMap STMap;
|
||||
|
||||
int32_t tPutTMap(uint8_t *p, STMap *pMap);
|
||||
int32_t tGetTMap(uint8_t *p, STMap *pMap);
|
||||
|
||||
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
||||
int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
|
||||
|
||||
int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx);
|
||||
int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx);
|
||||
|
||||
int32_t tPutDelData(uint8_t *p, SDelData *pDelData);
|
||||
int32_t tGetDelData(uint8_t *p, SDelData *pDelData);
|
||||
|
||||
// structs
|
||||
typedef struct {
|
||||
int minFid;
|
||||
|
@ -260,43 +183,6 @@ struct STable {
|
|||
STSchema *pCacheSchema; // cached cache
|
||||
};
|
||||
|
||||
// int tsdbPrepareCommit(STsdb *pTsdb);
|
||||
// typedef enum {
|
||||
// TSDB_FILE_HEAD = 0, // .head
|
||||
// TSDB_FILE_DATA, // .data
|
||||
// TSDB_FILE_LAST, // .last
|
||||
// TSDB_FILE_SMAD, // .smad(Block-wise SMA)
|
||||
// TSDB_FILE_SMAL, // .smal(Block-wise SMA)
|
||||
// TSDB_FILE_MAX, //
|
||||
// TSDB_FILE_META, // meta
|
||||
// } E_TSDB_FILE_T;
|
||||
|
||||
// struct SDFInfo {
|
||||
// uint32_t magic;
|
||||
// uint32_t fver;
|
||||
// uint32_t len;
|
||||
// uint32_t totalBlocks;
|
||||
// uint32_t totalSubBlocks;
|
||||
// uint32_t offset;
|
||||
// uint64_t size;
|
||||
// uint64_t tombSize;
|
||||
// };
|
||||
|
||||
// struct SDFile {
|
||||
// SDFInfo info;
|
||||
// STfsFile f;
|
||||
// TdFilePtr pFile;
|
||||
// uint8_t state;
|
||||
// };
|
||||
|
||||
// struct SDFileSet {
|
||||
// int fid;
|
||||
// int8_t state; // -128~127
|
||||
// uint8_t ver; // 0~255, DFileSet version
|
||||
// uint16_t reserve;
|
||||
// SDFile files[TSDB_FILE_MAX];
|
||||
// };
|
||||
|
||||
struct TSDBKEY {
|
||||
int64_t version;
|
||||
TSKEY ts;
|
||||
|
@ -337,97 +223,14 @@ struct SMemTable {
|
|||
SArray *aTbData; // SArray<STbData*>
|
||||
};
|
||||
|
||||
// struct STsdbFSMeta {
|
||||
// uint32_t version; // Commit version from 0 to increase
|
||||
// int64_t totalPoints; // total points
|
||||
// int64_t totalStorage; // Uncompressed total storage
|
||||
// };
|
||||
|
||||
// ==================
|
||||
// typedef struct {
|
||||
// STsdbFSMeta meta; // FS meta
|
||||
// SDFile cacheFile; // cache file
|
||||
// SDFile tombstone; // tomestome file
|
||||
// SArray *df; // data file array
|
||||
// SArray *sf; // sma data file array v2f1900.index_name_1
|
||||
// } SFSStatus;
|
||||
|
||||
// struct STsdbFS {
|
||||
// TdThreadRwlock lock;
|
||||
|
||||
// SFSStatus *cstatus; // current status
|
||||
// bool intxn;
|
||||
// SFSStatus *nstatus; // new status
|
||||
// };
|
||||
|
||||
// #define REPO_ID(r) TD_VID((r)->pVnode)
|
||||
// #define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
|
||||
// #define REPO_KEEP_CFG(r) (&(r)->keepCfg)
|
||||
// #define REPO_FS(r) ((r)->fs)
|
||||
// #define REPO_META(r) ((r)->pVnode->pMeta)
|
||||
// #define REPO_TFS(r) ((r)->pVnode->pTfs)
|
||||
// #define IS_REPO_LOCKED(r) ((r)->repoLocked)
|
||||
|
||||
int tsdbLockRepo(STsdb *pTsdb);
|
||||
int tsdbUnlockRepo(STsdb *pTsdb);
|
||||
|
||||
// static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy,
|
||||
// int32_t version) {
|
||||
// if ((version < 0) || (schemaVersion(pTable->pSchema) == version)) {
|
||||
// return pTable->pSchema;
|
||||
// }
|
||||
|
||||
// if (!pTable->pCacheSchema || (schemaVersion(pTable->pCacheSchema) != version)) {
|
||||
// taosMemoryFreeClear(pTable->pCacheSchema);
|
||||
// pTable->pCacheSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version);
|
||||
// }
|
||||
// return pTable->pCacheSchema;
|
||||
// }
|
||||
|
||||
// // tsdbMemTable.h
|
||||
// struct SMergeInfo {
|
||||
// int rowsInserted;
|
||||
// int rowsUpdated;
|
||||
// int rowsDeleteSucceed;
|
||||
// int rowsDeleteFailed;
|
||||
// int nOperations;
|
||||
// TSKEY keyFirst;
|
||||
// TSKEY keyLast;
|
||||
// };
|
||||
|
||||
// static void *taosTMalloc(size_t size);
|
||||
// static void *taosTCalloc(size_t nmemb, size_t size);
|
||||
// static void *taosTRealloc(void *ptr, size_t size);
|
||||
// static void *taosTZfree(void *ptr);
|
||||
// static size_t taosTSizeof(void *ptr);
|
||||
// static void taosTMemset(void *ptr, int c);
|
||||
|
||||
struct TSDBROW {
|
||||
int64_t version;
|
||||
STSRow *pTSRow;
|
||||
};
|
||||
|
||||
// static FORCE_INLINE STSRow *tsdbNextIterRow(STbDataIter *pIter) {
|
||||
// TSDBROW row;
|
||||
|
||||
// if (pIter == NULL) return NULL;
|
||||
|
||||
// if (tsdbTbDataIterGet(pIter, &row)) {
|
||||
// return row.pTSRow;
|
||||
// }
|
||||
|
||||
// return NULL;
|
||||
// }
|
||||
|
||||
// static FORCE_INLINE TSKEY tsdbNextIterKey(STbDataIter *pIter) {
|
||||
// STSRow *row = tsdbNextIterRow(pIter);
|
||||
// if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL;
|
||||
|
||||
// return TD_ROW_KEY(row);
|
||||
// }
|
||||
|
||||
// tsdbReadImpl
|
||||
|
||||
struct SBlockIdx {
|
||||
int64_t suid;
|
||||
int64_t uid;
|
||||
|
@ -435,11 +238,6 @@ struct SBlockIdx {
|
|||
int64_t minVersion;
|
||||
int64_t offset;
|
||||
int64_t size;
|
||||
// uint32_t len;
|
||||
// uint32_t offset;
|
||||
// uint32_t hasLast : 2;
|
||||
// uint32_t numOfBlocks : 30;
|
||||
// TSDBKEY maxKey;
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
|
@ -455,20 +253,6 @@ struct SBlock {
|
|||
int64_t maxVersion;
|
||||
int64_t minVersion;
|
||||
uint8_t flags; // last, algorithm
|
||||
// uint8_t last : 1;
|
||||
// uint8_t hasDupKey : 1; // 0: no dup TS key, 1: has dup TS key(since supporting Multi-Version)
|
||||
// uint8_t blkVer : 6;
|
||||
// uint8_t numOfSubBlocks;
|
||||
// col_id_t numOfCols; // not including timestamp column
|
||||
// uint32_t len; // data block length
|
||||
// uint32_t keyLen : 20; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
|
||||
// uint32_t algorithm : 4;
|
||||
// uint32_t reserve : 8;
|
||||
// col_id_t numOfBSma;
|
||||
// uint16_t numOfRows;
|
||||
// int64_t offset;
|
||||
// uint64_t aggrStat : 1;
|
||||
// uint64_t aggrOffset : 63;
|
||||
};
|
||||
|
||||
struct SBlockInfo {
|
||||
|
@ -505,129 +289,6 @@ struct SBlockData {
|
|||
|
||||
typedef void SAggrBlkData; // SBlockCol cols[];
|
||||
|
||||
// struct SReadH {
|
||||
// STsdb *pRepo;
|
||||
// SDFileSet rSet; // FSET to read
|
||||
// SArray *aBlkIdx; // SBlockIdx array
|
||||
// STable *pTable; // table to read
|
||||
// SBlockIdx *pBlkIdx; // current reading table SBlockIdx
|
||||
// int cidx;
|
||||
// SBlockInfo *pBlkInfo;
|
||||
// SBlockData *pBlkData; // Block info
|
||||
// SAggrBlkData *pAggrBlkData; // Aggregate Block info
|
||||
// SDataCols *pDCols[2];
|
||||
// void *pBuf; // buffer
|
||||
// void *pCBuf; // compression buffer
|
||||
// void *pExBuf; // extra buffer
|
||||
// };
|
||||
|
||||
// #define TSDB_READ_REPO(rh) ((rh)->pRepo)
|
||||
// #define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
|
||||
// #define TSDB_READ_FSET(rh) (&((rh)->rSet))
|
||||
// #define TSDB_READ_TABLE(rh) ((rh)->pTable)
|
||||
// #define TSDB_READ_TABLE_UID(rh) ((rh)->pTable->uid)
|
||||
// #define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
|
||||
// #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
|
||||
// #define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
|
||||
// #define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
|
||||
// #define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
|
||||
// #define TSDB_READ_BUF(rh) ((rh)->pBuf)
|
||||
// #define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
|
||||
// #define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
|
||||
|
||||
// #define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
|
||||
|
||||
// static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
|
||||
// switch (blkVer) {
|
||||
// case TSDB_SBLK_VER_0:
|
||||
// default:
|
||||
// return TSDB_BLOCK_STATIS_SIZE(nCols, 0);
|
||||
// }
|
||||
// }
|
||||
|
||||
// #define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM))
|
||||
|
||||
// static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
|
||||
// switch (blkVer) {
|
||||
// case TSDB_SBLK_VER_0:
|
||||
// default:
|
||||
// return TSDB_BLOCK_AGGR_SIZE(nCols, 0);
|
||||
// }
|
||||
// }
|
||||
|
||||
// static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
|
||||
// void *pBuf = *ppBuf;
|
||||
// size_t tsize = taosTSizeof(pBuf);
|
||||
|
||||
// if (tsize < size) {
|
||||
// if (tsize == 0) tsize = 1024;
|
||||
|
||||
// while (tsize < size) {
|
||||
// tsize *= 2;
|
||||
// }
|
||||
|
||||
// *ppBuf = taosTRealloc(pBuf, tsize);
|
||||
// if (*ppBuf == NULL) {
|
||||
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
// return -1;
|
||||
// }
|
||||
// }
|
||||
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
// // tsdbMemory
|
||||
// static FORCE_INLINE void *taosTMalloc(size_t size) {
|
||||
// if (size <= 0) return NULL;
|
||||
|
||||
// void *ret = taosMemoryMalloc(size + sizeof(size_t));
|
||||
// if (ret == NULL) return NULL;
|
||||
|
||||
// *(size_t *)ret = size;
|
||||
|
||||
// return (void *)((char *)ret + sizeof(size_t));
|
||||
// }
|
||||
|
||||
// static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) {
|
||||
// size_t tsize = nmemb * size;
|
||||
// void *ret = taosTMalloc(tsize);
|
||||
// if (ret == NULL) return NULL;
|
||||
|
||||
// taosTMemset(ret, 0);
|
||||
// return ret;
|
||||
// }
|
||||
|
||||
// static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; }
|
||||
|
||||
// static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); }
|
||||
|
||||
// static FORCE_INLINE void *taosTRealloc(void *ptr, size_t size) {
|
||||
// if (ptr == NULL) return taosTMalloc(size);
|
||||
|
||||
// if (size <= taosTSizeof(ptr)) return ptr;
|
||||
|
||||
// void *tptr = (void *)((char *)ptr - sizeof(size_t));
|
||||
// size_t tsize = size + sizeof(size_t);
|
||||
// void *tptr1 = taosMemoryRealloc(tptr, tsize);
|
||||
// if (tptr1 == NULL) return NULL;
|
||||
// tptr = tptr1;
|
||||
|
||||
// *(size_t *)tptr = size;
|
||||
|
||||
// return (void *)((char *)tptr + sizeof(size_t));
|
||||
// }
|
||||
|
||||
// static FORCE_INLINE void *taosTZfree(void *ptr) {
|
||||
// if (ptr) {
|
||||
// taosMemoryFree((void *)((char *)ptr - sizeof(size_t)));
|
||||
// }
|
||||
// return NULL;
|
||||
// }
|
||||
|
||||
// tsdbCommit
|
||||
|
||||
// void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
|
||||
|
||||
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t minutes, int8_t precision) {
|
||||
if (key < 0) {
|
||||
return (int)((key + 1) / tsTickPerMin[precision] / minutes - 1);
|
||||
|
@ -648,108 +309,9 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
|||
}
|
||||
}
|
||||
|
||||
// tsdbFile
|
||||
// #define TSDB_FILE_HEAD_SIZE 512
|
||||
// #define TSDB_FILE_DELIMITER 0xF00AFA0F
|
||||
// #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
|
||||
// #define TSDB_IVLD_FID INT_MIN
|
||||
// #define TSDB_FILE_STATE_OK 0
|
||||
// #define TSDB_FILE_STATE_BAD 1
|
||||
|
||||
// #define TSDB_FILE_F(tf) (&((tf)->f))
|
||||
// #define TSDB_FILE_PFILE(tf) ((tf)->pFile)
|
||||
// #define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname)
|
||||
// #define TSDB_FILE_OPENED(tf) (TSDB_FILE_PFILE(tf) != NULL)
|
||||
// #define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf))
|
||||
// #define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_PFILE(f) = NULL)
|
||||
// #define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level)
|
||||
// #define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id)
|
||||
// #define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did)
|
||||
// #define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname)
|
||||
// #define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname)
|
||||
// #define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_PFILE(tf))
|
||||
// #define TSDB_FILE_STATE(tf) ((tf)->state)
|
||||
// #define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
|
||||
// #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
|
||||
// #define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
|
||||
|
||||
// typedef enum {
|
||||
// TSDB_FS_VER_0 = 0,
|
||||
// TSDB_FS_VER_MAX,
|
||||
// } ETsdbFsVer;
|
||||
|
||||
// #define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile
|
||||
// #define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file
|
||||
|
||||
// static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile
|
||||
// switch (fType) {
|
||||
// case TSDB_FILE_HEAD: // .head
|
||||
// case TSDB_FILE_DATA: // .data
|
||||
// case TSDB_FILE_LAST: // .last
|
||||
// case TSDB_FILE_SMAD: // .smad(Block-wise SMA)
|
||||
// case TSDB_FILE_SMAL: // .smal(Block-wise SMA)
|
||||
// default:
|
||||
// return TSDB_LATEST_FVER;
|
||||
// }
|
||||
// }
|
||||
|
||||
// =============== SDFileSet
|
||||
|
||||
// #define TSDB_LATEST_FSET_VER 0
|
||||
// #define TSDB_FSET_FID(s) ((s)->fid)
|
||||
// #define TSDB_FSET_STATE(s) ((s)->state)
|
||||
// #define TSDB_FSET_VER(s) ((s)->ver)
|
||||
// #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
|
||||
// #define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
|
||||
// #define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
|
||||
// #define TSDB_FSET_SET_CLOSED(s) \
|
||||
// do { \
|
||||
// for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
|
||||
// TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
|
||||
// } \
|
||||
// } while (0);
|
||||
// #define TSDB_FSET_FSYNC(s) \
|
||||
// do { \
|
||||
// for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
|
||||
// TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \
|
||||
// } \
|
||||
// } while (0);
|
||||
|
||||
// static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet *pSet) {
|
||||
// for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
// if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) {
|
||||
// return false;
|
||||
// }
|
||||
// }
|
||||
|
||||
// return true;
|
||||
// }
|
||||
|
||||
// ================== TSDB global config
|
||||
extern bool tsdbForceKeepFile;
|
||||
|
||||
// ================== CURRENT file header info
|
||||
// typedef struct {
|
||||
// uint32_t version; // Current file system version (relating to code)
|
||||
// uint32_t len; // Encode content length (including checksum)
|
||||
// } SFSHeader;
|
||||
|
||||
// // ================== TSDB File System Meta
|
||||
// #define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus)
|
||||
// #define FS_NEW_STATUS(pfs) ((pfs)->nstatus)
|
||||
// #define FS_IN_TXN(pfs) (pfs)->intxn
|
||||
// #define FS_VERSION(pfs) ((pfs)->cstatus->meta.version)
|
||||
// #define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version)
|
||||
|
||||
// struct SFSIter {
|
||||
// int direction;
|
||||
// uint64_t version; // current FS version
|
||||
// STsdbFS *pfs;
|
||||
// int index; // used to position next fset when version the same
|
||||
// int fid; // used to seek when version is changed
|
||||
// SDFileSet *pSet;
|
||||
// };
|
||||
|
||||
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
|
||||
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
|
||||
|
||||
|
@ -771,19 +333,24 @@ struct SDelOp {
|
|||
SDelOp *pNext;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int64_t version;
|
||||
TSKEY sKey;
|
||||
TSKEY eKey;
|
||||
} SDelDataItem;
|
||||
|
||||
struct SDelData {
|
||||
struct SDelDataItem {
|
||||
int64_t version;
|
||||
TSKEY sKey;
|
||||
TSKEY eKey;
|
||||
};
|
||||
|
||||
struct SDelIdx {
|
||||
struct SDelData {
|
||||
uint32_t delimiter;
|
||||
tb_uid_t suid;
|
||||
tb_uid_t uid;
|
||||
uint8_t flags;
|
||||
uint32_t nOffset;
|
||||
uint8_t *pOffset;
|
||||
uint32_t nData;
|
||||
uint8_t *pData;
|
||||
};
|
||||
|
||||
struct SDelIdxItem {
|
||||
tb_uid_t suid;
|
||||
tb_uid_t uid;
|
||||
TSKEY minKey;
|
||||
|
@ -794,7 +361,8 @@ struct SDelIdx {
|
|||
int64_t size;
|
||||
};
|
||||
|
||||
struct STMap {
|
||||
struct SDelIdx {
|
||||
uint32_t delimiter;
|
||||
uint8_t flags;
|
||||
uint32_t nOffset;
|
||||
uint8_t *pOffset;
|
||||
|
|
|
@ -172,7 +172,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
|
|||
SDelFile *pDelFileW = NULL; // TODO
|
||||
|
||||
if (pDelFileR) {
|
||||
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR);
|
||||
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -183,7 +183,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
|
|||
}
|
||||
}
|
||||
|
||||
code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW);
|
||||
code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -299,11 +299,11 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
|
|||
}
|
||||
|
||||
_exit:
|
||||
tsdbDebug("vgId:%d commit del data, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
|
||||
tsdbDebug("vgId:%d commit del data done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d failed to commit del data since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
tsdbError("vgId:%d commit del data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,9 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
|
||||
#define TSDB_FHDR_SIZE 512
|
||||
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
|
||||
|
||||
// SDFileSetWritter ====================================================
|
||||
struct SDFileSetWritter {
|
||||
STsdb *pTsdb;
|
||||
|
@ -75,13 +78,39 @@ int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockSta
|
|||
|
||||
// SDelFWriter ====================================================
|
||||
struct SDelFWriter {
|
||||
STsdb *pTsdb;
|
||||
SDelFile *pFile;
|
||||
TdFilePtr pWriteH;
|
||||
};
|
||||
|
||||
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
char *fname = NULL; // TODO
|
||||
SDelFWriter *pDelFWriter;
|
||||
|
||||
pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter));
|
||||
if (pDelFWriter == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pDelFWriter->pTsdb = pTsdb;
|
||||
pDelFWriter->pFile = pFile;
|
||||
pDelFWriter->pWriteH = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE);
|
||||
if (pDelFWriter->pWriteH == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (taosLSeekFile(pDelFWriter->pWriteH, TSDB_FHDR_SIZE, SEEK_SET) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -91,15 +120,44 @@ int32_t tsdbDelFWriterClose(SDelFWriter *pWriter) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf) {
|
||||
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf, SDelIdxItem *pItem) {
|
||||
int32_t code = 0;
|
||||
int64_t size;
|
||||
|
||||
// TODO
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
int64_t size;
|
||||
|
||||
size = tPutDelIdx(NULL, pDelIdx) + sizeof(TSCKSUM);
|
||||
|
||||
// alloc
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// encode
|
||||
tPutDelIdx(*ppBuf, pDelIdx);
|
||||
|
||||
// checksum
|
||||
taosCalcChecksumAppend(0, *ppBuf, size);
|
||||
|
||||
// write
|
||||
if (taosWriteFile(pWriter->pWriteH, *ppBuf, size) < size) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pWriter->pFile->offset = pWriter->pFile->size;
|
||||
pWriter->pFile->size += size;
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -110,15 +168,65 @@ struct SDelFReader {
|
|||
TdFilePtr pReadH;
|
||||
};
|
||||
|
||||
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf) {
|
||||
int32_t code = 0;
|
||||
char *fname = NULL; // todo
|
||||
SDelFReader *pDelFReader;
|
||||
|
||||
// alloc
|
||||
pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
|
||||
if (pDelFReader == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open impl
|
||||
pDelFReader->pTsdb = pTsdb;
|
||||
pDelFReader->pFile = pFile;
|
||||
pDelFReader->pReadH = taosOpenFile(fname, TD_FILE_READ);
|
||||
if (pDelFReader == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
taosMemoryFree(pDelFReader);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// load and check hdr if buffer is given
|
||||
if (ppBuf) {
|
||||
code = tsdbRealloc(ppBuf, TSDB_FHDR_SIZE);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (taosReadFile(pDelFReader->pReadH, *ppBuf, TSDB_FHDR_SIZE) < TSDB_FHDR_SIZE) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (!taosCheckChecksumWhole(*ppBuf, TSDB_FHDR_SIZE)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// TODO: check the content
|
||||
}
|
||||
|
||||
_exit:
|
||||
*ppReader = pDelFReader;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDelFReaderClose(SDelFReader *pReader) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
|
||||
if (pReader) {
|
||||
taosCloseFile(&pReader->pReadH);
|
||||
taosMemoryFree(pReader);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -134,23 +242,23 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf)
|
|||
int64_t size = pReader->pFile->size - offset;
|
||||
|
||||
// seek
|
||||
if (taosLSeekFile(pReader->pReadH, pReader->pFile->offset, SEEK_SET) < 0) {
|
||||
if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// alloc
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// read
|
||||
if (taosReadFile(pReader->pReadH, *ppBuf, size) < size) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// realloc buf
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// check
|
||||
if (!taosCheckChecksumWhole(*ppBuf, size)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
|
@ -158,6 +266,10 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf)
|
|||
}
|
||||
|
||||
// decode
|
||||
int32_t n = tGetDelIdx(*ppBuf, pDelIdx);
|
||||
ASSERT(n == size - sizeof(TSCKSUM));
|
||||
ASSERT(pDelIdx->delimiter == TSDB_FILE_DLMT);
|
||||
ASSERT(pDelIdx->nOffset > 0 && pDelIdx->nData > 0);
|
||||
|
||||
return code;
|
||||
|
||||
|
|
|
@ -88,22 +88,46 @@ int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tPutTMap(uint8_t *p, STMap *pMap) {
|
||||
int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutU8(p ? p + n : p, pMap->flags);
|
||||
n += tPutBinary(p ? p + n : p, pMap->pOffset, pMap->nOffset);
|
||||
n += tPutBinary(p ? p + n : p, pMap->pData, pMap->nData);
|
||||
n += tPutU32(p ? p + n : p, pDelIdx->delimiter);
|
||||
n += tPutU8(p ? p + n : p, pDelIdx->flags);
|
||||
n += tPutBinary(p ? p + n : p, pDelIdx->pOffset, pDelIdx->nOffset);
|
||||
n += tPutBinary(p ? p + n : p, pDelIdx->pData, pDelIdx->nData);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
int32_t tGetTMap(uint8_t *p, STMap *pMap) {
|
||||
int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tGetU8(p, &pMap->flags);
|
||||
n += tGetBinary(p, &pMap->pOffset, &pMap->nOffset);
|
||||
n += tGetBinary(p, &pMap->pData, &pMap->nData);
|
||||
n += tGetU32(p, &pDelIdx->delimiter);
|
||||
n += tGetU8(p, &pDelIdx->flags);
|
||||
n += tGetBinary(p, &pDelIdx->pOffset, &pDelIdx->nOffset);
|
||||
n += tGetBinary(p, &pDelIdx->pData, &pDelIdx->nData);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
int32_t tPutDelData(uint8_t *p, SDelData *pDelData) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutU32(p ? p + n : p, pDelData->delimiter);
|
||||
n += tPutU8(p ? p + n : p, pDelData->flags);
|
||||
n += tPutBinary(p ? p + n : p, pDelData->pOffset, pDelData->nOffset);
|
||||
n += tPutBinary(p ? p + n : p, pDelData->pData, pDelData->nData);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
int32_t tGetDelData(uint8_t *p, SDelData *pDelData) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tGetU32(p, &pDelData->delimiter);
|
||||
n += tGetU8(p, &pDelData->flags);
|
||||
n += tGetBinary(p, &pDelData->pOffset, &pDelData->nOffset);
|
||||
n += tGetBinary(p, &pDelData->pData, &pDelData->nData);
|
||||
|
||||
return n;
|
||||
}
|
Loading…
Reference in New Issue