diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 7591cf2edc..bb8ed19186 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -690,7 +690,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "transPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != + if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != @@ -1139,7 +1139,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; - tsCompactPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; + tsCompactPullupInterval = cfgGetItem(pCfg, "compactPullupInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32; tsTtlPushIntervalSec = cfgGetItem(pCfg, "ttlPushInterval")->i32; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 2219202ed1..0107160f43 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -745,6 +745,8 @@ typedef struct { int32_t numberFileset; int32_t finished; int64_t startTime; + int32_t newNumberFileset; + int32_t newFinished; }SCompactDetailObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index 5553d66f62..7ad86920ff 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -428,87 +428,20 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa } static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId, SQueryCompactProgressRsp* rsp) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-compact-progress"); - if (pTrans == NULL) { - mError("compact:%" PRId32 ", failed to create since %s" , compactId, terrstr()); - return -1; - } - mInfo("trans:%d, used to update compact progress:%" PRId32, pTrans->id, compactId); - - SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId); - - void *pIter = NULL; + void* pIter = NULL; while (1) { SCompactDetailObj *pDetail = NULL; pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); if (pIter == NULL) break; - if (pDetail->compactId == pCompact->compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) { - pDetail->numberFileset = rsp->numberFileset; - pDetail->finished = rsp->finished; - - SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) { + pDetail->newNumberFileset = rsp->numberFileset; + pDetail->newFinished = rsp->finished; } sdbRelease(pMnode->pSdb, pDetail); } - bool allFinished = true; - while (1) { - SCompactDetailObj *pDetail = NULL; - pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); - if (pIter == NULL) break; - - if (pDetail->numberFileset != pDetail->finished) { - allFinished = false; - sdbRelease(pMnode->pSdb, pDetail); - break; - } - - sdbRelease(pMnode->pSdb, pDetail); - } - - if(allFinished){ - while (1) { - SCompactDetailObj *pDetail = NULL; - pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); - if (pIter == NULL) break; - - if (pDetail->compactId == pCompact->compactId) { - SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - } - - sdbRelease(pMnode->pSdb, pDetail); - } - - SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - - mndTransDrop(pTrans); return 0; } @@ -556,7 +489,8 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){ return -1; } - mInfo("numberFileset:%d, finished:%d", req.numberFileset, req.finished); + mInfo("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", + req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished); SMnode *pMnode = pReq->info.node; int32_t code = -1; @@ -595,8 +529,6 @@ void mndCompactUpdate(SMnode *pMnode, SCompactObj *pCompact){ continue; } - mInfo("tSerializeSQueryCompactProgressReq contLen:%d", contLen); - contLen += sizeof(SMsgHead); SMsgHead *pHead = rpcMallocCont(contLen); @@ -660,6 +592,124 @@ void mndCompactUpdate(SMnode *pMnode, SCompactObj *pCompact){ } } +static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { + bool needSave = false; + void* pIter = NULL; + while (1) { + SCompactDetailObj *pDetail = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); + if (pIter == NULL) break; + + if (pDetail->compactId == compactId) { + if(pDetail->numberFileset < pDetail->newNumberFileset || pDetail->finished < pDetail->newFinished) + needSave = true; + } + + sdbRelease(pMnode->pSdb, pDetail); + } + if(!needSave) { + mInfo("compact:%" PRId32 ", no need to save" , compactId); + return 0; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-compact-progress"); + if (pTrans == NULL) { + mError("trans:%" PRId32 ", failed to create since %s" , pTrans->id, terrstr()); + return -1; + } + mInfo("trans:%d, used to update compact progress:%" PRId32, pTrans->id, compactId); + + SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId); + + pIter = NULL; + while (1) { + SCompactDetailObj *pDetail = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); + if (pIter == NULL) break; + + if (pDetail->compactId == compactId) { + mInfo("trans:%d, check compact progress:%d, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, " + "newNumberFileset:%d, newFinished:%d", + pTrans->id, pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished, + pDetail->newNumberFileset, pDetail->newFinished); + + pDetail->numberFileset = pDetail->newNumberFileset; + pDetail->finished = pDetail->newFinished; + + SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + } + + sdbRelease(pMnode->pSdb, pDetail); + } + + bool allFinished = true; + while (1) { + SCompactDetailObj *pDetail = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); + if (pIter == NULL) break; + + mInfo("trans:%d, check compact finished:%d, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", + pTrans->id, pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished); + + if(pDetail->numberFileset == 0 && pDetail->finished == 0){ + allFinished = false; + sdbRelease(pMnode->pSdb, pDetail); + break; + } + if (pDetail->numberFileset != 0 && pDetail->finished != 0 && + pDetail->numberFileset != pDetail->finished) { + allFinished = false; + sdbRelease(pMnode->pSdb, pDetail); + break; + } + + sdbRelease(pMnode->pSdb, pDetail); + } + + if(allFinished){ + while (1) { + SCompactDetailObj *pDetail = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); + if (pIter == NULL) break; + + if (pDetail->compactId == pCompact->compactId) { + SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); + } + + sdbRelease(pMnode->pSdb, pDetail); + } + + SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} + void mndCompactPullup(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t)); @@ -681,6 +731,7 @@ void mndCompactPullup(SMnode *pMnode) { SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId); if (pCompact != NULL) { mndCompactUpdate(pMnode, pCompact); + mndSaveCompactProgress(pMnode, pCompact->compactId); } mndReleaseCompact(pMnode, pCompact); }