refact TSDB
This commit is contained in:
parent
40a396563b
commit
bbc2ba9fef
|
@ -101,7 +101,28 @@ int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest);
|
||||||
void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey);
|
void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey);
|
||||||
|
|
||||||
// tsdbFS.c ==============================================================================================
|
// tsdbFS.c ==============================================================================================
|
||||||
typedef struct STsdbFS STsdbFS;
|
typedef struct STsdbFS STsdbFS;
|
||||||
|
typedef struct SFSIter SFSIter;
|
||||||
|
typedef struct STsdbFSMeta STsdbFSMeta;
|
||||||
|
|
||||||
|
STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg);
|
||||||
|
void *tsdbFreeFS(STsdbFS *pfs);
|
||||||
|
int tsdbOpenFS(STsdb *pRepo);
|
||||||
|
void tsdbCloseFS(STsdb *pRepo);
|
||||||
|
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
|
||||||
|
int tsdbEndFSTxn(STsdb *pRepo);
|
||||||
|
int tsdbEndFSTxnWithError(STsdbFS *pfs);
|
||||||
|
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
|
||||||
|
// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
|
||||||
|
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
|
||||||
|
|
||||||
|
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
|
||||||
|
void tsdbFSIterSeek(SFSIter *pIter, int fid);
|
||||||
|
SDFileSet *tsdbFSIterNext(SFSIter *pIter);
|
||||||
|
int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
|
||||||
|
int tsdbRLockFS(STsdbFS *pFs);
|
||||||
|
int tsdbWLockFS(STsdbFS *pFs);
|
||||||
|
int tsdbUnLockFS(STsdbFS *pFs);
|
||||||
|
|
||||||
// tsdbMemTable ================
|
// tsdbMemTable ================
|
||||||
typedef struct STbData STbData;
|
typedef struct STbData STbData;
|
||||||
|
@ -208,11 +229,11 @@ struct STsdbMemTable {
|
||||||
SHashObj *pHashIdx;
|
SHashObj *pHashIdx;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
struct STsdbFSMeta {
|
||||||
uint32_t version; // Commit version from 0 to increase
|
uint32_t version; // Commit version from 0 to increase
|
||||||
int64_t totalPoints; // total points
|
int64_t totalPoints; // total points
|
||||||
int64_t totalStorage; // Uncompressed total storage
|
int64_t totalStorage; // Uncompressed total storage
|
||||||
} STsdbFSMeta;
|
};
|
||||||
|
|
||||||
// ==================
|
// ==================
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -573,21 +594,7 @@ static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest v
|
||||||
|
|
||||||
// =============== SDFileSet
|
// =============== SDFileSet
|
||||||
|
|
||||||
typedef struct {
|
#define TSDB_LATEST_FSET_VER 0
|
||||||
int fid;
|
|
||||||
int8_t state;
|
|
||||||
uint8_t ver;
|
|
||||||
uint16_t reserve;
|
|
||||||
#if 0
|
|
||||||
SDFInfo info;
|
|
||||||
#endif
|
|
||||||
STfsFile f;
|
|
||||||
TdFilePtr pFile;
|
|
||||||
|
|
||||||
} SSFile; // files split by days with fid
|
|
||||||
|
|
||||||
#define TSDB_LATEST_FSET_VER 0
|
|
||||||
|
|
||||||
#define TSDB_FSET_FID(s) ((s)->fid)
|
#define TSDB_FSET_FID(s) ((s)->fid)
|
||||||
#define TSDB_FSET_STATE(s) ((s)->state)
|
#define TSDB_FSET_STATE(s) ((s)->state)
|
||||||
#define TSDB_FSET_VER(s) ((s)->ver)
|
#define TSDB_FSET_VER(s) ((s)->ver)
|
||||||
|
@ -634,61 +641,18 @@ typedef struct {
|
||||||
#define FS_VERSION(pfs) ((pfs)->cstatus->meta.version)
|
#define FS_VERSION(pfs) ((pfs)->cstatus->meta.version)
|
||||||
#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version)
|
#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version)
|
||||||
|
|
||||||
typedef struct {
|
struct SFSIter {
|
||||||
int direction;
|
int direction;
|
||||||
uint64_t version; // current FS version
|
uint64_t version; // current FS version
|
||||||
STsdbFS *pfs;
|
STsdbFS *pfs;
|
||||||
int index; // used to position next fset when version the same
|
int index; // used to position next fset when version the same
|
||||||
int fid; // used to seek when version is changed
|
int fid; // used to seek when version is changed
|
||||||
SDFileSet *pSet;
|
SDFileSet *pSet;
|
||||||
} SFSIter;
|
};
|
||||||
|
|
||||||
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
|
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
|
||||||
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
|
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
|
||||||
|
|
||||||
STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg);
|
|
||||||
void *tsdbFreeFS(STsdbFS *pfs);
|
|
||||||
int tsdbOpenFS(STsdb *pRepo);
|
|
||||||
void tsdbCloseFS(STsdb *pRepo);
|
|
||||||
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
|
|
||||||
int tsdbEndFSTxn(STsdb *pRepo);
|
|
||||||
int tsdbEndFSTxnWithError(STsdbFS *pfs);
|
|
||||||
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
|
|
||||||
// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
|
|
||||||
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
|
|
||||||
|
|
||||||
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
|
|
||||||
void tsdbFSIterSeek(SFSIter *pIter, int fid);
|
|
||||||
SDFileSet *tsdbFSIterNext(SFSIter *pIter);
|
|
||||||
int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
|
|
||||||
int code = taosThreadRwlockRdlock(&(pFs->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) {
|
|
||||||
int code = taosThreadRwlockWrlock(&(pFs->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
|
|
||||||
int code = taosThreadRwlockUnlock(&(pFs->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct TSDBROW {
|
struct TSDBROW {
|
||||||
int64_t version;
|
int64_t version;
|
||||||
STSRow2 tsRow;
|
STSRow2 tsRow;
|
||||||
|
|
|
@ -952,13 +952,6 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbRestoreCurrent(STsdb *pRepo) {
|
static int tsdbRestoreCurrent(STsdb *pRepo) {
|
||||||
// // Loop to recover mfile
|
|
||||||
// if (tsdbRestoreMeta(pRepo) < 0) {
|
|
||||||
// tsdbError("vgId:%d, failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Loop to recover dfile set
|
|
||||||
if (tsdbRestoreDFileSet(pRepo) < 0) {
|
if (tsdbRestoreDFileSet(pRepo) < 0) {
|
||||||
tsdbError("vgId:%d, failed to restore DFileSet since %s", REPO_ID(pRepo), tstrerror(terrno));
|
tsdbError("vgId:%d, failed to restore DFileSet since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1041,3 +1034,30 @@ static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired) {
|
||||||
tsdbCloseDFileSet(&fset);
|
tsdbCloseDFileSet(&fset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbRLockFS(STsdbFS *pFs) {
|
||||||
|
int code = taosThreadRwlockRdlock(&(pFs->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbWLockFS(STsdbFS *pFs) {
|
||||||
|
int code = taosThreadRwlockWrlock(&(pFs->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbUnLockFS(STsdbFS *pFs) {
|
||||||
|
int code = taosThreadRwlockUnlock(&(pFs->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue