diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h
index 4658def1bf..feaeea9972 100644
--- a/src/tsdb/inc/tsdbMain.h
+++ b/src/tsdb/inc/tsdbMain.h
@@ -293,12 +293,19 @@ static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) {
return dataRowTKey(row);
}
-// ================= tsdbFS.c
+// ================= tsdbFile.c
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
-enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX };
+typedef enum {
+ TSDB_FILE_HEAD = 0,
+ TSDB_FILE_DATA,
+ TSDB_FILE_LAST,
+ TSDB_FILE_MAX,
+ TSDB_FILE_META,
+ TSDB_FILE_MANIFEST
+} TSDB_FILE_T;
// For meta file
typedef struct {
@@ -315,6 +322,15 @@ typedef struct {
int fd;
} SMFile;
+void tsdbInitMFile(SMFile* pMFile, int vid, int ver, SMFInfo* pInfo);
+int tsdbOpenMFile(SMFile* pMFile, int flags);
+void tsdbCloseMFile(SMFile* pMFile);
+int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence);
+int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte);
+int64_t tsdbTellMFile(SMFile *pMFile);
+int tsdbEncodeMFile(void** buf, SMFile* pMFile);
+void* tsdbDecodeMFile(void* buf, SMFile* pMFile);
+
// For .head/.data/.last file
typedef struct {
uint32_t magic;
@@ -332,12 +348,29 @@ typedef struct {
int fd;
} SDFile;
+void tsdbInitDFile(SDFile* pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo* pInfo,
+ TSDB_FILE_T ftype);
+int tsdbOpenDFile(SDFile* pDFile, int flags);
+void tsdbCloseDFile(SDFile* pDFile);
+int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence);
+int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte);
+int64_t tsdbTellDFile(SDFile* pDFile);
+int tsdbEncodeDFile(void** buf, SDFile* pDFile);
+void* tsdbDecodeDFile(void* buf, SDFile* pDFile);
+
typedef struct {
- int id;
+ int fid;
int state;
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
+#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
+
+void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id);
+int tsdbOpenDFileSet(SDFileSet* pSet, int flags);
+void tsdbCloseDFileSet(SDFileSet* pSet);
+int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
+
/* Statistic information of the TSDB file system.
*/
typedef struct {
@@ -351,31 +384,40 @@ typedef struct {
int64_t version;
STsdbFSMeta meta;
SMFile mf; // meta file
- SArray * df; // data file array
-} SFSSnapshot;
+ SArray* df; // data file array
+} SFSVer;
typedef struct {
pthread_rwlock_t lock;
- SFSSnapshot *curr;
- SFSSnapshot *new;
+ SFSVer fsv;
} STsdbFS;
+typedef struct {
+ int version; // current FS version
+ int index;
+ int fid;
+ SDFileSet* pSet;
+} SFSIter;
+
#define TSDB_FILE_INFO(tf) (&((tf)->info))
#define TSDB_FILE_F(tf) (&((tf)->f)))
#define TSDB_FILE_FD(tf) ((tf)->fd)
-int tsdbOpenFS(STsdbRepo* pRepo);
-void tsdbCloseFS(STsdbRepo* pRepo);
-int tsdbFSNewTxn(STsdbRepo* pRepo);
-int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError);
-int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile);
-int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet);
-void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid);
-int tsdbRemoveDFileSet(SDFileSet* pSet);
-int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
-void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
-SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to);
+int tsdbOpenFS(STsdbRepo* pRepo);
+void tsdbCloseFS(STsdbRepo* pRepo);
+int tsdbFSNewTxn(STsdbRepo* pRepo);
+int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError);
+int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile);
+int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet);
+void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid);
+int tsdbRemoveDFileSet(SDFileSet* pSet);
+int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
+void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
+SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to);
+int tsdbInitFSIter(STsdbRepo* pRepo, SFSIter* pIter);
+SDFileSet* tsdbFSIterNext(SFSIter* pIter);
+int tsdbCreateDFileSet(int fid, int level, SDFileSet* pSet);
static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
int code = pthread_rwlock_rdlock(&(pFs->lock));
@@ -430,7 +472,7 @@ int tdDropKVStoreRecord(SKVStore* pStore, uint64_t uid);
int tdKVStoreEndCommit(SKVStore* pStore);
void tsdbGetStoreInfo(char* fname, uint32_t* magic, int64_t* size);
-// ================= tsdbFile.c
+// =================
// extern const char* tsdbFileSuffix[];
// minFid <= midFid <= maxFid
@@ -642,9 +684,8 @@ typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
typedef struct {
TSKEY minKey;
TSKEY maxKey;
- SFileGroup fGroup;
- SFile nHeadF;
- SFile nLastF;
+ SDFileSet rSet;
+ SDFileSet wSet;
} SHelperFile;
typedef struct {
diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c
index 9fc2bbc451..62d84b66b6 100644
--- a/src/tsdb/src/tsdbCommit.c
+++ b/src/tsdb/src/tsdbCommit.c
@@ -75,6 +75,8 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbCfg * pCfg = &(pRepo->config);
SCommitH ch = {0};
+ SFSIter fsIter = {0};
+ SDFileSet *pOldSet = NULL;
if (pMem->numOfRows <= 0) return 0;
@@ -86,11 +88,17 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
int sfid = MIN(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/);
int efid = MAX(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/);
+ tsdbInitFSIter(pRepo, &fsIter);
+ pOldSet = tsdbFSIterNext(&fsIter);
for (int fid = sfid; fid <= efid; fid++) {
- if (tsdbCommitToFile(pRepo, fid, &ch) < 0) {
+ if (tsdbCommitToFile(pRepo, pOldSet, &ch, fid) < 0) {
tsdbDestroyCommitH(&ch, pMem->maxTables);
return -1;
}
+
+ if (pOldSet != NULL && pOldSet->fid == fid) {
+ pOldSet = tsdbFSIterNext(&fsIter);
+ }
}
tsdbDestroyCommitH(&ch, pMem->maxTables);
@@ -186,17 +194,69 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
return false;
}
-static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
+static int tsdbCommitToFile(STsdbRepo *pRepo, SDFileSet *pOldSet, SCommitH *pch, int fid) {
STsdbCfg * pCfg = &(pRepo->config);
SMemTable *pMem = pRepo->imem;
TSKEY minKey, maxKey;
- SDFileSet *pOldSet = NULL;
+ bool hasData;
+ SDFileSet rSet, wSet;
+
+ tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
+ hasData = tsdbHasDataToCommit(pch->iters, pMem->maxTables, minKey, maxKey);
+
+ if (pOldSet == NULL || pOldSet->fid != fid) { // need to create SDFileSet and commit
+ if (!hasData) return 0;
+
+ tsdbInitDFileSet(&wSet, REPO_ID(pRepo), fid, 0/*TODO*/, level, TFS_UNDECIDED_ID);
+ tsdbOpenDFileSet(&wSet, O_WRONLY | O_CREAT);
+ tsdbUpdateDFileSetHeader(&wSet);
+ } else {
+ int level = tsdbGetFidLevel(fid, &(pch->rtn));
+
+ // Check if SDFileSet expires
+ if (level < 0) {
+ if (hasData) {
+ tsdbSeekCommitIter(pch->iters, pMem->maxTables, maxKey + 1);
+ }
+ return 0;
+ }
+
+ // TODO: Check if SDFileSet in correct level
+ if (true /*pOldSet level is not the same as level*/) {
+ tsdbInitDFileSet(&rSet, REPO_ID(pRepo), fid, 0/*TODO*/, level, TFS_UNDECIDED_ID);
+ // TODO: check if level is correct
+ tsdbOpenDFileSet(&wSet, O_WRONLY|O_CREAT);
+ }
+ }
+
+ // TODO: close the file set
+ if (!hasData) {
+ tsdbUpdateDFileSet(pRepo, &rSet);
+ return 0;
+ }
+
+ {
+ // TODO: commit the memory data
+ }
+
+ if (tsdbUpdateDFileSet(pRepo, &wSet) < 0) {
+ return -1;
+ }
+
+ return 0;
+
+#if 0
+ STsdbCfg * pCfg = &(pRepo->config);
+ SMemTable *pMem = pRepo->imem;
+ TSKEY minKey, maxKey;
+ SDFileSet oldSet = {0};
SDFileSet newSet = {0};
+ int level;
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
- if (pOldSet) { // file exists
- int level = tsdbGetFidLevel(fid, &(pch->rtn));
+ level = tsdbGetFidLevel(fid, &(pch->rtn));
+ if (pOldSet) { // fset exists, check if the file shold be removed or upgrade tier level
if (level < 0) { // if out of data, remove it and ignore expired memory data
tsdbRemoveExpiredDFileSet(pRepo, fid);
tsdbSeekCommitIter(pch->iters, pMem->maxTables, maxKey + 1);
@@ -205,6 +265,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
// Move the data file set to correct level
tsdbMoveDFileSet(pOldSet, level);
+ } else { // fset not exist, create the fset
+ pOldSet = &oldSet;
+ if (tsdbCreateDFileSet(fid, level, pOldSet) < 0) {
+ // TODO
+ return -1;
+ }
}
if (tsdbHasDataToCommit(pch->iters, pMem->maxTables, minKey, maxKey)) {
@@ -221,9 +287,11 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
TSDB_RLOCK_TABLE(pIter->pTable);
if (pIter->pIter != NULL) { // has data in memory to commit
+ // TODO
}
TSDB_RUNLOCK_TABLE(pIter->pTable);
+
if (tsdbMoveLastBlockIfNeccessary() < 0) return -1;
if (tsdbWriteCompInfo() < 0) return -1;
@@ -232,11 +300,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
if (tsdbWriteCompIdx() < 0) return -1;
}
- if (/*file exists OR has data to commit*/) {
- tsdbUpdateDFileSet(pRepo, &newSet);
- }
+ tsdbUpdateDFileSet(pRepo, &newSet);
return 0;
+#endif
}
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c
index 8397410e50..86c6a7408f 100644
--- a/src/tsdb/src/tsdbFS.c
+++ b/src/tsdb/src/tsdbFS.c
@@ -88,7 +88,7 @@ int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile) {
}
int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) {
- SFSSnapshot *pSnapshot = REPO_FS(pRepo)->new;
+ SFSVer *pSnapshot = REPO_FS(pRepo)->new;
SDFileSet * pOldSet;
pOldSet = tsdbSearchDFileSet(pSnapshot, pSet->id, TD_GE);
@@ -116,7 +116,7 @@ int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) {
}
void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) {
- SFSSnapshot *pSnapshot = REPO_FS(pRepo)->new;
+ SFSVer *pSnapshot = REPO_FS(pRepo)->new;
while (taosArrayGetSize(pSnapshot->df) > 0) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pSnapshot->df, 0);
if (pSet->id < mfid) {
@@ -125,38 +125,32 @@ void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) {
}
}
-int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) {
- int tlen = 0;
-
- tlen += taosEncodeVariantI64(buf, pInfo->size);
- tlen += taosEncodeVariantI64(buf, pInfo->tombSize);
- tlen += taosEncodeVariantI64(buf, pInfo->nRecords);
- tlen += taosEncodeVariantI64(buf, pInfo->nDels);
- tlen += taosEncodeFixedU32(buf, pInfo->magic);
-
- return tlen;
-}
-
-void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) {
- buf = taosDecodeVariantI64(buf, &(pInfo->size));
- buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
- buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
- buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
- buf = taosDecodeFixedU32(buf, &(pInfo->magic));
-
- return buf;
-}
SDFileSet tsdbMoveDFileSet(SDFileSet *pOldSet, int to) {
// TODO
}
-static int tsdbSaveFSSnapshot(int fd, SFSSnapshot *pSnapshot) {
+int tsdbInitFSIter(STsdbRepo *pRepo, SFSIter *pIter) {
// TODO
return 0;
}
-static int tsdbLoadFSSnapshot(SFSSnapshot *pSnapshot) {
+SDFileSet *tsdbFSIterNext(SFSIter *pIter) {
+ // TODO
+ return NULL;
+}
+
+int tsdbCreateDFileSet(int fid, int level, SDFileSet *pSet) {
+ // TODO
+ return 0;
+}
+
+static int tsdbSaveFSSnapshot(int fd, SFSVer *pSnapshot) {
+ // TODO
+ return 0;
+}
+
+static int tsdbLoadFSSnapshot(SFSVer *pSnapshot) {
// TODO
return 0;
}
@@ -178,63 +172,6 @@ static int tsdbOpenFSImpl(STsdbRepo *pRepo) {
return 0;
}
-static int tsdbEncodeMFile(void **buf, SMFile *pMFile) {
- int tlen = 0;
-
- tlen += tsdbEncodeMFInfo(buf, &(pMFile->info));
- tlen += tfsEncodeFile(buf, &(pMFile->f));
-
- return tlen;
-}
-
-static void *tsdbDecodeMFile(void *buf, SMFile *pMFile) {
- buf = tsdbDecodeMFInfo(buf, &(pMFile->info));
- buf = tfsDecodeFile(buf, &(pMFile->f));
-
- return buf;
-}
-
-static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
- int tlen = 0;
-
- tlen += taosEncodeFixedU32(buf, pInfo->magic);
- tlen += taosEncodeFixedU32(buf, pInfo->len);
- tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
- tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
- tlen += taosEncodeFixedU32(buf, pInfo->offset);
- tlen += taosEncodeFixedU64(buf, pInfo->size);
- tlen += taosEncodeFixedU64(buf, pInfo->tombSize);
-
- return tlen;
-}
-
-static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) {
- buf = taosDecodeFixedU32(buf, &(pInfo->magic));
- buf = taosDecodeFixedU32(buf, &(pInfo->len));
- buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
- buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
- buf = taosDecodeFixedU32(buf, &(pInfo->offset));
- buf = taosDecodeFixedU64(buf, &(pInfo->size));
- buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
-
- return buf;
-}
-
-static int tsdbEncodeDFile(void **buf, SDFile *pDFile) {
- int tlen = 0;
-
- tlen += tsdbEncodeDFInfo(buf, &(pDFile->info));
- tlen += tfsEncodeFile(buf, &(pDFile->f));
-
- return tlen;
-}
-
-static void *tsdbDecodeDFile(void *buf, SDFile *pDFile) {
- buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
- buf = tfsDecodeFile(buf, &(pDFile->f));
-
- return buf;
-}
static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) {
int tlen = 0;
@@ -256,7 +193,7 @@ static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) {
return buf;
}
-static int tsdbEncodeFSSnapshot(void **buf, SFSSnapshot *pSnapshot) {
+static int tsdbEncodeFSSnapshot(void **buf, SFSVer *pSnapshot) {
int tlen = 0;
int64_t size = 0;
@@ -276,7 +213,7 @@ static int tsdbEncodeFSSnapshot(void **buf, SFSSnapshot *pSnapshot) {
return tlen;
}
-static void *tsdbDecodeFSSnapshot(void *buf, SFSSnapshot *pSnapshot) {
+static void *tsdbDecodeFSSnapshot(void *buf, SFSVer *pSnapshot) {
int64_t size = 0;
SDFile df;
@@ -293,10 +230,10 @@ static void *tsdbDecodeFSSnapshot(void *buf, SFSSnapshot *pSnapshot) {
return buf;
}
-static SFSSnapshot *tsdbNewSnapshot(int32_t nfiles) {
- SFSSnapshot *pSnapshot;
+static SFSVer *tsdbNewSnapshot(int32_t nfiles) {
+ SFSVer *pSnapshot;
- pSnapshot = (SFSSnapshot *)calloc(1, sizeof(pSnapshot));
+ pSnapshot = (SFSVer *)calloc(1, sizeof(pSnapshot));
if (pSnapshot == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
@@ -312,7 +249,7 @@ static SFSSnapshot *tsdbNewSnapshot(int32_t nfiles) {
return pSnapshot;
}
-static SFSSnapshot *tsdbFreeSnapshot(SFSSnapshot *pSnapshot) {
+static SFSVer *tsdbFreeSnapshot(SFSVer *pSnapshot) {
if (pSnapshot) {
taosArrayDestroy(pSnapshot->df);
free(pSnapshot);
@@ -359,7 +296,7 @@ static STsdbFS *tsdbFreeFS(STsdbFS *pFs) {
return NULL;
}
-static int tsdbCopySnapshot(SFSSnapshot *src, SFSSnapshot *dst) {
+static int tsdbCopySnapshot(SFSVer *src, SFSVer *dst) {
dst->meta = src->meta;
dst->mf = src->meta;
taosArrayCopy(dst->df, src->df);
@@ -379,7 +316,7 @@ static int tsdbCompFSetId(const void *key1, const void *key2) {
}
}
-static SDFileSet *tsdbSearchDFileSet(SFSSnapshot *pSnapshot, int fid, int flags) {
+static SDFileSet *tsdbSearchDFileSet(SFSVer *pSnapshot, int fid, int flags) {
void *ptr = taosArraySearch(pSnapshot->df, (void *)(&fid), tsdbCompFSetId, flags);
return (ptr == NULL) ? NULL : ((SDFileSet *)ptr);
}
diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c
index 411c1d796e..785933000b 100644
--- a/src/tsdb/src/tsdbFile.c
+++ b/src/tsdb/src/tsdbFile.c
@@ -12,351 +12,31 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#define _DEFAULT_SOURCE
-#define TAOS_RANDOM_FILE_FAIL_TEST
-#include
-#include "os.h"
-#include "talgo.h"
-#include "tchecksum.h"
+
#include "tsdbMain.h"
-#include "tutil.h"
-#include "tfs.h"
-#include "tarray.h"
-const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
+#define TSDB_FILE_OPENED(f) ((f)->fd >= 0)
+#define TSDB_FILE_SET_CLOSED(f) ((f)->fd = -1)
-static int compFGroup(const void *arg1, const void *arg2);
-static int keyFGroupCompFunc(const void *key, const void *fgroup);
-static void *tsdbScanAllFiles(STsdbRepo *pRepo);
-static int tsdbCompareFile(const void *arg1, const void *arg2);
-static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile);
-static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix);
-
-// STsdbFileH ===========================================
-STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
- STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(*pFileH));
- if (pFileH == NULL) {
- terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
- goto _err;
- }
-
- int code = pthread_rwlock_init(&(pFileH->fhlock), NULL);
- if (code != 0) {
- tsdbError("vgId:%d failed to init file handle lock since %s", pCfg->tsdbId, strerror(code));
- terrno = TAOS_SYSTEM_ERROR(code);
- goto _err;
- }
-
- pFileH->maxFGroups = TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile);
-
- pFileH->pFGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup));
- if (pFileH->pFGroup == NULL) {
- terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
- goto _err;
- }
-
- return pFileH;
-
-_err:
- tsdbFreeFileH(pFileH);
- return NULL;
-}
-
-void tsdbFreeFileH(STsdbFileH *pFileH) {
- if (pFileH) {
- pthread_rwlock_destroy(&pFileH->fhlock);
- tfree(pFileH->pFGroup);
- free(pFileH);
- }
-}
-
-int tsdbOpenFileH(STsdbRepo *pRepo) {
- ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL);
-
- void *pfArray = NULL;
-
- // Scan the whole directory and get data
- pfArray = tsdbScanAllFiles(pRepo);
- if (pfArray == NULL) {
- return -1;
- }
-
- if (taosArrayGetSize(pfArray) == 0) {
- taosArrayDestroy(pfArray);
- return 0;
- }
-
- // Sort the files
- taosArraySort(pfArray, tsdbCompareFile);
-
- // Loop to recover the files
- int iter = 0;
- while (true) {
- if (iter >= taosArrayGetSize(pfArray)) break;
-
- int vid, fid;
- char bname[TSDB_FILENAME_LEN] = "\0";
- char suffix[TSDB_FILENAME_LEN] = "\0";
- int count = 0;
-
- TFILE *pf = taosArrayGet(pfArray, iter);
- tfsbasename(pf, bname);
- tsdbParseFname(bname, &vid, &fid, suffix);
- count++;
- iter++;
-
- while (true) {
- int nfid = 0;
- if (iter >= taosArrayGetSize(pfArray)) break;
- TFILE *npf = taosArrayGet(pfArray, iter);
- tfsbasename(npf, bname);
- tsdbParseFname(bname, &vid, &nfid, suffix);
-
- if (nfid != fid) break;
- count++;
- iter++;
- }
-
- tsdbRestoreFile(pRepo, pf, count);
- }
-
- taosArrayDestroy(pfArray);
- return 0;
-}
-
-void tsdbCloseFileH(STsdbRepo *pRepo, bool isRestart) {
- STsdbFileH *pFileH = pRepo->tsdbFileH;
-
- for (int i = 0; i < pFileH->nFGroups; i++) {
- SFileGroup *pFGroup = pFileH->pFGroup + i;
- for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
- tsdbCloseFile(&(pFGroup->files[type]));
- }
- if (isRestart) {
- tfsDecDiskFile(pFGroup->files[0].file.level, pFGroup->files[0].file.level, TSDB_FILE_TYPE_MAX);
- }
- }
-}
-
-// SFileGroup ===========================================
-SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level) {
- STsdbFileH *pFileH = pRepo->tsdbFileH;
- SFileGroup fg = {0};
- int id = TFS_UNDECIDED_ID;
- char fname[TSDB_FILENAME_LEN] = "\0";
-
- ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL);
- ASSERT(pFileH->nFGroups < pFileH->maxFGroups);
-
- // SET FILE GROUP
- fg.fileId = fid;
-
- // CREATE FILES
- for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
- SFile *pFile = &(fg.files[type]);
-
- pFile->fd = -1;
- pFile->info.size = TSDB_FILE_HEAD_SIZE;
- pFile->info.magic = TSDB_FILE_INIT_MAGIC;
-
- tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, fname);
- tfsInitFile(&pFile->file, level, id, fname);
-
- if (tsdbOpenFile(pFile, O_WRONLY|O_CREAT) < 0) return NULL;
-
- if (tsdbUpdateFileHeader(pFile) < 0) {
- tsdbCloseFile(pFile);
- return NULL;
- }
-
- tsdbCloseFile(pFile);
-
- level = TFILE_LEVEL(&(pFile->file));
- id = TFILE_ID(&(pFile->file));
- }
-
- // PUT GROUP INTO FILE HANDLE
- pthread_rwlock_wrlock(&pFileH->fhlock);
- pFileH->pFGroup[pFileH->nFGroups++] = fg;
- qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
- pthread_rwlock_unlock(&pFileH->fhlock);
-
- SFileGroup *pfg = tsdbSearchFGroup(pFileH, fid, TD_EQ);
- ASSERT(pfg != NULL);
- return pfg;
-}
-
-void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
- ASSERT(pFGroup != NULL);
- STsdbFileH *pFileH = pRepo->tsdbFileH;
-
- SFileGroup fg = *pFGroup;
-
- int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
- if (nFilesLeft > 0) {
- memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft);
- }
-
- pFileH->nFGroups--;
- ASSERT(pFileH->nFGroups >= 0);
-
- for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
- SFile *pFile = &(fg.files[type]);
- tfsremove(&(pFile->file));
- }
-}
-
-SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
- void *ptr = taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup),
- keyFGroupCompFunc, flags);
- if (ptr == NULL) return NULL;
- return (SFileGroup *)ptr;
-}
-
-int tsdbGetFidLevel(int fid, SFidGroup fidg) {
- if (fid >= fidg.maxFid) {
- return 0;
- } else if (fid >= fidg.midFid) {
- return 1;
- } else if (fid >= fidg.minFid) {
- return 2;
+// ============== Operations on SMFile
+void tsdbInitMFile(SMFile *pMFile, int vid, int ver, SMFInfo *pInfo) {
+ TSDB_FILE_SET_CLOSED(pMFile);
+ if (pInfo == NULL) {
+ memset(&(pMFile->info), 0, sizeof(pMFile->info));
+ pMFile->info.magic = TSDB_FILE_INIT_MAGIC;
} else {
- return -1;
+ pMFile->info = *pInfo;
}
+ tfsInitFile(&(pMFile->f), TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID, NULL /*TODO*/);
+
+ return pMFile;
}
-static int compFGroup(const void *arg1, const void *arg2) {
- int val1 = ((SFileGroup *)arg1)->fileId;
- int val2 = ((SFileGroup *)arg2)->fileId;
+int tsdbOpenMFile(SMFile *pMFile, int flags) {
+ ASSERT(!TSDB_FILE_OPENED(pMFile));
- if (val1 < val2) {
- return -1;
- } else if (val1 > val2) {
- return 1;
- } else {
- return 0;
- }
-}
-
-static int keyFGroupCompFunc(const void *key, const void *fgroup) {
- int fid = *(int *)key;
- SFileGroup *pFGroup = (SFileGroup *)fgroup;
- if (fid == pFGroup->fileId) {
- return 0;
- } else {
- return fid > pFGroup->fileId ? 1 : -1;
- }
-}
-
-// SFileGroupIter ===========================================
-void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) {
- pIter->pFileH = pFileH;
- pIter->direction = direction;
-
- if (pFileH->nFGroups == 0) {
- pIter->index = -1;
- pIter->fileId = -1;
- } else {
- if (direction == TSDB_FGROUP_ITER_FORWARD) {
- pIter->index = 0;
- } else {
- pIter->index = pFileH->nFGroups - 1;
- }
- pIter->fileId = pFileH->pFGroup[pIter->index].fileId;
- }
-}
-
-void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
- STsdbFileH *pFileH = pIter->pFileH;
-
- if (pFileH->nFGroups == 0) {
- pIter->index = -1;
- pIter->fileId = -1;
- return;
- }
-
- int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
- void *ptr = taosbsearch(&fid, (void *)pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
- if (ptr == NULL) {
- pIter->index = -1;
- pIter->fileId = -1;
- } else {
- pIter->index = (int)(POINTER_DISTANCE(ptr, pFileH->pFGroup) / sizeof(SFileGroup));
- pIter->fileId = ((SFileGroup *)ptr)->fileId;
- }
-}
-
-SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
- STsdbFileH *pFileH = pIter->pFileH;
- SFileGroup *pFGroup = NULL;
-
- if (pIter->index < 0 || pIter->index >= pFileH->nFGroups || pIter->fileId < 0) return NULL;
-
- pFGroup = &pFileH->pFGroup[pIter->index];
- if (pFGroup->fileId != pIter->fileId) {
- tsdbSeekFileGroupIter(pIter, pIter->fileId);
- }
-
- if (pIter->index < 0) return NULL;
-
- pFGroup = &pFileH->pFGroup[pIter->index];
- ASSERT(pFGroup->fileId == pIter->fileId);
-
- if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) {
- pIter->index++;
- } else {
- pIter->index--;
- }
-
- if (pIter->index >= 0 && pIter->index < pFileH->nFGroups) {
- pIter->fileId = pFileH->pFGroup[pIter->index].fileId;
- } else {
- pIter->fileId = -1;
- }
-
- return pFGroup;
-}
-
-// SFile ===========================================
-int tsdbOpenFile(SFile *pFile, int oflag) {
- ASSERT(!TSDB_IS_FILE_OPENED(pFile));
-
- pFile->fd = tfsopen(&(pFile->file), oflag);
- if (pFile->fd < 0) {
- tsdbError("failed to open file %s since %s", TSDB_FILE_NAME(pFile), tstrerror(terrno));
- return -1;
- }
-
- tsdbTrace("open file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd);
-
- return 0;
-}
-
-void tsdbCloseFile(SFile *pFile) {
- if (TSDB_IS_FILE_OPENED(pFile)) {
- tsdbTrace("close file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd);
- tfsclose(pFile->fd);
- pFile->fd = -1;
- }
-}
-
-int tsdbUpdateFileHeader(SFile *pFile) {
- char buf[TSDB_FILE_HEAD_SIZE] = "\0";
-
- void *pBuf = (void *)buf;
- taosEncodeFixedU32((void *)(&pBuf), TSDB_FILE_VERSION);
- tsdbEncodeSFileInfo((void *)(&pBuf), &(pFile->info));
-
- taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
-
- if (lseek(pFile->fd, 0, SEEK_SET) < 0) {
- tsdbError("failed to lseek file %s since %s", TSDB_FILE_NAME(pFile), strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
- return -1;
- }
- if (taosWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
- tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, TSDB_FILE_NAME(pFile),
- strerror(errno));
+ pMFile->fd = open(pMFile->f.aname, flags);
+ if (pMFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
@@ -364,8 +44,155 @@ int tsdbUpdateFileHeader(SFile *pFile) {
return 0;
}
-int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) {
+void tsdbCloseMFile(SMFile *pMFile) {
+ if (TSDB_FILE_OPENED(pMFile)) {
+ close(pMFile->fd);
+ TSDB_FILE_SET_CLOSED(pMFile);
+ }
+}
+
+int64_t tsdbSeekMFile(SMFile *pMFile, int64_t offset, int whence) {
+ ASSERT(TSDB_FILE_OPENED(pMFile));
+
+ int64_t loffset = taosLSeek(pMFile->fd, offset, whence);
+ if (loffset < 0) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
+ return loffset;
+}
+
+int64_t tsdbWriteMFile(SMFile *pMFile, void *buf, int64_t nbyte) {
+ ASSERT(TSDB_FILE_OPENED(pMFile));
+
+ int64_t nwrite = taosWrite(pMFile->fd, buf, nbyte);
+ if (nwrite < nbyte) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
+ pMFile->info.size += nbyte;
+ return nwrite;
+}
+
+int64_t tsdbTellMFile(SMFile *pMFile) { return tsdbSeekMFile(pMFile, 0, SEEK_CUR); }
+
+int tsdbEncodeMFile(void **buf, SMFile *pMFile) {
int tlen = 0;
+
+ tlen += tsdbEncodeMFInfo(buf, &(pMFile->info));
+ tlen += tfsEncodeFile(buf, &(pMFile->f));
+
+ return tlen;
+}
+
+void *tsdbDecodeMFile(void *buf, SMFile *pMFile) {
+ buf = tsdbDecodeMFInfo(buf, &(pMFile->info));
+ buf = tfsDecodeFile(buf, &(pMFile->f));
+
+ return buf;
+}
+
+static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) {
+ int tlen = 0;
+
+ tlen += taosEncodeVariantI64(buf, pInfo->size);
+ tlen += taosEncodeVariantI64(buf, pInfo->tombSize);
+ tlen += taosEncodeVariantI64(buf, pInfo->nRecords);
+ tlen += taosEncodeVariantI64(buf, pInfo->nDels);
+ tlen += taosEncodeFixedU32(buf, pInfo->magic);
+
+ return tlen;
+}
+
+static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) {
+ buf = taosDecodeVariantI64(buf, &(pInfo->size));
+ buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
+ buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
+ buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
+ buf = taosDecodeFixedU32(buf, &(pInfo->magic));
+
+ return buf;
+}
+
+// ============== Operations on SDFile
+void tsdbInitDFile(SDFile *pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo *pInfo, TSDB_FILE_T ftype) {
+ TSDB_FILE_SET_CLOSED(pDFile);
+ if (pInfo == NULL) {
+ memset(&(pDFile->info), 0, sizeof(pDFile->info));
+ pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
+ } else {
+ pDFile->info = *pInfo;
+ }
+ tfsInitFile(&(pDFile->f), level, id, NULL /*TODO*/);
+}
+
+int tsdbOpenDFile(SDFile *pDFile, int flags) {
+ ASSERT(!TSDB_FILE_OPENED(pDFile));
+
+ pDFile->fd = open(pDFile->f.aname, flags);
+ if (pDFile->fd < 0) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
+ return 0;
+}
+
+void tsdbCloseDFile(SDFile *pDFile) {
+ if (TSDB_FILE_OPENED(pDFile)) {
+ close(pDFile->fd);
+ TSDB_FILE_SET_CLOSED(pDFile);
+ }
+}
+
+int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) {
+ ASSERT(TSDB_FILE_OPENED(pDFile));
+
+ int64_t loffset = taosLSeek(pDFile->fd, offset, whence);
+ if (loffset < 0) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
+ return loffset;
+}
+
+int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
+ ASSERT(TSDB_FILE_OPENED(pDFile));
+
+ int64_t nwrite = taosWrite(pDFile->fd, buf, nbyte);
+ if (nwrite < nbyte) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
+ pDFile->info.size += nbyte;
+ return nwrite;
+}
+
+int64_t tsdbTellDFile(SDFile *pDFile) { return tsdbSeekDFile(pDFile, 0, SEEK_CUR); }
+
+int tsdbEncodeDFile(void **buf, SDFile *pDFile) {
+ int tlen = 0;
+
+ tlen += tsdbEncodeDFInfo(buf, &(pDFile->info));
+ tlen += tfsEncodeFile(buf, &(pDFile->f));
+
+ return tlen;
+}
+
+void *tsdbDecodeDFile(void *buf, SDFile *pDFile) {
+ buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
+ buf = tfsDecodeFile(buf, &(pDFile->f));
+
+ return buf;
+}
+
+static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
+ int tlen = 0;
+
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->len);
tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
@@ -377,7 +204,7 @@ int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) {
return tlen;
}
-void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
+static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
@@ -389,268 +216,41 @@ void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
return buf;
}
-int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) {
- char buf[TSDB_FILE_HEAD_SIZE] = "\0";
+// ============== Operations on SDFileSet
+void tsdbInitDFileSet(SDFileSet *pSet, int vid, int fid, int ver, int level, int id) {
+ pSet->fid = fid;
+ pSet->state = 0;
- if (lseek(pFile->fd, 0, SEEK_SET) < 0) {
- tsdbError("failed to lseek file %s to start since %s", TSDB_FILE_NAME(pFile), strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
- return -1;
+ for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
+ SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
+ tsdbInitDFile(pDFile, vid, fid, ver, level, id, NULL, ftype);
+ // TODO: reset level and id
}
+}
- if (taosRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
- tsdbError("failed to read file %s header part with %d bytes, reason:%s", TSDB_FILE_NAME(pFile), TSDB_FILE_HEAD_SIZE,
- strerror(errno));
- terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
- return -1;
+int tsdbOpenDFileSet(SDFileSet *pSet, int flags) {
+ for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
+ SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
+
+ if (tsdbOpenDFile(pDFile, flags) < 0) {
+ tsdbCloseDFileSet(pSet);
+ return -1;
+ }
}
+}
- if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
- tsdbError("file %s header part is corrupted with failed checksum", TSDB_FILE_NAME(pFile));
- terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
- return -1;
+void tsdbCloseDFileSet(SDFileSet *pSet) {
+ for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
+ tsdbCloseDFile(pDFile);
}
+}
- void *pBuf = (void *)buf;
- pBuf = taosDecodeFixedU32(pBuf, version);
- pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
-
+int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
+ // TODO
return 0;
}
-void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) { // TODO
- uint32_t version = 0;
- SFile file;
- SFile * pFile = &file;
-
- tfsInitFile(&(pFile->file), TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID, fname);
- pFile->fd = -1;
-
- if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
- if (tsdbLoadFileHeader(pFile, &version) < 0) goto _err;
-
- off_t offset = lseek(pFile->fd, 0, SEEK_END);
- if (offset < 0) goto _err;
- tsdbCloseFile(pFile);
-
- *magic = pFile->info.magic;
- *size = offset;
-
- return;
-
-_err:
- tsdbCloseFile(pFile);
- *magic = TSDB_FILE_INIT_MAGIC;
- *size = 0;
-}
-
-// Retention ===========================================
-void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
- STsdbFileH *pFileH = pRepo->tsdbFileH;
- SFileGroup *pGroup = pFileH->pFGroup;
-
- pthread_rwlock_wrlock(&(pFileH->fhlock));
-
- while (pFileH->nFGroups > 0 && pGroup[0].fileId < pFidGroup->minFid) {
- tsdbRemoveFileGroup(pRepo, pGroup);
- }
-
- pthread_rwlock_unlock(&(pFileH->fhlock));
-}
-
-void tsdbGetFidGroup(STsdbCfg *pCfg, SFidGroup *pFidGroup) {
- TSKEY now = taosGetTimestamp(pCfg->precision);
-
- pFidGroup->minFid =
- TSDB_KEY_FILEID(now - pCfg->keep * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision);
- pFidGroup->midFid =
- TSDB_KEY_FILEID(now - pCfg->keep2 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision);
- pFidGroup->maxFid =
- TSDB_KEY_FILEID(now - pCfg->keep1 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision);
-}
-
-int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
- STsdbFileH *pFileH = pRepo->tsdbFileH;
-
- for (int i = 0; i < pFileH->nFGroups; i++) {
- SFileGroup ofg = pFileH->pFGroup[i];
-
- int level = tsdbGetFidLevel(ofg.fileId, *pFidGroup);
- ASSERT(level >= 0);
-
- if (level == ofg.files[0].file.level) continue;
-
- // COPY THE FILE GROUP TO THE RIGHT LEVEL
- SFileGroup nfg = ofg;
- int id = TFS_UNDECIDED_ID;
- int type = 0;
- for (; type < TSDB_FILE_TYPE_MAX; type++) {
- tfsInitFile(&nfg.files[type].file, level, id, nfg.files[type].file.rname);
- if (tfscopy(&(ofg.files[type].file), &(nfg.files[type].file)) < 0) {
- if (terrno == TSDB_CODE_FS_INVLD_LEVEL) break;
- tsdbError("vgId:%d failed to move fid %d from level %d to level %d since %s", REPO_ID(pRepo), ofg.fileId,
- ofg.files[0].file.level, level, strerror(terrno));
- return -1;
- }
-
- id = nfg.files[type].file.level;
- id = nfg.files[type].file.id;
- }
-
- if (type < TSDB_FILE_TYPE_MAX) continue;
-
- // Register new file into TSDB
- pthread_rwlock_wrlock(&(pFileH->fhlock));
- pFileH->pFGroup[i] = nfg;
- pthread_rwlock_unlock(&(pFileH->fhlock));
-
- for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
- SFile *pFile = &(ofg.files[type]);
- tfsremove(&(pFile->file));
- }
-
- tsdbDebug("vgId:%d move file group %d from level %d to level %d", REPO_ID(pRepo), ofg.fileId,
- ofg.files[0].file.level, level);
- }
-
+int tsdbMoveDFileSet(SDFileSet *pOldSet, int tolevel, SDFileSet *pNewSet) {
+ // TODO
return 0;
-}
-
-static void *tsdbScanAllFiles(STsdbRepo *pRepo) {
- void * farray = NULL;
- TDIR * tdir = NULL;
- char dirName[TSDB_FILENAME_LEN] = "\0";
- char bname[TSDB_FILENAME_LEN] = "\0";
- regex_t regex1 = {0};
- const TFILE *pf = NULL;
-
- farray = taosArrayInit(256, sizeof(TFILE));
- if (farray == NULL) {
- terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
- return NULL;
- }
-
- regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat|l|d|h|s)$", REG_EXTENDED);
-
- snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", REPO_ID(pRepo));
-
- tdir = tfsOpendir(dirName);
-
- while ((pf = tfsReaddir(tdir)) != NULL) {
- tfsbasename(pf, bname);
-
- int code = regexec(®ex1, bname, 0, NULL, 0);
- if (code != 0) {
- tsdbWarn("vgId:%d file %s exists, ignore it", REPO_ID(pRepo), pf->aname);
- continue;
- }
-
- taosArrayPush(farray, pf);
- }
-
- regfree(®ex1);
- tfsClosedir(tdir);
-
- return farray;
-}
-
-static int tsdbCompareFile(const void *arg1, const void *arg2) {
- char bname1[TSDB_FILENAME_LEN] = "\0";
- char bname2[TSDB_FILENAME_LEN] = "\0";
- TFILE *pf1 = (TFILE *)arg1;
- TFILE *pf2 = (TFILE *)arg2;
- int vid1, fid1, vid2, fid2;
-
- tfsbasename(pf1, bname1);
- tfsbasename(pf2, bname2);
-
- sscanf(bname1, "v%df%d", &vid1, &fid1);
- sscanf(bname2, "v%df%d", &vid2, &fid2);
-
- ASSERT(vid1 == vid2);
- if (fid1 < fid2) {
- return -1;
- } else if (fid1 == fid2) {
- return 0;
- } else {
- return 1;
- }
-}
-
-static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile) {
- char backname[TSDB_FILENAME_LEN*2] = "\0";
- char bname[TSDB_FILENAME_LEN] = "\0";
- STsdbFileH *pFileH = pRepo->tsdbFileH;
- TFILE * pfArray[TSDB_FILE_TYPE_MAX] = {0};
- TFILE * pHf = NULL;
- TFILE * pLf = NULL;
- SFileGroup fg = {0};
- int vid = 0;
- int fid = 0;
- char suffix[TSDB_FILENAME_LEN] = "\0";
-
- for (int i = 0; i < nfile; i++) {
- TFILE *pf = pfiles + i;
-
- tfsbasename(pf, bname);
- tsdbParseFname(bname, &vid, &fid, suffix);
-
- if (strcmp(suffix, ".head") == 0) {
- pfArray[TSDB_FILE_TYPE_HEAD] = pf;
- } else if (strcmp(suffix, ".data") == 0) {
- pfArray[TSDB_FILE_TYPE_DATA] = pf;
- } else if (strcmp(suffix, ".last") == 0) {
- pfArray[TSDB_FILE_TYPE_LAST] = pf;
- } else if (strcmp(suffix, ".l") == 0) {
- pLf = pf;
- } else if (strcmp(suffix, ".h") == 0) {
- pHf = pf;
- } else {
- tsdbWarn("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), pf->aname);
- }
- }
-
- if (pfArray[TSDB_FILE_TYPE_HEAD] == NULL || pfArray[TSDB_FILE_TYPE_DATA] == NULL || pfArray[TSDB_FILE_TYPE_LAST] == NULL) {
- for (int i = 0; i < nfile; i++) {
- snprintf(backname, TSDB_FILENAME_LEN*2, "%s_bak", (pfiles + i)->aname);
- rename((pfiles + i)->aname, backname);
- }
-
- return -1;
- }
-
- if (pHf == NULL) {
- if (pLf != NULL) {
- rename(pLf->aname, pfArray[TSDB_FILE_TYPE_LAST]->aname);
- }
- } else {
- if (pLf != NULL) {
- remove(pLf->aname);
- }
- remove(pHf->aname);
- }
-
- // Register file
- fg.fileId = fid;
-
- for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
- SFile * pFile = fg.files + type;
- uint32_t version = 0;
-
- pFile->fd = -1;
- pFile->file = *pfArray[type]; // TODO
- tsdbOpenFile(pFile, O_RDONLY);
- tsdbLoadFileHeader(pFile, &version);
- tsdbCloseFile(pFile);
- }
-
- pFileH->pFGroup[pFileH->nFGroups++] = fg;
-
- tfsIncDiskFile(pfArray[TSDB_FILE_TYPE_HEAD]->level, pfArray[TSDB_FILE_TYPE_HEAD]->id, TSDB_FILE_TYPE_MAX);
-
- return 0;
-}
-
-static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix) {
- sscanf(bname, "v%df%d%s", vid, fid, suffix);
}
\ No newline at end of file
diff --git a/src/tsdb/src/tsdbFile_bak.c b/src/tsdb/src/tsdbFile_bak.c
new file mode 100644
index 0000000000..411c1d796e
--- /dev/null
+++ b/src/tsdb/src/tsdbFile_bak.c
@@ -0,0 +1,656 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#define _DEFAULT_SOURCE
+#define TAOS_RANDOM_FILE_FAIL_TEST
+#include
+#include "os.h"
+#include "talgo.h"
+#include "tchecksum.h"
+#include "tsdbMain.h"
+#include "tutil.h"
+#include "tfs.h"
+#include "tarray.h"
+
+const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
+
+static int compFGroup(const void *arg1, const void *arg2);
+static int keyFGroupCompFunc(const void *key, const void *fgroup);
+static void *tsdbScanAllFiles(STsdbRepo *pRepo);
+static int tsdbCompareFile(const void *arg1, const void *arg2);
+static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile);
+static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix);
+
+// STsdbFileH ===========================================
+STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
+ STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(*pFileH));
+ if (pFileH == NULL) {
+ terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
+ goto _err;
+ }
+
+ int code = pthread_rwlock_init(&(pFileH->fhlock), NULL);
+ if (code != 0) {
+ tsdbError("vgId:%d failed to init file handle lock since %s", pCfg->tsdbId, strerror(code));
+ terrno = TAOS_SYSTEM_ERROR(code);
+ goto _err;
+ }
+
+ pFileH->maxFGroups = TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile);
+
+ pFileH->pFGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup));
+ if (pFileH->pFGroup == NULL) {
+ terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
+ goto _err;
+ }
+
+ return pFileH;
+
+_err:
+ tsdbFreeFileH(pFileH);
+ return NULL;
+}
+
+void tsdbFreeFileH(STsdbFileH *pFileH) {
+ if (pFileH) {
+ pthread_rwlock_destroy(&pFileH->fhlock);
+ tfree(pFileH->pFGroup);
+ free(pFileH);
+ }
+}
+
+int tsdbOpenFileH(STsdbRepo *pRepo) {
+ ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL);
+
+ void *pfArray = NULL;
+
+ // Scan the whole directory and get data
+ pfArray = tsdbScanAllFiles(pRepo);
+ if (pfArray == NULL) {
+ return -1;
+ }
+
+ if (taosArrayGetSize(pfArray) == 0) {
+ taosArrayDestroy(pfArray);
+ return 0;
+ }
+
+ // Sort the files
+ taosArraySort(pfArray, tsdbCompareFile);
+
+ // Loop to recover the files
+ int iter = 0;
+ while (true) {
+ if (iter >= taosArrayGetSize(pfArray)) break;
+
+ int vid, fid;
+ char bname[TSDB_FILENAME_LEN] = "\0";
+ char suffix[TSDB_FILENAME_LEN] = "\0";
+ int count = 0;
+
+ TFILE *pf = taosArrayGet(pfArray, iter);
+ tfsbasename(pf, bname);
+ tsdbParseFname(bname, &vid, &fid, suffix);
+ count++;
+ iter++;
+
+ while (true) {
+ int nfid = 0;
+ if (iter >= taosArrayGetSize(pfArray)) break;
+ TFILE *npf = taosArrayGet(pfArray, iter);
+ tfsbasename(npf, bname);
+ tsdbParseFname(bname, &vid, &nfid, suffix);
+
+ if (nfid != fid) break;
+ count++;
+ iter++;
+ }
+
+ tsdbRestoreFile(pRepo, pf, count);
+ }
+
+ taosArrayDestroy(pfArray);
+ return 0;
+}
+
+void tsdbCloseFileH(STsdbRepo *pRepo, bool isRestart) {
+ STsdbFileH *pFileH = pRepo->tsdbFileH;
+
+ for (int i = 0; i < pFileH->nFGroups; i++) {
+ SFileGroup *pFGroup = pFileH->pFGroup + i;
+ for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
+ tsdbCloseFile(&(pFGroup->files[type]));
+ }
+ if (isRestart) {
+ tfsDecDiskFile(pFGroup->files[0].file.level, pFGroup->files[0].file.level, TSDB_FILE_TYPE_MAX);
+ }
+ }
+}
+
+// SFileGroup ===========================================
+SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level) {
+ STsdbFileH *pFileH = pRepo->tsdbFileH;
+ SFileGroup fg = {0};
+ int id = TFS_UNDECIDED_ID;
+ char fname[TSDB_FILENAME_LEN] = "\0";
+
+ ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL);
+ ASSERT(pFileH->nFGroups < pFileH->maxFGroups);
+
+ // SET FILE GROUP
+ fg.fileId = fid;
+
+ // CREATE FILES
+ for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
+ SFile *pFile = &(fg.files[type]);
+
+ pFile->fd = -1;
+ pFile->info.size = TSDB_FILE_HEAD_SIZE;
+ pFile->info.magic = TSDB_FILE_INIT_MAGIC;
+
+ tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, fname);
+ tfsInitFile(&pFile->file, level, id, fname);
+
+ if (tsdbOpenFile(pFile, O_WRONLY|O_CREAT) < 0) return NULL;
+
+ if (tsdbUpdateFileHeader(pFile) < 0) {
+ tsdbCloseFile(pFile);
+ return NULL;
+ }
+
+ tsdbCloseFile(pFile);
+
+ level = TFILE_LEVEL(&(pFile->file));
+ id = TFILE_ID(&(pFile->file));
+ }
+
+ // PUT GROUP INTO FILE HANDLE
+ pthread_rwlock_wrlock(&pFileH->fhlock);
+ pFileH->pFGroup[pFileH->nFGroups++] = fg;
+ qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
+ pthread_rwlock_unlock(&pFileH->fhlock);
+
+ SFileGroup *pfg = tsdbSearchFGroup(pFileH, fid, TD_EQ);
+ ASSERT(pfg != NULL);
+ return pfg;
+}
+
+void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
+ ASSERT(pFGroup != NULL);
+ STsdbFileH *pFileH = pRepo->tsdbFileH;
+
+ SFileGroup fg = *pFGroup;
+
+ int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
+ if (nFilesLeft > 0) {
+ memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft);
+ }
+
+ pFileH->nFGroups--;
+ ASSERT(pFileH->nFGroups >= 0);
+
+ for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
+ SFile *pFile = &(fg.files[type]);
+ tfsremove(&(pFile->file));
+ }
+}
+
+SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
+ void *ptr = taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup),
+ keyFGroupCompFunc, flags);
+ if (ptr == NULL) return NULL;
+ return (SFileGroup *)ptr;
+}
+
+int tsdbGetFidLevel(int fid, SFidGroup fidg) {
+ if (fid >= fidg.maxFid) {
+ return 0;
+ } else if (fid >= fidg.midFid) {
+ return 1;
+ } else if (fid >= fidg.minFid) {
+ return 2;
+ } else {
+ return -1;
+ }
+}
+
+static int compFGroup(const void *arg1, const void *arg2) {
+ int val1 = ((SFileGroup *)arg1)->fileId;
+ int val2 = ((SFileGroup *)arg2)->fileId;
+
+ if (val1 < val2) {
+ return -1;
+ } else if (val1 > val2) {
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+static int keyFGroupCompFunc(const void *key, const void *fgroup) {
+ int fid = *(int *)key;
+ SFileGroup *pFGroup = (SFileGroup *)fgroup;
+ if (fid == pFGroup->fileId) {
+ return 0;
+ } else {
+ return fid > pFGroup->fileId ? 1 : -1;
+ }
+}
+
+// SFileGroupIter ===========================================
+void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) {
+ pIter->pFileH = pFileH;
+ pIter->direction = direction;
+
+ if (pFileH->nFGroups == 0) {
+ pIter->index = -1;
+ pIter->fileId = -1;
+ } else {
+ if (direction == TSDB_FGROUP_ITER_FORWARD) {
+ pIter->index = 0;
+ } else {
+ pIter->index = pFileH->nFGroups - 1;
+ }
+ pIter->fileId = pFileH->pFGroup[pIter->index].fileId;
+ }
+}
+
+void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
+ STsdbFileH *pFileH = pIter->pFileH;
+
+ if (pFileH->nFGroups == 0) {
+ pIter->index = -1;
+ pIter->fileId = -1;
+ return;
+ }
+
+ int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
+ void *ptr = taosbsearch(&fid, (void *)pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
+ if (ptr == NULL) {
+ pIter->index = -1;
+ pIter->fileId = -1;
+ } else {
+ pIter->index = (int)(POINTER_DISTANCE(ptr, pFileH->pFGroup) / sizeof(SFileGroup));
+ pIter->fileId = ((SFileGroup *)ptr)->fileId;
+ }
+}
+
+SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
+ STsdbFileH *pFileH = pIter->pFileH;
+ SFileGroup *pFGroup = NULL;
+
+ if (pIter->index < 0 || pIter->index >= pFileH->nFGroups || pIter->fileId < 0) return NULL;
+
+ pFGroup = &pFileH->pFGroup[pIter->index];
+ if (pFGroup->fileId != pIter->fileId) {
+ tsdbSeekFileGroupIter(pIter, pIter->fileId);
+ }
+
+ if (pIter->index < 0) return NULL;
+
+ pFGroup = &pFileH->pFGroup[pIter->index];
+ ASSERT(pFGroup->fileId == pIter->fileId);
+
+ if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) {
+ pIter->index++;
+ } else {
+ pIter->index--;
+ }
+
+ if (pIter->index >= 0 && pIter->index < pFileH->nFGroups) {
+ pIter->fileId = pFileH->pFGroup[pIter->index].fileId;
+ } else {
+ pIter->fileId = -1;
+ }
+
+ return pFGroup;
+}
+
+// SFile ===========================================
+int tsdbOpenFile(SFile *pFile, int oflag) {
+ ASSERT(!TSDB_IS_FILE_OPENED(pFile));
+
+ pFile->fd = tfsopen(&(pFile->file), oflag);
+ if (pFile->fd < 0) {
+ tsdbError("failed to open file %s since %s", TSDB_FILE_NAME(pFile), tstrerror(terrno));
+ return -1;
+ }
+
+ tsdbTrace("open file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd);
+
+ return 0;
+}
+
+void tsdbCloseFile(SFile *pFile) {
+ if (TSDB_IS_FILE_OPENED(pFile)) {
+ tsdbTrace("close file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd);
+ tfsclose(pFile->fd);
+ pFile->fd = -1;
+ }
+}
+
+int tsdbUpdateFileHeader(SFile *pFile) {
+ char buf[TSDB_FILE_HEAD_SIZE] = "\0";
+
+ void *pBuf = (void *)buf;
+ taosEncodeFixedU32((void *)(&pBuf), TSDB_FILE_VERSION);
+ tsdbEncodeSFileInfo((void *)(&pBuf), &(pFile->info));
+
+ taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
+
+ if (lseek(pFile->fd, 0, SEEK_SET) < 0) {
+ tsdbError("failed to lseek file %s since %s", TSDB_FILE_NAME(pFile), strerror(errno));
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+ if (taosWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
+ tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, TSDB_FILE_NAME(pFile),
+ strerror(errno));
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
+ return 0;
+}
+
+int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) {
+ int tlen = 0;
+ tlen += taosEncodeFixedU32(buf, pInfo->magic);
+ tlen += taosEncodeFixedU32(buf, pInfo->len);
+ tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
+ tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
+ tlen += taosEncodeFixedU32(buf, pInfo->offset);
+ tlen += taosEncodeFixedU64(buf, pInfo->size);
+ tlen += taosEncodeFixedU64(buf, pInfo->tombSize);
+
+ return tlen;
+}
+
+void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
+ buf = taosDecodeFixedU32(buf, &(pInfo->magic));
+ buf = taosDecodeFixedU32(buf, &(pInfo->len));
+ buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
+ buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
+ buf = taosDecodeFixedU32(buf, &(pInfo->offset));
+ buf = taosDecodeFixedU64(buf, &(pInfo->size));
+ buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
+
+ return buf;
+}
+
+int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) {
+ char buf[TSDB_FILE_HEAD_SIZE] = "\0";
+
+ if (lseek(pFile->fd, 0, SEEK_SET) < 0) {
+ tsdbError("failed to lseek file %s to start since %s", TSDB_FILE_NAME(pFile), strerror(errno));
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
+ if (taosRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
+ tsdbError("failed to read file %s header part with %d bytes, reason:%s", TSDB_FILE_NAME(pFile), TSDB_FILE_HEAD_SIZE,
+ strerror(errno));
+ terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
+ return -1;
+ }
+
+ if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
+ tsdbError("file %s header part is corrupted with failed checksum", TSDB_FILE_NAME(pFile));
+ terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
+ return -1;
+ }
+
+ void *pBuf = (void *)buf;
+ pBuf = taosDecodeFixedU32(pBuf, version);
+ pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
+
+ return 0;
+}
+
+void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) { // TODO
+ uint32_t version = 0;
+ SFile file;
+ SFile * pFile = &file;
+
+ tfsInitFile(&(pFile->file), TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID, fname);
+ pFile->fd = -1;
+
+ if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
+ if (tsdbLoadFileHeader(pFile, &version) < 0) goto _err;
+
+ off_t offset = lseek(pFile->fd, 0, SEEK_END);
+ if (offset < 0) goto _err;
+ tsdbCloseFile(pFile);
+
+ *magic = pFile->info.magic;
+ *size = offset;
+
+ return;
+
+_err:
+ tsdbCloseFile(pFile);
+ *magic = TSDB_FILE_INIT_MAGIC;
+ *size = 0;
+}
+
+// Retention ===========================================
+void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
+ STsdbFileH *pFileH = pRepo->tsdbFileH;
+ SFileGroup *pGroup = pFileH->pFGroup;
+
+ pthread_rwlock_wrlock(&(pFileH->fhlock));
+
+ while (pFileH->nFGroups > 0 && pGroup[0].fileId < pFidGroup->minFid) {
+ tsdbRemoveFileGroup(pRepo, pGroup);
+ }
+
+ pthread_rwlock_unlock(&(pFileH->fhlock));
+}
+
+void tsdbGetFidGroup(STsdbCfg *pCfg, SFidGroup *pFidGroup) {
+ TSKEY now = taosGetTimestamp(pCfg->precision);
+
+ pFidGroup->minFid =
+ TSDB_KEY_FILEID(now - pCfg->keep * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision);
+ pFidGroup->midFid =
+ TSDB_KEY_FILEID(now - pCfg->keep2 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision);
+ pFidGroup->maxFid =
+ TSDB_KEY_FILEID(now - pCfg->keep1 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision);
+}
+
+int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
+ STsdbFileH *pFileH = pRepo->tsdbFileH;
+
+ for (int i = 0; i < pFileH->nFGroups; i++) {
+ SFileGroup ofg = pFileH->pFGroup[i];
+
+ int level = tsdbGetFidLevel(ofg.fileId, *pFidGroup);
+ ASSERT(level >= 0);
+
+ if (level == ofg.files[0].file.level) continue;
+
+ // COPY THE FILE GROUP TO THE RIGHT LEVEL
+ SFileGroup nfg = ofg;
+ int id = TFS_UNDECIDED_ID;
+ int type = 0;
+ for (; type < TSDB_FILE_TYPE_MAX; type++) {
+ tfsInitFile(&nfg.files[type].file, level, id, nfg.files[type].file.rname);
+ if (tfscopy(&(ofg.files[type].file), &(nfg.files[type].file)) < 0) {
+ if (terrno == TSDB_CODE_FS_INVLD_LEVEL) break;
+ tsdbError("vgId:%d failed to move fid %d from level %d to level %d since %s", REPO_ID(pRepo), ofg.fileId,
+ ofg.files[0].file.level, level, strerror(terrno));
+ return -1;
+ }
+
+ id = nfg.files[type].file.level;
+ id = nfg.files[type].file.id;
+ }
+
+ if (type < TSDB_FILE_TYPE_MAX) continue;
+
+ // Register new file into TSDB
+ pthread_rwlock_wrlock(&(pFileH->fhlock));
+ pFileH->pFGroup[i] = nfg;
+ pthread_rwlock_unlock(&(pFileH->fhlock));
+
+ for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
+ SFile *pFile = &(ofg.files[type]);
+ tfsremove(&(pFile->file));
+ }
+
+ tsdbDebug("vgId:%d move file group %d from level %d to level %d", REPO_ID(pRepo), ofg.fileId,
+ ofg.files[0].file.level, level);
+ }
+
+ return 0;
+}
+
+static void *tsdbScanAllFiles(STsdbRepo *pRepo) {
+ void * farray = NULL;
+ TDIR * tdir = NULL;
+ char dirName[TSDB_FILENAME_LEN] = "\0";
+ char bname[TSDB_FILENAME_LEN] = "\0";
+ regex_t regex1 = {0};
+ const TFILE *pf = NULL;
+
+ farray = taosArrayInit(256, sizeof(TFILE));
+ if (farray == NULL) {
+ terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
+ return NULL;
+ }
+
+ regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat|l|d|h|s)$", REG_EXTENDED);
+
+ snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", REPO_ID(pRepo));
+
+ tdir = tfsOpendir(dirName);
+
+ while ((pf = tfsReaddir(tdir)) != NULL) {
+ tfsbasename(pf, bname);
+
+ int code = regexec(®ex1, bname, 0, NULL, 0);
+ if (code != 0) {
+ tsdbWarn("vgId:%d file %s exists, ignore it", REPO_ID(pRepo), pf->aname);
+ continue;
+ }
+
+ taosArrayPush(farray, pf);
+ }
+
+ regfree(®ex1);
+ tfsClosedir(tdir);
+
+ return farray;
+}
+
+static int tsdbCompareFile(const void *arg1, const void *arg2) {
+ char bname1[TSDB_FILENAME_LEN] = "\0";
+ char bname2[TSDB_FILENAME_LEN] = "\0";
+ TFILE *pf1 = (TFILE *)arg1;
+ TFILE *pf2 = (TFILE *)arg2;
+ int vid1, fid1, vid2, fid2;
+
+ tfsbasename(pf1, bname1);
+ tfsbasename(pf2, bname2);
+
+ sscanf(bname1, "v%df%d", &vid1, &fid1);
+ sscanf(bname2, "v%df%d", &vid2, &fid2);
+
+ ASSERT(vid1 == vid2);
+ if (fid1 < fid2) {
+ return -1;
+ } else if (fid1 == fid2) {
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile) {
+ char backname[TSDB_FILENAME_LEN*2] = "\0";
+ char bname[TSDB_FILENAME_LEN] = "\0";
+ STsdbFileH *pFileH = pRepo->tsdbFileH;
+ TFILE * pfArray[TSDB_FILE_TYPE_MAX] = {0};
+ TFILE * pHf = NULL;
+ TFILE * pLf = NULL;
+ SFileGroup fg = {0};
+ int vid = 0;
+ int fid = 0;
+ char suffix[TSDB_FILENAME_LEN] = "\0";
+
+ for (int i = 0; i < nfile; i++) {
+ TFILE *pf = pfiles + i;
+
+ tfsbasename(pf, bname);
+ tsdbParseFname(bname, &vid, &fid, suffix);
+
+ if (strcmp(suffix, ".head") == 0) {
+ pfArray[TSDB_FILE_TYPE_HEAD] = pf;
+ } else if (strcmp(suffix, ".data") == 0) {
+ pfArray[TSDB_FILE_TYPE_DATA] = pf;
+ } else if (strcmp(suffix, ".last") == 0) {
+ pfArray[TSDB_FILE_TYPE_LAST] = pf;
+ } else if (strcmp(suffix, ".l") == 0) {
+ pLf = pf;
+ } else if (strcmp(suffix, ".h") == 0) {
+ pHf = pf;
+ } else {
+ tsdbWarn("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), pf->aname);
+ }
+ }
+
+ if (pfArray[TSDB_FILE_TYPE_HEAD] == NULL || pfArray[TSDB_FILE_TYPE_DATA] == NULL || pfArray[TSDB_FILE_TYPE_LAST] == NULL) {
+ for (int i = 0; i < nfile; i++) {
+ snprintf(backname, TSDB_FILENAME_LEN*2, "%s_bak", (pfiles + i)->aname);
+ rename((pfiles + i)->aname, backname);
+ }
+
+ return -1;
+ }
+
+ if (pHf == NULL) {
+ if (pLf != NULL) {
+ rename(pLf->aname, pfArray[TSDB_FILE_TYPE_LAST]->aname);
+ }
+ } else {
+ if (pLf != NULL) {
+ remove(pLf->aname);
+ }
+ remove(pHf->aname);
+ }
+
+ // Register file
+ fg.fileId = fid;
+
+ for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
+ SFile * pFile = fg.files + type;
+ uint32_t version = 0;
+
+ pFile->fd = -1;
+ pFile->file = *pfArray[type]; // TODO
+ tsdbOpenFile(pFile, O_RDONLY);
+ tsdbLoadFileHeader(pFile, &version);
+ tsdbCloseFile(pFile);
+ }
+
+ pFileH->pFGroup[pFileH->nFGroups++] = fg;
+
+ tfsIncDiskFile(pfArray[TSDB_FILE_TYPE_HEAD]->level, pfArray[TSDB_FILE_TYPE_HEAD]->id, TSDB_FILE_TYPE_MAX);
+
+ return 0;
+}
+
+static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix) {
+ sscanf(bname, "v%df%d%s", vid, fid, suffix);
+}
\ No newline at end of file