From 6891cf43f5794b95f503eb20595f877375783543 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 4 Jan 2021 12:07:54 +0000 Subject: [PATCH] partial work --- src/tsdb/inc/tsdbMain.h | 85 +++- src/tsdb/src/tsdbCommit.c | 83 +++- src/tsdb/src/tsdbFS.c | 117 ++---- src/tsdb/src/tsdbFile.c | 784 +++++++++--------------------------- src/tsdb/src/tsdbFile_bak.c | 656 ++++++++++++++++++++++++++++++ 5 files changed, 1013 insertions(+), 712 deletions(-) create mode 100644 src/tsdb/src/tsdbFile_bak.c diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 4658def1bf..feaeea9972 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -293,12 +293,19 @@ static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) { return dataRowTKey(row); } -// ================= tsdbFS.c +// ================= tsdbFile.c #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF -enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX }; +typedef enum { + TSDB_FILE_HEAD = 0, + TSDB_FILE_DATA, + TSDB_FILE_LAST, + TSDB_FILE_MAX, + TSDB_FILE_META, + TSDB_FILE_MANIFEST +} TSDB_FILE_T; // For meta file typedef struct { @@ -315,6 +322,15 @@ typedef struct { int fd; } SMFile; +void tsdbInitMFile(SMFile* pMFile, int vid, int ver, SMFInfo* pInfo); +int tsdbOpenMFile(SMFile* pMFile, int flags); +void tsdbCloseMFile(SMFile* pMFile); +int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence); +int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte); +int64_t tsdbTellMFile(SMFile *pMFile); +int tsdbEncodeMFile(void** buf, SMFile* pMFile); +void* tsdbDecodeMFile(void* buf, SMFile* pMFile); + // For .head/.data/.last file typedef struct { uint32_t magic; @@ -332,12 +348,29 @@ typedef struct { int fd; } SDFile; +void tsdbInitDFile(SDFile* pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo* pInfo, + TSDB_FILE_T ftype); +int tsdbOpenDFile(SDFile* pDFile, int flags); +void tsdbCloseDFile(SDFile* pDFile); +int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence); +int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte); +int64_t tsdbTellDFile(SDFile* pDFile); +int tsdbEncodeDFile(void** buf, SDFile* pDFile); +void* tsdbDecodeDFile(void* buf, SDFile* pDFile); + typedef struct { - int id; + int fid; int state; SDFile files[TSDB_FILE_MAX]; } SDFileSet; +#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) + +void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id); +int tsdbOpenDFileSet(SDFileSet* pSet, int flags); +void tsdbCloseDFileSet(SDFileSet* pSet); +int tsdbUpdateDFileSetHeader(SDFileSet* pSet); + /* Statistic information of the TSDB file system. */ typedef struct { @@ -351,31 +384,40 @@ typedef struct { int64_t version; STsdbFSMeta meta; SMFile mf; // meta file - SArray * df; // data file array -} SFSSnapshot; + SArray* df; // data file array +} SFSVer; typedef struct { pthread_rwlock_t lock; - SFSSnapshot *curr; - SFSSnapshot *new; + SFSVer fsv; } STsdbFS; +typedef struct { + int version; // current FS version + int index; + int fid; + SDFileSet* pSet; +} SFSIter; + #define TSDB_FILE_INFO(tf) (&((tf)->info)) #define TSDB_FILE_F(tf) (&((tf)->f))) #define TSDB_FILE_FD(tf) ((tf)->fd) -int tsdbOpenFS(STsdbRepo* pRepo); -void tsdbCloseFS(STsdbRepo* pRepo); -int tsdbFSNewTxn(STsdbRepo* pRepo); -int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError); -int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile); -int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet); -void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid); -int tsdbRemoveDFileSet(SDFileSet* pSet); -int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); -void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); -SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to); +int tsdbOpenFS(STsdbRepo* pRepo); +void tsdbCloseFS(STsdbRepo* pRepo); +int tsdbFSNewTxn(STsdbRepo* pRepo); +int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError); +int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile); +int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet); +void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid); +int tsdbRemoveDFileSet(SDFileSet* pSet); +int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); +void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); +SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to); +int tsdbInitFSIter(STsdbRepo* pRepo, SFSIter* pIter); +SDFileSet* tsdbFSIterNext(SFSIter* pIter); +int tsdbCreateDFileSet(int fid, int level, SDFileSet* pSet); static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) { int code = pthread_rwlock_rdlock(&(pFs->lock)); @@ -430,7 +472,7 @@ int tdDropKVStoreRecord(SKVStore* pStore, uint64_t uid); int tdKVStoreEndCommit(SKVStore* pStore); void tsdbGetStoreInfo(char* fname, uint32_t* magic, int64_t* size); -// ================= tsdbFile.c +// ================= // extern const char* tsdbFileSuffix[]; // minFid <= midFid <= maxFid @@ -642,9 +684,8 @@ typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; typedef struct { TSKEY minKey; TSKEY maxKey; - SFileGroup fGroup; - SFile nHeadF; - SFile nLastF; + SDFileSet rSet; + SDFileSet wSet; } SHelperFile; typedef struct { diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 9fc2bbc451..62d84b66b6 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -75,6 +75,8 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; STsdbCfg * pCfg = &(pRepo->config); SCommitH ch = {0}; + SFSIter fsIter = {0}; + SDFileSet *pOldSet = NULL; if (pMem->numOfRows <= 0) return 0; @@ -86,11 +88,17 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { int sfid = MIN(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/); int efid = MAX(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/); + tsdbInitFSIter(pRepo, &fsIter); + pOldSet = tsdbFSIterNext(&fsIter); for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, &ch) < 0) { + if (tsdbCommitToFile(pRepo, pOldSet, &ch, fid) < 0) { tsdbDestroyCommitH(&ch, pMem->maxTables); return -1; } + + if (pOldSet != NULL && pOldSet->fid == fid) { + pOldSet = tsdbFSIterNext(&fsIter); + } } tsdbDestroyCommitH(&ch, pMem->maxTables); @@ -186,17 +194,69 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS return false; } -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) { +static int tsdbCommitToFile(STsdbRepo *pRepo, SDFileSet *pOldSet, SCommitH *pch, int fid) { STsdbCfg * pCfg = &(pRepo->config); SMemTable *pMem = pRepo->imem; TSKEY minKey, maxKey; - SDFileSet *pOldSet = NULL; + bool hasData; + SDFileSet rSet, wSet; + + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + hasData = tsdbHasDataToCommit(pch->iters, pMem->maxTables, minKey, maxKey); + + if (pOldSet == NULL || pOldSet->fid != fid) { // need to create SDFileSet and commit + if (!hasData) return 0; + + tsdbInitDFileSet(&wSet, REPO_ID(pRepo), fid, 0/*TODO*/, level, TFS_UNDECIDED_ID); + tsdbOpenDFileSet(&wSet, O_WRONLY | O_CREAT); + tsdbUpdateDFileSetHeader(&wSet); + } else { + int level = tsdbGetFidLevel(fid, &(pch->rtn)); + + // Check if SDFileSet expires + if (level < 0) { + if (hasData) { + tsdbSeekCommitIter(pch->iters, pMem->maxTables, maxKey + 1); + } + return 0; + } + + // TODO: Check if SDFileSet in correct level + if (true /*pOldSet level is not the same as level*/) { + tsdbInitDFileSet(&rSet, REPO_ID(pRepo), fid, 0/*TODO*/, level, TFS_UNDECIDED_ID); + // TODO: check if level is correct + tsdbOpenDFileSet(&wSet, O_WRONLY|O_CREAT); + } + } + + // TODO: close the file set + if (!hasData) { + tsdbUpdateDFileSet(pRepo, &rSet); + return 0; + } + + { + // TODO: commit the memory data + } + + if (tsdbUpdateDFileSet(pRepo, &wSet) < 0) { + return -1; + } + + return 0; + +#if 0 + STsdbCfg * pCfg = &(pRepo->config); + SMemTable *pMem = pRepo->imem; + TSKEY minKey, maxKey; + SDFileSet oldSet = {0}; SDFileSet newSet = {0}; + int level; tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - if (pOldSet) { // file exists - int level = tsdbGetFidLevel(fid, &(pch->rtn)); + level = tsdbGetFidLevel(fid, &(pch->rtn)); + if (pOldSet) { // fset exists, check if the file shold be removed or upgrade tier level if (level < 0) { // if out of data, remove it and ignore expired memory data tsdbRemoveExpiredDFileSet(pRepo, fid); tsdbSeekCommitIter(pch->iters, pMem->maxTables, maxKey + 1); @@ -205,6 +265,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) { // Move the data file set to correct level tsdbMoveDFileSet(pOldSet, level); + } else { // fset not exist, create the fset + pOldSet = &oldSet; + if (tsdbCreateDFileSet(fid, level, pOldSet) < 0) { + // TODO + return -1; + } } if (tsdbHasDataToCommit(pch->iters, pMem->maxTables, minKey, maxKey)) { @@ -221,9 +287,11 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) { TSDB_RLOCK_TABLE(pIter->pTable); if (pIter->pIter != NULL) { // has data in memory to commit + // TODO } TSDB_RUNLOCK_TABLE(pIter->pTable); + if (tsdbMoveLastBlockIfNeccessary() < 0) return -1; if (tsdbWriteCompInfo() < 0) return -1; @@ -232,11 +300,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) { if (tsdbWriteCompIdx() < 0) return -1; } - if (/*file exists OR has data to commit*/) { - tsdbUpdateDFileSet(pRepo, &newSet); - } + tsdbUpdateDFileSet(pRepo, &newSet); return 0; +#endif } static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index 8397410e50..86c6a7408f 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -88,7 +88,7 @@ int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile) { } int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { - SFSSnapshot *pSnapshot = REPO_FS(pRepo)->new; + SFSVer *pSnapshot = REPO_FS(pRepo)->new; SDFileSet * pOldSet; pOldSet = tsdbSearchDFileSet(pSnapshot, pSet->id, TD_GE); @@ -116,7 +116,7 @@ int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { } void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) { - SFSSnapshot *pSnapshot = REPO_FS(pRepo)->new; + SFSVer *pSnapshot = REPO_FS(pRepo)->new; while (taosArrayGetSize(pSnapshot->df) > 0) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(pSnapshot->df, 0); if (pSet->id < mfid) { @@ -125,38 +125,32 @@ void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) { } } -int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) { - int tlen = 0; - - tlen += taosEncodeVariantI64(buf, pInfo->size); - tlen += taosEncodeVariantI64(buf, pInfo->tombSize); - tlen += taosEncodeVariantI64(buf, pInfo->nRecords); - tlen += taosEncodeVariantI64(buf, pInfo->nDels); - tlen += taosEncodeFixedU32(buf, pInfo->magic); - - return tlen; -} - -void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) { - buf = taosDecodeVariantI64(buf, &(pInfo->size)); - buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); - buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); - buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); - buf = taosDecodeFixedU32(buf, &(pInfo->magic)); - - return buf; -} SDFileSet tsdbMoveDFileSet(SDFileSet *pOldSet, int to) { // TODO } -static int tsdbSaveFSSnapshot(int fd, SFSSnapshot *pSnapshot) { +int tsdbInitFSIter(STsdbRepo *pRepo, SFSIter *pIter) { // TODO return 0; } -static int tsdbLoadFSSnapshot(SFSSnapshot *pSnapshot) { +SDFileSet *tsdbFSIterNext(SFSIter *pIter) { + // TODO + return NULL; +} + +int tsdbCreateDFileSet(int fid, int level, SDFileSet *pSet) { + // TODO + return 0; +} + +static int tsdbSaveFSSnapshot(int fd, SFSVer *pSnapshot) { + // TODO + return 0; +} + +static int tsdbLoadFSSnapshot(SFSVer *pSnapshot) { // TODO return 0; } @@ -178,63 +172,6 @@ static int tsdbOpenFSImpl(STsdbRepo *pRepo) { return 0; } -static int tsdbEncodeMFile(void **buf, SMFile *pMFile) { - int tlen = 0; - - tlen += tsdbEncodeMFInfo(buf, &(pMFile->info)); - tlen += tfsEncodeFile(buf, &(pMFile->f)); - - return tlen; -} - -static void *tsdbDecodeMFile(void *buf, SMFile *pMFile) { - buf = tsdbDecodeMFInfo(buf, &(pMFile->info)); - buf = tfsDecodeFile(buf, &(pMFile->f)); - - return buf; -} - -static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { - int tlen = 0; - - tlen += taosEncodeFixedU32(buf, pInfo->magic); - tlen += taosEncodeFixedU32(buf, pInfo->len); - tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks); - tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks); - tlen += taosEncodeFixedU32(buf, pInfo->offset); - tlen += taosEncodeFixedU64(buf, pInfo->size); - tlen += taosEncodeFixedU64(buf, pInfo->tombSize); - - return tlen; -} - -static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) { - buf = taosDecodeFixedU32(buf, &(pInfo->magic)); - buf = taosDecodeFixedU32(buf, &(pInfo->len)); - buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); - buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks)); - buf = taosDecodeFixedU32(buf, &(pInfo->offset)); - buf = taosDecodeFixedU64(buf, &(pInfo->size)); - buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); - - return buf; -} - -static int tsdbEncodeDFile(void **buf, SDFile *pDFile) { - int tlen = 0; - - tlen += tsdbEncodeDFInfo(buf, &(pDFile->info)); - tlen += tfsEncodeFile(buf, &(pDFile->f)); - - return tlen; -} - -static void *tsdbDecodeDFile(void *buf, SDFile *pDFile) { - buf = tsdbDecodeDFInfo(buf, &(pDFile->info)); - buf = tfsDecodeFile(buf, &(pDFile->f)); - - return buf; -} static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) { int tlen = 0; @@ -256,7 +193,7 @@ static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) { return buf; } -static int tsdbEncodeFSSnapshot(void **buf, SFSSnapshot *pSnapshot) { +static int tsdbEncodeFSSnapshot(void **buf, SFSVer *pSnapshot) { int tlen = 0; int64_t size = 0; @@ -276,7 +213,7 @@ static int tsdbEncodeFSSnapshot(void **buf, SFSSnapshot *pSnapshot) { return tlen; } -static void *tsdbDecodeFSSnapshot(void *buf, SFSSnapshot *pSnapshot) { +static void *tsdbDecodeFSSnapshot(void *buf, SFSVer *pSnapshot) { int64_t size = 0; SDFile df; @@ -293,10 +230,10 @@ static void *tsdbDecodeFSSnapshot(void *buf, SFSSnapshot *pSnapshot) { return buf; } -static SFSSnapshot *tsdbNewSnapshot(int32_t nfiles) { - SFSSnapshot *pSnapshot; +static SFSVer *tsdbNewSnapshot(int32_t nfiles) { + SFSVer *pSnapshot; - pSnapshot = (SFSSnapshot *)calloc(1, sizeof(pSnapshot)); + pSnapshot = (SFSVer *)calloc(1, sizeof(pSnapshot)); if (pSnapshot == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; @@ -312,7 +249,7 @@ static SFSSnapshot *tsdbNewSnapshot(int32_t nfiles) { return pSnapshot; } -static SFSSnapshot *tsdbFreeSnapshot(SFSSnapshot *pSnapshot) { +static SFSVer *tsdbFreeSnapshot(SFSVer *pSnapshot) { if (pSnapshot) { taosArrayDestroy(pSnapshot->df); free(pSnapshot); @@ -359,7 +296,7 @@ static STsdbFS *tsdbFreeFS(STsdbFS *pFs) { return NULL; } -static int tsdbCopySnapshot(SFSSnapshot *src, SFSSnapshot *dst) { +static int tsdbCopySnapshot(SFSVer *src, SFSVer *dst) { dst->meta = src->meta; dst->mf = src->meta; taosArrayCopy(dst->df, src->df); @@ -379,7 +316,7 @@ static int tsdbCompFSetId(const void *key1, const void *key2) { } } -static SDFileSet *tsdbSearchDFileSet(SFSSnapshot *pSnapshot, int fid, int flags) { +static SDFileSet *tsdbSearchDFileSet(SFSVer *pSnapshot, int fid, int flags) { void *ptr = taosArraySearch(pSnapshot->df, (void *)(&fid), tsdbCompFSetId, flags); return (ptr == NULL) ? NULL : ((SDFileSet *)ptr); } diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 411c1d796e..785933000b 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -12,351 +12,31 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#define _DEFAULT_SOURCE -#define TAOS_RANDOM_FILE_FAIL_TEST -#include -#include "os.h" -#include "talgo.h" -#include "tchecksum.h" + #include "tsdbMain.h" -#include "tutil.h" -#include "tfs.h" -#include "tarray.h" -const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"}; +#define TSDB_FILE_OPENED(f) ((f)->fd >= 0) +#define TSDB_FILE_SET_CLOSED(f) ((f)->fd = -1) -static int compFGroup(const void *arg1, const void *arg2); -static int keyFGroupCompFunc(const void *key, const void *fgroup); -static void *tsdbScanAllFiles(STsdbRepo *pRepo); -static int tsdbCompareFile(const void *arg1, const void *arg2); -static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile); -static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix); - -// STsdbFileH =========================================== -STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { - STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(*pFileH)); - if (pFileH == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - int code = pthread_rwlock_init(&(pFileH->fhlock), NULL); - if (code != 0) { - tsdbError("vgId:%d failed to init file handle lock since %s", pCfg->tsdbId, strerror(code)); - terrno = TAOS_SYSTEM_ERROR(code); - goto _err; - } - - pFileH->maxFGroups = TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile); - - pFileH->pFGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup)); - if (pFileH->pFGroup == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - return pFileH; - -_err: - tsdbFreeFileH(pFileH); - return NULL; -} - -void tsdbFreeFileH(STsdbFileH *pFileH) { - if (pFileH) { - pthread_rwlock_destroy(&pFileH->fhlock); - tfree(pFileH->pFGroup); - free(pFileH); - } -} - -int tsdbOpenFileH(STsdbRepo *pRepo) { - ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL); - - void *pfArray = NULL; - - // Scan the whole directory and get data - pfArray = tsdbScanAllFiles(pRepo); - if (pfArray == NULL) { - return -1; - } - - if (taosArrayGetSize(pfArray) == 0) { - taosArrayDestroy(pfArray); - return 0; - } - - // Sort the files - taosArraySort(pfArray, tsdbCompareFile); - - // Loop to recover the files - int iter = 0; - while (true) { - if (iter >= taosArrayGetSize(pfArray)) break; - - int vid, fid; - char bname[TSDB_FILENAME_LEN] = "\0"; - char suffix[TSDB_FILENAME_LEN] = "\0"; - int count = 0; - - TFILE *pf = taosArrayGet(pfArray, iter); - tfsbasename(pf, bname); - tsdbParseFname(bname, &vid, &fid, suffix); - count++; - iter++; - - while (true) { - int nfid = 0; - if (iter >= taosArrayGetSize(pfArray)) break; - TFILE *npf = taosArrayGet(pfArray, iter); - tfsbasename(npf, bname); - tsdbParseFname(bname, &vid, &nfid, suffix); - - if (nfid != fid) break; - count++; - iter++; - } - - tsdbRestoreFile(pRepo, pf, count); - } - - taosArrayDestroy(pfArray); - return 0; -} - -void tsdbCloseFileH(STsdbRepo *pRepo, bool isRestart) { - STsdbFileH *pFileH = pRepo->tsdbFileH; - - for (int i = 0; i < pFileH->nFGroups; i++) { - SFileGroup *pFGroup = pFileH->pFGroup + i; - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - tsdbCloseFile(&(pFGroup->files[type])); - } - if (isRestart) { - tfsDecDiskFile(pFGroup->files[0].file.level, pFGroup->files[0].file.level, TSDB_FILE_TYPE_MAX); - } - } -} - -// SFileGroup =========================================== -SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level) { - STsdbFileH *pFileH = pRepo->tsdbFileH; - SFileGroup fg = {0}; - int id = TFS_UNDECIDED_ID; - char fname[TSDB_FILENAME_LEN] = "\0"; - - ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL); - ASSERT(pFileH->nFGroups < pFileH->maxFGroups); - - // SET FILE GROUP - fg.fileId = fid; - - // CREATE FILES - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile *pFile = &(fg.files[type]); - - pFile->fd = -1; - pFile->info.size = TSDB_FILE_HEAD_SIZE; - pFile->info.magic = TSDB_FILE_INIT_MAGIC; - - tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, fname); - tfsInitFile(&pFile->file, level, id, fname); - - if (tsdbOpenFile(pFile, O_WRONLY|O_CREAT) < 0) return NULL; - - if (tsdbUpdateFileHeader(pFile) < 0) { - tsdbCloseFile(pFile); - return NULL; - } - - tsdbCloseFile(pFile); - - level = TFILE_LEVEL(&(pFile->file)); - id = TFILE_ID(&(pFile->file)); - } - - // PUT GROUP INTO FILE HANDLE - pthread_rwlock_wrlock(&pFileH->fhlock); - pFileH->pFGroup[pFileH->nFGroups++] = fg; - qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup); - pthread_rwlock_unlock(&pFileH->fhlock); - - SFileGroup *pfg = tsdbSearchFGroup(pFileH, fid, TD_EQ); - ASSERT(pfg != NULL); - return pfg; -} - -void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { - ASSERT(pFGroup != NULL); - STsdbFileH *pFileH = pRepo->tsdbFileH; - - SFileGroup fg = *pFGroup; - - int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1); - if (nFilesLeft > 0) { - memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft); - } - - pFileH->nFGroups--; - ASSERT(pFileH->nFGroups >= 0); - - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile *pFile = &(fg.files[type]); - tfsremove(&(pFile->file)); - } -} - -SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) { - void *ptr = taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), - keyFGroupCompFunc, flags); - if (ptr == NULL) return NULL; - return (SFileGroup *)ptr; -} - -int tsdbGetFidLevel(int fid, SFidGroup fidg) { - if (fid >= fidg.maxFid) { - return 0; - } else if (fid >= fidg.midFid) { - return 1; - } else if (fid >= fidg.minFid) { - return 2; +// ============== Operations on SMFile +void tsdbInitMFile(SMFile *pMFile, int vid, int ver, SMFInfo *pInfo) { + TSDB_FILE_SET_CLOSED(pMFile); + if (pInfo == NULL) { + memset(&(pMFile->info), 0, sizeof(pMFile->info)); + pMFile->info.magic = TSDB_FILE_INIT_MAGIC; } else { - return -1; + pMFile->info = *pInfo; } + tfsInitFile(&(pMFile->f), TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID, NULL /*TODO*/); + + return pMFile; } -static int compFGroup(const void *arg1, const void *arg2) { - int val1 = ((SFileGroup *)arg1)->fileId; - int val2 = ((SFileGroup *)arg2)->fileId; +int tsdbOpenMFile(SMFile *pMFile, int flags) { + ASSERT(!TSDB_FILE_OPENED(pMFile)); - if (val1 < val2) { - return -1; - } else if (val1 > val2) { - return 1; - } else { - return 0; - } -} - -static int keyFGroupCompFunc(const void *key, const void *fgroup) { - int fid = *(int *)key; - SFileGroup *pFGroup = (SFileGroup *)fgroup; - if (fid == pFGroup->fileId) { - return 0; - } else { - return fid > pFGroup->fileId ? 1 : -1; - } -} - -// SFileGroupIter =========================================== -void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { - pIter->pFileH = pFileH; - pIter->direction = direction; - - if (pFileH->nFGroups == 0) { - pIter->index = -1; - pIter->fileId = -1; - } else { - if (direction == TSDB_FGROUP_ITER_FORWARD) { - pIter->index = 0; - } else { - pIter->index = pFileH->nFGroups - 1; - } - pIter->fileId = pFileH->pFGroup[pIter->index].fileId; - } -} - -void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { - STsdbFileH *pFileH = pIter->pFileH; - - if (pFileH->nFGroups == 0) { - pIter->index = -1; - pIter->fileId = -1; - return; - } - - int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; - void *ptr = taosbsearch(&fid, (void *)pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags); - if (ptr == NULL) { - pIter->index = -1; - pIter->fileId = -1; - } else { - pIter->index = (int)(POINTER_DISTANCE(ptr, pFileH->pFGroup) / sizeof(SFileGroup)); - pIter->fileId = ((SFileGroup *)ptr)->fileId; - } -} - -SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { - STsdbFileH *pFileH = pIter->pFileH; - SFileGroup *pFGroup = NULL; - - if (pIter->index < 0 || pIter->index >= pFileH->nFGroups || pIter->fileId < 0) return NULL; - - pFGroup = &pFileH->pFGroup[pIter->index]; - if (pFGroup->fileId != pIter->fileId) { - tsdbSeekFileGroupIter(pIter, pIter->fileId); - } - - if (pIter->index < 0) return NULL; - - pFGroup = &pFileH->pFGroup[pIter->index]; - ASSERT(pFGroup->fileId == pIter->fileId); - - if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) { - pIter->index++; - } else { - pIter->index--; - } - - if (pIter->index >= 0 && pIter->index < pFileH->nFGroups) { - pIter->fileId = pFileH->pFGroup[pIter->index].fileId; - } else { - pIter->fileId = -1; - } - - return pFGroup; -} - -// SFile =========================================== -int tsdbOpenFile(SFile *pFile, int oflag) { - ASSERT(!TSDB_IS_FILE_OPENED(pFile)); - - pFile->fd = tfsopen(&(pFile->file), oflag); - if (pFile->fd < 0) { - tsdbError("failed to open file %s since %s", TSDB_FILE_NAME(pFile), tstrerror(terrno)); - return -1; - } - - tsdbTrace("open file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd); - - return 0; -} - -void tsdbCloseFile(SFile *pFile) { - if (TSDB_IS_FILE_OPENED(pFile)) { - tsdbTrace("close file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd); - tfsclose(pFile->fd); - pFile->fd = -1; - } -} - -int tsdbUpdateFileHeader(SFile *pFile) { - char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - - void *pBuf = (void *)buf; - taosEncodeFixedU32((void *)(&pBuf), TSDB_FILE_VERSION); - tsdbEncodeSFileInfo((void *)(&pBuf), &(pFile->info)); - - taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); - - if (lseek(pFile->fd, 0, SEEK_SET) < 0) { - tsdbError("failed to lseek file %s since %s", TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - if (taosWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { - tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, TSDB_FILE_NAME(pFile), - strerror(errno)); + pMFile->fd = open(pMFile->f.aname, flags); + if (pMFile->fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -364,8 +44,155 @@ int tsdbUpdateFileHeader(SFile *pFile) { return 0; } -int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) { +void tsdbCloseMFile(SMFile *pMFile) { + if (TSDB_FILE_OPENED(pMFile)) { + close(pMFile->fd); + TSDB_FILE_SET_CLOSED(pMFile); + } +} + +int64_t tsdbSeekMFile(SMFile *pMFile, int64_t offset, int whence) { + ASSERT(TSDB_FILE_OPENED(pMFile)); + + int64_t loffset = taosLSeek(pMFile->fd, offset, whence); + if (loffset < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return loffset; +} + +int64_t tsdbWriteMFile(SMFile *pMFile, void *buf, int64_t nbyte) { + ASSERT(TSDB_FILE_OPENED(pMFile)); + + int64_t nwrite = taosWrite(pMFile->fd, buf, nbyte); + if (nwrite < nbyte) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + pMFile->info.size += nbyte; + return nwrite; +} + +int64_t tsdbTellMFile(SMFile *pMFile) { return tsdbSeekMFile(pMFile, 0, SEEK_CUR); } + +int tsdbEncodeMFile(void **buf, SMFile *pMFile) { int tlen = 0; + + tlen += tsdbEncodeMFInfo(buf, &(pMFile->info)); + tlen += tfsEncodeFile(buf, &(pMFile->f)); + + return tlen; +} + +void *tsdbDecodeMFile(void *buf, SMFile *pMFile) { + buf = tsdbDecodeMFInfo(buf, &(pMFile->info)); + buf = tfsDecodeFile(buf, &(pMFile->f)); + + return buf; +} + +static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) { + int tlen = 0; + + tlen += taosEncodeVariantI64(buf, pInfo->size); + tlen += taosEncodeVariantI64(buf, pInfo->tombSize); + tlen += taosEncodeVariantI64(buf, pInfo->nRecords); + tlen += taosEncodeVariantI64(buf, pInfo->nDels); + tlen += taosEncodeFixedU32(buf, pInfo->magic); + + return tlen; +} + +static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) { + buf = taosDecodeVariantI64(buf, &(pInfo->size)); + buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); + buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); + buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); + + return buf; +} + +// ============== Operations on SDFile +void tsdbInitDFile(SDFile *pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo *pInfo, TSDB_FILE_T ftype) { + TSDB_FILE_SET_CLOSED(pDFile); + if (pInfo == NULL) { + memset(&(pDFile->info), 0, sizeof(pDFile->info)); + pDFile->info.magic = TSDB_FILE_INIT_MAGIC; + } else { + pDFile->info = *pInfo; + } + tfsInitFile(&(pDFile->f), level, id, NULL /*TODO*/); +} + +int tsdbOpenDFile(SDFile *pDFile, int flags) { + ASSERT(!TSDB_FILE_OPENED(pDFile)); + + pDFile->fd = open(pDFile->f.aname, flags); + if (pDFile->fd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} + +void tsdbCloseDFile(SDFile *pDFile) { + if (TSDB_FILE_OPENED(pDFile)) { + close(pDFile->fd); + TSDB_FILE_SET_CLOSED(pDFile); + } +} + +int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) { + ASSERT(TSDB_FILE_OPENED(pDFile)); + + int64_t loffset = taosLSeek(pDFile->fd, offset, whence); + if (loffset < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return loffset; +} + +int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) { + ASSERT(TSDB_FILE_OPENED(pDFile)); + + int64_t nwrite = taosWrite(pDFile->fd, buf, nbyte); + if (nwrite < nbyte) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + pDFile->info.size += nbyte; + return nwrite; +} + +int64_t tsdbTellDFile(SDFile *pDFile) { return tsdbSeekDFile(pDFile, 0, SEEK_CUR); } + +int tsdbEncodeDFile(void **buf, SDFile *pDFile) { + int tlen = 0; + + tlen += tsdbEncodeDFInfo(buf, &(pDFile->info)); + tlen += tfsEncodeFile(buf, &(pDFile->f)); + + return tlen; +} + +void *tsdbDecodeDFile(void *buf, SDFile *pDFile) { + buf = tsdbDecodeDFInfo(buf, &(pDFile->info)); + buf = tfsDecodeFile(buf, &(pDFile->f)); + + return buf; +} + +static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { + int tlen = 0; + tlen += taosEncodeFixedU32(buf, pInfo->magic); tlen += taosEncodeFixedU32(buf, pInfo->len); tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks); @@ -377,7 +204,7 @@ int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) { return tlen; } -void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { +static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) { buf = taosDecodeFixedU32(buf, &(pInfo->magic)); buf = taosDecodeFixedU32(buf, &(pInfo->len)); buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); @@ -389,268 +216,41 @@ void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { return buf; } -int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) { - char buf[TSDB_FILE_HEAD_SIZE] = "\0"; +// ============== Operations on SDFileSet +void tsdbInitDFileSet(SDFileSet *pSet, int vid, int fid, int ver, int level, int id) { + pSet->fid = fid; + pSet->state = 0; - if (lseek(pFile->fd, 0, SEEK_SET) < 0) { - tsdbError("failed to lseek file %s to start since %s", TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); + tsdbInitDFile(pDFile, vid, fid, ver, level, id, NULL, ftype); + // TODO: reset level and id } +} - if (taosRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { - tsdbError("failed to read file %s header part with %d bytes, reason:%s", TSDB_FILE_NAME(pFile), TSDB_FILE_HEAD_SIZE, - strerror(errno)); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; +int tsdbOpenDFileSet(SDFileSet *pSet, int flags) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); + + if (tsdbOpenDFile(pDFile, flags) < 0) { + tsdbCloseDFileSet(pSet); + return -1; + } } +} - if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) { - tsdbError("file %s header part is corrupted with failed checksum", TSDB_FILE_NAME(pFile)); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; +void tsdbCloseDFileSet(SDFileSet *pSet) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + tsdbCloseDFile(pDFile); } +} - void *pBuf = (void *)buf; - pBuf = taosDecodeFixedU32(pBuf, version); - pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info)); - +int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { + // TODO return 0; } -void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) { // TODO - uint32_t version = 0; - SFile file; - SFile * pFile = &file; - - tfsInitFile(&(pFile->file), TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID, fname); - pFile->fd = -1; - - if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err; - if (tsdbLoadFileHeader(pFile, &version) < 0) goto _err; - - off_t offset = lseek(pFile->fd, 0, SEEK_END); - if (offset < 0) goto _err; - tsdbCloseFile(pFile); - - *magic = pFile->info.magic; - *size = offset; - - return; - -_err: - tsdbCloseFile(pFile); - *magic = TSDB_FILE_INIT_MAGIC; - *size = 0; -} - -// Retention =========================================== -void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) { - STsdbFileH *pFileH = pRepo->tsdbFileH; - SFileGroup *pGroup = pFileH->pFGroup; - - pthread_rwlock_wrlock(&(pFileH->fhlock)); - - while (pFileH->nFGroups > 0 && pGroup[0].fileId < pFidGroup->minFid) { - tsdbRemoveFileGroup(pRepo, pGroup); - } - - pthread_rwlock_unlock(&(pFileH->fhlock)); -} - -void tsdbGetFidGroup(STsdbCfg *pCfg, SFidGroup *pFidGroup) { - TSKEY now = taosGetTimestamp(pCfg->precision); - - pFidGroup->minFid = - TSDB_KEY_FILEID(now - pCfg->keep * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); - pFidGroup->midFid = - TSDB_KEY_FILEID(now - pCfg->keep2 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); - pFidGroup->maxFid = - TSDB_KEY_FILEID(now - pCfg->keep1 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); -} - -int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) { - STsdbFileH *pFileH = pRepo->tsdbFileH; - - for (int i = 0; i < pFileH->nFGroups; i++) { - SFileGroup ofg = pFileH->pFGroup[i]; - - int level = tsdbGetFidLevel(ofg.fileId, *pFidGroup); - ASSERT(level >= 0); - - if (level == ofg.files[0].file.level) continue; - - // COPY THE FILE GROUP TO THE RIGHT LEVEL - SFileGroup nfg = ofg; - int id = TFS_UNDECIDED_ID; - int type = 0; - for (; type < TSDB_FILE_TYPE_MAX; type++) { - tfsInitFile(&nfg.files[type].file, level, id, nfg.files[type].file.rname); - if (tfscopy(&(ofg.files[type].file), &(nfg.files[type].file)) < 0) { - if (terrno == TSDB_CODE_FS_INVLD_LEVEL) break; - tsdbError("vgId:%d failed to move fid %d from level %d to level %d since %s", REPO_ID(pRepo), ofg.fileId, - ofg.files[0].file.level, level, strerror(terrno)); - return -1; - } - - id = nfg.files[type].file.level; - id = nfg.files[type].file.id; - } - - if (type < TSDB_FILE_TYPE_MAX) continue; - - // Register new file into TSDB - pthread_rwlock_wrlock(&(pFileH->fhlock)); - pFileH->pFGroup[i] = nfg; - pthread_rwlock_unlock(&(pFileH->fhlock)); - - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile *pFile = &(ofg.files[type]); - tfsremove(&(pFile->file)); - } - - tsdbDebug("vgId:%d move file group %d from level %d to level %d", REPO_ID(pRepo), ofg.fileId, - ofg.files[0].file.level, level); - } - +int tsdbMoveDFileSet(SDFileSet *pOldSet, int tolevel, SDFileSet *pNewSet) { + // TODO return 0; -} - -static void *tsdbScanAllFiles(STsdbRepo *pRepo) { - void * farray = NULL; - TDIR * tdir = NULL; - char dirName[TSDB_FILENAME_LEN] = "\0"; - char bname[TSDB_FILENAME_LEN] = "\0"; - regex_t regex1 = {0}; - const TFILE *pf = NULL; - - farray = taosArrayInit(256, sizeof(TFILE)); - if (farray == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } - - regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat|l|d|h|s)$", REG_EXTENDED); - - snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", REPO_ID(pRepo)); - - tdir = tfsOpendir(dirName); - - while ((pf = tfsReaddir(tdir)) != NULL) { - tfsbasename(pf, bname); - - int code = regexec(®ex1, bname, 0, NULL, 0); - if (code != 0) { - tsdbWarn("vgId:%d file %s exists, ignore it", REPO_ID(pRepo), pf->aname); - continue; - } - - taosArrayPush(farray, pf); - } - - regfree(®ex1); - tfsClosedir(tdir); - - return farray; -} - -static int tsdbCompareFile(const void *arg1, const void *arg2) { - char bname1[TSDB_FILENAME_LEN] = "\0"; - char bname2[TSDB_FILENAME_LEN] = "\0"; - TFILE *pf1 = (TFILE *)arg1; - TFILE *pf2 = (TFILE *)arg2; - int vid1, fid1, vid2, fid2; - - tfsbasename(pf1, bname1); - tfsbasename(pf2, bname2); - - sscanf(bname1, "v%df%d", &vid1, &fid1); - sscanf(bname2, "v%df%d", &vid2, &fid2); - - ASSERT(vid1 == vid2); - if (fid1 < fid2) { - return -1; - } else if (fid1 == fid2) { - return 0; - } else { - return 1; - } -} - -static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile) { - char backname[TSDB_FILENAME_LEN*2] = "\0"; - char bname[TSDB_FILENAME_LEN] = "\0"; - STsdbFileH *pFileH = pRepo->tsdbFileH; - TFILE * pfArray[TSDB_FILE_TYPE_MAX] = {0}; - TFILE * pHf = NULL; - TFILE * pLf = NULL; - SFileGroup fg = {0}; - int vid = 0; - int fid = 0; - char suffix[TSDB_FILENAME_LEN] = "\0"; - - for (int i = 0; i < nfile; i++) { - TFILE *pf = pfiles + i; - - tfsbasename(pf, bname); - tsdbParseFname(bname, &vid, &fid, suffix); - - if (strcmp(suffix, ".head") == 0) { - pfArray[TSDB_FILE_TYPE_HEAD] = pf; - } else if (strcmp(suffix, ".data") == 0) { - pfArray[TSDB_FILE_TYPE_DATA] = pf; - } else if (strcmp(suffix, ".last") == 0) { - pfArray[TSDB_FILE_TYPE_LAST] = pf; - } else if (strcmp(suffix, ".l") == 0) { - pLf = pf; - } else if (strcmp(suffix, ".h") == 0) { - pHf = pf; - } else { - tsdbWarn("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), pf->aname); - } - } - - if (pfArray[TSDB_FILE_TYPE_HEAD] == NULL || pfArray[TSDB_FILE_TYPE_DATA] == NULL || pfArray[TSDB_FILE_TYPE_LAST] == NULL) { - for (int i = 0; i < nfile; i++) { - snprintf(backname, TSDB_FILENAME_LEN*2, "%s_bak", (pfiles + i)->aname); - rename((pfiles + i)->aname, backname); - } - - return -1; - } - - if (pHf == NULL) { - if (pLf != NULL) { - rename(pLf->aname, pfArray[TSDB_FILE_TYPE_LAST]->aname); - } - } else { - if (pLf != NULL) { - remove(pLf->aname); - } - remove(pHf->aname); - } - - // Register file - fg.fileId = fid; - - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile * pFile = fg.files + type; - uint32_t version = 0; - - pFile->fd = -1; - pFile->file = *pfArray[type]; // TODO - tsdbOpenFile(pFile, O_RDONLY); - tsdbLoadFileHeader(pFile, &version); - tsdbCloseFile(pFile); - } - - pFileH->pFGroup[pFileH->nFGroups++] = fg; - - tfsIncDiskFile(pfArray[TSDB_FILE_TYPE_HEAD]->level, pfArray[TSDB_FILE_TYPE_HEAD]->id, TSDB_FILE_TYPE_MAX); - - return 0; -} - -static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix) { - sscanf(bname, "v%df%d%s", vid, fid, suffix); } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile_bak.c b/src/tsdb/src/tsdbFile_bak.c new file mode 100644 index 0000000000..411c1d796e --- /dev/null +++ b/src/tsdb/src/tsdbFile_bak.c @@ -0,0 +1,656 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#define _DEFAULT_SOURCE +#define TAOS_RANDOM_FILE_FAIL_TEST +#include +#include "os.h" +#include "talgo.h" +#include "tchecksum.h" +#include "tsdbMain.h" +#include "tutil.h" +#include "tfs.h" +#include "tarray.h" + +const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"}; + +static int compFGroup(const void *arg1, const void *arg2); +static int keyFGroupCompFunc(const void *key, const void *fgroup); +static void *tsdbScanAllFiles(STsdbRepo *pRepo); +static int tsdbCompareFile(const void *arg1, const void *arg2); +static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile); +static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix); + +// STsdbFileH =========================================== +STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { + STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(*pFileH)); + if (pFileH == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + + int code = pthread_rwlock_init(&(pFileH->fhlock), NULL); + if (code != 0) { + tsdbError("vgId:%d failed to init file handle lock since %s", pCfg->tsdbId, strerror(code)); + terrno = TAOS_SYSTEM_ERROR(code); + goto _err; + } + + pFileH->maxFGroups = TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile); + + pFileH->pFGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup)); + if (pFileH->pFGroup == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + + return pFileH; + +_err: + tsdbFreeFileH(pFileH); + return NULL; +} + +void tsdbFreeFileH(STsdbFileH *pFileH) { + if (pFileH) { + pthread_rwlock_destroy(&pFileH->fhlock); + tfree(pFileH->pFGroup); + free(pFileH); + } +} + +int tsdbOpenFileH(STsdbRepo *pRepo) { + ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL); + + void *pfArray = NULL; + + // Scan the whole directory and get data + pfArray = tsdbScanAllFiles(pRepo); + if (pfArray == NULL) { + return -1; + } + + if (taosArrayGetSize(pfArray) == 0) { + taosArrayDestroy(pfArray); + return 0; + } + + // Sort the files + taosArraySort(pfArray, tsdbCompareFile); + + // Loop to recover the files + int iter = 0; + while (true) { + if (iter >= taosArrayGetSize(pfArray)) break; + + int vid, fid; + char bname[TSDB_FILENAME_LEN] = "\0"; + char suffix[TSDB_FILENAME_LEN] = "\0"; + int count = 0; + + TFILE *pf = taosArrayGet(pfArray, iter); + tfsbasename(pf, bname); + tsdbParseFname(bname, &vid, &fid, suffix); + count++; + iter++; + + while (true) { + int nfid = 0; + if (iter >= taosArrayGetSize(pfArray)) break; + TFILE *npf = taosArrayGet(pfArray, iter); + tfsbasename(npf, bname); + tsdbParseFname(bname, &vid, &nfid, suffix); + + if (nfid != fid) break; + count++; + iter++; + } + + tsdbRestoreFile(pRepo, pf, count); + } + + taosArrayDestroy(pfArray); + return 0; +} + +void tsdbCloseFileH(STsdbRepo *pRepo, bool isRestart) { + STsdbFileH *pFileH = pRepo->tsdbFileH; + + for (int i = 0; i < pFileH->nFGroups; i++) { + SFileGroup *pFGroup = pFileH->pFGroup + i; + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + tsdbCloseFile(&(pFGroup->files[type])); + } + if (isRestart) { + tfsDecDiskFile(pFGroup->files[0].file.level, pFGroup->files[0].file.level, TSDB_FILE_TYPE_MAX); + } + } +} + +// SFileGroup =========================================== +SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level) { + STsdbFileH *pFileH = pRepo->tsdbFileH; + SFileGroup fg = {0}; + int id = TFS_UNDECIDED_ID; + char fname[TSDB_FILENAME_LEN] = "\0"; + + ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL); + ASSERT(pFileH->nFGroups < pFileH->maxFGroups); + + // SET FILE GROUP + fg.fileId = fid; + + // CREATE FILES + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pFile = &(fg.files[type]); + + pFile->fd = -1; + pFile->info.size = TSDB_FILE_HEAD_SIZE; + pFile->info.magic = TSDB_FILE_INIT_MAGIC; + + tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, fname); + tfsInitFile(&pFile->file, level, id, fname); + + if (tsdbOpenFile(pFile, O_WRONLY|O_CREAT) < 0) return NULL; + + if (tsdbUpdateFileHeader(pFile) < 0) { + tsdbCloseFile(pFile); + return NULL; + } + + tsdbCloseFile(pFile); + + level = TFILE_LEVEL(&(pFile->file)); + id = TFILE_ID(&(pFile->file)); + } + + // PUT GROUP INTO FILE HANDLE + pthread_rwlock_wrlock(&pFileH->fhlock); + pFileH->pFGroup[pFileH->nFGroups++] = fg; + qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup); + pthread_rwlock_unlock(&pFileH->fhlock); + + SFileGroup *pfg = tsdbSearchFGroup(pFileH, fid, TD_EQ); + ASSERT(pfg != NULL); + return pfg; +} + +void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { + ASSERT(pFGroup != NULL); + STsdbFileH *pFileH = pRepo->tsdbFileH; + + SFileGroup fg = *pFGroup; + + int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1); + if (nFilesLeft > 0) { + memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft); + } + + pFileH->nFGroups--; + ASSERT(pFileH->nFGroups >= 0); + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pFile = &(fg.files[type]); + tfsremove(&(pFile->file)); + } +} + +SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) { + void *ptr = taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), + keyFGroupCompFunc, flags); + if (ptr == NULL) return NULL; + return (SFileGroup *)ptr; +} + +int tsdbGetFidLevel(int fid, SFidGroup fidg) { + if (fid >= fidg.maxFid) { + return 0; + } else if (fid >= fidg.midFid) { + return 1; + } else if (fid >= fidg.minFid) { + return 2; + } else { + return -1; + } +} + +static int compFGroup(const void *arg1, const void *arg2) { + int val1 = ((SFileGroup *)arg1)->fileId; + int val2 = ((SFileGroup *)arg2)->fileId; + + if (val1 < val2) { + return -1; + } else if (val1 > val2) { + return 1; + } else { + return 0; + } +} + +static int keyFGroupCompFunc(const void *key, const void *fgroup) { + int fid = *(int *)key; + SFileGroup *pFGroup = (SFileGroup *)fgroup; + if (fid == pFGroup->fileId) { + return 0; + } else { + return fid > pFGroup->fileId ? 1 : -1; + } +} + +// SFileGroupIter =========================================== +void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { + pIter->pFileH = pFileH; + pIter->direction = direction; + + if (pFileH->nFGroups == 0) { + pIter->index = -1; + pIter->fileId = -1; + } else { + if (direction == TSDB_FGROUP_ITER_FORWARD) { + pIter->index = 0; + } else { + pIter->index = pFileH->nFGroups - 1; + } + pIter->fileId = pFileH->pFGroup[pIter->index].fileId; + } +} + +void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { + STsdbFileH *pFileH = pIter->pFileH; + + if (pFileH->nFGroups == 0) { + pIter->index = -1; + pIter->fileId = -1; + return; + } + + int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; + void *ptr = taosbsearch(&fid, (void *)pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags); + if (ptr == NULL) { + pIter->index = -1; + pIter->fileId = -1; + } else { + pIter->index = (int)(POINTER_DISTANCE(ptr, pFileH->pFGroup) / sizeof(SFileGroup)); + pIter->fileId = ((SFileGroup *)ptr)->fileId; + } +} + +SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { + STsdbFileH *pFileH = pIter->pFileH; + SFileGroup *pFGroup = NULL; + + if (pIter->index < 0 || pIter->index >= pFileH->nFGroups || pIter->fileId < 0) return NULL; + + pFGroup = &pFileH->pFGroup[pIter->index]; + if (pFGroup->fileId != pIter->fileId) { + tsdbSeekFileGroupIter(pIter, pIter->fileId); + } + + if (pIter->index < 0) return NULL; + + pFGroup = &pFileH->pFGroup[pIter->index]; + ASSERT(pFGroup->fileId == pIter->fileId); + + if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) { + pIter->index++; + } else { + pIter->index--; + } + + if (pIter->index >= 0 && pIter->index < pFileH->nFGroups) { + pIter->fileId = pFileH->pFGroup[pIter->index].fileId; + } else { + pIter->fileId = -1; + } + + return pFGroup; +} + +// SFile =========================================== +int tsdbOpenFile(SFile *pFile, int oflag) { + ASSERT(!TSDB_IS_FILE_OPENED(pFile)); + + pFile->fd = tfsopen(&(pFile->file), oflag); + if (pFile->fd < 0) { + tsdbError("failed to open file %s since %s", TSDB_FILE_NAME(pFile), tstrerror(terrno)); + return -1; + } + + tsdbTrace("open file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd); + + return 0; +} + +void tsdbCloseFile(SFile *pFile) { + if (TSDB_IS_FILE_OPENED(pFile)) { + tsdbTrace("close file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd); + tfsclose(pFile->fd); + pFile->fd = -1; + } +} + +int tsdbUpdateFileHeader(SFile *pFile) { + char buf[TSDB_FILE_HEAD_SIZE] = "\0"; + + void *pBuf = (void *)buf; + taosEncodeFixedU32((void *)(&pBuf), TSDB_FILE_VERSION); + tsdbEncodeSFileInfo((void *)(&pBuf), &(pFile->info)); + + taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); + + if (lseek(pFile->fd, 0, SEEK_SET) < 0) { + tsdbError("failed to lseek file %s since %s", TSDB_FILE_NAME(pFile), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (taosWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { + tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, TSDB_FILE_NAME(pFile), + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} + +int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) { + int tlen = 0; + tlen += taosEncodeFixedU32(buf, pInfo->magic); + tlen += taosEncodeFixedU32(buf, pInfo->len); + tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks); + tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks); + tlen += taosEncodeFixedU32(buf, pInfo->offset); + tlen += taosEncodeFixedU64(buf, pInfo->size); + tlen += taosEncodeFixedU64(buf, pInfo->tombSize); + + return tlen; +} + +void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); + buf = taosDecodeFixedU32(buf, &(pInfo->len)); + buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); + buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks)); + buf = taosDecodeFixedU32(buf, &(pInfo->offset)); + buf = taosDecodeFixedU64(buf, &(pInfo->size)); + buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); + + return buf; +} + +int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) { + char buf[TSDB_FILE_HEAD_SIZE] = "\0"; + + if (lseek(pFile->fd, 0, SEEK_SET) < 0) { + tsdbError("failed to lseek file %s to start since %s", TSDB_FILE_NAME(pFile), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (taosRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { + tsdbError("failed to read file %s header part with %d bytes, reason:%s", TSDB_FILE_NAME(pFile), TSDB_FILE_HEAD_SIZE, + strerror(errno)); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) { + tsdbError("file %s header part is corrupted with failed checksum", TSDB_FILE_NAME(pFile)); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + void *pBuf = (void *)buf; + pBuf = taosDecodeFixedU32(pBuf, version); + pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info)); + + return 0; +} + +void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) { // TODO + uint32_t version = 0; + SFile file; + SFile * pFile = &file; + + tfsInitFile(&(pFile->file), TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID, fname); + pFile->fd = -1; + + if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err; + if (tsdbLoadFileHeader(pFile, &version) < 0) goto _err; + + off_t offset = lseek(pFile->fd, 0, SEEK_END); + if (offset < 0) goto _err; + tsdbCloseFile(pFile); + + *magic = pFile->info.magic; + *size = offset; + + return; + +_err: + tsdbCloseFile(pFile); + *magic = TSDB_FILE_INIT_MAGIC; + *size = 0; +} + +// Retention =========================================== +void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) { + STsdbFileH *pFileH = pRepo->tsdbFileH; + SFileGroup *pGroup = pFileH->pFGroup; + + pthread_rwlock_wrlock(&(pFileH->fhlock)); + + while (pFileH->nFGroups > 0 && pGroup[0].fileId < pFidGroup->minFid) { + tsdbRemoveFileGroup(pRepo, pGroup); + } + + pthread_rwlock_unlock(&(pFileH->fhlock)); +} + +void tsdbGetFidGroup(STsdbCfg *pCfg, SFidGroup *pFidGroup) { + TSKEY now = taosGetTimestamp(pCfg->precision); + + pFidGroup->minFid = + TSDB_KEY_FILEID(now - pCfg->keep * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); + pFidGroup->midFid = + TSDB_KEY_FILEID(now - pCfg->keep2 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); + pFidGroup->maxFid = + TSDB_KEY_FILEID(now - pCfg->keep1 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); +} + +int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) { + STsdbFileH *pFileH = pRepo->tsdbFileH; + + for (int i = 0; i < pFileH->nFGroups; i++) { + SFileGroup ofg = pFileH->pFGroup[i]; + + int level = tsdbGetFidLevel(ofg.fileId, *pFidGroup); + ASSERT(level >= 0); + + if (level == ofg.files[0].file.level) continue; + + // COPY THE FILE GROUP TO THE RIGHT LEVEL + SFileGroup nfg = ofg; + int id = TFS_UNDECIDED_ID; + int type = 0; + for (; type < TSDB_FILE_TYPE_MAX; type++) { + tfsInitFile(&nfg.files[type].file, level, id, nfg.files[type].file.rname); + if (tfscopy(&(ofg.files[type].file), &(nfg.files[type].file)) < 0) { + if (terrno == TSDB_CODE_FS_INVLD_LEVEL) break; + tsdbError("vgId:%d failed to move fid %d from level %d to level %d since %s", REPO_ID(pRepo), ofg.fileId, + ofg.files[0].file.level, level, strerror(terrno)); + return -1; + } + + id = nfg.files[type].file.level; + id = nfg.files[type].file.id; + } + + if (type < TSDB_FILE_TYPE_MAX) continue; + + // Register new file into TSDB + pthread_rwlock_wrlock(&(pFileH->fhlock)); + pFileH->pFGroup[i] = nfg; + pthread_rwlock_unlock(&(pFileH->fhlock)); + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pFile = &(ofg.files[type]); + tfsremove(&(pFile->file)); + } + + tsdbDebug("vgId:%d move file group %d from level %d to level %d", REPO_ID(pRepo), ofg.fileId, + ofg.files[0].file.level, level); + } + + return 0; +} + +static void *tsdbScanAllFiles(STsdbRepo *pRepo) { + void * farray = NULL; + TDIR * tdir = NULL; + char dirName[TSDB_FILENAME_LEN] = "\0"; + char bname[TSDB_FILENAME_LEN] = "\0"; + regex_t regex1 = {0}; + const TFILE *pf = NULL; + + farray = taosArrayInit(256, sizeof(TFILE)); + if (farray == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat|l|d|h|s)$", REG_EXTENDED); + + snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", REPO_ID(pRepo)); + + tdir = tfsOpendir(dirName); + + while ((pf = tfsReaddir(tdir)) != NULL) { + tfsbasename(pf, bname); + + int code = regexec(®ex1, bname, 0, NULL, 0); + if (code != 0) { + tsdbWarn("vgId:%d file %s exists, ignore it", REPO_ID(pRepo), pf->aname); + continue; + } + + taosArrayPush(farray, pf); + } + + regfree(®ex1); + tfsClosedir(tdir); + + return farray; +} + +static int tsdbCompareFile(const void *arg1, const void *arg2) { + char bname1[TSDB_FILENAME_LEN] = "\0"; + char bname2[TSDB_FILENAME_LEN] = "\0"; + TFILE *pf1 = (TFILE *)arg1; + TFILE *pf2 = (TFILE *)arg2; + int vid1, fid1, vid2, fid2; + + tfsbasename(pf1, bname1); + tfsbasename(pf2, bname2); + + sscanf(bname1, "v%df%d", &vid1, &fid1); + sscanf(bname2, "v%df%d", &vid2, &fid2); + + ASSERT(vid1 == vid2); + if (fid1 < fid2) { + return -1; + } else if (fid1 == fid2) { + return 0; + } else { + return 1; + } +} + +static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile) { + char backname[TSDB_FILENAME_LEN*2] = "\0"; + char bname[TSDB_FILENAME_LEN] = "\0"; + STsdbFileH *pFileH = pRepo->tsdbFileH; + TFILE * pfArray[TSDB_FILE_TYPE_MAX] = {0}; + TFILE * pHf = NULL; + TFILE * pLf = NULL; + SFileGroup fg = {0}; + int vid = 0; + int fid = 0; + char suffix[TSDB_FILENAME_LEN] = "\0"; + + for (int i = 0; i < nfile; i++) { + TFILE *pf = pfiles + i; + + tfsbasename(pf, bname); + tsdbParseFname(bname, &vid, &fid, suffix); + + if (strcmp(suffix, ".head") == 0) { + pfArray[TSDB_FILE_TYPE_HEAD] = pf; + } else if (strcmp(suffix, ".data") == 0) { + pfArray[TSDB_FILE_TYPE_DATA] = pf; + } else if (strcmp(suffix, ".last") == 0) { + pfArray[TSDB_FILE_TYPE_LAST] = pf; + } else if (strcmp(suffix, ".l") == 0) { + pLf = pf; + } else if (strcmp(suffix, ".h") == 0) { + pHf = pf; + } else { + tsdbWarn("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), pf->aname); + } + } + + if (pfArray[TSDB_FILE_TYPE_HEAD] == NULL || pfArray[TSDB_FILE_TYPE_DATA] == NULL || pfArray[TSDB_FILE_TYPE_LAST] == NULL) { + for (int i = 0; i < nfile; i++) { + snprintf(backname, TSDB_FILENAME_LEN*2, "%s_bak", (pfiles + i)->aname); + rename((pfiles + i)->aname, backname); + } + + return -1; + } + + if (pHf == NULL) { + if (pLf != NULL) { + rename(pLf->aname, pfArray[TSDB_FILE_TYPE_LAST]->aname); + } + } else { + if (pLf != NULL) { + remove(pLf->aname); + } + remove(pHf->aname); + } + + // Register file + fg.fileId = fid; + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile * pFile = fg.files + type; + uint32_t version = 0; + + pFile->fd = -1; + pFile->file = *pfArray[type]; // TODO + tsdbOpenFile(pFile, O_RDONLY); + tsdbLoadFileHeader(pFile, &version); + tsdbCloseFile(pFile); + } + + pFileH->pFGroup[pFileH->nFGroups++] = fg; + + tfsIncDiskFile(pfArray[TSDB_FILE_TYPE_HEAD]->level, pfArray[TSDB_FILE_TYPE_HEAD]->id, TSDB_FILE_TYPE_MAX); + + return 0; +} + +static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix) { + sscanf(bname, "v%df%d%s", vid, fid, suffix); +} \ No newline at end of file