Merge pull request #3421 from taosdata/hotfix/retention_problem
fix TD-1362
This commit is contained in:
commit
ca493df61d
|
@ -26,11 +26,13 @@
|
||||||
|
|
||||||
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
|
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
|
||||||
|
|
||||||
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
|
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
|
||||||
static void tsdbDestroyFile(SFile *pFile);
|
static void tsdbDestroyFile(SFile *pFile);
|
||||||
static int compFGroup(const void *arg1, const void *arg2);
|
static int compFGroup(const void *arg1, const void *arg2);
|
||||||
static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
||||||
static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo);
|
static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo);
|
||||||
|
static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep);
|
||||||
|
static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days);
|
||||||
|
|
||||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||||
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
|
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
|
||||||
|
@ -79,9 +81,11 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
|
||||||
int vid = 0;
|
int vid = 0;
|
||||||
regex_t regex1, regex2;
|
regex_t regex1, regex2;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
char fname[TSDB_FILENAME_LEN] = "\0";
|
||||||
|
|
||||||
SFileGroup fileGroup = {0};
|
SFileGroup fileGroup = {0};
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
|
STsdbCfg * pCfg = &(pRepo->config);
|
||||||
|
|
||||||
tDataDir = tsdbGetDataDirName(pRepo->rootDir);
|
tDataDir = tsdbGetDataDirName(pRepo->rootDir);
|
||||||
if (tDataDir == NULL) {
|
if (tDataDir == NULL) {
|
||||||
|
@ -108,6 +112,8 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
|
||||||
|
|
||||||
struct dirent *dp = NULL;
|
struct dirent *dp = NULL;
|
||||||
while ((dp = readdir(dir)) != NULL) {
|
while ((dp = readdir(dir)) != NULL) {
|
||||||
if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue;
|
if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue;
|
||||||
|
@ -120,6 +126,14 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (fid < mfid) {
|
||||||
|
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
|
tsdbGetDataFileName(pRepo, fid, type, fname);
|
||||||
|
(void)remove(fname);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (tsdbSearchFGroup(pFileH, fid, TD_EQ) != NULL) continue;
|
if (tsdbSearchFGroup(pFileH, fid, TD_EQ) != NULL) continue;
|
||||||
memset((void *)(&fileGroup), 0, sizeof(SFileGroup));
|
memset((void *)(&fileGroup), 0, sizeof(SFileGroup));
|
||||||
fileGroup.fileId = fid;
|
fileGroup.fileId = fid;
|
||||||
|
@ -179,8 +193,18 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
|
||||||
|
|
||||||
SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
|
SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
|
STsdbCfg * pCfg = &(pRepo->config);
|
||||||
|
|
||||||
if (pFileH->nFGroups >= pFileH->maxFGroups) return NULL;
|
if (pFileH->nFGroups >= pFileH->maxFGroups) {
|
||||||
|
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
|
||||||
|
if (pFileH->pFGroup[0].fileId < mfid) {
|
||||||
|
pthread_rwlock_wrlock(&pFileH->fhlock);
|
||||||
|
tsdbRemoveFileGroup(pRepo, &(pFileH->pFGroup[0]));
|
||||||
|
pthread_rwlock_unlock(&pFileH->fhlock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pFileH->nFGroups < pFileH->maxFGroups);
|
||||||
|
|
||||||
SFileGroup fGroup;
|
SFileGroup fGroup;
|
||||||
SFileGroup *pFGroup = &fGroup;
|
SFileGroup *pFGroup = &fGroup;
|
||||||
|
@ -342,8 +366,7 @@ void tsdbFitRetention(STsdbRepo *pRepo) {
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
SFileGroup *pGroup = pFileH->pFGroup;
|
SFileGroup *pGroup = pFileH->pFGroup;
|
||||||
|
|
||||||
int mfid = (int)(TSDB_KEY_FILEID(taosGetTimestamp(pCfg->precision), pCfg->daysPerFile, pCfg->precision) -
|
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
|
||||||
TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile));
|
|
||||||
|
|
||||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||||
|
|
||||||
|
@ -533,3 +556,11 @@ static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) {
|
||||||
|
return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) {
|
||||||
|
return TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision);
|
||||||
|
}
|
Loading…
Reference in New Issue