update progress

This commit is contained in:
dmchen 2023-11-23 07:36:33 +00:00
parent d4d55ab395
commit bbbdec5db7
3 changed files with 129 additions and 76 deletions

View File

@ -690,7 +690,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0)
return -1;
if (cfgAddInt32(pCfg, "transPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0)
return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
@ -1139,7 +1139,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsCompactPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsCompactPullupInterval = cfgGetItem(pCfg, "compactPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
tsTtlPushIntervalSec = cfgGetItem(pCfg, "ttlPushInterval")->i32;

View File

@ -745,6 +745,8 @@ typedef struct {
int32_t numberFileset;
int32_t finished;
int64_t startTime;
int32_t newNumberFileset;
int32_t newFinished;
}SCompactDetailObj;
typedef struct {

View File

@ -428,87 +428,20 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa
}
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId, SQueryCompactProgressRsp* rsp) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-compact-progress");
if (pTrans == NULL) {
mError("compact:%" PRId32 ", failed to create since %s" , compactId, terrstr());
return -1;
}
mInfo("trans:%d, used to update compact progress:%" PRId32, pTrans->id, compactId);
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
void *pIter = NULL;
void* pIter = NULL;
while (1) {
SCompactDetailObj *pDetail = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
if (pIter == NULL) break;
if (pDetail->compactId == pCompact->compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
pDetail->numberFileset = rsp->numberFileset;
pDetail->finished = rsp->finished;
SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
pDetail->newNumberFileset = rsp->numberFileset;
pDetail->newFinished = rsp->finished;
}
sdbRelease(pMnode->pSdb, pDetail);
}
bool allFinished = true;
while (1) {
SCompactDetailObj *pDetail = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
if (pIter == NULL) break;
if (pDetail->numberFileset != pDetail->finished) {
allFinished = false;
sdbRelease(pMnode->pSdb, pDetail);
break;
}
sdbRelease(pMnode->pSdb, pDetail);
}
if(allFinished){
while (1) {
SCompactDetailObj *pDetail = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
if (pIter == NULL) break;
if (pDetail->compactId == pCompact->compactId) {
SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
}
sdbRelease(pMnode->pSdb, pDetail);
}
SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
mndTransDrop(pTrans);
return 0;
}
@ -556,7 +489,8 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){
return -1;
}
mInfo("numberFileset:%d, finished:%d", req.numberFileset, req.finished);
mInfo("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished);
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
@ -595,8 +529,6 @@ void mndCompactUpdate(SMnode *pMnode, SCompactObj *pCompact){
continue;
}
mInfo("tSerializeSQueryCompactProgressReq contLen:%d", contLen);
contLen += sizeof(SMsgHead);
SMsgHead *pHead = rpcMallocCont(contLen);
@ -660,6 +592,124 @@ void mndCompactUpdate(SMnode *pMnode, SCompactObj *pCompact){
}
}
static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
bool needSave = false;
void* pIter = NULL;
while (1) {
SCompactDetailObj *pDetail = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
if (pIter == NULL) break;
if (pDetail->compactId == compactId) {
if(pDetail->numberFileset < pDetail->newNumberFileset || pDetail->finished < pDetail->newFinished)
needSave = true;
}
sdbRelease(pMnode->pSdb, pDetail);
}
if(!needSave) {
mInfo("compact:%" PRId32 ", no need to save" , compactId);
return 0;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-compact-progress");
if (pTrans == NULL) {
mError("trans:%" PRId32 ", failed to create since %s" , pTrans->id, terrstr());
return -1;
}
mInfo("trans:%d, used to update compact progress:%" PRId32, pTrans->id, compactId);
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
pIter = NULL;
while (1) {
SCompactDetailObj *pDetail = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
if (pIter == NULL) break;
if (pDetail->compactId == compactId) {
mInfo("trans:%d, check compact progress:%d, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
"newNumberFileset:%d, newFinished:%d",
pTrans->id, pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
pDetail->newNumberFileset, pDetail->newFinished);
pDetail->numberFileset = pDetail->newNumberFileset;
pDetail->finished = pDetail->newFinished;
SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
}
sdbRelease(pMnode->pSdb, pDetail);
}
bool allFinished = true;
while (1) {
SCompactDetailObj *pDetail = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
if (pIter == NULL) break;
mInfo("trans:%d, check compact finished:%d, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
pTrans->id, pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
if(pDetail->numberFileset == 0 && pDetail->finished == 0){
allFinished = false;
sdbRelease(pMnode->pSdb, pDetail);
break;
}
if (pDetail->numberFileset != 0 && pDetail->finished != 0 &&
pDetail->numberFileset != pDetail->finished) {
allFinished = false;
sdbRelease(pMnode->pSdb, pDetail);
break;
}
sdbRelease(pMnode->pSdb, pDetail);
}
if(allFinished){
while (1) {
SCompactDetailObj *pDetail = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
if (pIter == NULL) break;
if (pDetail->compactId == pCompact->compactId) {
SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
}
sdbRelease(pMnode->pSdb, pDetail);
}
SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
mndTransDrop(pTrans);
return 0;
}
void mndCompactPullup(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t));
@ -681,6 +731,7 @@ void mndCompactPullup(SMnode *pMnode) {
SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
if (pCompact != NULL) {
mndCompactUpdate(pMnode, pCompact);
mndSaveCompactProgress(pMnode, pCompact->compactId);
}
mndReleaseCompact(pMnode, pCompact);
}