clean code
This commit is contained in:
parent
75810bbdbb
commit
522c848489
|
@ -32,7 +32,6 @@ int32_t mndInitCompact(SMnode *pMnode) {
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp);
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer);
|
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer);
|
||||||
|
|
||||||
SSdbTable table = {
|
SSdbTable table = {
|
||||||
|
@ -53,14 +52,6 @@ void mndCleanupCompact(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tFreeCompactObj(SCompactObj *pCompact) {
|
void tFreeCompactObj(SCompactObj *pCompact) {
|
||||||
//int32_t size = taosArrayGetSize(pCompact->compactDetail);
|
|
||||||
|
|
||||||
//for (int32_t i = 0; i < size; ++i) {
|
|
||||||
// SCompactDetailObj *detail = taosArrayGet(pCompact->compactDetail, i);
|
|
||||||
// taosMemoryFree(detail);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//taosArrayDestroy(pCompact->compactDetail);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
|
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
|
||||||
|
@ -159,7 +150,7 @@ SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) {
|
||||||
|
|
||||||
if (sver != MND_COMPACT_VER_NUMBER) {
|
if (sver != MND_COMPACT_VER_NUMBER) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
mError("view read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER);
|
mError("compact read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER);
|
||||||
goto OVER;
|
goto OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,21 +210,26 @@ int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj
|
||||||
mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p",
|
mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p",
|
||||||
pOldCompact->compactId, pOldCompact, pNewCompact);
|
pOldCompact->compactId, pOldCompact, pNewCompact);
|
||||||
|
|
||||||
//TSWAP(pOldCompact->compactDetail, pNewCompact->compactDetail);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SDbObj *pDb, SCompactDbRsp *rsp){
|
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
|
||||||
//char uuid[40];
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
//int32_t code = taosGetSystemUUID(uuid, 40);
|
SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId);
|
||||||
//if (code != 0) {
|
if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||||
// strcpy(uuid, "tdengine3.0");
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
// mError("failed to get name from system, set to default val %s", uuid);
|
}
|
||||||
//}
|
return pCompact;
|
||||||
|
}
|
||||||
|
|
||||||
pCompact->compactId = tGenIdPI32(); //mndGenerateUid(uuid, TSDB_CLUSTER_ID_LEN);
|
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
|
||||||
//pCompact->compactId = (pCompact->compactId >= 0 ? pCompact->compactId : -pCompact->compactId);
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
sdbRelease(pSdb, pCompact);
|
||||||
|
}
|
||||||
|
|
||||||
|
//compact db
|
||||||
|
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SDbObj *pDb, SCompactDbRsp *rsp){
|
||||||
|
pCompact->compactId = tGenIdPI32();
|
||||||
|
|
||||||
strcpy(pCompact->dbname, pDb->name);
|
strcpy(pCompact->dbname, pDb->name);
|
||||||
|
|
||||||
|
@ -252,6 +248,7 @@ int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompac
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//retrieve compact
|
||||||
int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows){
|
int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows){
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -306,25 +303,13 @@ int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock,
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
|
//kill compact
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen,
|
||||||
SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId);
|
int32_t compactId, int32_t dnodeid) {
|
||||||
if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
return pCompact;
|
|
||||||
}
|
|
||||||
|
|
||||||
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
sdbRelease(pSdb, pCompact);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId) {
|
|
||||||
SVKillCompactReq req = {0};
|
SVKillCompactReq req = {0};
|
||||||
req.compactId = compactId;
|
req.compactId = compactId;
|
||||||
req.vgId = pVgroup->vgId;
|
req.vgId = pVgroup->vgId;
|
||||||
//req.dnodeId = pVgroup->;
|
req.dnodeId = dnodeid;
|
||||||
|
|
||||||
mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
|
mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
|
||||||
int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
|
int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
|
||||||
|
@ -349,7 +334,8 @@ static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pC
|
||||||
return pReq;
|
return pReq;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId, int32_t dnodeid) {
|
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup,
|
||||||
|
int32_t compactId, int32_t dnodeid) {
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
|
|
||||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
|
||||||
|
@ -358,7 +344,7 @@ static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *p
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId);
|
void *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid);
|
||||||
if (pReq == NULL) return -1;
|
if (pReq == NULL) return -1;
|
||||||
|
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
|
@ -433,24 +419,6 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId, SQueryCompactProgressRsp* rsp) {
|
|
||||||
void* pIter = NULL;
|
|
||||||
while (1) {
|
|
||||||
SCompactDetailObj *pDetail = NULL;
|
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
|
|
||||||
if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
|
|
||||||
pDetail->newNumberFileset = rsp->numberFileset;
|
|
||||||
pDetail->newFinished = rsp->finished;
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pDetail);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndProcessKillCompactReq(SRpcMsg *pReq){
|
int32_t mndProcessKillCompactReq(SRpcMsg *pReq){
|
||||||
SKillCompactReq killCompactReq = {0};
|
SKillCompactReq killCompactReq = {0};
|
||||||
if (tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq) != 0) {
|
if (tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq) != 0) {
|
||||||
|
@ -487,13 +455,32 @@ _OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
|
mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
|
||||||
}
|
}
|
||||||
|
|
||||||
tFreeSKillCompactReq(&killCompactReq);
|
tFreeSKillCompactReq(&killCompactReq);
|
||||||
sdbRelease(pMnode->pSdb, pCompact);
|
sdbRelease(pMnode->pSdb, pCompact);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//update progress
|
||||||
|
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId, SQueryCompactProgressRsp* rsp) {
|
||||||
|
void* pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
SCompactDetailObj *pDetail = NULL;
|
||||||
|
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
|
||||||
|
pDetail->newNumberFileset = rsp->numberFileset;
|
||||||
|
pDetail->newFinished = rsp->finished;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pMnode->pSdb, pDetail);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){
|
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){
|
||||||
SQueryCompactProgressRsp req = {0};
|
SQueryCompactProgressRsp req = {0};
|
||||||
if (tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req) != 0) {
|
if (tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req) != 0) {
|
||||||
|
@ -513,6 +500,7 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//timer
|
||||||
void mndCompactUpdate(SMnode *pMnode, SCompactObj *pCompact){
|
void mndCompactUpdate(SMnode *pMnode, SCompactObj *pCompact){
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
|
||||||
|
@ -718,8 +706,6 @@ void mndCompactPullup(SMnode *pMnode) {
|
||||||
sdbRelease(pSdb, pCompact);
|
sdbRelease(pSdb, pCompact);
|
||||||
}
|
}
|
||||||
|
|
||||||
//taosArraySort(pArray, (__compar_fn_t)mndCompareTransId);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
int32_t *pCompactId = taosArrayGet(pArray, i);
|
int32_t *pCompactId = taosArrayGet(pArray, i);
|
||||||
SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
|
SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
|
||||||
|
|
Loading…
Reference in New Issue