diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index e206567572..08bdac277c 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -32,7 +32,6 @@ int32_t mndInitCompact(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact); mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq); mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp); - mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer); SSdbTable table = { @@ -53,14 +52,6 @@ void mndCleanupCompact(SMnode *pMnode) { } void tFreeCompactObj(SCompactObj *pCompact) { - //int32_t size = taosArrayGetSize(pCompact->compactDetail); - - //for (int32_t i = 0; i < size; ++i) { - // SCompactDetailObj *detail = taosArrayGet(pCompact->compactDetail, i); - // taosMemoryFree(detail); - //} - - //taosArrayDestroy(pCompact->compactDetail); } int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) { @@ -159,7 +150,7 @@ SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) { if (sver != MND_COMPACT_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("view read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER); + mError("compact read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER); goto OVER; } @@ -219,21 +210,26 @@ int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, pOldCompact, pNewCompact); - //TSWAP(pOldCompact->compactDetail, pNewCompact->compactDetail); - return 0; } -int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SDbObj *pDb, SCompactDbRsp *rsp){ - //char uuid[40]; - //int32_t code = taosGetSystemUUID(uuid, 40); - //if (code != 0) { - // strcpy(uuid, "tdengine3.0"); - // mError("failed to get name from system, set to default val %s", uuid); - //} +SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) { + SSdb *pSdb = pMnode->pSdb; + SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId); + if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + terrno = TSDB_CODE_SUCCESS; + } + return pCompact; +} - pCompact->compactId = tGenIdPI32(); //mndGenerateUid(uuid, TSDB_CLUSTER_ID_LEN); - //pCompact->compactId = (pCompact->compactId >= 0 ? pCompact->compactId : -pCompact->compactId); +void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pCompact); +} + +//compact db +int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SDbObj *pDb, SCompactDbRsp *rsp){ + pCompact->compactId = tGenIdPI32(); strcpy(pCompact->dbname, pDb->name); @@ -252,6 +248,7 @@ int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompac return 0; } +//retrieve compact int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows){ SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -306,25 +303,13 @@ int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, return numOfRows; } -SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) { - SSdb *pSdb = pMnode->pSdb; - SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId); - if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { - terrno = TSDB_CODE_SUCCESS; - } - return pCompact; -} - -void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) { - SSdb *pSdb = pMnode->pSdb; - sdbRelease(pSdb, pCompact); -} - -static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId) { +//kill compact +static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, + int32_t compactId, int32_t dnodeid) { SVKillCompactReq req = {0}; req.compactId = compactId; req.vgId = pVgroup->vgId; - //req.dnodeId = pVgroup->; + req.dnodeId = dnodeid; mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId); int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req); @@ -349,7 +334,8 @@ static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pC return pReq; } -static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId, int32_t dnodeid) { +static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, + int32_t compactId, int32_t dnodeid) { STransAction action = {0}; SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid); @@ -358,7 +344,7 @@ static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *p mndReleaseDnode(pMnode, pDnode); int32_t contLen = 0; - void *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId); + void *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid); if (pReq == NULL) return -1; action.pCont = pReq; @@ -433,24 +419,6 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa return 0; } -static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId, SQueryCompactProgressRsp* rsp) { - void* pIter = NULL; - while (1) { - SCompactDetailObj *pDetail = NULL; - pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); - if (pIter == NULL) break; - - if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) { - pDetail->newNumberFileset = rsp->numberFileset; - pDetail->newFinished = rsp->finished; - } - - sdbRelease(pMnode->pSdb, pDetail); - } - - return 0; -} - int32_t mndProcessKillCompactReq(SRpcMsg *pReq){ SKillCompactReq killCompactReq = {0}; if (tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq) != 0) { @@ -487,13 +455,32 @@ _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr()); } - + tFreeSKillCompactReq(&killCompactReq); sdbRelease(pMnode->pSdb, pCompact); return code; } +//update progress +static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId, SQueryCompactProgressRsp* rsp) { + void* pIter = NULL; + while (1) { + SCompactDetailObj *pDetail = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); + if (pIter == NULL) break; + + if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) { + pDetail->newNumberFileset = rsp->numberFileset; + pDetail->newFinished = rsp->finished; + } + + sdbRelease(pMnode->pSdb, pDetail); + } + + return 0; +} + int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){ SQueryCompactProgressRsp req = {0}; if (tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req) != 0) { @@ -513,6 +500,7 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){ return 0; } +//timer void mndCompactUpdate(SMnode *pMnode, SCompactObj *pCompact){ void *pIter = NULL; @@ -718,8 +706,6 @@ void mndCompactPullup(SMnode *pMnode) { sdbRelease(pSdb, pCompact); } - //taosArraySort(pArray, (__compar_fn_t)mndCompareTransId); - for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { int32_t *pCompactId = taosArrayGet(pArray, i); SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);