chore: code optimization for tsdb commit
This commit is contained in:
parent
6536ad72a6
commit
b7253d8814
|
@ -994,14 +994,11 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
|
||||||
|
|
||||||
if(inTrim) taosThreadRwlockWrlock(&pTsdb->rwLock);
|
if(inTrim) taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||||
|
|
||||||
int64_t startTime = taosGetTimestampMs();
|
|
||||||
code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
|
code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
|
||||||
if (code) {
|
if (code) {
|
||||||
if(inTrim) taosThreadRwlockUnlock(&pTsdb->rwLock);
|
if(inTrim) taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
int64_t endTime = taosGetTimestampMs();
|
|
||||||
tsdbInfo("vgId:%d, tsdb end commit - commit1 fsSize:%d cost: %" PRIi64 " ms", TD_VID(pTsdb->pVnode), (int32_t)taosArrayGetSize(pCommitter->fs.aDFileSet), endTime - startTime);
|
|
||||||
|
|
||||||
// lock
|
// lock
|
||||||
if(!inTrim) taosThreadRwlockWrlock(&pTsdb->rwLock);
|
if(!inTrim) taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||||
|
@ -1012,7 +1009,6 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
|
||||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
tsdbInfo("vgId:%d, tsdb end commit - commit2 cost: %" PRIi64 " ms", TD_VID(pTsdb->pVnode), taosGetTimestampMs() - endTime);
|
|
||||||
|
|
||||||
pTsdb->imem = NULL;
|
pTsdb->imem = NULL;
|
||||||
|
|
||||||
|
|
|
@ -714,343 +714,6 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
#if 0
|
|
||||||
int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) {
|
|
||||||
int32_t code = 0;
|
|
||||||
char tfname[TSDB_FILENAME_LEN];
|
|
||||||
char fname[TSDB_FILENAME_LEN];
|
|
||||||
|
|
||||||
snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
|
|
||||||
pTsdb->path, TD_DIRSEP);
|
|
||||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
|
|
||||||
pTsdb->path, TD_DIRSEP);
|
|
||||||
|
|
||||||
// gnrt CURRENT.t
|
|
||||||
code = tsdbGnrtCurrent(pTsdb, pFSNew, tfname);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
// rename
|
|
||||||
code = taosRenameFile(tfname, fname);
|
|
||||||
if (code) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(code);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tsdbError("vgId:%d, tsdb fs commit phase 1 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t nRef;
|
|
||||||
char fname[TSDB_FILENAME_LEN];
|
|
||||||
|
|
||||||
// del
|
|
||||||
if (pFSNew->pDelFile) {
|
|
||||||
SDelFile *pDelFile = pTsdb->fs.pDelFile;
|
|
||||||
|
|
||||||
if (pDelFile == NULL || (pDelFile->commitID != pFSNew->pDelFile->commitID)) {
|
|
||||||
pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
|
|
||||||
if (pTsdb->fs.pDelFile == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
*pTsdb->fs.pDelFile = *pFSNew->pDelFile;
|
|
||||||
pTsdb->fs.pDelFile->nRef = 1;
|
|
||||||
|
|
||||||
if (pDelFile) {
|
|
||||||
nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbDelFileName(pTsdb, pDelFile, fname);
|
|
||||||
taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pDelFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(pTsdb->fs.pDelFile == NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
// data
|
|
||||||
int32_t iOld = 0;
|
|
||||||
int32_t iNew = 0;
|
|
||||||
while (true) {
|
|
||||||
int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet);
|
|
||||||
int32_t nNew = taosArrayGetSize(pFSNew->aDFileSet);
|
|
||||||
SDFileSet fSet;
|
|
||||||
int8_t sameDisk;
|
|
||||||
|
|
||||||
if (iOld >= nOld && iNew >= nNew) break;
|
|
||||||
|
|
||||||
SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL;
|
|
||||||
SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aDFileSet, iNew) : NULL;
|
|
||||||
|
|
||||||
if (pSetOld && pSetNew) {
|
|
||||||
if (pSetOld->fid == pSetNew->fid) {
|
|
||||||
goto _merge_old_and_new;
|
|
||||||
} else if (pSetOld->fid < pSetNew->fid) {
|
|
||||||
goto _remove_old;
|
|
||||||
} else {
|
|
||||||
goto _add_new;
|
|
||||||
}
|
|
||||||
} else if (pSetOld) {
|
|
||||||
goto _remove_old;
|
|
||||||
} else {
|
|
||||||
goto _add_new;
|
|
||||||
}
|
|
||||||
|
|
||||||
_merge_old_and_new:
|
|
||||||
sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id));
|
|
||||||
|
|
||||||
// head
|
|
||||||
fSet.pHeadF = pSetOld->pHeadF;
|
|
||||||
if ((!sameDisk) || (pSetOld->pHeadF->commitID != pSetNew->pHeadF->commitID)) {
|
|
||||||
pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
|
||||||
if (pSetOld->pHeadF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
*pSetOld->pHeadF = *pSetNew->pHeadF;
|
|
||||||
pSetOld->pHeadF->nRef = 1;
|
|
||||||
|
|
||||||
nRef = atomic_sub_fetch_32(&fSet.pHeadF->nRef, 1);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pHeadF, fname);
|
|
||||||
taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(fSet.pHeadF);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(fSet.pHeadF->size == pSetNew->pHeadF->size);
|
|
||||||
ASSERT(fSet.pHeadF->offset == pSetNew->pHeadF->offset);
|
|
||||||
}
|
|
||||||
|
|
||||||
// data
|
|
||||||
fSet.pDataF = pSetOld->pDataF;
|
|
||||||
if ((!sameDisk) || (pSetOld->pDataF->commitID != pSetNew->pDataF->commitID)) {
|
|
||||||
pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
|
||||||
if (pSetOld->pDataF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
*pSetOld->pDataF = *pSetNew->pDataF;
|
|
||||||
pSetOld->pDataF->nRef = 1;
|
|
||||||
|
|
||||||
nRef = atomic_sub_fetch_32(&fSet.pDataF->nRef, 1);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pDataF, fname);
|
|
||||||
taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(fSet.pDataF);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(pSetOld->pDataF->size <= pSetNew->pDataF->size);
|
|
||||||
pSetOld->pDataF->size = pSetNew->pDataF->size;
|
|
||||||
}
|
|
||||||
|
|
||||||
// sma
|
|
||||||
fSet.pSmaF = pSetOld->pSmaF;
|
|
||||||
if ((!sameDisk) || (pSetOld->pSmaF->commitID != pSetNew->pSmaF->commitID)) {
|
|
||||||
pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
|
||||||
if (pSetOld->pSmaF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
*pSetOld->pSmaF = *pSetNew->pSmaF;
|
|
||||||
pSetOld->pSmaF->nRef = 1;
|
|
||||||
|
|
||||||
nRef = atomic_sub_fetch_32(&fSet.pSmaF->nRef, 1);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pSmaF, fname);
|
|
||||||
taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(fSet.pSmaF);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(pSetOld->pSmaF->size <= pSetNew->pSmaF->size);
|
|
||||||
pSetOld->pSmaF->size = pSetNew->pSmaF->size;
|
|
||||||
}
|
|
||||||
|
|
||||||
// stt
|
|
||||||
if (sameDisk) {
|
|
||||||
if (pSetNew->nSttF > pSetOld->nSttF) {
|
|
||||||
ASSERT(pSetNew->nSttF = pSetOld->nSttF + 1);
|
|
||||||
pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
|
||||||
if (pSetOld->aSttF[pSetOld->nSttF] == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
*pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF];
|
|
||||||
pSetOld->aSttF[pSetOld->nSttF]->nRef = 1;
|
|
||||||
pSetOld->nSttF++;
|
|
||||||
} else if (pSetNew->nSttF < pSetOld->nSttF) {
|
|
||||||
ASSERT(pSetNew->nSttF == 1);
|
|
||||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
|
||||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
|
||||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
|
||||||
taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pSttFile);
|
|
||||||
}
|
|
||||||
pSetOld->aSttF[iStt] = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pSetOld->nSttF = 1;
|
|
||||||
pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
|
||||||
if (pSetOld->aSttF[0] == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
*pSetOld->aSttF[0] = *pSetNew->aSttF[0];
|
|
||||||
pSetOld->aSttF[0]->nRef = 1;
|
|
||||||
} else {
|
|
||||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
|
||||||
if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) {
|
|
||||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
|
||||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
|
||||||
taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pSttFile);
|
|
||||||
}
|
|
||||||
|
|
||||||
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
|
||||||
if (pSetOld->aSttF[iStt] == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
|
||||||
pSetOld->aSttF[iStt]->nRef = 1;
|
|
||||||
} else {
|
|
||||||
ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size);
|
|
||||||
ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(pSetOld->nSttF == pSetNew->nSttF);
|
|
||||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
|
||||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
|
||||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
|
||||||
taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pSttFile);
|
|
||||||
}
|
|
||||||
|
|
||||||
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
|
||||||
if (pSetOld->aSttF[iStt] == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
|
||||||
pSetOld->aSttF[iStt]->nRef = 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!sameDisk) {
|
|
||||||
pSetOld->diskId = pSetNew->diskId;
|
|
||||||
}
|
|
||||||
|
|
||||||
iOld++;
|
|
||||||
iNew++;
|
|
||||||
continue;
|
|
||||||
|
|
||||||
_remove_old:
|
|
||||||
nRef = atomic_sub_fetch_32(&pSetOld->pHeadF->nRef, 1);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pHeadF, fname);
|
|
||||||
taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pSetOld->pHeadF);
|
|
||||||
}
|
|
||||||
|
|
||||||
nRef = atomic_sub_fetch_32(&pSetOld->pDataF->nRef, 1);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pDataF, fname);
|
|
||||||
taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pSetOld->pDataF);
|
|
||||||
}
|
|
||||||
|
|
||||||
nRef = atomic_sub_fetch_32(&pSetOld->pSmaF->nRef, 1);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pSmaF, fname);
|
|
||||||
taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pSetOld->pSmaF);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int8_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
|
||||||
nRef = atomic_sub_fetch_32(&pSetOld->aSttF[iStt]->nRef, 1);
|
|
||||||
if (nRef == 0) {
|
|
||||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSttF[iStt], fname);
|
|
||||||
taosRemoveFile(fname);
|
|
||||||
taosMemoryFree(pSetOld->aSttF[iStt]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
|
|
||||||
continue;
|
|
||||||
|
|
||||||
_add_new:
|
|
||||||
fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSttF = 1};
|
|
||||||
|
|
||||||
// head
|
|
||||||
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
|
||||||
if (fSet.pHeadF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
*fSet.pHeadF = *pSetNew->pHeadF;
|
|
||||||
fSet.pHeadF->nRef = 1;
|
|
||||||
|
|
||||||
// data
|
|
||||||
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
|
||||||
if (fSet.pDataF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
*fSet.pDataF = *pSetNew->pDataF;
|
|
||||||
fSet.pDataF->nRef = 1;
|
|
||||||
|
|
||||||
// sma
|
|
||||||
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
|
||||||
if (fSet.pSmaF == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
*fSet.pSmaF = *pSetNew->pSmaF;
|
|
||||||
fSet.pSmaF->nRef = 1;
|
|
||||||
|
|
||||||
// stt
|
|
||||||
ASSERT(pSetNew->nSttF == 1);
|
|
||||||
fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
|
||||||
if (fSet.aSttF[0] == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
*fSet.aSttF[0] = *pSetNew->aSttF[0];
|
|
||||||
fSet.aSttF[0]->nRef = 1;
|
|
||||||
|
|
||||||
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
iOld++;
|
|
||||||
iNew++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tsdbError("vgId:%d, tsdb fs commit phase 2 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#if 1
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Update or delete DFileSet in pFS according to DFileSet (fid <= maxFid) in pFSNew.
|
* @brief Update or delete DFileSet in pFS according to DFileSet (fid <= maxFid) in pFSNew.
|
||||||
|
@ -1488,8 +1151,6 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
|
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t nRef;
|
int32_t nRef;
|
||||||
|
|
Loading…
Reference in New Issue