commit
ea7709a303
|
@ -13,17 +13,16 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include "mndCompact.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndCompactDetail.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "tmsgcb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "tmisce.h"
|
||||
#include "audit.h"
|
||||
#include "mndCompactDetail.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "tmisce.h"
|
||||
#include "tmsgcb.h"
|
||||
|
||||
#define MND_COMPACT_VER_NUMBER 1
|
||||
#define MND_COMPACT_ID_LEN 11
|
||||
|
@ -50,12 +49,9 @@ int32_t mndInitCompact(SMnode *pMnode) {
|
|||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
void mndCleanupCompact(SMnode *pMnode) {
|
||||
mDebug("mnd compact cleanup");
|
||||
}
|
||||
void mndCleanupCompact(SMnode *pMnode) { mDebug("mnd compact cleanup"); }
|
||||
|
||||
void tFreeCompactObj(SCompactObj *pCompact) {
|
||||
}
|
||||
void tFreeCompactObj(SCompactObj *pCompact) {}
|
||||
|
||||
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
|
||||
SEncoder encoder = {0};
|
||||
|
@ -127,7 +123,6 @@ SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) {
|
|||
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, OVER);
|
||||
|
||||
|
||||
OVER:
|
||||
taosMemoryFreeClear(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
|
@ -210,8 +205,8 @@ int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact) {
|
|||
}
|
||||
|
||||
int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact) {
|
||||
mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p",
|
||||
pOldCompact->compactId, pOldCompact, pNewCompact);
|
||||
mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, pOldCompact,
|
||||
pNewCompact);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -262,7 +257,8 @@ int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock,
|
|||
|
||||
if (strlen(pShow->db) > 0) {
|
||||
sep = strchr(pShow->db, '.');
|
||||
if (sep && ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
|
||||
if (sep &&
|
||||
((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
|
||||
sep++;
|
||||
} else {
|
||||
pDb = mndAcquireDb(pMnode, pShow->db);
|
||||
|
@ -307,8 +303,8 @@ int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock,
|
|||
}
|
||||
|
||||
// kill compact
|
||||
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen,
|
||||
int32_t compactId, int32_t dnodeid) {
|
||||
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId,
|
||||
int32_t dnodeid) {
|
||||
SVKillCompactReq req = {0};
|
||||
req.compactId = compactId;
|
||||
req.vgId = pVgroup->vgId;
|
||||
|
@ -337,8 +333,8 @@ static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pC
|
|||
return pReq;
|
||||
}
|
||||
|
||||
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup,
|
||||
int32_t compactId, int32_t dnodeid) {
|
||||
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId,
|
||||
int32_t dnodeid) {
|
||||
STransAction action = {0};
|
||||
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
|
||||
|
@ -471,7 +467,8 @@ _OVER:
|
|||
}
|
||||
|
||||
// update progress
|
||||
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId, SQueryCompactProgressRsp* rsp) {
|
||||
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId,
|
||||
SQueryCompactProgressRsp *rsp) {
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SCompactDetailObj *pDetail = NULL;
|
||||
|
@ -499,21 +496,21 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){
|
|||
code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d",
|
||||
code, pReq->pCont, pReq->contLen);
|
||||
mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
|
||||
pReq->contLen);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
|
||||
req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished);
|
||||
mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
|
||||
req.vgId, req.dnodeId, req.numberFileset, req.finished);
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
|
||||
code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
|
||||
req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished);
|
||||
mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
|
||||
req.vgId, req.dnodeId, req.numberFileset, req.finished);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -562,8 +559,7 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact){
|
|||
|
||||
tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req);
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS,
|
||||
.contLen = contLen};
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
|
||||
|
||||
// rpcMsg.pCont = rpcMallocCont(contLen);
|
||||
// if (rpcMsg.pCont == NULL) {
|
||||
|
@ -575,11 +571,10 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact){
|
|||
rpcMsg.pCont = pHead;
|
||||
|
||||
char detail[1024] = {0};
|
||||
int32_t len = snprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d", TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS),
|
||||
epSet.numOfEps, epSet.inUse);
|
||||
int32_t len = snprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
|
||||
TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse);
|
||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||
len += snprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn,
|
||||
epSet.eps[i].port);
|
||||
len += snprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
|
||||
}
|
||||
|
||||
mDebug("compact:%d, send update progress msg to %s", pDetail->compactId, detail);
|
||||
|
@ -600,7 +595,8 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
|||
if (pIter == NULL) break;
|
||||
|
||||
if (pDetail->compactId == compactId) {
|
||||
mDebug("compact:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
|
||||
mDebug(
|
||||
"compact:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
|
||||
"newNumberFileset:%d, newFinished:%d",
|
||||
pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
|
||||
pDetail->newNumberFileset, pDetail->newFinished);
|
||||
|
@ -613,6 +609,18 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
|||
sdbRelease(pMnode->pSdb, pDetail);
|
||||
}
|
||||
|
||||
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
|
||||
if (pCompact == NULL) return 0;
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pCompact->dbname);
|
||||
if (pDb == NULL) {
|
||||
needSave = true;
|
||||
mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, pCompact->dbname);
|
||||
} else {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
pDb = NULL;
|
||||
}
|
||||
|
||||
if (!needSave) {
|
||||
mDebug("compact:%" PRId32 ", no need to save", compactId);
|
||||
return 0;
|
||||
|
@ -625,8 +633,6 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
|||
}
|
||||
mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
|
||||
|
||||
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
|
||||
|
||||
mndTransSetDbName(pTrans, pCompact->dbname, NULL);
|
||||
|
||||
pIter = NULL;
|
||||
|
@ -636,7 +642,8 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
|||
if (pIter == NULL) break;
|
||||
|
||||
if (pDetail->compactId == compactId) {
|
||||
mInfo("compact:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
|
||||
mInfo(
|
||||
"compact:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
|
||||
"newNumberFileset:%d, newFinished:%d",
|
||||
pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
|
||||
pDetail->newNumberFileset, pDetail->newFinished);
|
||||
|
@ -657,6 +664,7 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
|||
}
|
||||
|
||||
bool allFinished = true;
|
||||
pIter = NULL;
|
||||
while (1) {
|
||||
SCompactDetailObj *pDetail = NULL;
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
|
||||
|
@ -671,8 +679,7 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
|||
sdbRelease(pMnode->pSdb, pDetail);
|
||||
break;
|
||||
}
|
||||
if (pDetail->numberFileset != -1 && pDetail->finished != -1 &&
|
||||
pDetail->numberFileset != pDetail->finished) {
|
||||
if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
|
||||
allFinished = false;
|
||||
sdbRelease(pMnode->pSdb, pDetail);
|
||||
break;
|
||||
|
@ -682,8 +689,18 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
|||
sdbRelease(pMnode->pSdb, pDetail);
|
||||
}
|
||||
|
||||
pDb = mndAcquireDb(pMnode, pCompact->dbname);
|
||||
if (pDb == NULL) {
|
||||
allFinished = true;
|
||||
mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, pCompact->dbname);
|
||||
} else {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
pDb = NULL;
|
||||
}
|
||||
|
||||
if (allFinished) {
|
||||
mInfo("compact:%d, all finished", pCompact->compactId);
|
||||
pIter = NULL;
|
||||
while (1) {
|
||||
SCompactDetailObj *pDetail = NULL;
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
|
||||
|
@ -692,11 +709,13 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
|||
if (pDetail->compactId == pCompact->compactId) {
|
||||
SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr());
|
||||
mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id,
|
||||
terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||
mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId);
|
||||
}
|
||||
|
||||
sdbRelease(pMnode->pSdb, pDetail);
|
||||
|
@ -709,6 +728,7 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
|||
return -1;
|
||||
}
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||
mInfo("compact:%d, add drop compact action", pCompact->compactId);
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
|
|
Loading…
Reference in New Issue