chore: rsma fs apply change

This commit is contained in:
kailixu 2022-12-25 23:59:48 +08:00
parent 8efce718b0
commit dfa7e0f59a
1 changed files with 34 additions and 15 deletions

View File

@ -243,7 +243,7 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
}
if (q1->version < q2->version) {
return -1;
return -2;
} else if (q1->version > q2->version) {
return 1;
}
@ -254,6 +254,7 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
int32_t nRef = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
@ -273,22 +274,42 @@ static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) {
idx = taosArrayGetSize(pFSOld->aQTaskInf);
pQTaskFNew->nRef = 1;
} else {
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFSOld->aQTaskInf, idx);
int32_t c = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF);
if (c == 0) {
SQTaskFile *pTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx);
int32_t c1 = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF);
if (c1 == 0) {
// utilize the item in pFSOld->qQTaskInf, instead of pFSNew
continue;
} else if (c < 0) {
} else if (c1 < 0) {
// NOTHING TODO
} else {
ASSERT(0);
continue;
code = TSDB_CODE_RSMA_FS_UPDATE;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
if (taosArrayInsert(pFSOld->aQTaskInf, idx, pQTaskFNew) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
// remove previous version
while (--idx >= 0) {
SQTaskFile *preTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx);
int32_t c2 = tdQTaskInfCmprFn1(preTaskF, pQTaskFNew);
if (c2 == 0) {
code = TSDB_CODE_RSMA_FS_UPDATE;
TSDB_CHECK_CODE(code, lino, _exit);
} else if (c2 != -2) {
break;
}
nRef = atomic_sub_fetch_32(&preTaskF->nRef, 1);
if (nRef <= 0) {
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), preTaskF->suid, preTaskF->level, preTaskF->version,
tfsGetPrimaryPath(pVnode->pTfs), fname);
(void)taosRemoveFile(fname);
taosArrayRemove(pFSOld->aQTaskInf, idx);
}
}
}
@ -532,7 +553,7 @@ int32_t tdRSmaFSCommit(SSma *pSma) {
TSDB_CHECK_CODE(code, lino, _exit);
}
// Load the new FS
// load the new FS
code = tdRSmaFSCreate(&fs, 1);
TSDB_CHECK_CODE(code, lino, _exit);
@ -558,13 +579,11 @@ int32_t tdRSmaFSFinishCommit(SSma *pSma) {
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
taosWLockLatch(RSMA_FS_LOCK(pStat));
if ((code = tdRSmaFSCommit(pSma)) < 0) {
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
TSDB_CHECK_CODE(code, lino, _exit);
}
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
} else {
@ -723,7 +742,7 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) {
for (int32_t i = 0; i < size; ++i) {
SQTaskFile *qTaskF = (SQTaskFile *)taosArrayGet(qFS->aQTaskInf, i);
nRef = atomic_fetch_add_32(&qTaskF->nRef, 1);
if(nRef <= 0) {
if (nRef <= 0) {
code = TSDB_CODE_RSMA_FS_REF;
TSDB_CHECK_CODE(code, lino, _exit);
}