diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index dcb5eadfab..9ebf1253cb 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -38,7 +38,7 @@ #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) #define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) -typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T; +typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META, TSDB_FILE_META_TMP} TSDB_FILE_T; // =============== SMFile typedef struct { @@ -56,7 +56,8 @@ typedef struct { uint8_t state; } SMFile; -void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver); +void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver, bool tmp); +void tsdbRenameOrDeleleTempMetaFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver, int code); void tsdbInitMFileEx(SMFile* pMFile, const SMFile* pOMFile); int tsdbEncodeSMFile(void** buf, SMFile* pMFile); void* tsdbDecodeSMFile(void* buf, SMFile* pMFile); diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 82cc6f07f7..6c42311abc 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -55,8 +55,9 @@ typedef struct { #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) static int tsdbCommitMeta(STsdbRepo *pRepo); -static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen); +static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool updateMeta); static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); +static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile); static int tsdbCommitTSData(STsdbRepo *pRepo); static void tsdbStartCommit(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo, int eno); @@ -283,7 +284,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { // Create a new meta file did.level = TFS_PRIMARY_LEVEL; did.id = TFS_PRIMARY_ID; - tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); + tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)), false); if (tsdbCreateMFile(&mf, true) < 0) { tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -305,7 +306,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { pAct = (SActObj *)pNode->data; if (pAct->act == TSDB_UPDATE_META) { pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); - if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { + if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, true) < 0) { tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, tstrerror(terrno)); tsdbCloseMFile(&mf); @@ -338,6 +339,10 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { tsdbCloseMFile(&mf); tsdbUpdateMFile(pfs, &mf); + if (tsdbCompactMetaFile(pRepo, pfs, &mf) < 0) { + tsdbError("compact meta file error"); + } + return 0; } @@ -375,7 +380,7 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) { pRtn->minFid, pRtn->midFid, pRtn->maxFid); } -static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen) { +static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool updateMeta) { char buf[64] = "\0"; void * pBuf = buf; SKVRecord rInfo; @@ -401,6 +406,11 @@ static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void } tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM))); + if (!updateMeta) { + pMFile->info.nRecords++; + return 0; + } + SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)&uid, sizeof(uid)); if (pRecord != NULL) { pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord)); @@ -442,6 +452,95 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { return 0; } +static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { + float delPercent = pMFile->info.nDels * 1.0 / pMFile->info.nRecords; + float tombPercent = pMFile->info.tombSize * 1.0 / pMFile->info.size; + + if (delPercent < 0.33 && tombPercent < 0.33) { + return 0; + } + + tsdbInfo("begin compact tsdb meta file, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 ",size:%" PRId64, + pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size); + + SMFile mf; + SDiskID did; + + // first create tmp meta file + did.level = TFS_PRIMARY_LEVEL; + did.id = TFS_PRIMARY_ID; + tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)), true); + + if (tsdbCreateMFile(&mf, true) < 0) { + tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + + tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); + + // second iterator metaCache + int code = -1; + int64_t maxBufSize = 1024; + SKVRecord *pRecord; + void *pBuf = NULL; + + pBuf = malloc((size_t)maxBufSize); + if (pBuf == NULL) { + goto _err; + } + + pRecord = taosHashIterate(pfs->metaCache, NULL); + while (pRecord) { + if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) { + tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), + tstrerror(terrno)); + break; + } + if (pRecord->size > maxBufSize) { + maxBufSize = pRecord->size; + void* tmp = realloc(pBuf, maxBufSize); + if (tmp == NULL) { + break; + } + pBuf = tmp; + } + int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size); + if (nread < 0) { + tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), + tstrerror(terrno)); + break; + } + + if (nread < pRecord->size) { + tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d", + REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread); + break; + } + + if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, pRecord->size, false) < 0) { + tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid, + tstrerror(terrno)); + break; + } + + pRecord = taosHashIterate(pfs->metaCache, pRecord); + } + code = 0; + +_err: + TSDB_FILE_FSYNC(&mf); + tsdbCloseMFile(&mf); + + tsdbRenameOrDeleleTempMetaFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)), code); + if (code == 0) { + tsdbUpdateMFile(pfs, &mf); + } + + tfree(pBuf); + tsdbInfo("end compact tsdb meta file, code:%d", code); + return code; +} + // =================== Commit Time-Series Data static int tsdbCommitTSData(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index 54372ae8c2..35e9998323 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -272,7 +272,7 @@ static int tsdbCreateMeta(STsdbRepo *pRepo) { // Create a new meta file did.level = TFS_PRIMARY_LEVEL; did.id = TFS_PRIMARY_ID; - tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); + tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)), false); if (tsdbCreateMFile(&mf, true) < 0) { tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 50fa393e9f..b1ff79bfef 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -16,11 +16,12 @@ #include "tsdbint.h" static const char *TSDB_FNAME_SUFFIX[] = { - "head", // TSDB_FILE_HEAD - "data", // TSDB_FILE_DATA - "last", // TSDB_FILE_LAST - "", // TSDB_FILE_MAX - "meta" // TSDB_FILE_META + "head", // TSDB_FILE_HEAD + "data", // TSDB_FILE_DATA + "last", // TSDB_FILE_LAST + "", // TSDB_FILE_MAX + "meta" // TSDB_FILE_META + "meta.tmp" // TSDB_FILE_META_TMP }; static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); @@ -30,7 +31,7 @@ static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo); static int tsdbRollBackDFile(SDFile *pDFile); // ============== SMFile -void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) { +void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver, bool tmp) { char fname[TSDB_FILENAME_LEN]; TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_OK); @@ -38,10 +39,26 @@ void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) { memset(&(pMFile->info), 0, sizeof(pMFile->info)); pMFile->info.magic = TSDB_FILE_INIT_MAGIC; - tsdbGetFilename(vid, 0, ver, TSDB_FILE_META, fname); + tsdbGetFilename(vid, 0, ver, tmp ? TSDB_FILE_META_TMP : TSDB_FILE_META, fname); tfsInitFile(TSDB_FILE_F(pMFile), did.level, did.id, fname); } +void tsdbRenameOrDeleleTempMetaFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver, int code) { + char mfname[TSDB_FILENAME_LEN] = {'\0'}; + char tfname[TSDB_FILENAME_LEN] = {'\0'}; + + tsdbGetFilename(vid, 0, ver, TSDB_FILE_META_TMP, tfname); + + if (code != 0) { + remove(tfname); + return; + } + + tsdbGetFilename(vid, 0, ver, TSDB_FILE_META, mfname); + + (void)taosRename(tfname, mfname); +} + void tsdbInitMFileEx(SMFile *pMFile, const SMFile *pOMFile) { *pMFile = *pOMFile; TSDB_FILE_SET_CLOSED(pMFile); diff --git a/src/tsdb/src/tsdbSync.c b/src/tsdb/src/tsdbSync.c index edcb84d091..f06943bc37 100644 --- a/src/tsdb/src/tsdbSync.c +++ b/src/tsdb/src/tsdbSync.c @@ -209,7 +209,7 @@ static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) { // Recv from remote SMFile mf; SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID}; - tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); + tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)), false); if (tsdbCreateMFile(&mf, false) < 0) { tsdbError("vgId:%d, failed to create file while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1;