diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 256b8189f8..da9ae036eb 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -482,7 +482,7 @@ int tsdbOpenFile(SFile* pFile, int oflag); void tsdbCloseFile(SFile* pFile); int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type); SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags); -void tsdbFitRetention(STsdbRepo* pRepo); +void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, int mfid); int tsdbUpdateFileHeader(SFile* pFile); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); @@ -490,6 +490,9 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); +int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days); +int tsdbGetBaseDirFromFile(char* fname, char* baseDir); +int tsdbApplyRetention(STsdbRepo* pRepo); // ------------------ tsdbRWHelper.c #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index da4ddee214..be3190a622 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -33,7 +33,6 @@ static void tsdbDestroyFile(SFile *pFile); static int compFGroup(const void *arg1, const void *arg2); static int keyFGroupCompFunc(const void *key, const void *fgroup); static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep); -static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days); static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk); static SHashObj *tsdbGetAllFids(STsdbRepo *pRepo, char *dirName); static int tsdbRestoreFileGroup(STsdbRepo *pRepo, SDisk *pDisk, int fid, SFileGroup *pFileGroup); @@ -280,13 +279,10 @@ SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) { return (SFileGroup *)ptr; } -void tsdbFitRetention(STsdbRepo *pRepo) { - STsdbCfg *pCfg = &(pRepo->config); +void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, int mfid) { STsdbFileH *pFileH = pRepo->tsdbFileH; SFileGroup *pGroup = pFileH->pFGroup; - int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); - pthread_rwlock_wrlock(&(pFileH->fhlock)); while (pFileH->nFGroups > 0 && pGroup[0].fileId < mfid) { @@ -347,8 +343,13 @@ void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { ASSERT(pFGroup != NULL); STsdbFileH *pFileH = pRepo->tsdbFileH; + SDisk * pDisk = NULL; + char baseDir[TSDB_FILENAME_LEN] = "\0"; SFileGroup fileGroup = *pFGroup; + tsdbGetBaseDirFromFile(fileGroup.files[0].fname, baseDir); + pDisk = dnodeGetDiskByName(baseDir); + ASSERT(pDisk != NULL); int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1); if (nFilesLeft > 0) { @@ -364,6 +365,8 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { } tsdbDestroyFile(&fileGroup.files[type]); } + + pDisk->dmeta.nfiles--; } int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) { @@ -421,6 +424,31 @@ _err: *size = 0; } +int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) { + return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision)); +} + +int tsdbGetBaseDirFromFile(char *fname, char *baseDir) { + char *fdup = strdup(fname); + if (fdup == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + for (size_t i = 0; i < 5; i++) { + dirname(fdup); + } + + strncpy(baseDir, fdup, TSDB_FILENAME_LEN); + free(fdup); + return 0; +} + +int tsdbApplyRetention(STsdbRepo *pRepo) { + // TODO + return 0; +} + // ---------------- LOCAL FUNCTIONS ---------------- static void tsdbDestroyFile(SFile *pFile) { tsdbCloseFile(pFile); } @@ -451,10 +479,6 @@ static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) { return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]); } -static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) { - return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision)); -} - static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk) { char tsdbDataDir[TSDB_FILENAME_LEN] = "\0"; char tsdbRootDir[TSDB_FILENAME_LEN] = "\0"; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 4cf8ddd4bd..67bd5b947b 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -32,6 +32,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); +static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY minKey); // ---------------- INTERNAL FUNCTIONS ---------------- int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { @@ -471,12 +472,18 @@ static void *tsdbCommitData(void *arg) { STsdbMeta * pMeta = pRepo->tsdbMeta; SCommitIter *iters = NULL; SRWHelper whelper = {0}; + STsdbFileH * pFileH = pRepo->tsdbFileH; + TSKEY minKey = 0, maxKey = 0; ASSERT(pRepo->commit == 1); ASSERT(pMem != NULL); tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows); + int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, mfid, &minKey, &maxKey); + tsdbRemoveFilesBeyondRetention(pRepo, mfid); + // Create the iterator to read from cache if (pMem->numOfRows > 0) { iters = tsdbCreateCommitIters(pRepo); @@ -500,8 +507,12 @@ static void *tsdbCommitData(void *arg) { int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); + tsdbSeekCommitIter(iters, pMem->maxTables, minKey); + // Loop to commit to each file for (int fid = sfid; fid <= efid; fid++) { + if (fid < mfid) continue; + if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); goto _exit; @@ -509,14 +520,14 @@ static void *tsdbCommitData(void *arg) { } } + tsdbApplyRetention(pRepo); + // Commit to update meta file if (tsdbCommitMeta(pRepo) < 0) { tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _exit; } - tsdbFitRetention(pRepo); - _exit: tdFreeDataCols(pDataCols); tsdbDestroyCommitIters(iters, pMem->maxTables); @@ -611,6 +622,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe return 0; } + if ((pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ)) == NULL) { + // file group not exists + } + // Create and open files for commit dataDir = tsdbGetDataDirName(pRepo->rootDir); if (dataDir == NULL) { @@ -780,4 +795,14 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { taosTFree(tData); return 0; +} + +static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key) { + for (int i = 0; i < nIters; i++) { + SCommitIter *pIter = pIters + i; + if (pIter->pTable == NULL) continue; + if (pIter->pIter == NULL) continue; + + tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key-1, INT32_MAX, NULL, NULL, 0); + } } \ No newline at end of file