diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index 3c60a24d3d..a315e1ccb3 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -243,7 +243,7 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { } if (q1->version < q2->version) { - return -1; + return -2; } else if (q1->version > q2->version) { return 1; } @@ -254,6 +254,7 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) { int32_t code = 0; int32_t lino = 0; + int32_t nRef = 0; SVnode *pVnode = pSma->pVnode; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); @@ -273,22 +274,42 @@ static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) { idx = taosArrayGetSize(pFSOld->aQTaskInf); pQTaskFNew->nRef = 1; } else { - SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFSOld->aQTaskInf, idx); - int32_t c = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF); - if (c == 0) { + SQTaskFile *pTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx); + int32_t c1 = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF); + if (c1 == 0) { // utilize the item in pFSOld->qQTaskInf, instead of pFSNew continue; - } else if (c < 0) { + } else if (c1 < 0) { // NOTHING TODO } else { - ASSERT(0); - continue; + code = TSDB_CODE_RSMA_FS_UPDATE; + TSDB_CHECK_CODE(code, lino, _exit); } } if (taosArrayInsert(pFSOld->aQTaskInf, idx, pQTaskFNew) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // remove previous version + while (--idx >= 0) { + SQTaskFile *preTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx); + int32_t c2 = tdQTaskInfCmprFn1(preTaskF, pQTaskFNew); + if (c2 == 0) { + code = TSDB_CODE_RSMA_FS_UPDATE; + TSDB_CHECK_CODE(code, lino, _exit); + } else if (c2 != -2) { + break; + } + + nRef = atomic_sub_fetch_32(&preTaskF->nRef, 1); + if (nRef <= 0) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), preTaskF->suid, preTaskF->level, preTaskF->version, + tfsGetPrimaryPath(pVnode->pTfs), fname); + (void)taosRemoveFile(fname); + taosArrayRemove(pFSOld->aQTaskInf, idx); + } } } @@ -532,7 +553,7 @@ int32_t tdRSmaFSCommit(SSma *pSma) { TSDB_CHECK_CODE(code, lino, _exit); } - // Load the new FS + // load the new FS code = tdRSmaFSCreate(&fs, 1); TSDB_CHECK_CODE(code, lino, _exit); @@ -558,13 +579,11 @@ int32_t tdRSmaFSFinishCommit(SSma *pSma) { SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); taosWLockLatch(RSMA_FS_LOCK(pStat)); - if ((code = tdRSmaFSCommit(pSma)) < 0) { - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); - TSDB_CHECK_CODE(code, lino, _exit); - } - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + code = tdRSmaFSCommit(pSma); + TSDB_CHECK_CODE(code, lino, _exit); _exit: + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); if (code) { smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); } else { @@ -723,7 +742,7 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) { for (int32_t i = 0; i < size; ++i) { SQTaskFile *qTaskF = (SQTaskFile *)taosArrayGet(qFS->aQTaskInf, i); nRef = atomic_fetch_add_32(&qTaskF->nRef, 1); - if(nRef <= 0) { + if (nRef <= 0) { code = TSDB_CODE_RSMA_FS_REF; TSDB_CHECK_CODE(code, lino, _exit); }