From b9c38d6e1a7f3398cd6c1fcfb0df9e7d6a845699 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 31 Dec 2020 10:45:47 +0000 Subject: [PATCH] partial work --- src/inc/tfs.h | 1 + src/tfs/src/tfs.c | 7 + src/tsdb/CMakeLists.txt | 1 + src/tsdb/inc/tsdbMain.h | 269 +++++++++++++++------- src/tsdb/src/tsdbCommit.c | 10 +- src/tsdb/src/tsdbFS.c | 469 ++++++++++++++++++++++++++++++++++++++ src/tsdb/src/tsdbMain.c | 2 +- 7 files changed, 674 insertions(+), 85 deletions(-) create mode 100644 src/tsdb/src/tsdbFS.c diff --git a/src/inc/tfs.h b/src/inc/tfs.h index 9fe4ac7225..1f47006a5b 100644 --- a/src/inc/tfs.h +++ b/src/inc/tfs.h @@ -57,6 +57,7 @@ typedef struct { #define TFILE_NAME(pf) ((pf)->aname) void tfsInitFile(TFILE *pf, int level, int id, const char *bname); +bool tfsIsSameFile(TFILE *pf1, TFILE *pf2); void tfsSetLevel(TFILE *pf, int level); void tfsSetID(TFILE *pf, int id); int tfsopen(TFILE *pf, int flags); diff --git a/src/tfs/src/tfs.c b/src/tfs/src/tfs.c index e48a3ca3b5..fc47b07973 100644 --- a/src/tfs/src/tfs.c +++ b/src/tfs/src/tfs.c @@ -175,6 +175,13 @@ void tfsInitFile(TFILE *pf, int level, int id, const char *bname) { tfsSetFileAname(pf); } +bool tfsIsSameFile(TFILE *pf1, TFILE *pf2) { + if (pf1->level != pf2->level) return false; + if (pf1->id != pf2->id) return false; + if (strncmp(pf1->rname, pf2->rname, TSDB_FILENAME_LEN) != 0) return false; + return true; +} + void tfsSetLevel(TFILE *pf, int level) { pf->level = level; diff --git a/src/tsdb/CMakeLists.txt b/src/tsdb/CMakeLists.txt index 31d52aae7d..13205e2212 100644 --- a/src/tsdb/CMakeLists.txt +++ b/src/tsdb/CMakeLists.txt @@ -3,6 +3,7 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) +list(REMOVE_ITEM SRC "src/tsdbFS.c") ADD_LIBRARY(tsdb ${SRC}) TARGET_LINK_LIBRARIES(tsdb tfs common tutil) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index f8bd05e880..c57315acd8 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -45,10 +45,6 @@ extern int32_t tsdbDebugFlag; #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) // ================= OTHERS -#define TSDB_MAX_TABLE_SCHEMAS 16 -#define TSDB_FILE_HEAD_SIZE 512 -#define TSDB_FILE_DELIMITER 0xF00AFA0F -#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF #define TAOS_IN_RANGE(key, keyMin, keyLast) (((key) >= (keyMin)) && ((key) <= (keyMax))) @@ -58,6 +54,8 @@ extern int32_t tsdbDebugFlag; // Definitions // ================= tsdbMeta.c +#define TSDB_MAX_TABLE_SCHEMAS 16 + typedef struct STable { STableId tableId; ETableType type; @@ -295,102 +293,211 @@ static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) { return dataRowTKey(row); } -// ================= tsdbFile.c -extern const char* tsdbFileSuffix[]; +// ================= tsdbFS.c +#define TSDB_FILE_HEAD_SIZE 512 +#define TSDB_FILE_DELIMITER 0xF00AFA0F +#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF -// minFid <= midFid <= maxFid +enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX }; + +// For meta file typedef struct { - int minFid; // >= minFid && < midFid, at level 2 - int midFid; // >= midFid && < maxFid, at level 1 - int maxFid; // >= maxFid, at level 0 -} SFidGroup; + int64_t size; + int64_t tombSize; + int64_t nRecords; + int64_t nDels; + uint32_t magic; +} SMFInfo; -typedef enum { - TSDB_FILE_TYPE_HEAD = 0, - TSDB_FILE_TYPE_DATA, - TSDB_FILE_TYPE_LAST, - TSDB_FILE_TYPE_STAT, - TSDB_FILE_TYPE_NHEAD, - TSDB_FILE_TYPE_NDATA, - TSDB_FILE_TYPE_NLAST, - TSDB_FILE_TYPE_NSTAT -} TSDB_FILE_TYPE; - -#ifndef TDINTERNAL -#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_LAST+1) -#else -#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_STAT+1) -#endif +typedef struct { + SMFInfo info; + TFILE f; + int fd; +} SMFile; +// For .head/.data/.last file typedef struct { uint32_t magic; uint32_t len; uint32_t totalBlocks; uint32_t totalSubBlocks; uint32_t offset; - uint64_t size; // total size of the file - uint64_t tombSize; // unused file size -} STsdbFileInfo; + uint64_t size; + uint64_t tombSize; +} SDFInfo; typedef struct { - TFILE file; - STsdbFileInfo info; - int fd; -} SFile; + SDFInfo info; + TFILE f; + int fd; +} SDFile; typedef struct { - int fileId; - int state; // 0 for health, 1 for problem - SFile files[TSDB_FILE_TYPE_MAX]; -} SFileGroup; + int id; + int state; + SDFile files[TSDB_FILE_MAX]; +} SDFileSet; + +/* Statistic information of the TSDB file system. + */ +typedef struct { + int64_t fsversion; // file system version, related to program + int64_t version; + int64_t totalPoints; + int64_t totalStorage; +} STsdbFSMeta; typedef struct { - pthread_rwlock_t fhlock; - - int maxFGroups; - int nFGroups; - SFileGroup* pFGroup; -} STsdbFileH; + int64_t version; + STsdbFSMeta meta; + SMFile mf; // meta file + SArray * df; // data file array +} SFSSnapshot; typedef struct { - int direction; - STsdbFileH* pFileH; - int fileId; - int index; -} SFileGroupIter; + pthread_rwlock_t lock; -#define TSDB_FILE_NAME(pFile) ((pFile)->file.aname) -#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) -#define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3) -#define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId -#define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId -#define TSDB_IS_FILE_OPENED(f) ((f)->fd > 0) -#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC -#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC + SFSSnapshot *curr; + SFSSnapshot *new; +} STsdbFS; -STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); -void tsdbFreeFileH(STsdbFileH* pFileH); -int tsdbOpenFileH(STsdbRepo* pRepo); -void tsdbCloseFileH(STsdbRepo* pRepo, bool isRestart); -SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level); -void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction); -void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid); -SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter); -int tsdbOpenFile(SFile* pFile, int oflag); -void tsdbCloseFile(SFile* pFile); -int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type); -SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags); -int tsdbGetFidLevel(int fid, SFidGroup fidg); -void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, SFidGroup* pFidGroup); -int tsdbUpdateFileHeader(SFile* pFile); -int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); -void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); -void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); -int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); -void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); -void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup); -void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); -int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup); +#define TSDB_FILE_INFO(tf) (&((tf)->info)) +#define TSDB_FILE_F(tf) (&((tf)->f))) +#define TSDB_FILE_FD(tf) ((tf)->fd) + +int tsdbOpenFS(STsdbRepo *pRepo); +void tsdbCloseFS(STsdbRepo *pRepo); +int tsdbFSNewTxn(STsdbRepo *pRepo); +int tsdbFSEndTxn(STsdbRepo *pRepo, bool hasError); +int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile); +int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet); +void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid); +int tsdbRemoveDFileSet(SDFileSet *pSet); + +static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) { + int code = pthread_rwlock_rdlock(&(pFs->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) { + int code = pthread_rwlock_wrlock(&(pFs->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { + int code = pthread_rwlock_unlock(&(pFs->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + + +// ================= tsdbFile.c +// extern const char* tsdbFileSuffix[]; + +// minFid <= midFid <= maxFid +// typedef struct { +// int minFid; // >= minFid && < midFid, at level 2 +// int midFid; // >= midFid && < maxFid, at level 1 +// int maxFid; // >= maxFid, at level 0 +// } SFidGroup; + +// typedef enum { +// TSDB_FILE_TYPE_HEAD = 0, +// TSDB_FILE_TYPE_DATA, +// TSDB_FILE_TYPE_LAST, +// TSDB_FILE_TYPE_STAT, +// TSDB_FILE_TYPE_NHEAD, +// TSDB_FILE_TYPE_NDATA, +// TSDB_FILE_TYPE_NLAST, +// TSDB_FILE_TYPE_NSTAT +// } TSDB_FILE_TYPE; + +// #ifndef TDINTERNAL +// #define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_LAST+1) +// #else +// #define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_STAT+1) +// #endif + +// typedef struct { +// uint32_t magic; +// uint32_t len; +// uint32_t totalBlocks; +// uint32_t totalSubBlocks; +// uint32_t offset; +// uint64_t size; // total size of the file +// uint64_t tombSize; // unused file size +// } STsdbFileInfo; + +// typedef struct { +// TFILE file; +// STsdbFileInfo info; +// int fd; +// } SFile; + +// typedef struct { +// int fileId; +// int state; // 0 for health, 1 for problem +// SFile files[TSDB_FILE_TYPE_MAX]; +// } SFileGroup; + +// typedef struct { +// pthread_rwlock_t fhlock; + +// int maxFGroups; +// int nFGroups; +// SFileGroup* pFGroup; +// } STsdbFileH; + +// typedef struct { +// int direction; +// STsdbFileH* pFileH; +// int fileId; +// int index; +// } SFileGroupIter; + +// #define TSDB_FILE_NAME(pFile) ((pFile)->file.aname) +// #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) +// #define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3) +// #define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId +// #define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId +// #define TSDB_IS_FILE_OPENED(f) ((f)->fd > 0) +// #define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC +// #define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC + +// STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); +// void tsdbFreeFileH(STsdbFileH* pFileH); +// int tsdbOpenFileH(STsdbRepo* pRepo); +// void tsdbCloseFileH(STsdbRepo* pRepo, bool isRestart); +// SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level); +// void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction); +// void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid); +// SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter); +// int tsdbOpenFile(SFile* pFile, int oflag); +// void tsdbCloseFile(SFile* pFile); +// int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type); +// SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags); +// int tsdbGetFidLevel(int fid, SFidGroup fidg); +// void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, SFidGroup* pFidGroup); +// int tsdbUpdateFileHeader(SFile* pFile); +// int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); +// void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); +// void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); +// int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); +// void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); +// void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup); +// void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); +// int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup); // ================= tsdbMain.c typedef struct { @@ -416,7 +523,7 @@ struct STsdbRepo { STsdbBufPool* pPool; SMemTable* mem; SMemTable* imem; - STsdbFileH* tsdbFileH; + STsdbFS* fs; sem_t readyToCommit; pthread_mutex_t mutex; bool repoLocked; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 686e171cd4..fe60cbf40d 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -147,8 +147,8 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { goto _err; } - // TODO - // tsdbUpdateMFile(pRepo, NULL) + // TODO: update meta file + tsdbUpdateMFile(pRepo, NULL); return 0; @@ -162,13 +162,17 @@ static int tsdbStartCommit(STsdbRepo *pRepo) { tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d", REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList)); - // TODO + if (tsdbFSNewTxn(pRepo) < 0) return -1; pRepo->code = TSDB_CODE_SUCCESS; return 0; } static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { + if (tsdbFSEndTxn(pRepo, eno != TSDB_CODE_SUCCESS) < 0) { + eno = terrno; + } + tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno); diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c new file mode 100644 index 0000000000..86204f40f9 --- /dev/null +++ b/src/tsdb/src/tsdbFS.c @@ -0,0 +1,469 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include + +#include "tsdbMain.h" + +#define REPO_FS(r) ((r)->fs) +#define TSDB_MAX_DFILES(keep, days) ((keep) / (days) + 3) + +int tsdbOpenFS(STsdbRepo *pRepo) { + ASSERT(REPO_FS == NULL); + + STsdbCfg *pCfg = TSDB_CFG(pRepo); + + // Create fs object + REPO_FS(pRepo) = tsdbNewFS(pCfg->keep, pCfg->daysPerFile); + if (REPO_FS(pRepo) == NULL) { + tsdbError("vgId:%d failed to open TSDB FS since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + + // Load TSDB file system from disk + if (tsdbOpenFSImpl(pRepo) < 0) { + tsdbError("vgId:%d failed to open TSDB FS since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbCloseFS(pRepo); + return -1; + } + + return 0; +} + +void tsdbCloseFS(STsdbRepo *pRepo) { + REPO_FS(pRepo) = tsdbFreeFS(REPO_FS(pRepo)); + return 0; +} + +// Start a new FS transaction +int tsdbFSNewTxn(STsdbRepo *pRepo) { + STsdbFS *pFs = REPO_FS(pRepo); + + if (tsdbCopySnapshot(pFs->curr, pFs->new) < 0) { + return -1; + } + + pFs->new->version++; + + return 0; +} + +// End an existing FS transaction +int tsdbFSEndTxn(STsdbRepo *pRepo, bool hasError) { + STsdbFS *pFs = REPO_FS(pRepo); + + if (hasError) { // roll back files + + } else { // apply file change + if (tsdbSaveFSSnapshot(-1, pFs->new) < 0) { + // TODO + } + + // rename(); + + // apply all file changes + + } + + return 0; +} + +int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile) { + STsdbFS *pFs = REPO_FS(pRepo); + pFs->new->mf = *pMFile; + return 0; +} + +int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { + SFSSnapshot *pSnapshot = REPO_FS(pRepo)->new; + SDFileSet * pOldSet; + + pOldSet = tsdbSearchDFileSet(pSnapshot, pSet->id, TD_GE); + if (pOldSet == NULL) { + if (taosArrayPush(pSnapshot->df, pSet) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } else { + int index = TARRAY_ELEM_IDX(dfArray, ptr); + + if (pOldSet->id == pSet->id) { + taosArraySet(pSnapshot->df, index, pSet); + } else if (pOldSet->id > pSet->id) { + if (taosArrayInsert(pSnapshot->df, index, pSet) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } else { + ASSERT(0); + } + } + + return 0; +} + +void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) { + SFSSnapshot *pSnapshot = REPO_FS(pRepo)->new; + while (taosArrayGetSize(pSnapshot->df) > 0) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(pSnapshot->df, 0); + if (pSet->id < mfid) { + taosArrayRemove(pSnapshot->df, 0); + } + } +} + +static int tsdbSaveFSSnapshot(int fd, SFSSnapshot *pSnapshot) { + // TODO + return 0; +} + +static int tsdbLoadFSSnapshot(SFSSnapshot *pSnapshot) { + // TODO + return 0; +} + +static int tsdbOpenFSImpl(STsdbRepo *pRepo) { + char manifest[TSDB_FILENAME_LEN] = "\0"; + + // TODO: use API here + sprintf(manifest, "%s/manifest", pRepo->rootDir); + + if (access(manifest, F_OK) == 0) { + // manifest file exist, just load + // TODO + } else { + // manifest file not exists, scan all the files and construct + // TODO + } + + return 0; +} + +static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) { + int tlen = 0; + + tlen += taosEncodeVariantI64(buf, pInfo->size); + tlen += taosEncodeVariantI64(buf, pInfo->tombSize); + tlen += taosEncodeVariantI64(buf, pInfo->nRecords); + tlen += taosEncodeVariantI64(buf, pInfo->nDels); + tlen += taosEncodeFixedU32(buf, pInfo->magic); + + return tlen; +} + +static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) { + buf = taosDecodeVariantI64(buf, &(pInfo->size)); + buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); + buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); + buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); + + return buf; +} + +static int tsdbEncodeMFile(void **buf, SMFile *pMFile) { + int tlen = 0; + + tlen += tsdbEncodeMFInfo(buf, &(pMFile->info)); + tlen += tfsEncodeFile(buf, &(pMFile->f)); + + return tlen; +} + +static void *tsdbDecodeMFile(void *buf, SMFile *pMFile) { + buf = tsdbDecodeMFInfo(buf, &(pMFile->info)); + buf = tfsDecodeFile(buf, &(pMFile->f)); + + return buf; +} + +static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { + int tlen = 0; + + tlen += taosEncodeFixedU32(buf, pInfo->magic); + tlen += taosEncodeFixedU32(buf, pInfo->len); + tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks); + tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks); + tlen += taosEncodeFixedU32(buf, pInfo->offset); + tlen += taosEncodeFixedU64(buf, pInfo->size); + tlen += taosEncodeFixedU64(buf, pInfo->tombSize); + + return tlen; +} + +static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) { + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); + buf = taosDecodeFixedU32(buf, &(pInfo->len)); + buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); + buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks)); + buf = taosDecodeFixedU32(buf, &(pInfo->offset)); + buf = taosDecodeFixedU64(buf, &(pInfo->size)); + buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); + + return buf; +} + +static int tsdbEncodeDFile(void **buf, SDFile *pDFile) { + int tlen = 0; + + tlen += tsdbEncodeDFInfo(buf, &(pDFile->info)); + tlen += tfsEncodeFile(buf, &(pDFile->f)); + + return tlen; +} + +static void *tsdbDecodeDFile(void *buf, SDFile *pDFile) { + buf = tsdbDecodeDFInfo(buf, &(pDFile->info)); + buf = tfsDecodeFile(buf, &(pDFile->f)); + + return buf; +} + +static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) { + int tlen = 0; + + tlen += taosEncodeVariantI64(buf, pMeta->fsversion); + tlen += taosEncodeVariantI64(buf, pMeta->version); + tlen += taosEncodeVariantI64(buf, pMeta->totalPoints); + tlen += taosEncodeVariantI64(buf, pMeta->totalStorage); + + return tlen; +} + +static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) { + buf = taosDecodeVariantI64(buf, &(pMeta->fsversion)); + buf = taosDecodeVariantI64(buf, &(pMeta->version)); + buf = taosDecodeVariantI64(buf, &(pMeta->totalPoints)); + buf = taosDecodeVariantI64(buf, &(pMeta->totalStorage)); + + return buf; +} + +static int tsdbEncodeFSSnapshot(void **buf, SFSSnapshot *pSnapshot) { + int tlen = 0; + int64_t size = 0; + + // Encode meta file + tlen += tsdbEncodeMFile(buf, &(pSnapshot->mf)); + + // Encode data files + size = taosArrayGetSize(pSnapshot->df); + tlen += taosEncodeVariantI64(buf, size); + for (size_t index = 0; index < size; index++) { + SDFile *pFile = taosArrayGet(pSnapshot->df, index); + + tlen += tsdbEncodeDFInfo(buf, &pFile); + } + + + return tlen; +} + +static void *tsdbDecodeFSSnapshot(void *buf, SFSSnapshot *pSnapshot) { + int64_t size = 0; + SDFile df; + + // Decode meta file + buf = tsdbDecodeMFile(buf, &(pSnapshot->mf)); + + // Decode data files + buf = taosDecodeVariantI64(buf, &size); + for (size_t index = 0; index < size; index++) { + buf = tsdbDecodeDFInfo(buf, &df); + taosArrayPush(pSnapshot->df, (void *)(&df)); + } + + return buf; +} + +static SFSSnapshot *tsdbNewSnapshot(int32_t nfiles) { + SFSSnapshot *pSnapshot; + + pSnapshot = (SFSSnapshot *)calloc(1, sizeof(pSnapshot)); + if (pSnapshot == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + pSnapshot->df = taosArrayInit(nfiles, sizeof(SDFileSet)); + if (pSnapshot->df == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + free(pSnapshot); + return NULL; + } + + return pSnapshot; +} + +static SFSSnapshot *tsdbFreeSnapshot(SFSSnapshot *pSnapshot) { + if (pSnapshot) { + taosArrayDestroy(pSnapshot->df); + free(pSnapshot); + } + + return NULL; +} + +static STsdbFS *tsdbNewFS(int32_t keep, int32_t days) { + STsdbFS *pFs; + int code; + int32_t nfiles; + + pFs = (STsdbFS *)calloc(1, sizeof(*pFs)); + if (pFs == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + code = pthread_rwlock_init(&(pFs->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + free(pFs); + return NULL; + } + + nfiles = TSDB_MAX_DFILES(keep, days); + if (((pFs->curr = tsdbNewSnapshot(nfiles)) == NULL) || ((pFs->new = tsdbNewSnapshot(nfiles)) == NULL)) { + tsdbFreeFS(pFs); + return NULL; + } + + return pFs; +} + +static STsdbFS *tsdbFreeFS(STsdbFS *pFs) { + if (pFs) { + pFs->new = tsdbFreeSnapshot(pFs->new); + pFs->curr = tsdbFreeSnapshot(pFs->curr); + pthread_rwlock_destroy(&(pFs->lock)); + free(pFs); + } + + return NULL; +} + +static int tsdbCopySnapshot(SFSSnapshot *src, SFSSnapshot *dst) { + dst->meta = src->meta; + dst->mf = src->meta; + taosArrayCopy(dst->df, src->df); + return 0; +} + +static int tsdbCompFSetId(const void *key1, const void *key2) { + int id = *(int *)key1; + SDFileSet *pSet = (SDFileSet *)key2; + + if (id < pSet->id) { + return -1; + } else if (id == pSet->id) { + return 0; + } else { + return 1; + } +} + +static SDFileSet *tsdbSearchDFileSet(SFSSnapshot *pSnapshot, int fid, int flags) { + void *ptr = taosArraySearch(pSnapshot->df, (void *)(&fid), tsdbCompFSetId, flags); + return (ptr == NULL) ? NULL : ((SDFileSet *)ptr); +} + +static int tsdbMakeFSChange(STsdbRepo *pRepo) { + tsdbMakeFSMFileChange(pRepo); + tsdbMakeFSDFileChange(pRepo); + return 0; +} + +static int tsdbMakeFSMFileChange(STsdbRepo *pRepo) { + STsdbFS *pFs = REPO_FS(pRepo); + SMFile * pDstMFile = &(pFs->curr->mf); + SMFile * pSrcMFile = &(pFs->new->mf); + + if (tfsIsSameFile(&(pDstMFile->f), &(pSrcMFile->f))) { // the same file + if (pDstMFile->info != pSrcMFile->info) { + if (pDstMFile->info.size > pDstMFile->info.size) { + // Commit succeed, do nothing + } else if (pDstMFile->info.size < pDstMFile->info.size) { + // Commit failed, back + // TODO + } else { + ASSERT(0); + } + } + } else { + tfsremove(&(pSrcMFile->f)); + } + + return 0; +} + +static int tsdbMakeFSDFileChange(STsdbRepo *pRepo) { + STsdbFS * pFs = REPO_FS(pRepo); + int cidx = 0; + int nidx = 0; + SDFileSet *pCSet = NULL; + SDFileSet *pNSet = NULL; + + if (cidx < taosArrayGetSize(pFs->curr->df)) { + pCSet = taosArrayGet(pFs->curr->df, cidx); + } else { + pCSet = NULL; + } + + if (nidx < taosArrayGetSize(pFs->new->df)) { + pNSet = taosArrayGet(pFs->new->df, nidx); + } else { + pNSet = NULL; + } + + while (true) { + if (pCSet == NULL && pNSet == NULL) break; + + if (pCSet == NULL || (pNSet != NULL && pCSet->id > pNSet->id)) { + tsdbRemoveDFileSet(pNSet); + + nidx++; + if (nidx < taosArrayGetSize(pFs->new->df)) { + pNSet = taosArrayGet(pFs->new->df, nidx); + } else { + pNSet = NULL; + } + } else if (pNSet == NULL || (pCSet != NULL && pCSet->id < pNSet->id)) { + cidx++; + if (cidx < taosArrayGetSize(pFs->curr->df)) { + pCSet = taosArrayGet(pFs->curr->df, cidx); + } else { + pCSet = NULL; + } + } else { + // TODO: apply dfileset change + nidx++; + if (nidx < taosArrayGetSize(pFs->new->df)) { + pNSet = taosArrayGet(pFs->new->df, nidx); + } else { + pNSet = NULL; + } + + cidx++; + if (cidx < taosArrayGetSize(pFs->curr->df)) { + pCSet = taosArrayGet(pFs->curr->df, cidx); + } else { + pCSet = NULL; + } + } + } + + return 0; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 7d02bd9ea4..c6c537c17c 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -109,7 +109,7 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) { goto _err; } - if (tsdbOpenFileH(pRepo) < 0) { + if (tsdbOpenFS(pRepo) < 0) { tsdbError("vgId:%d failed to open file handle since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; }