enh: send compact msg from mnode to vnode
This commit is contained in:
parent
f0dc9a82e6
commit
7a3079e79a
|
@ -1233,7 +1233,7 @@ int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq
|
|||
typedef struct {
|
||||
int64_t dbUid;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t reserved[8];
|
||||
int64_t compactStartTime;
|
||||
} SCompactVnodeReq;
|
||||
|
||||
int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
|
||||
|
|
|
@ -216,6 +216,7 @@ static const SSysDbTableSchema vgroupsSchema[] = {
|
|||
{.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||
{.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
|
||||
{.name = "compact_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||
};
|
||||
|
||||
static const SSysDbTableSchema smaSchema[] = {
|
||||
|
|
|
@ -3989,9 +3989,7 @@ int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *
|
|||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||
for (int32_t i = 0; i < 8; ++i) {
|
||||
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI64(&encoder, pReq->compactStartTime) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -4006,9 +4004,7 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
|
|||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||
for (int32_t i = 0; i < 8; ++i) {
|
||||
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
|
||||
}
|
||||
if (tDecodeI64(&decoder, &pReq->compactStartTime) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
|
|
@ -327,6 +327,7 @@ typedef struct {
|
|||
SDbCfg cfg;
|
||||
SRWLatch lock;
|
||||
int64_t stateTs;
|
||||
int64_t compactStartTime;
|
||||
} SDbObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -43,6 +43,7 @@ int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVg
|
|||
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId, bool force);
|
||||
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
|
||||
SArray *pArray);
|
||||
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs);
|
||||
|
||||
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
#include "systable.h"
|
||||
|
||||
#define DB_VER_NUMBER 1
|
||||
#define DB_RESERVE_SIZE 54
|
||||
#define DB_RESERVE_SIZE 46
|
||||
|
||||
static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
|
||||
static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
|
||||
|
@ -128,6 +128,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
|
|||
SDB_SET_INT16(pRaw, dataPos, pDb->cfg.hashPrefix, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pDb->cfg.hashSuffix, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.tsdbPageSize, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pDb->compactStartTime, _OVER)
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
||||
|
@ -217,6 +218,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT16(pRaw, dataPos, &pDb->cfg.hashPrefix, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, &pDb->cfg.hashSuffix, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.tsdbPageSize, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pDb->compactStartTime, _OVER)
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
||||
taosInitRWLatch(&pDb->lock);
|
||||
|
@ -275,6 +277,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
|
|||
pOld->cfg.replications = pNew->cfg.replications;
|
||||
pOld->cfg.sstTrigger = pNew->cfg.sstTrigger;
|
||||
pOld->cfg.tsdbPageSize = pNew->cfg.tsdbPageSize;
|
||||
pOld->compactStartTime = pNew->compactStartTime;
|
||||
taosWUnLockLatch(&pOld->lock);
|
||||
return 0;
|
||||
}
|
||||
|
@ -1388,7 +1391,63 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCompactDb(SMnode *pMnode, SDbObj *pDb) { return 0; }
|
||||
static int32_t mndSetCompactDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) {
|
||||
SDbObj dbObj = {0};
|
||||
memcpy(&dbObj, pDb, sizeof(SDbObj));
|
||||
dbObj.compactStartTime = compactTs;
|
||||
|
||||
SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
return -1;
|
||||
}
|
||||
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCompactDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
|
||||
while (1) {
|
||||
SVgObj *pVgroup = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (mndVgroupInDb(pVgroup, pDb->uid)) {
|
||||
if (mndBuildCompactVgroupAction(pMnode, pTrans, pDb, pVgroup, compactTs) != 0) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
|
||||
int64_t compactTs = taosGetTimestampMs();
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "compact-db");
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mInfo("trans:%d, used to compact db:%s", pTrans->id, pDb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||
if (mndSetCompactDbCommitLogs(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER;
|
||||
if (mndSetCompactDbRedoActions(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
|
@ -1412,10 +1471,11 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
code = mndCompactDb(pMnode, pDb);
|
||||
code = mndCompactDb(pMnode, pReq, pDb);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
_OVER:
|
||||
if (code != 0) {
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("db:%s, failed to process compact db req since %s", compactReq.db, terrstr());
|
||||
}
|
||||
|
||||
|
|
|
@ -729,6 +729,13 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
if (pDb->compactStartTime <= 0) {
|
||||
colDataAppendNULL(pColInfo, rows);
|
||||
} else {
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pDb->compactStartTime, false);
|
||||
}
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
@ -2066,3 +2073,59 @@ _OVER:
|
|||
}
|
||||
|
||||
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
|
||||
|
||||
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen,
|
||||
int64_t compactTs) {
|
||||
SCompactVnodeReq compactReq = {0};
|
||||
compactReq.dbUid = pDb->uid;
|
||||
compactReq.compactStartTime = compactTs;
|
||||
tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
|
||||
mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
|
||||
int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
|
||||
if (contLen < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
contLen += sizeof(SMsgHead);
|
||||
|
||||
void *pReq = taosMemoryMalloc(contLen);
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SMsgHead *pHead = pReq;
|
||||
pHead->contLen = htonl(contLen);
|
||||
pHead->vgId = htonl(pVgroup->vgId);
|
||||
|
||||
tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq);
|
||||
*pContLen = contLen;
|
||||
return pReq;
|
||||
}
|
||||
|
||||
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
||||
int64_t compactTs) {
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs);
|
||||
if (pReq == NULL) return -1;
|
||||
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_COMPACT;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs) {
|
||||
if (mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs) != 0) return -1;
|
||||
return 0;
|
||||
}
|
|
@ -30,6 +30,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
|
|||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
||||
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
|
@ -308,8 +309,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
vnodeBegin(pVnode);
|
||||
goto _exit;
|
||||
case TDMT_VND_COMPACT:
|
||||
vnodeAsyncCompact(pVnode);
|
||||
vnodeBegin(pVnode);
|
||||
vnodeProcessCompactVnodeReq(pVnode, version, pReq, len, pRsp);
|
||||
goto _exit;
|
||||
default:
|
||||
vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
|
||||
|
@ -1266,3 +1266,20 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SCompactVnodeReq req = {0};
|
||||
if (tDeserializeSCompactVnodeReq(pReq, len, &req) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
vInfo("vgId:%d, compact msg will be processed, db:%s dbUid:%" PRId64 " compactStartTime:%" PRId64, TD_VID(pVnode),
|
||||
req.db, req.dbUid, req.compactStartTime);
|
||||
|
||||
#if 0
|
||||
vnodeAsyncCompact(pVnode);
|
||||
vnodeBegin(pVnode);
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue