[TD-4352]<feature>fix bug:save new file content in new meta cache
This commit is contained in:
parent
2d464964e0
commit
23cbcdd0b6
|
@ -42,8 +42,9 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
pthread_rwlock_t lock;
|
pthread_rwlock_t lock;
|
||||||
|
|
||||||
SFSStatus* cstatus; // current status
|
SFSStatus* cstatus; // current status
|
||||||
SHashObj* metaCache; // meta cache
|
SHashObj* metaCache; // meta cache
|
||||||
|
SHashObj* metaCacheComp; // meta cache for compact
|
||||||
bool intxn;
|
bool intxn;
|
||||||
SFSStatus* nstatus; // new status
|
SFSStatus* nstatus; // new status
|
||||||
} STsdbFS;
|
} STsdbFS;
|
||||||
|
|
|
@ -57,7 +57,7 @@ typedef struct {
|
||||||
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
|
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
|
||||||
|
|
||||||
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||||
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool updateMeta);
|
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact);
|
||||||
static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
|
static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
|
||||||
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile);
|
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile);
|
||||||
static int tsdbCommitTSData(STsdbRepo *pRepo);
|
static int tsdbCommitTSData(STsdbRepo *pRepo);
|
||||||
|
@ -328,7 +328,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
||||||
pAct = (SActObj *)pNode->data;
|
pAct = (SActObj *)pNode->data;
|
||||||
if (pAct->act == TSDB_UPDATE_META) {
|
if (pAct->act == TSDB_UPDATE_META) {
|
||||||
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
|
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
|
||||||
if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, true) < 0) {
|
if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, false) < 0) {
|
||||||
tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
|
tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
tsdbCloseMFile(&mf);
|
tsdbCloseMFile(&mf);
|
||||||
|
@ -402,7 +402,7 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
|
||||||
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
|
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool updateMeta) {
|
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) {
|
||||||
char buf[64] = "\0";
|
char buf[64] = "\0";
|
||||||
void * pBuf = buf;
|
void * pBuf = buf;
|
||||||
SKVRecord rInfo;
|
SKVRecord rInfo;
|
||||||
|
@ -428,18 +428,18 @@ static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)));
|
tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)));
|
||||||
if (!updateMeta) {
|
|
||||||
pMFile->info.nRecords++;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)&uid, sizeof(uid));
|
SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache;
|
||||||
|
|
||||||
|
pMFile->info.nRecords++;
|
||||||
|
|
||||||
|
SKVRecord *pRecord = taosHashGet(cache, (void *)&uid, sizeof(uid));
|
||||||
if (pRecord != NULL) {
|
if (pRecord != NULL) {
|
||||||
pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord));
|
pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord));
|
||||||
} else {
|
} else {
|
||||||
pMFile->info.nRecords++;
|
pMFile->info.nRecords++;
|
||||||
}
|
}
|
||||||
taosHashPut(pfs->metaCache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo));
|
taosHashPut(cache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -517,6 +517,13 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// init Comp
|
||||||
|
assert(pfs->metaCacheComp == NULL);
|
||||||
|
pfs->metaCacheComp = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
|
if (pfs->metaCacheComp == NULL) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
pRecord = taosHashIterate(pfs->metaCache, NULL);
|
pRecord = taosHashIterate(pfs->metaCache, NULL);
|
||||||
while (pRecord) {
|
while (pRecord) {
|
||||||
if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) {
|
if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) {
|
||||||
|
@ -545,7 +552,7 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, (int)pRecord->size, false) < 0) {
|
if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, (int)pRecord->size, true) < 0) {
|
||||||
tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid,
|
tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -569,9 +576,15 @@ _err:
|
||||||
// update current meta file info
|
// update current meta file info
|
||||||
pfs->nstatus->pmf = NULL;
|
pfs->nstatus->pmf = NULL;
|
||||||
tsdbUpdateMFile(pfs, &mf);
|
tsdbUpdateMFile(pfs, &mf);
|
||||||
|
|
||||||
|
taosHashCleanup(pfs->metaCache);
|
||||||
|
pfs->metaCache = pfs->metaCacheComp;
|
||||||
|
pfs->metaCacheComp = NULL;
|
||||||
} else {
|
} else {
|
||||||
// remove meta.tmp file
|
// remove meta.tmp file
|
||||||
remove(mf.f.aname);
|
remove(mf.f.aname);
|
||||||
|
taosHashCleanup(pfs->metaCacheComp);
|
||||||
|
pfs->metaCacheComp = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pBuf);
|
tfree(pBuf);
|
||||||
|
|
|
@ -215,6 +215,7 @@ STsdbFS *tsdbNewFS(STsdbCfg *pCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pfs->intxn = false;
|
pfs->intxn = false;
|
||||||
|
pfs->metaCacheComp = NULL;
|
||||||
|
|
||||||
pfs->nstatus = tsdbNewFSStatus(maxFSet);
|
pfs->nstatus = tsdbNewFSStatus(maxFSet);
|
||||||
if (pfs->nstatus == NULL) {
|
if (pfs->nstatus == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue