From 8517faaefa9c17d4b9459fcc76f1bd639bb30182 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 22 Oct 2020 09:50:12 +0000 Subject: [PATCH] finish more code --- src/common/inc/tdisk.h | 32 +++++---- src/common/src/tdisk.c | 86 +++++++++++++----------- src/dnode/src/dnodeMain.c | 8 +-- src/tsdb/inc/tsdbMain.h | 18 +++-- src/tsdb/src/tsdbFile.c | 127 +++++++++++++++++++++++++++++++----- src/tsdb/src/tsdbMemTable.c | 11 ++-- 6 files changed, 204 insertions(+), 78 deletions(-) diff --git a/src/common/inc/tdisk.h b/src/common/inc/tdisk.h index 04f7ba71ab..9cc53fa0d2 100644 --- a/src/common/inc/tdisk.h +++ b/src/common/inc/tdisk.h @@ -38,6 +38,13 @@ typedef struct { } SDiskMeta; typedef struct { + uint64_t tsize; + uint64_t avail; // bytes +} STiersMeta; + +typedef struct { + int level; + int did; char dir[TSDB_FILENAME_LEN]; SDiskMeta dmeta; } SDisk; @@ -50,6 +57,7 @@ typedef struct { typedef struct SDnodeTier { pthread_mutex_t lock; + STiersMeta meta; int nTiers; STier tiers[TSDB_MAX_TIERS]; SHashObj * map; @@ -58,7 +66,7 @@ typedef struct SDnodeTier { extern struct SDnodeTier *tsDnodeTier; #define DNODE_PRIMARY_DISK(pDnodeTier) (pDnodeTier)->tiers[0].disks[0] -static FORCE_INLINE int dnodeLockTiers(SDnodeTier *pDnodeTier) { +static FORCE_INLINE int tdLockTiers(SDnodeTier *pDnodeTier) { int code = pthread_mutex_lock(&(pDnodeTier->lock)); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); @@ -67,7 +75,7 @@ static FORCE_INLINE int dnodeLockTiers(SDnodeTier *pDnodeTier) { return 0; } -static FORCE_INLINE int dnodeUnLockTiers(SDnodeTier *pDnodeTier) { +static FORCE_INLINE int tdUnLockTiers(SDnodeTier *pDnodeTier) { int code = pthread_mutex_unlock(&(pDnodeTier->lock)); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); @@ -76,7 +84,7 @@ static FORCE_INLINE int dnodeUnLockTiers(SDnodeTier *pDnodeTier) { return 0; } -static FORCE_INLINE SDisk *dnodeGetDisk(SDnodeTier *pDnodeTier, int level, int did) { +static FORCE_INLINE SDisk *tdGetDisk(SDnodeTier *pDnodeTier, int level, int did) { if (level < 0 || level >= pDnodeTier->nTiers) return NULL; if (did < 0 || did >= pDnodeTier->tiers[level].nDisks) return NULL; @@ -84,15 +92,15 @@ static FORCE_INLINE SDisk *dnodeGetDisk(SDnodeTier *pDnodeTier, int level, int d return pDnodeTier->tiers[level].disks[did]; } -SDnodeTier *dnodeNewTier(); -void * dnodeCloseTier(SDnodeTier *pDnodeTier); -int dnodeAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks); -int dnodeUpdateTiersInfo(SDnodeTier *pDnodeTier); -int dnodeCheckTiers(SDnodeTier *pDnodeTier); -SDisk * dnodeAssignDisk(SDnodeTier *pDnodeTier, int level); -SDisk * dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName); -void dnodeIncDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock); -void dnodeDecDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock); +SDnodeTier *tdNewTier(); +void * tdCloseTier(SDnodeTier *pDnodeTier); +int tdAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks); +int tdUpdateTiersInfo(SDnodeTier *pDnodeTier); +int tdCheckTiers(SDnodeTier *pDnodeTier); +SDisk * tdAssignDisk(SDnodeTier *pDnodeTier, int level); +SDisk * tdGetDiskByName(SDnodeTier *pDnodeTier, char *dirName); +void tdIncDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock); +void tdDecDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock); #ifdef __cplusplus } diff --git a/src/common/src/tdisk.c b/src/common/src/tdisk.c index 9d678d64a1..425a66058f 100644 --- a/src/common/src/tdisk.c +++ b/src/common/src/tdisk.c @@ -20,14 +20,14 @@ #define DISK_MIN_FREE_SPACE 30 * 1024 * 1024 // disk free space less than 100M will not create new file again #define DNODE_DISK_AVAIL(pDisk) ((pDisk)->dmeta.free > DISK_MIN_FREE_SPACE) -static int dnodeFormatDir(char *idir, char *odir); -static int dnodeCheckDisk(char *dirName, int level, int primary); -static int dnodeUpdateDiskMeta(SDisk *pDisk); -static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary); +static int tdFormatDir(char *idir, char *odir); +static int tdCheckDisk(char *dirName, int level, int primary); +static int tdUpdateDiskMeta(SDisk *pDisk); +static int tdAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary); struct SDnodeTier *tsDnodeTier = NULL; -SDnodeTier *dnodeNewTier() { +SDnodeTier *tdNewTier() { SDnodeTier *pDnodeTier = (SDnodeTier *)calloc(1, sizeof(*pDnodeTier)); if (pDnodeTier == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -37,7 +37,7 @@ SDnodeTier *dnodeNewTier() { int ret = pthread_mutex_init(&(pDnodeTier->lock), NULL); if (ret != 0) { terrno = TAOS_SYSTEM_ERROR(ret); - dnodeCloseTier(pDnodeTier); + tdCloseTier(pDnodeTier); return NULL; } @@ -45,14 +45,14 @@ SDnodeTier *dnodeNewTier() { taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (pDnodeTier->map == NULL) { terrno = TSDB_CODE_COM_OUT_OF_MEMORY; - dnodeCloseTier(pDnodeTier); + tdCloseTier(pDnodeTier); return NULL; } return pDnodeTier; } -void *dnodeCloseTier(SDnodeTier *pDnodeTier) { +void *tdCloseTier(SDnodeTier *pDnodeTier) { if (pDnodeTier) { if (pDnodeTier->map) { taosHashCleanup(pDnodeTier->map); @@ -75,32 +75,42 @@ void *dnodeCloseTier(SDnodeTier *pDnodeTier) { return NULL; } -int dnodeAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks) { +int tdAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks) { ASSERT(ndisks > 0); for (int i = 0; i < ndisks; i++) { SDiskCfg *pCfg = pDiskCfgs + i; - dnodeAddDisk(pDnodeTier, pCfg->dir, pCfg->level, pCfg->primary); + tdAddDisk(pDnodeTier, pCfg->dir, pCfg->level, pCfg->primary); } - if (dnodeCheckTiers(pDnodeTier) < 0) return -1; + if (tdCheckTiers(pDnodeTier) < 0) return -1; return 0; } -int dnodeUpdateTiersInfo(SDnodeTier *pDnodeTier) { +int tdUpdateTiersInfo(SDnodeTier *pDnodeTier) { + tdLockTiers(pDnodeTier); + + pDnodeTier->meta.tsize = 0; + pDnodeTier->meta.avail = 0; + for (int i = 0; i < pDnodeTier->nTiers; i++) { STier *pTier = pDnodeTier->tiers + i; for (int j = 0; j < pTier->nDisks; j++) { SDisk *pDisk = pTier->disks[j]; - if (dnodeUpdateDiskMeta(pDisk) < 0) return -1; + if (tdUpdateDiskMeta(pDisk) < 0) return -1; + + pDnodeTier->meta.tsize += pDisk->dmeta.size; + pDnodeTier->meta.avail += pDisk->dmeta.free; } } + + tdUnLockTiers(pDnodeTier); return 0; } -int dnodeCheckTiers(SDnodeTier *pDnodeTier) { +int tdCheckTiers(SDnodeTier *pDnodeTier) { ASSERT(pDnodeTier->nTiers > 0); if (DNODE_PRIMARY_DISK(pDnodeTier) == NULL) { terrno = TSDB_CODE_DND_LACK_PRIMARY_DISK; @@ -117,7 +127,7 @@ int dnodeCheckTiers(SDnodeTier *pDnodeTier) { return 0; } -SDisk *dnodeAssignDisk(SDnodeTier *pDnodeTier, int level) { +SDisk *tdAssignDisk(SDnodeTier *pDnodeTier, int level) { ASSERT(level < pDnodeTier->nTiers); STier *pTier = pDnodeTier->tiers + level; @@ -125,11 +135,11 @@ SDisk *dnodeAssignDisk(SDnodeTier *pDnodeTier, int level) { ASSERT(pTier->nDisks > 0); - dnodeLockTiers(pDnodeTier); + tdLockTiers(pDnodeTier); for (int i = 0; i < pTier->nDisks; i++) { SDisk *iDisk = pTier->disks[i]; - if (dnodeUpdateDiskMeta(iDisk) < 0) return NULL; + if (tdUpdateDiskMeta(iDisk) < 0) return NULL; if (DNODE_DISK_AVAIL(iDisk)) { if (pDisk == NULL || pDisk->dmeta.nfiles > iDisk->dmeta.nfiles) { pDisk = iDisk; @@ -139,22 +149,22 @@ SDisk *dnodeAssignDisk(SDnodeTier *pDnodeTier, int level) { if (pDisk == NULL) { terrno = TSDB_CODE_DND_NO_DISK_SPACE; - dnodeUnLockTiers(pDnodeTier); + tdUnLockTiers(pDnodeTier); return NULL; } - dnodeIncDiskFiles(pDnodeTier, pDisk, false); + tdIncDiskFiles(pDnodeTier, pDisk, false); - dnodeUnLockTiers(pDnodeTier); + tdUnLockTiers(pDnodeTier); return NULL; } -SDisk *dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName) { +SDisk *tdGetDiskByName(SDnodeTier *pDnodeTier, char *dirName) { char fdirName[TSDB_FILENAME_LEN] = "\0"; SDiskID *pDiskID = NULL; - if (dnodeFormatDir(dirName, fdirName) < 0) { + if (tdFormatDir(dirName, fdirName) < 0) { return NULL; } @@ -162,34 +172,34 @@ SDisk *dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName) { if (ptr == NULL) return NULL; pDiskID = (SDiskID *)ptr; - return dnodeGetDisk(pDnodeTier, pDiskID->level, pDiskID->did); + return tdGetDisk(pDnodeTier, pDiskID->level, pDiskID->did); } -void dnodeIncDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock) { +void tdIncDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock) { if (lock) { - dnodeLockTiers(pDnodeTier); + tdLockTiers(pDnodeTier); } pDisk->dmeta.nfiles++; if (lock) { - dnodeUnLockTiers(pDnodeTier); + tdUnLockTiers(pDnodeTier); } } -void dnodeDecDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock) { +void tdDecDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock) { if (lock) { - dnodeLockTiers(pDnodeTier); + tdLockTiers(pDnodeTier); } pDisk->dmeta.nfiles--; if (lock) { - dnodeUnLockTiers(pDnodeTier); + tdUnLockTiers(pDnodeTier); } } -static int dnodeFormatDir(char *idir, char *odir) { +static int tdFormatDir(char *idir, char *odir) { wordexp_t wep; int code = wordexp(idir, &wep, 0); @@ -210,7 +220,7 @@ static int dnodeFormatDir(char *idir, char *odir) { return 0; } -static int dnodeCheckDisk(char *dirName, int level, int primary) { +static int tdCheckDisk(char *dirName, int level, int primary) { if (access(dirName, W_OK | R_OK | F_OK) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -230,7 +240,7 @@ static int dnodeCheckDisk(char *dirName, int level, int primary) { } } -static int dnodeUpdateDiskMeta(SDisk *pDisk) { +static int tdUpdateDiskMeta(SDisk *pDisk) { struct statvfs dstat; if (statvfs(pDisk->dir, &dstat) < 0) { uError("failed to get dir %s information since %s", pDisk->dir, strerror(errno)); @@ -244,7 +254,7 @@ static int dnodeUpdateDiskMeta(SDisk *pDisk) { return 0; } -static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary) { +static int tdAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary) { char dirName[TSDB_FILENAME_LEN] = "\0"; STier * pTier = NULL; SDiskID diskid = {0}; @@ -256,7 +266,7 @@ static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primar return -1; } - if (dnodeFormatDir(dir, dirName) < 0) { + if (tdFormatDir(dir, dirName) < 0) { uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); return -1; } @@ -270,13 +280,13 @@ static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primar return -1; } - if (dnodeGetDiskByName(pDnodeTier, dirName) != NULL) { + if (tdGetDiskByName(pDnodeTier, dirName) != NULL) { terrno = TSDB_CODE_DND_DISK_ALREADY_EXISTS; uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); return -1; } - if (dnodeCheckDisk(dirName, level, primary) < 0) { + if (tdCheckDisk(dirName, level, primary) < 0) { uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno)); return -1; } @@ -320,6 +330,8 @@ static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primar } strncpy(pDisk->dir, dirName, TSDB_FILENAME_LEN); + pDisk->level = diskid.level; + pDisk->did = diskid.did; if (taosHashPut(pDnodeTier->map, (void *)dirName, strnlen(dirName, TSDB_FILENAME_LEN), (void *)(&diskid), sizeof(diskid)) < 0) { @@ -331,7 +343,7 @@ static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primar pTier->nDisks++; pTier->disks[diskid.did] = pDisk; - pDnodeTier->nTiers = MAX(pDnodeTier->nTiers, level); + pDnodeTier->nTiers = MAX(pDnodeTier->nTiers, level + 1); return 0; } \ No newline at end of file diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 3a02e09a48..c4cdf7bee5 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -170,13 +170,13 @@ static void dnodeCheckDataDirOpenned(char *dir) { } static int32_t dnodeInitStorage() { - tsDnodeTier = dnodeNewTier(); + tsDnodeTier = tdNewTier(); if (tsDnodeTier == NULL) { dError("failed to create new dnode tier since %s", tstrerror(terrno)); return -1; } - if (dnodeAddDisks(tsDnodeTier, tsDiskCfg, tsDiskCfgNum) < 0) { + if (tdAddDisks(tsDnodeTier, tsDiskCfg, tsDiskCfgNum) < 0) { dError("failed to add disks to dnode tier since %s", tstrerror(terrno)); return -1; } @@ -201,7 +201,7 @@ static int32_t dnodeInitStorage() { STier *pTier = tsDnodeTier->tiers + i; for (int j = 0; j < pTier->nDisks; j++) { - SDisk *pDisk = dnodeGetDisk(tsDnodeTier, i, j); + SDisk *pDisk = tdGetDisk(tsDnodeTier, i, j); tdGetVnodeRootDir(dirName, pDisk->dir); if (dnodeCreateDir(dirName) < 0) { @@ -225,7 +225,7 @@ static int32_t dnodeInitStorage() { static void dnodeCleanupStorage() { if (tsDnodeTier) { - dnodeCloseTier(tsDnodeTier); + tdCloseTier(tsDnodeTier); tsDnodeTier = NULL; } } diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 65697dbcfe..1a73eae908 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -153,6 +153,14 @@ typedef struct { // ------------------ tsdbFile.c extern const char* tsdbFileSuffix[]; + +// minFid <= midFid <= maxFid +typedef struct { + int minFid; // >= minFid && < midFid, at level 2 + int midFid; // >= midFid && < maxFid, at level 1 + int maxFid; // >= maxFid, at level 0 +} SFidGroup; + typedef enum { TSDB_FILE_TYPE_HEAD = 0, TSDB_FILE_TYPE_DATA, @@ -189,7 +197,9 @@ typedef struct { typedef struct { int fileId; - int state; // 0 for health, 1 for problem + int state; // 0 for health, 1 for problem + int level; + int did; SFile files[TSDB_FILE_TYPE_MAX]; } SFileGroup; @@ -483,17 +493,17 @@ int tsdbOpenFile(SFile* pFile, int oflag); void tsdbCloseFile(SFile* pFile); int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type, SDisk* pDisk); SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags); -void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, int mfid); +void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, SFidGroup* pFidGroup); int tsdbUpdateFileHeader(SFile* pFile); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); +void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup); void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); -int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days); int tsdbGetBaseDirFromFile(char* fname, char* baseDir); -int tsdbApplyRetention(STsdbRepo* pRepo); +int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup); // ------------------ tsdbRWHelper.c #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 7fe90c58cd..365f4953c4 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -31,10 +31,11 @@ const char * tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", static void tsdbDestroyFile(SFile *pFile); static int compFGroup(const void *arg1, const void *arg2); static int keyFGroupCompFunc(const void *key, const void *fgroup); -static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep); static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk); static SHashObj *tsdbGetAllFids(STsdbRepo *pRepo, char *dirName); static int tsdbRestoreFileGroup(STsdbRepo *pRepo, SDisk *pDisk, int fid, SFileGroup *pFileGroup); +static int tsdbGetFidLevel(int fid, SFidGroup *pFidGroup); +static int tsdbCreateVnodeDataDir(char *baseDir, int vid); // ---------------- INTERNAL FUNCTIONS ---------------- STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { @@ -115,13 +116,15 @@ SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid) { ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL); // TODO: think about if (level == 0) is correct - SDisk *pDisk = dnodeAssignDisk(tsDnodeTier, 0); + SDisk *pDisk = tdAssignDisk(tsDnodeTier, 0); if (pDisk == NULL) { tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); return NULL; } fGroup.fileId = fid; + fGroup.level = pDisk->level; + fGroup.did = pDisk->did; for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbCreateFile(&(fGroup.files[type]), pRepo, fid, type, pDisk) < 0) goto _err; } @@ -136,7 +139,7 @@ _err: for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { tsdbDestroyFile(&(fGroup.files[type])); } - dnodeDecDiskFiles(tsDnodeTier, pDisk, true); + tdDecDiskFiles(tsDnodeTier, pDisk, true); return NULL; } @@ -270,13 +273,13 @@ SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) { return (SFileGroup *)ptr; } -void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, int mfid) { +void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) { STsdbFileH *pFileH = pRepo->tsdbFileH; SFileGroup *pGroup = pFileH->pFGroup; pthread_rwlock_wrlock(&(pFileH->fhlock)); - while (pFileH->nFGroups > 0 && pGroup[0].fileId < mfid) { + while (pFileH->nFGroups > 0 && pGroup[0].fileId < pFidGroup->minFid) { tsdbRemoveFileGroup(pRepo, pGroup); } @@ -339,7 +342,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { SFileGroup fileGroup = *pFGroup; tsdbGetBaseDirFromFile(fileGroup.files[0].fname, baseDir); - pDisk = dnodeGetDiskByName(tsDnodeTier, baseDir); + pDisk = tdGetDiskByName(tsDnodeTier, baseDir); ASSERT(pDisk != NULL); int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1); @@ -357,7 +360,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { tsdbDestroyFile(&fileGroup.files[type]); } - pDisk->dmeta.nfiles--; + tdDecDiskFiles(tsDnodeTier, pDisk, true); } int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) { @@ -415,8 +418,15 @@ _err: *size = 0; } -int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) { - return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision)); +void tsdbGetFidGroup(STsdbCfg *pCfg, SFidGroup *pFidGroup) { + TSKEY now = taosGetTimestamp(pCfg->precision); + + pFidGroup->minFid = + TSDB_KEY_FILEID(now - pCfg->keep * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); + pFidGroup->midFid = + TSDB_KEY_FILEID(now - pCfg->keep2 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); + pFidGroup->maxFid = + TSDB_KEY_FILEID(now - pCfg->keep1 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); } int tsdbGetBaseDirFromFile(char *fname, char *baseDir) { @@ -435,8 +445,54 @@ int tsdbGetBaseDirFromFile(char *fname, char *baseDir) { return 0; } -int tsdbApplyRetention(STsdbRepo *pRepo) { - // TODO +int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) { + STsdbFileH *pFileH = pRepo->tsdbFileH; + SFileGroup *pGroup = NULL; + SFileGroup nFileGroup = {0}; + SFileGroup oFileGroup = {0}; + int level = 0; + + if (tsDnodeTier->nTiers == 1 || (pFidGroup->minFid == pFidGroup->midFid && pFidGroup->midFid == pFidGroup->maxFid)) { + return 0; + } + + for (int gidx = pFileH->nFGroups - 1; gidx >= 0; gidx--) { + pGroup = pFileH->pFGroup + gidx; + + level = tsdbGetFidLevel(pGroup->fileId, pFidGroup); + + if (level == pGroup->level) continue; + if (level > pGroup->level && level < tsDnodeTier->nTiers) { + SDisk *pODisk = tdGetDisk(tsDnodeTier, pGroup->level, pGroup->did); + SDisk *pDisk = tdAssignDisk(tsDnodeTier, level); + tsdbCreateVnodeDataDir(pDisk->dir, REPO_ID(pRepo)); + oFileGroup = *pGroup; + nFileGroup = *pGroup; + nFileGroup.level = level; + nFileGroup.did = pDisk->did; + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + // TODO fileGroup.files[type].fname + } + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + } + + pthread_rwlock_wrlock(&(pFileH->fhlock)); + *pGroup = nFileGroup; + pthread_rwlock_unlock(&(pFileH->fhlock)); + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + (void)remove(oFileGroup.files[type].fname); + } + + tdLockTiers(tsDnodeTier); + tdDecDiskFiles(tsDnodeTier, pODisk, false); + tdIncDiskFiles(tsDnodeTier, pDisk, false); + tdUnLockTiers(tsDnodeTier); + } + } + return 0; } @@ -466,10 +522,6 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) { } } -static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) { - return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]); -} - static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk) { char tsdbDataDir[TSDB_FILENAME_LEN] = "\0"; char tsdbRootDir[TSDB_FILENAME_LEN] = "\0"; @@ -479,6 +531,7 @@ static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk) { STsdbFileH * pFileH = pRepo->tsdbFileH; SFileGroup fgroup = {0}; STsdbCfg * pCfg = &(pRepo->config); + SFidGroup fidGroup = {0}; int mfid = 0; tdGetTsdbRootDir(pDisk->dir, REPO_ID(pRepo), tsdbRootDir); @@ -494,7 +547,8 @@ static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk) { goto _err; } - mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); + tsdbGetFidGroup(pCfg, &fidGroup); + mfid = fidGroup.minFid; while (taosHashIterNext(pIter)) { int32_t fid = *(int32_t *)taosHashIterGet(pIter); @@ -677,4 +731,45 @@ _err: if (dir != NULL) closedir(dir); regfree(®ex); return NULL; +} + +static int tsdbGetFidLevel(int fid, SFidGroup *pFidGroup) { + if (fid >= pFidGroup->maxFid) { + return 0; + } else if (fid >= pFidGroup->midFid && fid < pFidGroup->maxFid) { + return 1; + } else { + return 2; + } +} + +static int tsdbCreateVnodeDataDir(char *baseDir, int vid) { + char dirName[TSDB_FILENAME_LEN] = "\0"; + char tsdbRootDir[TSDB_FILENAME_LEN] = "\0"; + + tdGetVnodeRootDir(baseDir, dirName); + if (taosMkDir(dirName, 0755) < 0 && errno != EEXIST) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + tdGetVnodeDir(baseDir, vid, dirName); + if (taosMkDir(dirName, 0755) < 0 && errno != EEXIST) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + tdGetTsdbRootDir(baseDir, vid, tsdbRootDir); + if (taosMkDir(tsdbRootDir, 0755) < 0 && errno != EEXIST) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + tdGetTsdbDataDir(baseDir, vid, dirName); + if (taosMkDir(dirName, 0755) < 0 && errno != EEXIST) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index e1bd306bb2..b98ab71aa1 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -471,6 +471,7 @@ static void *tsdbCommitData(void *arg) { SDataCols * pDataCols = NULL; STsdbMeta * pMeta = pRepo->tsdbMeta; SCommitIter *iters = NULL; + SFidGroup fidGroup = {0}; SRWHelper whelper = {0}; TSKEY minKey = 0, maxKey = 0; ASSERT(pRepo->commit == 1); @@ -479,9 +480,9 @@ static void *tsdbCommitData(void *arg) { tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows); - int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, mfid, &minKey, &maxKey); - tsdbRemoveFilesBeyondRetention(pRepo, mfid); + tsdbGetFidGroup(pCfg, &fidGroup); + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fidGroup.minFid, &minKey, &maxKey); + tsdbRemoveFilesBeyondRetention(pRepo, &fidGroup); // Create the iterator to read from cache if (pMem->numOfRows > 0) { @@ -510,7 +511,7 @@ static void *tsdbCommitData(void *arg) { // Loop to commit to each file for (int fid = sfid; fid <= efid; fid++) { - if (fid < mfid) continue; + if (fid < fidGroup.minFid) continue; if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); @@ -519,7 +520,7 @@ static void *tsdbCommitData(void *arg) { } } - tsdbApplyRetention(pRepo); + tsdbApplyRetention(pRepo, &fidGroup); // Commit to update meta file if (tsdbCommitMeta(pRepo) < 0) {