refact tsdb fs
This commit is contained in:
parent
d795ff85b5
commit
e843619b87
|
@ -24,7 +24,7 @@ extern "C" {
|
||||||
|
|
||||||
// ================== CURRENT file header info
|
// ================== CURRENT file header info
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint32_t version; // Current file version
|
uint32_t version; // Current file system version (relating to code)
|
||||||
uint32_t len; // Encode content length (including checksum)
|
uint32_t len; // Encode content length (including checksum)
|
||||||
} SFSHeader;
|
} SFSHeader;
|
||||||
|
|
||||||
|
@ -72,9 +72,10 @@ STsdbFS *tsdbNewFS(int keep, int days);
|
||||||
void * tsdbFreeFS(STsdbFS *pfs);
|
void * tsdbFreeFS(STsdbFS *pfs);
|
||||||
int tsdbOpenFS(STsdbFS *pFs, int keep, int days);
|
int tsdbOpenFS(STsdbFS *pFs, int keep, int days);
|
||||||
void tsdbCloseFS(STsdbFS *pFs);
|
void tsdbCloseFS(STsdbFS *pFs);
|
||||||
int tsdbStartTxn(STsdbFS *pfs);
|
uint32_t tsdbStartFSTxn(STsdbFS *pfs);
|
||||||
int tsdbEndTxn(STsdbFS *pfs);
|
int tsdbEndFSTxn(STsdbFS *pfs);
|
||||||
int tsdbEndTxnWithError(STsdbFS *pfs);
|
int tsdbEndFSTxnWithError(STsdbFS *pfs);
|
||||||
|
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
|
||||||
void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
|
void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
|
||||||
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
|
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
|
||||||
|
|
||||||
|
|
|
@ -57,10 +57,11 @@ void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
|
||||||
void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile);
|
void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile);
|
||||||
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
|
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
|
||||||
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
|
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
|
||||||
|
int tsdbApplyMFileChange(SMFile* from, SMFile* to);
|
||||||
int tsdbCreateMFile(SMFile* pMFile);
|
int tsdbCreateMFile(SMFile* pMFile);
|
||||||
int tsdbUpdateMFileHeader(SMFile* pMFile);
|
int tsdbUpdateMFileHeader(SMFile* pMFile);
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMInfo* pInfo) { pMFile->info = *pInfo; }
|
static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMFInfo* pInfo) { pMFile->info = *pInfo; }
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) {
|
static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) {
|
||||||
ASSERT(TSDB_FILE_CLOSED(pMFile));
|
ASSERT(TSDB_FILE_CLOSED(pMFile));
|
||||||
|
|
|
@ -361,7 +361,7 @@ static int tsdbStartCommit(STsdbRepo *pRepo) {
|
||||||
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d",
|
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d",
|
||||||
REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList));
|
REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList));
|
||||||
|
|
||||||
if (tsdbStartTxn(REPO_FS(pRepo)) < 0) return -1;
|
if (tsdbStartFSTxn(REPO_FS(pRepo)) < 0) return -1;
|
||||||
|
|
||||||
pRepo->code = TSDB_CODE_SUCCESS;
|
pRepo->code = TSDB_CODE_SUCCESS;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -369,9 +369,9 @@ static int tsdbStartCommit(STsdbRepo *pRepo) {
|
||||||
|
|
||||||
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
|
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
|
||||||
if (eno != TSDB_CODE_SUCCESS) {
|
if (eno != TSDB_CODE_SUCCESS) {
|
||||||
tsdbEndTxnWithError(REPO_FS(pRepo));
|
tsdbEndFSTxnWithError(REPO_FS(pRepo));
|
||||||
} else {
|
} else {
|
||||||
tsdbEndTxn(REPO_FS(pRepo));
|
tsdbEndFSTxn(REPO_FS(pRepo));
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
|
tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
|
||||||
|
|
|
@ -40,7 +40,7 @@ static void *tsdbDecodeFSHeader(void *buf, SFSHeader *pHeader) {
|
||||||
static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) {
|
static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) {
|
||||||
int tlen = 0;
|
int tlen = 0;
|
||||||
|
|
||||||
tlen += taosEncodeFixedU64(buf, pMeta->version);
|
tlen += taosEncodeFixedU32(buf, pMeta->version);
|
||||||
tlen += taosEncodeFixedI64(buf, pMeta->totalPoints);
|
tlen += taosEncodeFixedI64(buf, pMeta->totalPoints);
|
||||||
tlen += taosEncodeFixedI64(buf, pMeta->totalStorage);
|
tlen += taosEncodeFixedI64(buf, pMeta->totalStorage);
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) {
|
static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) {
|
||||||
buf = taosDecodeFixedU64(buf, &(pMeta->version));
|
buf = taosDecodeFixedU32(buf, &(pMeta->version));
|
||||||
buf = taosDecodeFixedI64(buf, &(pMeta->totalPoints));
|
buf = taosDecodeFixedI64(buf, &(pMeta->totalPoints));
|
||||||
buf = taosDecodeFixedI64(buf, &(pMeta->totalStorage));
|
buf = taosDecodeFixedI64(buf, &(pMeta->totalStorage));
|
||||||
|
|
||||||
|
@ -70,7 +70,7 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
|
static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
|
||||||
uint64_t nset;
|
uint64_t nset;
|
||||||
SDFileSet dset;
|
SDFileSet dset;
|
||||||
|
|
||||||
|
@ -89,7 +89,7 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
|
||||||
|
|
||||||
int tlen = 0;
|
int tlen = 0;
|
||||||
|
|
||||||
tlen += tsdbEncodeSMFile(buf, &(pStatus->pmf));
|
tlen += tsdbEncodeSMFile(buf, pStatus->pmf);
|
||||||
tlen += tsdbEncodeDFileSetArray(buf, pStatus->df);
|
tlen += tsdbEncodeDFileSetArray(buf, pStatus->df);
|
||||||
|
|
||||||
return tlen;
|
return tlen;
|
||||||
|
@ -113,6 +113,8 @@ static SFSStatus *tsdbNewFSStatus(int maxFSet) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TSDB_FSET_SET_CLOSED(&(pStatus->mf));
|
||||||
|
|
||||||
pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet));
|
pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet));
|
||||||
if (pStatus->df == NULL) {
|
if (pStatus->df == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
@ -137,6 +139,8 @@ static void tsdbResetFSStatus(SFSStatus *pStatus) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TSDB_FSET_SET_CLOSED(&(pStatus->mf));
|
||||||
|
|
||||||
pStatus->pmf = NULL;
|
pStatus->pmf = NULL;
|
||||||
taosArrayClear(pStatus->df);
|
taosArrayClear(pStatus->df);
|
||||||
}
|
}
|
||||||
|
@ -162,6 +166,7 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ================== STsdbFS
|
// ================== STsdbFS
|
||||||
|
// TODO
|
||||||
STsdbFS *tsdbNewFS(int keep, int days) {
|
STsdbFS *tsdbNewFS(int keep, int days) {
|
||||||
int maxFSet = TSDB_MAX_FSETS(keep, days);
|
int maxFSet = TSDB_MAX_FSETS(keep, days);
|
||||||
STsdbFS *pfs;
|
STsdbFS *pfs;
|
||||||
|
@ -201,6 +206,7 @@ STsdbFS *tsdbNewFS(int keep, int days) {
|
||||||
return pfs;
|
return pfs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO
|
||||||
void *tsdbFreeFS(STsdbFS *pfs) {
|
void *tsdbFreeFS(STsdbFS *pfs) {
|
||||||
if (pfs) {
|
if (pfs) {
|
||||||
pfs->nstatus = tsdbFreeFSStatus(pfs->nstatus);
|
pfs->nstatus = tsdbFreeFSStatus(pfs->nstatus);
|
||||||
|
@ -213,33 +219,37 @@ void *tsdbFreeFS(STsdbFS *pfs) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO
|
||||||
int tsdbOpenFS(STsdbFS *pFs, int keep, int days) {
|
int tsdbOpenFS(STsdbFS *pFs, int keep, int days) {
|
||||||
// TODO
|
// TODO
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO
|
||||||
void tsdbCloseFS(STsdbFS *pFs) {
|
void tsdbCloseFS(STsdbFS *pFs) {
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a new transaction to modify the file system
|
// Start a new transaction to modify the file system
|
||||||
int tsdbStartTxn(STsdbFS *pfs) {
|
uint32_t tsdbStartFSTxn(STsdbFS *pfs) {
|
||||||
ASSERT(pfs->intxn == false);
|
ASSERT(pfs->intxn == false);
|
||||||
|
|
||||||
pfs->intxn = true;
|
pfs->intxn = true;
|
||||||
tsdbResetFSStatus(pfs->nstatus);
|
tsdbResetFSStatus(pfs->nstatus);
|
||||||
|
|
||||||
return 0;
|
return pfs->cstatus->meta.version + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbEndTxn(STsdbFS *pfs) {
|
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; }
|
||||||
|
|
||||||
|
int tsdbEndFSTxn(STsdbFS *pfs) {
|
||||||
ASSERT(FS_IN_TXN(pfs));
|
ASSERT(FS_IN_TXN(pfs));
|
||||||
SFSStatus *pStatus;
|
SFSStatus *pStatus;
|
||||||
|
|
||||||
// Write current file system snapshot
|
// Write current file system snapshot
|
||||||
if (tsdbUpdateFS(pfs) < 0) {
|
if (tsdbApplyFSTxn(pfs) < 0) {
|
||||||
tsdbEndTxnWithError(pfs);
|
tsdbEndFSTxnWithError(pfs);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,13 +261,13 @@ int tsdbEndTxn(STsdbFS *pfs) {
|
||||||
tsdbUnLockFS(pfs);
|
tsdbUnLockFS(pfs);
|
||||||
|
|
||||||
// Apply actual change to each file and SDFileSet
|
// Apply actual change to each file and SDFileSet
|
||||||
tsdbApplyFSChangeOnDisk(pfs);
|
tsdbApplyFSTxnOnDisk(pfs->nstatus, pfs->cstatus);
|
||||||
|
|
||||||
pfs->intxn = false;
|
pfs->intxn = false;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbEndTxnWithError(STsdbFS *pfs) {
|
int tsdbEndFSTxnWithError(STsdbFS *pfs) {
|
||||||
// TODO
|
// TODO
|
||||||
pfs->intxn = false;
|
pfs->intxn = false;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -267,7 +277,7 @@ void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pf
|
||||||
|
|
||||||
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
|
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
|
||||||
|
|
||||||
static int tsdbUpdateFS(STsdbFS *pfs) {
|
static int tsdbApplyFSTxn(STsdbFS *pfs) {
|
||||||
ASSERT(FS_IN_TXN(pfs));
|
ASSERT(FS_IN_TXN(pfs));
|
||||||
SFSHeader fsheader;
|
SFSHeader fsheader;
|
||||||
void * pBuf = NULL;
|
void * pBuf = NULL;
|
||||||
|
@ -339,7 +349,7 @@ static int tsdbUpdateFS(STsdbFS *pfs) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbApplyFSChangeOnDisk(SFSStatus *pFrom, SFSStatus *pTo) {
|
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo) {
|
||||||
int ifrom = 0;
|
int ifrom = 0;
|
||||||
int ito = 0;
|
int ito = 0;
|
||||||
size_t sizeFrom, sizeTo;
|
size_t sizeFrom, sizeTo;
|
||||||
|
|
Loading…
Reference in New Issue