[TD-4352]compact tsdb meta data implementation
This commit is contained in:
parent
5f5a802bb9
commit
b50154343d
|
@ -38,7 +38,7 @@
|
||||||
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
|
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
|
||||||
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
|
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
|
||||||
|
|
||||||
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
|
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META, TSDB_FILE_META_TMP} TSDB_FILE_T;
|
||||||
|
|
||||||
// =============== SMFile
|
// =============== SMFile
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -56,7 +56,8 @@ typedef struct {
|
||||||
uint8_t state;
|
uint8_t state;
|
||||||
} SMFile;
|
} SMFile;
|
||||||
|
|
||||||
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
|
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver, bool tmp);
|
||||||
|
void tsdbRenameOrDeleleTempMetaFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver, int code);
|
||||||
void tsdbInitMFileEx(SMFile* pMFile, const SMFile* pOMFile);
|
void tsdbInitMFileEx(SMFile* pMFile, const SMFile* pOMFile);
|
||||||
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
|
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
|
||||||
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
|
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
|
||||||
|
|
|
@ -55,8 +55,9 @@ typedef struct {
|
||||||
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
|
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
|
||||||
|
|
||||||
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||||
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen);
|
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool updateMeta);
|
||||||
static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
|
static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
|
||||||
|
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile);
|
||||||
static int tsdbCommitTSData(STsdbRepo *pRepo);
|
static int tsdbCommitTSData(STsdbRepo *pRepo);
|
||||||
static void tsdbStartCommit(STsdbRepo *pRepo);
|
static void tsdbStartCommit(STsdbRepo *pRepo);
|
||||||
static void tsdbEndCommit(STsdbRepo *pRepo, int eno);
|
static void tsdbEndCommit(STsdbRepo *pRepo, int eno);
|
||||||
|
@ -283,7 +284,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
||||||
// Create a new meta file
|
// Create a new meta file
|
||||||
did.level = TFS_PRIMARY_LEVEL;
|
did.level = TFS_PRIMARY_LEVEL;
|
||||||
did.id = TFS_PRIMARY_ID;
|
did.id = TFS_PRIMARY_ID;
|
||||||
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
|
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)), false);
|
||||||
|
|
||||||
if (tsdbCreateMFile(&mf, true) < 0) {
|
if (tsdbCreateMFile(&mf, true) < 0) {
|
||||||
tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
|
tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
|
@ -305,7 +306,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
||||||
pAct = (SActObj *)pNode->data;
|
pAct = (SActObj *)pNode->data;
|
||||||
if (pAct->act == TSDB_UPDATE_META) {
|
if (pAct->act == TSDB_UPDATE_META) {
|
||||||
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
|
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
|
||||||
if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
|
if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, true) < 0) {
|
||||||
tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
|
tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
tsdbCloseMFile(&mf);
|
tsdbCloseMFile(&mf);
|
||||||
|
@ -338,6 +339,10 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
||||||
tsdbCloseMFile(&mf);
|
tsdbCloseMFile(&mf);
|
||||||
tsdbUpdateMFile(pfs, &mf);
|
tsdbUpdateMFile(pfs, &mf);
|
||||||
|
|
||||||
|
if (tsdbCompactMetaFile(pRepo, pfs, &mf) < 0) {
|
||||||
|
tsdbError("compact meta file error");
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,7 +380,7 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
|
||||||
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
|
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen) {
|
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool updateMeta) {
|
||||||
char buf[64] = "\0";
|
char buf[64] = "\0";
|
||||||
void * pBuf = buf;
|
void * pBuf = buf;
|
||||||
SKVRecord rInfo;
|
SKVRecord rInfo;
|
||||||
|
@ -401,6 +406,11 @@ static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)));
|
tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)));
|
||||||
|
if (!updateMeta) {
|
||||||
|
pMFile->info.nRecords++;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)&uid, sizeof(uid));
|
SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)&uid, sizeof(uid));
|
||||||
if (pRecord != NULL) {
|
if (pRecord != NULL) {
|
||||||
pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord));
|
pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord));
|
||||||
|
@ -442,6 +452,95 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
|
||||||
|
float delPercent = pMFile->info.nDels * 1.0 / pMFile->info.nRecords;
|
||||||
|
float tombPercent = pMFile->info.tombSize * 1.0 / pMFile->info.size;
|
||||||
|
|
||||||
|
if (delPercent < 0.33 && tombPercent < 0.33) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbInfo("begin compact tsdb meta file, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 ",size:%" PRId64,
|
||||||
|
pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size);
|
||||||
|
|
||||||
|
SMFile mf;
|
||||||
|
SDiskID did;
|
||||||
|
|
||||||
|
// first create tmp meta file
|
||||||
|
did.level = TFS_PRIMARY_LEVEL;
|
||||||
|
did.id = TFS_PRIMARY_ID;
|
||||||
|
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)), true);
|
||||||
|
|
||||||
|
if (tsdbCreateMFile(&mf, true) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf));
|
||||||
|
|
||||||
|
// second iterator metaCache
|
||||||
|
int code = -1;
|
||||||
|
int64_t maxBufSize = 1024;
|
||||||
|
SKVRecord *pRecord;
|
||||||
|
void *pBuf = NULL;
|
||||||
|
|
||||||
|
pBuf = malloc((size_t)maxBufSize);
|
||||||
|
if (pBuf == NULL) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRecord = taosHashIterate(pfs->metaCache, NULL);
|
||||||
|
while (pRecord) {
|
||||||
|
if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
|
||||||
|
tstrerror(terrno));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (pRecord->size > maxBufSize) {
|
||||||
|
maxBufSize = pRecord->size;
|
||||||
|
void* tmp = realloc(pBuf, maxBufSize);
|
||||||
|
if (tmp == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pBuf = tmp;
|
||||||
|
}
|
||||||
|
int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size);
|
||||||
|
if (nread < 0) {
|
||||||
|
tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
|
||||||
|
tstrerror(terrno));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nread < pRecord->size) {
|
||||||
|
tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d",
|
||||||
|
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, pRecord->size, false) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid,
|
||||||
|
tstrerror(terrno));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRecord = taosHashIterate(pfs->metaCache, pRecord);
|
||||||
|
}
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
TSDB_FILE_FSYNC(&mf);
|
||||||
|
tsdbCloseMFile(&mf);
|
||||||
|
|
||||||
|
tsdbRenameOrDeleleTempMetaFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)), code);
|
||||||
|
if (code == 0) {
|
||||||
|
tsdbUpdateMFile(pfs, &mf);
|
||||||
|
}
|
||||||
|
|
||||||
|
tfree(pBuf);
|
||||||
|
tsdbInfo("end compact tsdb meta file, code:%d", code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// =================== Commit Time-Series Data
|
// =================== Commit Time-Series Data
|
||||||
static int tsdbCommitTSData(STsdbRepo *pRepo) {
|
static int tsdbCommitTSData(STsdbRepo *pRepo) {
|
||||||
SMemTable *pMem = pRepo->imem;
|
SMemTable *pMem = pRepo->imem;
|
||||||
|
|
|
@ -272,7 +272,7 @@ static int tsdbCreateMeta(STsdbRepo *pRepo) {
|
||||||
// Create a new meta file
|
// Create a new meta file
|
||||||
did.level = TFS_PRIMARY_LEVEL;
|
did.level = TFS_PRIMARY_LEVEL;
|
||||||
did.id = TFS_PRIMARY_ID;
|
did.id = TFS_PRIMARY_ID;
|
||||||
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
|
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)), false);
|
||||||
|
|
||||||
if (tsdbCreateMFile(&mf, true) < 0) {
|
if (tsdbCreateMFile(&mf, true) < 0) {
|
||||||
tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
|
tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
|
|
|
@ -16,11 +16,12 @@
|
||||||
#include "tsdbint.h"
|
#include "tsdbint.h"
|
||||||
|
|
||||||
static const char *TSDB_FNAME_SUFFIX[] = {
|
static const char *TSDB_FNAME_SUFFIX[] = {
|
||||||
"head", // TSDB_FILE_HEAD
|
"head", // TSDB_FILE_HEAD
|
||||||
"data", // TSDB_FILE_DATA
|
"data", // TSDB_FILE_DATA
|
||||||
"last", // TSDB_FILE_LAST
|
"last", // TSDB_FILE_LAST
|
||||||
"", // TSDB_FILE_MAX
|
"", // TSDB_FILE_MAX
|
||||||
"meta" // TSDB_FILE_META
|
"meta" // TSDB_FILE_META
|
||||||
|
"meta.tmp" // TSDB_FILE_META_TMP
|
||||||
};
|
};
|
||||||
|
|
||||||
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
|
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
|
||||||
|
@ -30,7 +31,7 @@ static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo);
|
||||||
static int tsdbRollBackDFile(SDFile *pDFile);
|
static int tsdbRollBackDFile(SDFile *pDFile);
|
||||||
|
|
||||||
// ============== SMFile
|
// ============== SMFile
|
||||||
void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) {
|
void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver, bool tmp) {
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_OK);
|
TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_OK);
|
||||||
|
@ -38,10 +39,26 @@ void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) {
|
||||||
memset(&(pMFile->info), 0, sizeof(pMFile->info));
|
memset(&(pMFile->info), 0, sizeof(pMFile->info));
|
||||||
pMFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
pMFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||||
|
|
||||||
tsdbGetFilename(vid, 0, ver, TSDB_FILE_META, fname);
|
tsdbGetFilename(vid, 0, ver, tmp ? TSDB_FILE_META_TMP : TSDB_FILE_META, fname);
|
||||||
tfsInitFile(TSDB_FILE_F(pMFile), did.level, did.id, fname);
|
tfsInitFile(TSDB_FILE_F(pMFile), did.level, did.id, fname);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tsdbRenameOrDeleleTempMetaFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver, int code) {
|
||||||
|
char mfname[TSDB_FILENAME_LEN] = {'\0'};
|
||||||
|
char tfname[TSDB_FILENAME_LEN] = {'\0'};
|
||||||
|
|
||||||
|
tsdbGetFilename(vid, 0, ver, TSDB_FILE_META_TMP, tfname);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
remove(tfname);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbGetFilename(vid, 0, ver, TSDB_FILE_META, mfname);
|
||||||
|
|
||||||
|
(void)taosRename(tfname, mfname);
|
||||||
|
}
|
||||||
|
|
||||||
void tsdbInitMFileEx(SMFile *pMFile, const SMFile *pOMFile) {
|
void tsdbInitMFileEx(SMFile *pMFile, const SMFile *pOMFile) {
|
||||||
*pMFile = *pOMFile;
|
*pMFile = *pOMFile;
|
||||||
TSDB_FILE_SET_CLOSED(pMFile);
|
TSDB_FILE_SET_CLOSED(pMFile);
|
||||||
|
|
|
@ -209,7 +209,7 @@ static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) {
|
||||||
// Recv from remote
|
// Recv from remote
|
||||||
SMFile mf;
|
SMFile mf;
|
||||||
SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID};
|
SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID};
|
||||||
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
|
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)), false);
|
||||||
if (tsdbCreateMFile(&mf, false) < 0) {
|
if (tsdbCreateMFile(&mf, false) < 0) {
|
||||||
tsdbError("vgId:%d, failed to create file while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
|
tsdbError("vgId:%d, failed to create file while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
Loading…
Reference in New Issue