refact: 1

This commit is contained in:
Hongze Cheng 2022-06-10 06:48:21 +00:00
parent 9c1b1b4507
commit 17efc22b03
11 changed files with 978 additions and 504 deletions

View File

@ -44,6 +44,7 @@ target_sources(
"src/tsdb/tsdbReadImpl.c" "src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbWrite.c" "src/tsdb/tsdbWrite.c"
"src/tsdb/tsdbReaderWriter.c" "src/tsdb/tsdbReaderWriter.c"
"src/tsdb/tsdbUtil.c"
"src/tsdb/tsdbSnapshot.c" "src/tsdb/tsdbSnapshot.c"
# tq # tq

View File

@ -37,8 +37,6 @@ typedef struct TSDBKEY TSDBKEY;
typedef struct TABLEID TABLEID; typedef struct TABLEID TABLEID;
typedef struct SDelOp SDelOp; typedef struct SDelOp SDelOp;
static int tsdbKeyCmprFn(const void *p1, const void *p2);
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
typedef struct STbData STbData; typedef struct STbData STbData;
typedef struct SMemTable SMemTable; typedef struct SMemTable SMemTable;
@ -59,70 +57,88 @@ bool tsdbTbDataIterNext(STbDataIter *pIter);
bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow); bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow);
// tsdbFile.c ============================================================================================== // tsdbFile.c ==============================================================================================
typedef int32_t TSDB_FILE_T; typedef struct STsdbTombstoneFile STsdbTombstoneFile;
typedef struct SDFInfo SDFInfo; typedef struct STsdbCacheFile STsdbCacheFile;
typedef struct SDFile SDFile; typedef struct STsdbIndexFile STsdbIndexFile;
typedef struct SDFileSet SDFileSet; typedef struct STsdbDataFile STsdbDataFile;
typedef struct STsdbLastFile STsdbLastFile;
// SDFile typedef struct STsdbSmaFile STsdbSmaFile;
void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype); typedef struct STsdbSmalFile STsdbSmalFile;
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile); typedef struct SDFileSet SDFileSet;
int tsdbOpenDFile(SDFile *pDFile, int flags);
void tsdbCloseDFile(SDFile *pDFile);
int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence);
int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte);
void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm);
int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset);
int tsdbRemoveDFile(SDFile *pDFile);
int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte);
int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest);
int tsdbEncodeSDFile(void **buf, SDFile *pDFile);
void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile);
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType);
int tsdbUpdateDFileHeader(SDFile *pDFile);
int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo);
int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version);
// SDFileSet
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver);
void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet);
int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet);
void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet);
int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet);
void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet);
int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to);
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet *pSet);
int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet);
void tsdbCloseDFileSet(SDFileSet *pSet);
int tsdbOpenDFileSet(SDFileSet *pSet, int flags);
void tsdbRemoveDFileSet(SDFileSet *pSet);
int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest);
void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey);
// tsdbFS.c ============================================================================================== // tsdbFS.c ==============================================================================================
typedef struct STsdbFS STsdbFS; typedef struct STsdbFS STsdbFS;
typedef struct SFSIter SFSIter;
typedef struct STsdbFSMeta STsdbFSMeta;
STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg); int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS);
void *tsdbFreeFS(STsdbFS *pfs); int32_t tsdbFSClose(STsdbFS *pFS);
int tsdbOpenFS(STsdb *pRepo); int32_t tsdbFSStart(STsdbFS *pFS);
void tsdbCloseFS(STsdb *pRepo); int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback);
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
int tsdbEndFSTxn(STsdb *pRepo);
int tsdbEndFSTxnWithError(STsdbFS *pfs);
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction); // tsdbReaderWritter.c ==============================================================================================
void tsdbFSIterSeek(SFSIter *pIter, int fid);
SDFileSet *tsdbFSIterNext(SFSIter *pIter); // tsdbCommit.c ==============================================================================================
int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
int tsdbRLockFS(STsdbFS *pFs); // old
int tsdbWLockFS(STsdbFS *pFs); // typedef int32_t TSDB_FILE_T;
int tsdbUnLockFS(STsdbFS *pFs); // typedef struct SDFInfo SDFInfo;
// typedef struct SDFile SDFile;
// typedef struct SDFileSet SDFileSet;
// // SDFile
// void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
// void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile);
// int tsdbOpenDFile(SDFile *pDFile, int flags);
// void tsdbCloseDFile(SDFile *pDFile);
// int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence);
// int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte);
// void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm);
// int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset);
// int tsdbRemoveDFile(SDFile *pDFile);
// int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte);
// int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest);
// int tsdbEncodeSDFile(void **buf, SDFile *pDFile);
// void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile);
// int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType);
// int tsdbUpdateDFileHeader(SDFile *pDFile);
// int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo);
// int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version);
// // SDFileSet
// void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver);
// void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet);
// int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet);
// void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet);
// int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet);
// void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet);
// int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to);
// int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader);
// int tsdbUpdateDFileSetHeader(SDFileSet *pSet);
// int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet);
// void tsdbCloseDFileSet(SDFileSet *pSet);
// int tsdbOpenDFileSet(SDFileSet *pSet, int flags);
// void tsdbRemoveDFileSet(SDFileSet *pSet);
// int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest);
// void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey);
// typedef struct SFSIter SFSIter;
// typedef struct STsdbFSMeta STsdbFSMeta;
// STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg);
// void *tsdbFreeFS(STsdbFS *pfs);
// int tsdbOpenFS(STsdb *pRepo);
// void tsdbCloseFS(STsdb *pRepo);
// void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
// int tsdbEndFSTxn(STsdb *pRepo);
// int tsdbEndFSTxnWithError(STsdbFS *pfs);
// int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
// void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
// void tsdbFSIterSeek(SFSIter *pIter, int fid);
// SDFileSet *tsdbFSIterNext(SFSIter *pIter);
// int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
// int tsdbRLockFS(STsdbFS *pFs);
// int tsdbWLockFS(STsdbFS *pFs);
// int tsdbUnLockFS(STsdbFS *pFs);
// tsdbReadImpl.c ============================================================================================== // tsdbReadImpl.c ==============================================================================================
typedef struct SBlockIdx SBlockIdx; typedef struct SBlockIdx SBlockIdx;
@ -135,20 +151,21 @@ typedef struct SBlockData SBlockData;
typedef struct SReadH SReadH; typedef struct SReadH SReadH;
// SReadH // SReadH
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo); // int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo);
void tsdbDestroyReadH(SReadH *pReadh); // void tsdbDestroyReadH(SReadH *pReadh);
int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); // int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet);
void tsdbCloseAndUnsetFSet(SReadH *pReadh); // void tsdbCloseAndUnsetFSet(SReadH *pReadh);
int tsdbLoadBlockIdx(SReadH *pReadh); // int tsdbLoadBlockIdx(SReadH *pReadh);
int tsdbSetReadTable(SReadH *pReadh, STable *pTable); // int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); // int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); // int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds, // int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int
bool mergeBitmap); // numOfColsIds,
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); // bool mergeBitmap);
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); // int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); // int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock); // void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
// void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock);
typedef struct SDFileSetReader SDFileSetReader; typedef struct SDFileSetReader SDFileSetReader;
typedef struct SDFileSetWriter SDFileSetWriter; typedef struct SDFileSetWriter SDFileSetWriter;
@ -177,6 +194,10 @@ int32_t tsdbTomstoneFileWriterClose(STombstoneFileWriter *pWriter);
int32_t tsdbTomstoneFileReaderOpen(STombstoneFileReader *pReader, STsdb *pTsdb); int32_t tsdbTomstoneFileReaderOpen(STombstoneFileReader *pReader, STsdb *pTsdb);
int32_t tsdbTomstoneFileReaderClose(STombstoneFileReader *pReader); int32_t tsdbTomstoneFileReaderClose(STombstoneFileReader *pReader);
// tsdbUtil.c ==============================================================================================
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
// structs // structs
typedef struct { typedef struct {
int minFid; int minFid;
@ -209,41 +230,41 @@ struct STable {
}; };
// int tsdbPrepareCommit(STsdb *pTsdb); // int tsdbPrepareCommit(STsdb *pTsdb);
typedef enum { // typedef enum {
TSDB_FILE_HEAD = 0, // .head // TSDB_FILE_HEAD = 0, // .head
TSDB_FILE_DATA, // .data // TSDB_FILE_DATA, // .data
TSDB_FILE_LAST, // .last // TSDB_FILE_LAST, // .last
TSDB_FILE_SMAD, // .smad(Block-wise SMA) // TSDB_FILE_SMAD, // .smad(Block-wise SMA)
TSDB_FILE_SMAL, // .smal(Block-wise SMA) // TSDB_FILE_SMAL, // .smal(Block-wise SMA)
TSDB_FILE_MAX, // // TSDB_FILE_MAX, //
TSDB_FILE_META, // meta // TSDB_FILE_META, // meta
} E_TSDB_FILE_T; // } E_TSDB_FILE_T;
struct SDFInfo { // struct SDFInfo {
uint32_t magic; // uint32_t magic;
uint32_t fver; // uint32_t fver;
uint32_t len; // uint32_t len;
uint32_t totalBlocks; // uint32_t totalBlocks;
uint32_t totalSubBlocks; // uint32_t totalSubBlocks;
uint32_t offset; // uint32_t offset;
uint64_t size; // uint64_t size;
uint64_t tombSize; // uint64_t tombSize;
}; // };
struct SDFile { // struct SDFile {
SDFInfo info; // SDFInfo info;
STfsFile f; // STfsFile f;
TdFilePtr pFile; // TdFilePtr pFile;
uint8_t state; // uint8_t state;
}; // };
struct SDFileSet { // struct SDFileSet {
int fid; // int fid;
int8_t state; // -128~127 // int8_t state; // -128~127
uint8_t ver; // 0~255, DFileSet version // uint8_t ver; // 0~255, DFileSet version
uint16_t reserve; // uint16_t reserve;
SDFile files[TSDB_FILE_MAX]; // SDFile files[TSDB_FILE_MAX];
}; // };
struct TSDBKEY { struct TSDBKEY {
int64_t version; int64_t version;
@ -285,105 +306,109 @@ struct SMemTable {
SArray *aTbData; // SArray<STbData> SArray *aTbData; // SArray<STbData>
}; };
struct STsdbFSMeta { // struct STsdbFSMeta {
uint32_t version; // Commit version from 0 to increase // uint32_t version; // Commit version from 0 to increase
int64_t totalPoints; // total points // int64_t totalPoints; // total points
int64_t totalStorage; // Uncompressed total storage // int64_t totalStorage; // Uncompressed total storage
}; // };
// ================== // ==================
typedef struct { // typedef struct {
STsdbFSMeta meta; // FS meta // STsdbFSMeta meta; // FS meta
SDFile cacheFile; // cache file // SDFile cacheFile; // cache file
SDFile tombstone; // tomestome file // SDFile tombstone; // tomestome file
SArray *df; // data file array // SArray *df; // data file array
SArray *sf; // sma data file array v2f1900.index_name_1 // SArray *sf; // sma data file array v2f1900.index_name_1
} SFSStatus; // } SFSStatus;
struct STsdbFS { // struct STsdbFS {
TdThreadRwlock lock; // TdThreadRwlock lock;
SFSStatus *cstatus; // current status // SFSStatus *cstatus; // current status
bool intxn; // bool intxn;
SFSStatus *nstatus; // new status // SFSStatus *nstatus; // new status
}; // };
#define REPO_ID(r) TD_VID((r)->pVnode) // #define REPO_ID(r) TD_VID((r)->pVnode)
#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg) // #define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
#define REPO_KEEP_CFG(r) (&(r)->keepCfg) // #define REPO_KEEP_CFG(r) (&(r)->keepCfg)
#define REPO_FS(r) ((r)->fs) // #define REPO_FS(r) ((r)->fs)
#define REPO_META(r) ((r)->pVnode->pMeta) // #define REPO_META(r) ((r)->pVnode->pMeta)
#define REPO_TFS(r) ((r)->pVnode->pTfs) // #define REPO_TFS(r) ((r)->pVnode->pTfs)
#define IS_REPO_LOCKED(r) ((r)->repoLocked) // #define IS_REPO_LOCKED(r) ((r)->repoLocked)
int tsdbLockRepo(STsdb *pTsdb); int tsdbLockRepo(STsdb *pTsdb);
int tsdbUnlockRepo(STsdb *pTsdb); int tsdbUnlockRepo(STsdb *pTsdb);
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy, // static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy,
int32_t version) { // int32_t version) {
if ((version < 0) || (schemaVersion(pTable->pSchema) == version)) { // if ((version < 0) || (schemaVersion(pTable->pSchema) == version)) {
return pTable->pSchema; // return pTable->pSchema;
} // }
if (!pTable->pCacheSchema || (schemaVersion(pTable->pCacheSchema) != version)) { // if (!pTable->pCacheSchema || (schemaVersion(pTable->pCacheSchema) != version)) {
taosMemoryFreeClear(pTable->pCacheSchema); // taosMemoryFreeClear(pTable->pCacheSchema);
pTable->pCacheSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version); // pTable->pCacheSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version);
} // }
return pTable->pCacheSchema; // return pTable->pCacheSchema;
} // }
// tsdbMemTable.h // // tsdbMemTable.h
struct SMergeInfo { // struct SMergeInfo {
int rowsInserted; // int rowsInserted;
int rowsUpdated; // int rowsUpdated;
int rowsDeleteSucceed; // int rowsDeleteSucceed;
int rowsDeleteFailed; // int rowsDeleteFailed;
int nOperations; // int nOperations;
TSKEY keyFirst; // TSKEY keyFirst;
TSKEY keyLast; // TSKEY keyLast;
}; // };
static void *taosTMalloc(size_t size); // static void *taosTMalloc(size_t size);
static void *taosTCalloc(size_t nmemb, size_t size); // static void *taosTCalloc(size_t nmemb, size_t size);
static void *taosTRealloc(void *ptr, size_t size); // static void *taosTRealloc(void *ptr, size_t size);
static void *taosTZfree(void *ptr); // static void *taosTZfree(void *ptr);
static size_t taosTSizeof(void *ptr); // static size_t taosTSizeof(void *ptr);
static void taosTMemset(void *ptr, int c); // static void taosTMemset(void *ptr, int c);
struct TSDBROW { struct TSDBROW {
int64_t version; int64_t version;
STSRow *pTSRow; STSRow *pTSRow;
}; };
static FORCE_INLINE STSRow *tsdbNextIterRow(STbDataIter *pIter) { // static FORCE_INLINE STSRow *tsdbNextIterRow(STbDataIter *pIter) {
TSDBROW row; // TSDBROW row;
if (pIter == NULL) return NULL; // if (pIter == NULL) return NULL;
if (tsdbTbDataIterGet(pIter, &row)) { // if (tsdbTbDataIterGet(pIter, &row)) {
return row.pTSRow; // return row.pTSRow;
} // }
return NULL; // return NULL;
} // }
static FORCE_INLINE TSKEY tsdbNextIterKey(STbDataIter *pIter) { // static FORCE_INLINE TSKEY tsdbNextIterKey(STbDataIter *pIter) {
STSRow *row = tsdbNextIterRow(pIter); // STSRow *row = tsdbNextIterRow(pIter);
if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; // if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL;
return TD_ROW_KEY(row); // return TD_ROW_KEY(row);
} // }
// tsdbReadImpl // tsdbReadImpl
struct SBlockIdx { struct SBlockIdx {
uint64_t suid; int64_t suid;
uint64_t uid; int64_t uid;
uint32_t len; int64_t maxVersion;
uint32_t offset; int64_t minVersion;
uint32_t hasLast : 2; int64_t offset;
uint32_t numOfBlocks : 30; int64_t size;
TSDBKEY maxKey; // uint32_t len;
// uint32_t offset;
// uint32_t hasLast : 2;
// uint32_t numOfBlocks : 30;
// TSDBKEY maxKey;
}; };
typedef enum { typedef enum {
@ -394,22 +419,25 @@ typedef enum {
#define SBlockVerLatest TSDB_SBLK_VER_0 #define SBlockVerLatest TSDB_SBLK_VER_0
struct SBlock { struct SBlock {
uint8_t last : 1; TSDBKEY minKey;
uint8_t hasDupKey : 1; // 0: no dup TS key, 1: has dup TS key(since supporting Multi-Version) TSDBKEY maxKey;
uint8_t blkVer : 6; int64_t maxVersion;
uint8_t numOfSubBlocks; int64_t minVersion;
col_id_t numOfCols; // not including timestamp column uint8_t flags; // last, algorithm
uint32_t len; // data block length // uint8_t last : 1;
uint32_t keyLen : 20; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols // uint8_t hasDupKey : 1; // 0: no dup TS key, 1: has dup TS key(since supporting Multi-Version)
uint32_t algorithm : 4; // uint8_t blkVer : 6;
uint32_t reserve : 8; // uint8_t numOfSubBlocks;
col_id_t numOfBSma; // col_id_t numOfCols; // not including timestamp column
uint16_t numOfRows; // uint32_t len; // data block length
int64_t offset; // uint32_t keyLen : 20; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
uint64_t aggrStat : 1; // uint32_t algorithm : 4;
uint64_t aggrOffset : 63; // uint32_t reserve : 8;
TSDBKEY minKey; // col_id_t numOfBSma;
TSDBKEY maxKey; // uint16_t numOfRows;
// int64_t offset;
// uint64_t aggrStat : 1;
// uint64_t aggrOffset : 63;
}; };
struct SBlockInfo { struct SBlockInfo {
@ -446,128 +474,128 @@ struct SBlockData {
typedef void SAggrBlkData; // SBlockCol cols[]; typedef void SAggrBlkData; // SBlockCol cols[];
struct SReadH { // struct SReadH {
STsdb *pRepo; // STsdb *pRepo;
SDFileSet rSet; // FSET to read // SDFileSet rSet; // FSET to read
SArray *aBlkIdx; // SBlockIdx array // SArray *aBlkIdx; // SBlockIdx array
STable *pTable; // table to read // STable *pTable; // table to read
SBlockIdx *pBlkIdx; // current reading table SBlockIdx // SBlockIdx *pBlkIdx; // current reading table SBlockIdx
int cidx; // int cidx;
SBlockInfo *pBlkInfo; // SBlockInfo *pBlkInfo;
SBlockData *pBlkData; // Block info // SBlockData *pBlkData; // Block info
SAggrBlkData *pAggrBlkData; // Aggregate Block info // SAggrBlkData *pAggrBlkData; // Aggregate Block info
SDataCols *pDCols[2]; // SDataCols *pDCols[2];
void *pBuf; // buffer // void *pBuf; // buffer
void *pCBuf; // compression buffer // void *pCBuf; // compression buffer
void *pExBuf; // extra buffer // void *pExBuf; // extra buffer
}; // };
#define TSDB_READ_REPO(rh) ((rh)->pRepo) // #define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh)) // #define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet)) // #define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_TABLE(rh) ((rh)->pTable) // #define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_TABLE_UID(rh) ((rh)->pTable->uid) // #define TSDB_READ_TABLE_UID(rh) ((rh)->pTable->uid)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) // #define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) // #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) // #define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD) // #define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL) // #define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
#define TSDB_READ_BUF(rh) ((rh)->pBuf) // #define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf) // #define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf) // #define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM)) // #define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) { // static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
switch (blkVer) { // switch (blkVer) {
case TSDB_SBLK_VER_0: // case TSDB_SBLK_VER_0:
default: // default:
return TSDB_BLOCK_STATIS_SIZE(nCols, 0); // return TSDB_BLOCK_STATIS_SIZE(nCols, 0);
} // }
} // }
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM)) // #define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) { // static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
switch (blkVer) { // switch (blkVer) {
case TSDB_SBLK_VER_0: // case TSDB_SBLK_VER_0:
default: // default:
return TSDB_BLOCK_AGGR_SIZE(nCols, 0); // return TSDB_BLOCK_AGGR_SIZE(nCols, 0);
} // }
} // }
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { // static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
void *pBuf = *ppBuf; // void *pBuf = *ppBuf;
size_t tsize = taosTSizeof(pBuf); // size_t tsize = taosTSizeof(pBuf);
if (tsize < size) { // if (tsize < size) {
if (tsize == 0) tsize = 1024; // if (tsize == 0) tsize = 1024;
while (tsize < size) { // while (tsize < size) {
tsize *= 2; // tsize *= 2;
} // }
*ppBuf = taosTRealloc(pBuf, tsize); // *ppBuf = taosTRealloc(pBuf, tsize);
if (*ppBuf == NULL) { // if (*ppBuf == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; // terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; // return -1;
} // }
} // }
return 0; // return 0;
} // }
// tsdbMemory // // tsdbMemory
static FORCE_INLINE void *taosTMalloc(size_t size) { // static FORCE_INLINE void *taosTMalloc(size_t size) {
if (size <= 0) return NULL; // if (size <= 0) return NULL;
void *ret = taosMemoryMalloc(size + sizeof(size_t)); // void *ret = taosMemoryMalloc(size + sizeof(size_t));
if (ret == NULL) return NULL; // if (ret == NULL) return NULL;
*(size_t *)ret = size; // *(size_t *)ret = size;
return (void *)((char *)ret + sizeof(size_t)); // return (void *)((char *)ret + sizeof(size_t));
} // }
static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) { // static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) {
size_t tsize = nmemb * size; // size_t tsize = nmemb * size;
void *ret = taosTMalloc(tsize); // void *ret = taosTMalloc(tsize);
if (ret == NULL) return NULL; // if (ret == NULL) return NULL;
taosTMemset(ret, 0); // taosTMemset(ret, 0);
return ret; // return ret;
} // }
static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; } // static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; }
static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); } // static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); }
static FORCE_INLINE void *taosTRealloc(void *ptr, size_t size) { // static FORCE_INLINE void *taosTRealloc(void *ptr, size_t size) {
if (ptr == NULL) return taosTMalloc(size); // if (ptr == NULL) return taosTMalloc(size);
if (size <= taosTSizeof(ptr)) return ptr; // if (size <= taosTSizeof(ptr)) return ptr;
void *tptr = (void *)((char *)ptr - sizeof(size_t)); // void *tptr = (void *)((char *)ptr - sizeof(size_t));
size_t tsize = size + sizeof(size_t); // size_t tsize = size + sizeof(size_t);
void *tptr1 = taosMemoryRealloc(tptr, tsize); // void *tptr1 = taosMemoryRealloc(tptr, tsize);
if (tptr1 == NULL) return NULL; // if (tptr1 == NULL) return NULL;
tptr = tptr1; // tptr = tptr1;
*(size_t *)tptr = size; // *(size_t *)tptr = size;
return (void *)((char *)tptr + sizeof(size_t)); // return (void *)((char *)tptr + sizeof(size_t));
} // }
static FORCE_INLINE void *taosTZfree(void *ptr) { // static FORCE_INLINE void *taosTZfree(void *ptr) {
if (ptr) { // if (ptr) {
taosMemoryFree((void *)((char *)ptr - sizeof(size_t))); // taosMemoryFree((void *)((char *)ptr - sizeof(size_t)));
} // }
return NULL; // return NULL;
} // }
// tsdbCommit // tsdbCommit
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn); // void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t minutes, int8_t precision) { static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t minutes, int8_t precision) {
if (key < 0) { if (key < 0) {
@ -590,107 +618,106 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
} }
// tsdbFile // tsdbFile
#define TSDB_FILE_HEAD_SIZE 512 // #define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F // #define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF // #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TSDB_IVLD_FID INT_MIN // #define TSDB_IVLD_FID INT_MIN
#define TSDB_FILE_STATE_OK 0 // #define TSDB_FILE_STATE_OK 0
#define TSDB_FILE_STATE_BAD 1 // #define TSDB_FILE_STATE_BAD 1
#define TSDB_FILE_F(tf) (&((tf)->f)) // #define TSDB_FILE_F(tf) (&((tf)->f))
#define TSDB_FILE_PFILE(tf) ((tf)->pFile) // #define TSDB_FILE_PFILE(tf) ((tf)->pFile)
#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname) // #define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname)
#define TSDB_FILE_OPENED(tf) (TSDB_FILE_PFILE(tf) != NULL) // #define TSDB_FILE_OPENED(tf) (TSDB_FILE_PFILE(tf) != NULL)
#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf)) // #define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf))
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_PFILE(f) = NULL) // #define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_PFILE(f) = NULL)
#define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level) // #define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level)
#define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id) // #define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id)
#define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did) // #define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did)
#define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname) // #define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname)
#define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname) // #define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname)
#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_PFILE(tf)) // #define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_PFILE(tf))
#define TSDB_FILE_STATE(tf) ((tf)->state) // #define TSDB_FILE_STATE(tf) ((tf)->state)
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) // #define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) // #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) // #define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
typedef enum { // typedef enum {
TSDB_FS_VER_0 = 0, // TSDB_FS_VER_0 = 0,
TSDB_FS_VER_MAX, // TSDB_FS_VER_MAX,
} ETsdbFsVer; // } ETsdbFsVer;
#define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile // #define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile
#define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file // #define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file
static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile // static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile
switch (fType) { // switch (fType) {
case TSDB_FILE_HEAD: // .head // case TSDB_FILE_HEAD: // .head
case TSDB_FILE_DATA: // .data // case TSDB_FILE_DATA: // .data
case TSDB_FILE_LAST: // .last // case TSDB_FILE_LAST: // .last
case TSDB_FILE_SMAD: // .smad(Block-wise SMA) // case TSDB_FILE_SMAD: // .smad(Block-wise SMA)
case TSDB_FILE_SMAL: // .smal(Block-wise SMA) // case TSDB_FILE_SMAL: // .smal(Block-wise SMA)
default: // default:
return TSDB_LATEST_FVER; // return TSDB_LATEST_FVER;
} // }
} // }
// =============== SDFileSet // =============== SDFileSet
#define TSDB_LATEST_FSET_VER 0 // #define TSDB_LATEST_FSET_VER 0
#define TSDB_FSET_FID(s) ((s)->fid) // #define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_FSET_STATE(s) ((s)->state) // #define TSDB_FSET_STATE(s) ((s)->state)
#define TSDB_FSET_VER(s) ((s)->ver) // #define TSDB_FSET_VER(s) ((s)->ver)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) // #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) // #define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0)) // #define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_SET_CLOSED(s) \ // #define TSDB_FSET_SET_CLOSED(s) \
do { \ // do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ // for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \ // TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
} \ // } \
} while (0); // } while (0);
#define TSDB_FSET_FSYNC(s) \ // #define TSDB_FSET_FSYNC(s) \
do { \ // do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ // for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \ // TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \
} \ // } \
} while (0); // } while (0);
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet *pSet) { // static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { // for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) { // if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) {
return false; // return false;
} // }
} // }
return true; // return true;
} // }
// tsdbFS
// ================== TSDB global config // ================== TSDB global config
extern bool tsdbForceKeepFile; extern bool tsdbForceKeepFile;
// ================== CURRENT file header info // ================== CURRENT file header info
typedef struct { // typedef struct {
uint32_t version; // Current file system version (relating to code) // uint32_t version; // Current file system version (relating to code)
uint32_t len; // Encode content length (including checksum) // uint32_t len; // Encode content length (including checksum)
} SFSHeader; // } SFSHeader;
// ================== TSDB File System Meta // // ================== TSDB File System Meta
#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus) // #define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus)
#define FS_NEW_STATUS(pfs) ((pfs)->nstatus) // #define FS_NEW_STATUS(pfs) ((pfs)->nstatus)
#define FS_IN_TXN(pfs) (pfs)->intxn // #define FS_IN_TXN(pfs) (pfs)->intxn
#define FS_VERSION(pfs) ((pfs)->cstatus->meta.version) // #define FS_VERSION(pfs) ((pfs)->cstatus->meta.version)
#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version) // #define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version)
struct SFSIter { // struct SFSIter {
int direction; // int direction;
uint64_t version; // current FS version // uint64_t version; // current FS version
STsdbFS *pfs; // STsdbFS *pfs;
int index; // used to position next fset when version the same // int index; // used to position next fset when version the same
int fid; // used to seek when version is changed // int fid; // used to seek when version is changed
SDFileSet *pSet; // SDFileSet *pSet;
}; // };
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
@ -715,25 +742,6 @@ typedef struct {
TSKEY eKey; TSKEY eKey;
} SDelInfo; } SDelInfo;
static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;
TSDBKEY *pKey2 = (TSDBKEY *)p2;
if (pKey1->ts < pKey2->ts) {
return -1;
} else if (pKey1->ts > pKey2->ts) {
return 1;
}
if (pKey1->version < pKey2->version) {
return -1;
} else if (pKey1->version > pKey2->version) {
return 1;
}
return 0;
}
struct STbDataIter { struct STbDataIter {
STbData *pTbData; STbData *pTbData;
int8_t backward; int8_t backward;

View File

@ -15,58 +15,51 @@
#include "tsdb.h" #include "tsdb.h"
typedef struct SCommitIter SCommitIter; typedef struct SCommitter SCommitter;
typedef struct SCommitter SCommitter;
struct SCommitter { struct SCommitter {
SRtn rtn; // retention snapshot STsdb *pTsdb;
SFSIter fsIter; // tsdb file iterator int32_t minutes;
int niters; // memory iterators int8_t precision;
SCommitIter *iters; TSKEY nextCommitKey;
bool isRFileSet; // read and commit FSET // commit file data
int32_t fid; int32_t commitFid;
SDFileSet *pSet; TSKEY minKey;
SReadH readh; TSKEY maxKey;
SDFileSet wSet; SDFileSetReader *pReader;
bool isDFileSame; SDFileSetWriter *pWriter;
bool isLFileSame; SArray *aOBlockIdx;
TSKEY minKey; SArray *aBlockIdx;
TSKEY maxKey; // commit table data
SArray *aBlkIdx; // SBlockIdx array STbData *pTbData;
STable *pTable; SBlockIdx *pBlockIdx;
SArray *aSupBlk; // Table super-block array
SArray *aSubBlk; // table sub-block array
SDataCols *pDataCols;
}; };
static int32_t tsdbCommitData(SCommitter *pCommith); static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommith); static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommith); static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCHandle); static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCHandle, int eno); static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
int32_t tsdbBegin(STsdb *pTsdb) { int32_t tsdbBegin(STsdb *pTsdb) {
if (!pTsdb) return 0; int32_t code = 0;
SMemTable *pMem; code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
if (code) {
if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) { goto _err;
return -1;
} }
return 0; return code;
_err:
return code;
} }
int32_t tsdbCommit(STsdb *pTsdb) { int32_t tsdbCommit(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
SCommitter commith = {0}; SCommitter commith = {0};
SDFileSet *pSet = NULL;
int fid; int fid;
ASSERT(pTsdb->imem == NULL && pTsdb->mem);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
// start commit // start commit
code = tsdbStartCommit(pTsdb, &commith); code = tsdbStartCommit(pTsdb, &commith);
if (code) { if (code) {
@ -102,11 +95,348 @@ _err:
return code; return code;
} }
#ifdef USE_NEW
// ===================================== NEW ====================================== // ===================================== NEW ======================================
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
int32_t code = 0;
#else ASSERT(pTsdb->mem && pTsdb->imem == NULL);
// ===================================== OLD ====================================== // lock();
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
// unlock();
pCommitter->pTsdb = pTsdb;
return code;
}
static int32_t tsdbCommitDataStart(SCommitter *pCommitter);
static int32_t tsdbCommitDataImpl(SCommitter *pCommitter);
static int32_t tsdbCommitDataEnd(SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
// no data, just return
if (pMemTable->nRow == 0) {
tsdbDebug("vgId:%d no data to commit", TD_VID(pTsdb->pVnode));
goto _exit;
}
// start
code = tsdbCommitDataStart(pCommitter);
if (code) {
goto _err;
}
// commit
code = tsdbCommitDataImpl(pCommitter);
if (code) {
goto _err;
}
// end
code = tsdbCommitDataEnd(pCommitter);
if (code) {
goto _err;
}
_exit:
tsdbDebug("vgId:%d commit TSDB data, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
return code;
_err:
tsdbError("vgId:%d failed to commit TSDB data since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
if (pMemTable->nDelOp == 0) {
goto _exit;
}
// start
code = tsdbCommitDelStart(pCommitter);
if (code) {
goto _err;
}
// impl
code = tsdbCommitDelImpl(pCommitter);
if (code) {
goto _err;
}
// end
code = tsdbCommitDelEnd(pCommitter);
if (code) {
goto _err;
}
_exit:
return code;
_err:
return code;
}
static int32_t tsdbCommitCache(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
pCommitter->nextCommitKey = pMemTable->minKey.ts;
return code;
}
static int32_t tsdbCommitFileData(SCommitter *pCommitter);
static int32_t tsdbCommitDataImpl(SCommitter *pCommitter) {
int32_t code = 0;
for (;;) {
if (pCommitter->nextCommitKey == TSKEY_MAX) break;
pCommitter->commitFid = TSDB_KEY_FID(pCommitter->nextCommitKey, pCommitter->minutes, pCommitter->precision);
code = tsdbCommitFileData(pCommitter);
if (code) {
goto _err;
}
}
_exit:
return code;
_err:
return code;
}
static int32_t tsdbCommitDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter);
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter);
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
int32_t code = 0;
// commit file data start
code = tsdbCommitFileDataStart(pCommitter);
if (code) {
goto _err;
}
// commit file data impl
code = tsdbCommitFileDataImpl(pCommitter);
if (code) {
goto _err;
}
// commit file data end
code = tsdbCommitFileDataEnd(pCommitter);
if (code) {
goto _err;
}
return code;
_err:
return code;
}
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SDFileSet *pRSet = NULL;
SDFileSet *pWSet = NULL;
taosArrayClear(pCommitter->aOBlockIdx);
taosArrayClear(pCommitter->aBlockIdx);
// search pRSet (todo)
// open file to read
if (pRSet) {
code = tsdbDFileSetReaderOpen(pCommitter->pReader, pTsdb, pRSet);
if (code) goto _err;
// create/open the pWSet (todo)
} else {
// create the pWSet (todo)
}
// open file for write
code = tsdbDFileSetWriterOpen(pCommitter->pWriter, pTsdb, pWSet);
if (code) goto _err;
// read the SBlockIdx part for merge purpose
code = tsdbLoadSBlockIdx(pCommitter->pReader, pCommitter->aOBlockIdx);
if (code) goto _err;
_exit:
return code;
_err:
return code;
}
static int32_t tsdbCommitTableData(SCommitter *pCommitter);
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
int32_t iBlockIdx = 0;
int32_t nBlockIdx = taosArrayGetSize(pCommitter->aOBlockIdx);
int32_t iTbData = 0;
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
SBlockIdx *pBlockIdx;
STbData *pTbData;
while (iTbData < nTbData || iBlockIdx < nBlockIdx) {
pTbData = NULL;
pBlockIdx = NULL;
if (iTbData < nTbData) {
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
}
if (iBlockIdx < nBlockIdx) {
pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->aOBlockIdx, iBlockIdx);
}
if (pTbData && pBlockIdx) {
int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
if (c == 0) {
iTbData++;
iBlockIdx++;
} else if (c < 0) {
iTbData++;
pBlockIdx = NULL;
} else {
iBlockIdx++;
pTbData = NULL;
}
} else {
if (pTbData) {
iTbData++;
} else {
iBlockIdx++;
}
}
pCommitter->pTbData = pTbData;
pCommitter->pBlockIdx = pBlockIdx;
code = tsdbCommitTableData(pCommitter);
if (code) {
goto _err;
}
}
return code;
_err:
return code;
}
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter);
static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter);
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter);
static int32_t tsdbCommitTableData(SCommitter *pCommitter) {
int32_t code = 0;
// start
code = tsdbCommitTableDataStart(pCommitter);
if (code) {
goto _err;
}
// impl
code = tsdbCommitTableDataImpl(pCommitter);
if (code) {
goto _err;
}
// end
code = tsdbCommitTableDataEnd(pCommitter);
if (code) {
goto _err;
}
_exit:
return code;
_err:
return code;
}
static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
return code;
}
// // ===================================== OLD ======================================
#if 0
struct SCommitIter { struct SCommitIter {
STable *pTable; STable *pTable;
STbDataIter *pIter; STbDataIter *pIter;
@ -146,8 +476,9 @@ static int tsdbWriteBlockInfo(SCommitter *pCommih);
static int tsdbCommitMemData(SCommitter *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); static int tsdbCommitMemData(SCommitter *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int tsdbMergeMemData(SCommitter *pCommith, SCommitIter *pIter, int bidx); static int tsdbMergeMemData(SCommitter *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitter *pCommith, int bidx); static int tsdbMoveBlock(SCommitter *pCommith, int bidx);
static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int
static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, nSubBlocks); static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY
keyLimit,
bool isLastOneBlock); bool isLastOneBlock);
static void tsdbResetCommitTable(SCommitter *pCommith); static void tsdbResetCommitTable(SCommitter *pCommith);
static void tsdbCloseCommitFile(SCommitter *pCommith, bool hasError); static void tsdbCloseCommitFile(SCommitter *pCommith, bool hasError);
@ -273,28 +604,14 @@ static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
return 0; return 0;
} }
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep2 * tsTickPerMin[pCfg->precision];
midKey = now - pCfg->keep1 * tsTickPerMin[pCfg->precision];
maxKey = now - pCfg->keep0 * tsTickPerMin[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision));
pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->days, pCfg->precision));
pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->days, pCfg->precision));
tsdbDebug("vgId:%d, now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey,
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
}
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCHandle) { static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCHandle) {
int32_t code = 0; int32_t code = 0;
tsdbInfo("vgId:%d, start to commit", REPO_ID(pTsdb)); tsdbInfo("vgId:%d, start to commit", REPO_ID(pTsdb));
ASSERT(pTsdb->imem == NULL && pTsdb->mem);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
if (tsdbInitCommitH(pCHandle, pTsdb) < 0) { if (tsdbInitCommitH(pCHandle, pTsdb) < 0) {
return -1; return -1;
} }
@ -443,7 +760,8 @@ static int32_t tsdbCommitToFileEnd(SCommitter *pCommith) {
int32_t code = 0; int32_t code = 0;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) < if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith))))
<
0) { 0) {
tsdbError("vgId:%d, failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), pCommith->fid, tsdbError("vgId:%d, failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), pCommith->fid,
tstrerror(terrno)); tstrerror(terrno));
@ -499,7 +817,8 @@ static int32_t tsdbCommitToFile(SCommitter *pCommith, SDFileSet *pSet, int fid)
break; break;
} }
if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->suid <= pIdx->suid || pIter->pTable->uid <= pIdx->uid))) { if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->suid <= pIdx->suid || pIter->pTable->uid <= pIdx->uid)))
{
if (tsdbCommitToTable(pCommith, mIter) < 0) { if (tsdbCommitToTable(pCommith, mIter) < 0) {
tsdbCloseCommitFile(pCommith, true); tsdbCloseCommitFile(pCommith, true);
// revert the file change // revert the file change
@ -701,7 +1020,8 @@ static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int f
pCommith->isLFileSame = false; pCommith->isLFileSame = false;
if (tsdbCreateDFile(pRepo, pWLastf, true, TSDB_FILE_LAST) < 0) { if (tsdbCreateDFile(pRepo, pWLastf, true, TSDB_FILE_LAST) < 0) {
tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno)); tstrerror(terrno));
tsdbCloseDFileSet(pWSet); tsdbCloseDFileSet(pWSet);
@ -722,7 +1042,8 @@ static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int f
tsdbInitDFile(pRepo, pWSmadF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD); tsdbInitDFile(pRepo, pWSmadF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD);
if (tsdbCreateDFile(pRepo, pWSmadF, true, TSDB_FILE_SMAD) < 0) { if (tsdbCreateDFile(pRepo, pWSmadF, true, TSDB_FILE_SMAD) < 0) {
tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF), tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pWSmadF),
tstrerror(terrno)); tstrerror(terrno));
tsdbCloseDFileSet(pWSet); tsdbCloseDFileSet(pWSet);
@ -769,7 +1090,8 @@ static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int f
tsdbInitDFile(pRepo, pWSmalF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL); tsdbInitDFile(pRepo, pWSmalF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL);
if (tsdbCreateDFile(pRepo, pWSmalF, true, TSDB_FILE_SMAL) < 0) { if (tsdbCreateDFile(pRepo, pWSmalF, true, TSDB_FILE_SMAL) < 0) {
tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF), tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pWSmalF),
tstrerror(terrno)); tstrerror(terrno));
tsdbCloseDFileSet(pWSet); tsdbCloseDFileSet(pWSet);
@ -1084,7 +1406,8 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
* @return int * @return int
*/ */
static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) { SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf)
{
STsdbCfg *pCfg = REPO_CFG(pRepo); STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData = NULL; SBlockData *pBlockData = NULL;
SAggrBlkData *pAggrBlkData = NULL; SAggrBlkData *pAggrBlkData = NULL;
@ -1132,7 +1455,8 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull)); &(pBlockCol->numOfNull));
#endif #endif
(*tDataTypes[pDataCol->type].statisFunc)(pDataCols->bitmapMode, pDataCol->pBitmap, pDataCol->pData, rowsToWrite, (*tDataTypes[pDataCol->type].statisFunc)(pDataCols->bitmapMode, pDataCol->pBitmap, pDataCol->pData,
rowsToWrite,
&(pAggrBlkCol->min), &(pAggrBlkCol->max), &(pAggrBlkCol->sum), &(pAggrBlkCol->min), &(pAggrBlkCol->max), &(pAggrBlkCol->sum),
&(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex), &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex),
&(pAggrBlkCol->numOfNull)); &(pAggrBlkCol->numOfNull));
@ -1314,7 +1638,8 @@ static int tsdbWriteBlockInfo(SCommitter *pCommih) {
SBlockIdx blkIdx; SBlockIdx blkIdx;
STable *pTable = TSDB_COMMIT_TABLE(pCommih); STable *pTable = TSDB_COMMIT_TABLE(pCommih);
if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))), if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void
**)(&(TSDB_COMMIT_BUF(pCommih))),
&blkIdx) < 0) { &blkIdx) < 0) {
return -1; return -1;
} }
@ -1430,7 +1755,8 @@ static int tsdbMergeMemData(SCommitter *pCommith, SCommitIter *pIter, int bidx)
if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1;
} else { } else {
if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1; if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return
-1;
} }
return 0; return 0;
@ -1483,7 +1809,8 @@ static int tsdbMoveBlock(SCommitter *pCommith, int bidx) {
return 0; return 0;
} }
static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) { static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int
nSubBlocks) {
if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) { if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
@ -1508,7 +1835,8 @@ static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCol
int biter = 0; int biter = 0;
while (true) { while (true) {
tsdbLoadAndMergeFromCache(TSDB_COMMIT_REPO(pCommith), pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, tsdbLoadAndMergeFromCache(TSDB_COMMIT_REPO(pCommith), pCommith->readh.pDCols[0], &biter, pIter,
pCommith->pDataCols,
keyLimit, defaultRows, pCfg->update); keyLimit, defaultRows, pCfg->update);
if (pCommith->pDataCols->numOfRows == 0) break; if (pCommith->pDataCols->numOfRows == 0) break;

View File

@ -15,6 +15,41 @@
#include "tsdb.h" #include "tsdb.h"
struct STsdbFS {
STsdb *pTsdb;
TdThreadRwlock lock;
int64_t minVersion;
int64_t maxVersion;
STsdbTombstoneFile *pTombstoneF;
STsdbCacheFile *pCacheF;
SArray *pArray;
};
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbFSClose(STsdbFS *pFS) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbFSStart(STsdbFS *pFS) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback) {
int32_t code = 0;
// TODO
return code;
}
#if 0
extern const char *TSDB_LEVEL_DNAME[]; extern const char *TSDB_LEVEL_DNAME[];
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T; typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
@ -300,8 +335,6 @@ void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd) {
pfs->nstatus->meta.totalStorage = pfs->cstatus->meta.totalStorage += storageAdd; pfs->nstatus->meta.totalStorage = pfs->cstatus->meta.totalStorage += storageAdd;
} }
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; }
int tsdbEndFSTxn(STsdb *pRepo) { int tsdbEndFSTxn(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo); STsdbFS *pfs = REPO_FS(pRepo);
ASSERT(FS_IN_TXN(pfs)); ASSERT(FS_IN_TXN(pfs));
@ -334,8 +367,6 @@ int tsdbEndFSTxnWithError(STsdbFS *pfs) {
return 0; return 0;
} }
// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pfs->nstatus, pMFile); }
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); } int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) { static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
@ -1060,4 +1091,22 @@ int tsdbUnLockFS(STsdbFS *pFs) {
return -1; return -1;
} }
return 0; return 0;
} }
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep2 * tsTickPerMin[pCfg->precision];
midKey = now - pCfg->keep1 * tsTickPerMin[pCfg->precision];
maxKey = now - pCfg->keep0 * tsTickPerMin[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision));
pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->days, pCfg->precision));
pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->days, pCfg->precision));
tsdbDebug("vgId:%d, now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey,
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
}
#endif

View File

@ -15,6 +15,45 @@
#include "tsdb.h" #include "tsdb.h"
static const char *tsdbFileSuffix[] = {".tombstone", ".cache", ".index", ".data", ".last", ".sma", ""};
// .tombstone
struct STsdbTombstoneFile {
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
};
struct STsdbIndexFile {
int64_t size;
int64_t offset;
int32_t nRef;
};
struct STsdbDataFile {
int64_t size;
int32_t nRef;
};
struct STsdbLastFile {
int64_t size;
int32_t nRef;
};
struct STsdbSmaFile {
int64_t size;
int32_t nRef;
};
struct SDFileSet {
STsdbIndexFile *pIndexF;
STsdbDataFile *pDataF;
STsdbLastFile *pLastF;
STsdbSmaFile *pSmaF;
};
#if 0
static const char *TSDB_FNAME_SUFFIX[] = { static const char *TSDB_FNAME_SUFFIX[] = {
"head", // TSDB_FILE_HEAD "head", // TSDB_FILE_HEAD
"data", // TSDB_FILE_DATA "data", // TSDB_FILE_DATA
@ -579,4 +618,5 @@ int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest) {
void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey) { void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fid * days * tsTickPerMin[precision]; *minKey = fid * days * tsTickPerMin[precision];
*maxKey = *minKey + days * tsTickPerMin[precision] - 1; *maxKey = *minKey + days * tsTickPerMin[precision] - 1;
} }
#endif

View File

@ -17,7 +17,6 @@
static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg); static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg);
// implementation // implementation
static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg) { static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg) {
@ -64,13 +63,13 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
} else { } else {
memcpy(&pTsdb->keepCfg, pKeepCfg, sizeof(STsdbKeepCfg)); memcpy(&pTsdb->keepCfg, pKeepCfg, sizeof(STsdbKeepCfg));
} }
pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb)); // pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb));
// create dir (TODO: use tfsMkdir) // create dir (TODO: use tfsMkdir)
taosMkDir(pTsdb->path); taosMkDir(pTsdb->path);
// open tsdb // open tsdb
if (tsdbOpenFS(pTsdb) < 0) { if (tsdbFSOpen(pTsdb, &pTsdb->fs) < 0) {
goto _err; goto _err;
} }
@ -89,8 +88,8 @@ int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) { if (*pTsdb) {
// TODO: destroy mem/imem // TODO: destroy mem/imem
taosThreadMutexDestroy(&(*pTsdb)->mutex); taosThreadMutexDestroy(&(*pTsdb)->mutex);
tsdbCloseFS(*pTsdb); tsdbFSClose((*pTsdb)->fs);
tsdbFreeFS((*pTsdb)->fs); // tsdbFreeFS((*pTsdb)->fs);
taosMemoryFreeClear(*pTsdb); taosMemoryFreeClear(*pTsdb);
} }
return 0; return 0;
@ -99,7 +98,7 @@ int tsdbClose(STsdb **pTsdb) {
int tsdbLockRepo(STsdb *pTsdb) { int tsdbLockRepo(STsdb *pTsdb) {
int code = taosThreadMutexLock(&pTsdb->mutex); int code = taosThreadMutexLock(&pTsdb->mutex);
if (code != 0) { if (code != 0) {
tsdbError("vgId:%d, failed to lock tsdb since %s", REPO_ID(pTsdb), strerror(errno)); tsdbError("vgId:%d, failed to lock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
return -1; return -1;
} }
@ -108,11 +107,11 @@ int tsdbLockRepo(STsdb *pTsdb) {
} }
int tsdbUnlockRepo(STsdb *pTsdb) { int tsdbUnlockRepo(STsdb *pTsdb) {
ASSERT(IS_REPO_LOCKED(pTsdb)); // ASSERT(IS_REPO_LOCKED(pTsdb));
pTsdb->repoLocked = false; pTsdb->repoLocked = false;
int code = taosThreadMutexUnlock(&pTsdb->mutex); int code = taosThreadMutexUnlock(&pTsdb->mutex);
if (code != 0) { if (code != 0) {
tsdbError("vgId:%d, failed to unlock tsdb since %s", REPO_ID(pTsdb), strerror(errno)); tsdbError("vgId:%d, failed to unlock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
return -1; return -1;
} }

View File

@ -14,6 +14,8 @@
*/ */
#include "tsdb.h" #include "tsdb.h"
#if 0
#include "vnode.h" #include "vnode.h"
#define EXTRA_BYTES 2 #define EXTRA_BYTES 2
@ -3725,3 +3727,5 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
taosMemoryFreeClear(pTsdbReadHandle); taosMemoryFreeClear(pTsdbReadHandle);
} }
#endif

View File

@ -15,6 +15,7 @@
#include "tsdb.h" #include "tsdb.h"
#if 0
#define TSDB_KEY_COL_OFFSET 0 #define TSDB_KEY_COL_OFFSET 0
static void tsdbResetReadTable(SReadH *pReadh); static void tsdbResetReadTable(SReadH *pReadh);
@ -975,3 +976,4 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
return 0; return 0;
} }
#endif

View File

@ -17,22 +17,20 @@
// SDFileSetWritter ==================================================== // SDFileSetWritter ====================================================
struct SDFileSetWritter { struct SDFileSetWritter {
STsdb *pTsdb; STsdb *pTsdb;
SDFileSet wSet; int32_t szBuf1;
int32_t szBuf1; uint8_t *pBuf1;
uint8_t *pBuf1; int32_t szBuf2;
int32_t szBuf2; uint8_t *pBuf2;
uint8_t *pBuf2;
}; };
// SDFileSetReader ==================================================== // SDFileSetReader ====================================================
struct SDFileSetReader { struct SDFileSetReader {
STsdb *pTsdb; STsdb *pTsdb;
SDFileSet rSet; int32_t szBuf1;
int32_t szBuf1; uint8_t *pBuf1;
uint8_t *pBuf1; int32_t szBuf2;
int32_t szBuf2; uint8_t *pBuf2;
uint8_t *pBuf2;
}; };
int32_t tsdbDFileSetReaderOpen(SDFileSetReader *pReader, STsdb *pTsdb, SDFileSet *pSet) { int32_t tsdbDFileSetReaderOpen(SDFileSetReader *pReader, STsdb *pTsdb, SDFileSet *pSet) {
@ -40,12 +38,6 @@ int32_t tsdbDFileSetReaderOpen(SDFileSetReader *pReader, STsdb *pTsdb, SDFileSet
memset(pReader, 0, sizeof(*pReader)); memset(pReader, 0, sizeof(*pReader));
pReader->pTsdb = pTsdb; pReader->pTsdb = pTsdb;
pReader->rSet = *pSet;
code = tsdbOpenDFileSet(&pReader->rSet, TD_FILE_READ);
if (code) {
goto _err;
}
return code; return code;
@ -59,7 +51,6 @@ int32_t tsdbDFileSetReaderClose(SDFileSetReader *pReader) {
taosMemoryFreeClear(pReader->pBuf1); taosMemoryFreeClear(pReader->pBuf1);
taosMemoryFreeClear(pReader->pBuf2); taosMemoryFreeClear(pReader->pBuf2);
tsdbCloseDFileSet(&pReader->rSet);
return code; return code;
} }
@ -84,12 +75,10 @@ int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockSta
// STombstoneFileWriter ==================================================== // STombstoneFileWriter ====================================================
struct STombstoneFileWriter { struct STombstoneFileWriter {
STsdb *pTsdb; STsdb *pTsdb;
SDFile *pTombstoneF;
}; };
// STombstoneFileReader ==================================================== // STombstoneFileReader ====================================================
struct STombstoneFileReader { struct STombstoneFileReader {
STsdb *pTsdb; STsdb *pTsdb;
SDFile *pTombstoneF;
}; };

View File

@ -0,0 +1,54 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
TABLEID *pId1 = (TABLEID *)p1;
TABLEID *pId2 = (TABLEID *)p2;
if (pId1->suid < pId2->suid) {
return -1;
} else if (pId1->suid > pId2->suid) {
return 1;
}
if (pId1->uid < pId2->uid) {
return -1;
} else if (pId1->uid > pId2->uid) {
return 1;
}
return 0;
}
int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;
TSDBKEY *pKey2 = (TSDBKEY *)p2;
if (pKey1->ts < pKey2->ts) {
return -1;
} else if (pKey1->ts > pKey2->ts) {
return 1;
}
if (pKey1->version < pKey2->version) {
return -1;
} else if (pKey1->version > pKey2->version) {
return 1;
}
return 0;
}

View File

@ -28,7 +28,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
// scan and convert // scan and convert
if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) { if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) { if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d, failed to insert data since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(terrno));
} }
return -1; return -1;
} }
@ -77,7 +77,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro
if (rowKey < minKey || rowKey > maxKey) { if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64, " maxKey %" PRId64 " row key %" PRId64,
REPO_ID(pTsdb), uid, now, minKey, maxKey, rowKey); TD_VID(pTsdb->pVnode), uid, now, minKey, maxKey, rowKey);
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1; return -1;
} }
@ -92,7 +92,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
SSubmitBlk *pBlock = NULL; SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
STSRow *row = NULL; STSRow *row = NULL;
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pTsdb); STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
TSKEY now = taosGetTimestamp(pCfg->precision); TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2; TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = now + tsTickPerMin[pCfg->precision] * pCfg->days; TSKEY maxKey = now + tsTickPerMin[pCfg->precision] * pCfg->days;