refact tsdb file
This commit is contained in:
parent
5dcfd2a642
commit
d795ff85b5
|
@ -30,7 +30,7 @@ typedef struct {
|
||||||
|
|
||||||
// ================== TSDB File System Meta
|
// ================== TSDB File System Meta
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint64_t version; // Commit version from 0 to increase
|
uint32_t version; // Commit version from 0 to increase
|
||||||
int64_t totalPoints; // total points
|
int64_t totalPoints; // total points
|
||||||
int64_t totalStorage; // Uncompressed total storage
|
int64_t totalStorage; // Uncompressed total storage
|
||||||
} STsdbFSMeta;
|
} STsdbFSMeta;
|
||||||
|
|
|
@ -34,15 +34,9 @@ extern "C" {
|
||||||
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
|
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
|
||||||
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
|
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
|
||||||
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
|
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
|
||||||
|
#define TSDB_FILE_FSYNC(tf) fsync(TSDB_FILE_FD(tf))
|
||||||
|
|
||||||
typedef enum {
|
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
|
||||||
TSDB_FILE_HEAD = 0,
|
|
||||||
TSDB_FILE_DATA,
|
|
||||||
TSDB_FILE_LAST,
|
|
||||||
TSDB_FILE_MAX,
|
|
||||||
TSDB_FILE_META,
|
|
||||||
TSDB_FILE_MANIFEST
|
|
||||||
} TSDB_FILE_T;
|
|
||||||
|
|
||||||
// =============== SMFile
|
// =============== SMFile
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -63,11 +57,13 @@ void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
|
||||||
void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile);
|
void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile);
|
||||||
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
|
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
|
||||||
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
|
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
|
||||||
int tsdbApplyMFileChange(SMFile* from, SMFile* to);
|
int tsdbCreateMFile(SMFile* pMFile);
|
||||||
int tsdbApplyMFileChange(SMFile* from, SMFile* to);
|
int tsdbUpdateMFileHeader(SMFile* pMFile);
|
||||||
|
|
||||||
|
static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMInfo* pInfo) { pMFile->info = *pInfo; }
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) {
|
static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) {
|
||||||
ASSERT(!TSDB_FILE_OPENED(pMFile));
|
ASSERT(TSDB_FILE_CLOSED(pMFile));
|
||||||
|
|
||||||
pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), flags);
|
pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), flags);
|
||||||
if (pMFile->fd < 0) {
|
if (pMFile->fd < 0) {
|
||||||
|
@ -137,12 +133,8 @@ static FORCE_INLINE int tsdbAppendMFile(SMFile* pMFile, void* buf, int64_t nbyte
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbCreateMFile(SMFile *pMFile);
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbRemoveMFile(SMFile* pMFile) { return tfsremove(TSDB_FILE_F(pMFile)); }
|
static FORCE_INLINE int tsdbRemoveMFile(SMFile* pMFile) { return tfsremove(TSDB_FILE_F(pMFile)); }
|
||||||
|
|
||||||
int tsdbUpdateMFileHeader(SMFile* pMFile);
|
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
|
static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
|
||||||
ASSERT(TSDB_FILE_OPENED(pMFile));
|
ASSERT(TSDB_FILE_OPENED(pMFile));
|
||||||
|
|
||||||
|
@ -176,6 +168,10 @@ void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver,
|
||||||
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
|
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
|
||||||
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
|
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
|
||||||
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile);
|
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile);
|
||||||
|
int tsdbCreateDFile(SDFile* pDFile);
|
||||||
|
int tsdbUpdateDFileHeader(SDFile* pDFile);
|
||||||
|
|
||||||
|
static FORCE_INLINE void tsdbSetDFileInfo(SDFile* pDFile, SDFInfo* pInfo) { pDFile->info = *pInfo; }
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbOpenDFile(SDFile* pDFile, int flags) {
|
static FORCE_INLINE int tsdbOpenDFile(SDFile* pDFile, int flags) {
|
||||||
ASSERT(!TSDB_FILE_OPENED(pDFile));
|
ASSERT(!TSDB_FILE_OPENED(pDFile));
|
||||||
|
@ -196,7 +192,7 @@ static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) {
|
static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence) {
|
||||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
int64_t loffset = taosLSeek(TSDB_FILE_FD(pDFile), offset, whence);
|
int64_t loffset = taosLSeek(TSDB_FILE_FD(pDFile), offset, whence);
|
||||||
|
@ -248,12 +244,8 @@ static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbCreateDFile(SDFile* pDFile);
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsremove(TSDB_FILE_F(pDFile)); }
|
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsremove(TSDB_FILE_F(pDFile)); }
|
||||||
|
|
||||||
int tsdbUpdateDFileHeader(SDFile* pDFile);
|
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
|
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
|
||||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
|
@ -272,7 +264,7 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDest->info = pSrc->info;
|
tsdbSetDFileInfo(pDest, TSDB_FILE_INFO(pSrc));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,11 +286,13 @@ typedef struct {
|
||||||
} \
|
} \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver);
|
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver);
|
||||||
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
|
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
|
||||||
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
|
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
|
||||||
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet);
|
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet);
|
||||||
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
|
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
|
||||||
|
int tsdbCreateDFileSet(SDFileSet* pSet);
|
||||||
|
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
|
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
|
||||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
|
@ -316,16 +310,12 @@ static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbCreateDFileSet(SDFileSet *pSet);
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet* pSet) {
|
static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet* pSet) {
|
||||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype));
|
tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
|
static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
|
||||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
|
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
|
||||||
|
|
|
@ -16,12 +16,11 @@
|
||||||
#include "tsdbint.h"
|
#include "tsdbint.h"
|
||||||
|
|
||||||
static const char *TSDB_FNAME_SUFFIX[] = {
|
static const char *TSDB_FNAME_SUFFIX[] = {
|
||||||
".head", // TSDB_FILE_HEAD
|
".head", // TSDB_FILE_HEAD
|
||||||
".data", // TSDB_FILE_DATA
|
".data", // TSDB_FILE_DATA
|
||||||
".last", // TSDB_FILE_LAST
|
".last", // TSDB_FILE_LAST
|
||||||
"", // TSDB_FILE_MAX
|
"", // TSDB_FILE_MAX
|
||||||
"meta", // TSDB_FILE_META
|
"meta" // TSDB_FILE_META
|
||||||
"manifest" // TSDB_FILE_MANIFEST
|
|
||||||
};
|
};
|
||||||
|
|
||||||
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
|
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
|
||||||
|
@ -62,6 +61,7 @@ int tsdbEncodeSMFile(void **buf, SMFile *pMFile) {
|
||||||
void *tsdbDecodeSMFile(void *buf, SMFile *pMFile) {
|
void *tsdbDecodeSMFile(void *buf, SMFile *pMFile) {
|
||||||
buf = tsdbDecodeMFInfo(buf, &(pMFile->info));
|
buf = tsdbDecodeMFInfo(buf, &(pMFile->info));
|
||||||
buf = tfsDecodeFile(buf, &(pMFile->f));
|
buf = tfsDecodeFile(buf, &(pMFile->f));
|
||||||
|
TSDB_FILE_SET_CLOSED(pMFile);
|
||||||
|
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
@ -71,14 +71,14 @@ int tsdbApplyMFileChange(SMFile *from, SMFile *to) {
|
||||||
|
|
||||||
if (from != NULL) {
|
if (from != NULL) {
|
||||||
if (to == NULL) {
|
if (to == NULL) {
|
||||||
tsdbRemoveMFile(from);
|
return tsdbRemoveMFile(from);
|
||||||
} else {
|
} else {
|
||||||
if (tfsIsSameFile(TSDB_FILE_F(from), TSDB_FILE_F(to))) {
|
if (tfsIsSameFile(TSDB_FILE_F(from), TSDB_FILE_F(to))) {
|
||||||
if (from->info.size > to->info.size) {
|
if (from->info.size > to->info.size) {
|
||||||
tsdbRollBackMFile(to);
|
tsdbRollBackMFile(to);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tsdbRemoveMFile(from);
|
return tsdbRemoveMFile(from);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,28 +86,6 @@ int tsdbApplyMFileChange(SMFile *from, SMFile *to) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbRollBackMFile(SMFile *pMFile) {
|
|
||||||
SMFile mf = *pMFile;
|
|
||||||
|
|
||||||
if (tsdbOpenMFile(&mf, O_WRONLY) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosFtruncate(TSDB_FILE_FD(&mf), pMFile->info.size) < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
tsdbCloseMFile(&mf);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbUpdateMFileHeader(&mf) < 0) {
|
|
||||||
tsdbCloseMFile(&mf);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbCloseMFile(&mf);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tsdbCreateMFile(SMFile *pMFile) {
|
int tsdbCreateMFile(SMFile *pMFile) {
|
||||||
ASSERT(pMFile->info.size == 0 && pMFile->info.magic == TSDB_FILE_INIT_MAGIC);
|
ASSERT(pMFile->info.size == 0 && pMFile->info.magic == TSDB_FILE_INIT_MAGIC);
|
||||||
|
|
||||||
|
@ -139,7 +117,7 @@ int tsdbUpdateMFileHeader(SMFile *pMFile) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *ptr = buf;
|
void *ptr = buf;
|
||||||
tsdbEncodeMFInfo(&ptr, &(pMFile->info));
|
tsdbEncodeMFInfo(&ptr, TSDB_FILE_INFO(pMFile));
|
||||||
|
|
||||||
if (tsdbWriteMFile(pMFile, buf, TSDB_FILE_HEAD_SIZE) < 0) {
|
if (tsdbWriteMFile(pMFile, buf, TSDB_FILE_HEAD_SIZE) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -170,6 +148,30 @@ static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbRollBackMFile(SMFile *pMFile) {
|
||||||
|
SMFile mf = *pMFile;
|
||||||
|
|
||||||
|
if (tsdbOpenMFile(&mf, O_WRONLY) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosFtruncate(TSDB_FILE_FD(&mf), pMFile->info.size) < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
tsdbCloseMFile(&mf);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbUpdateMFileHeader(&mf) < 0) {
|
||||||
|
tsdbCloseMFile(&mf);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
TSDB_FILE_FSYNC(&mf);
|
||||||
|
|
||||||
|
tsdbCloseMFile(&mf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// ============== Operations on SDFile
|
// ============== Operations on SDFile
|
||||||
void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) {
|
void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) {
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
@ -179,7 +181,7 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver,
|
||||||
memset(&(pDFile->info), 0, sizeof(pDFile->info));
|
memset(&(pDFile->info), 0, sizeof(pDFile->info));
|
||||||
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||||
|
|
||||||
tsdbGetFilename(vid, 0, ver, ftype, fname);
|
tsdbGetFilename(vid, fid, ver, ftype, fname);
|
||||||
tfsInitFile(&(pDFile->f), did.level, did.id, fname);
|
tfsInitFile(&(pDFile->f), did.level, did.id, fname);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,6 +202,7 @@ int tsdbEncodeSDFile(void **buf, SDFile *pDFile) {
|
||||||
void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) {
|
void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) {
|
||||||
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
|
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
|
||||||
buf = tfsDecodeFile(buf, &(pDFile->f));
|
buf = tfsDecodeFile(buf, &(pDFile->f));
|
||||||
|
TSDB_FILE_SET_CLOSED(pDFile);
|
||||||
|
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
@ -308,6 +311,8 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TSDB_FILE_FSYNC(&df);
|
||||||
|
|
||||||
tsdbCloseDFile(&df);
|
tsdbCloseDFile(&df);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -384,13 +389,14 @@ static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, c
|
||||||
if (ver == 0) {
|
if (ver == 0) {
|
||||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]);
|
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]);
|
||||||
} else {
|
} else {
|
||||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s-%012" PRIu32, vid, vid, fid, TSDB_FNAME_SUFFIX[ftype], ver);
|
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s-ver%" PRIu32, vid, vid, fid,
|
||||||
|
TSDB_FNAME_SUFFIX[ftype], ver);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (ver == 0) {
|
if (ver == 0) {
|
||||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vid, TSDB_FNAME_SUFFIX[ftype]);
|
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vid, TSDB_FNAME_SUFFIX[ftype]);
|
||||||
} else {
|
} else {
|
||||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s-%012" PRIu32, vid, TSDB_FNAME_SUFFIX[ftype], ver);
|
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s-ver%" PRIu32, vid, TSDB_FNAME_SUFFIX[ftype], ver);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue