Feat(sync):Add restore progress to the "show vnodes" command, and add the applied index to the "show vgroups" command.

This commit is contained in:
xiao-77 2025-02-10 15:58:25 +08:00
parent 054ca189bb
commit fbe65197d6
10 changed files with 71 additions and 18 deletions

View File

@ -1355,10 +1355,10 @@ typedef struct {
int8_t encryptAlgorithm; int8_t encryptAlgorithm;
char dnodeListStr[TSDB_DNODE_LIST_LEN]; char dnodeListStr[TSDB_DNODE_LIST_LEN];
// 1. add auto-compact parameters // 1. add auto-compact parameters
int32_t compactInterval; // minutes int32_t compactInterval; // minutes
int32_t compactStartTime; // minutes int32_t compactStartTime; // minutes
int32_t compactEndTime; // minutes int32_t compactEndTime; // minutes
int8_t compactTimeOffset; // hour int8_t compactTimeOffset; // hour
} SCreateDbReq; } SCreateDbReq;
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq); int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
@ -1777,6 +1777,8 @@ typedef struct {
int64_t numOfBatchInsertSuccessReqs; int64_t numOfBatchInsertSuccessReqs;
int32_t numOfCachedTables; int32_t numOfCachedTables;
int32_t learnerProgress; // use one reservered int32_t learnerProgress; // use one reservered
int64_t syncAppliedIndex;
int64_t syncCommitIndex;
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
@ -3937,8 +3939,8 @@ typedef struct {
int8_t igExists; int8_t igExists;
int8_t intervalUnit; int8_t intervalUnit;
int8_t slidingUnit; int8_t slidingUnit;
int8_t timezone; // int8_t is not enough, timezone is unit of second int8_t timezone; // int8_t is not enough, timezone is unit of second
int32_t dstVgId; // for stream int32_t dstVgId; // for stream
int64_t interval; int64_t interval;
int64_t offset; int64_t offset;
int64_t sliding; int64_t sliding;

View File

@ -293,6 +293,7 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg);
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm); int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm);
SSyncState syncGetState(int64_t rid); SSyncState syncGetState(int64_t rid);
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex);
int32_t syncGetArbToken(int64_t rid, char* outToken); int32_t syncGetArbToken(int64_t rid, char* outToken);
int32_t syncGetAssignedLogSynced(int64_t rid); int32_t syncGetAssignedLogSynced(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);

View File

@ -328,8 +328,8 @@ typedef enum ELogicConditionType {
#define TSDB_ARB_GROUP_MEMBER_NUM 2 #define TSDB_ARB_GROUP_MEMBER_NUM 2
#define TSDB_ARB_TOKEN_SIZE 32 #define TSDB_ARB_TOKEN_SIZE 32
#define TSDB_TRANS_STAGE_LEN 12 #define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16 #define TSDB_TRANS_TYPE_LEN 16
#define TSDB_TRANS_ERROR_LEN 512 #define TSDB_TRANS_ERROR_LEN 512
#define TSDB_TRANS_OBJTYPE_LEN 40 #define TSDB_TRANS_OBJTYPE_LEN 40
#define TSDB_TRANS_RESULT_LEN 100 #define TSDB_TRANS_RESULT_LEN 100
@ -363,6 +363,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_REPLICA 5 #define TSDB_MAX_REPLICA 5
#define TSDB_MAX_LEARNER_REPLICA 10 #define TSDB_MAX_LEARNER_REPLICA 10
#define TSDB_SYNC_RESOTRE_lEN 8
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096 #define TSDB_SYNC_LOG_BUFFER_SIZE 4096
#define TSDB_SYNC_LOG_BUFFER_RETENTION 256 #define TSDB_SYNC_LOG_BUFFER_RETENTION 256
#define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5) #define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5)
@ -678,7 +679,7 @@ typedef enum {
TSDB_VERSION_END, TSDB_VERSION_END,
} EVersionType; } EVersionType;
#define MIN_RESERVE_MEM_SIZE 1024 // MB #define MIN_RESERVE_MEM_SIZE 1024 // MB
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -14,8 +14,8 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tmsg.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h"
#undef TD_MSG_NUMBER_ #undef TD_MSG_NUMBER_
#undef TD_MSG_DICT_ #undef TD_MSG_DICT_
@ -1432,6 +1432,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pload->learnerProgress)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pload->learnerProgress));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->roleTimeMs)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->roleTimeMs));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->startTimeMs)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->startTimeMs));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->syncAppliedIndex));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->syncCommitIndex));
} }
// mnode loads // mnode loads
@ -1542,6 +1544,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vload.learnerProgress)); TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vload.learnerProgress));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload.roleTimeMs)); TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload.roleTimeMs));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload.startTimeMs)); TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload.startTimeMs));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload.syncAppliedIndex));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload.syncCommitIndex));
if (taosArrayPush(pReq->pVloads, &vload) == NULL) { if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
TAOS_CHECK_EXIT(terrno); TAOS_CHECK_EXIT(terrno);
} }

View File

@ -290,12 +290,16 @@ static const SSysDbTableSchema vgroupsSchema[] = {
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "v1_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "v1_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "v1_applied", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "v1_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "v1_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "v2_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "v2_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "v2_applied", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "v2_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "v2_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "v3_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "v3_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "v3_applied", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "v3_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "v3_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "v4_applied", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "cacheelements", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "cacheelements", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
@ -366,7 +370,7 @@ static const SSysDbTableSchema vnodesSchema[] = {
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "restored", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = true}, {.name = "restored", .bytes = TSDB_SYNC_RESOTRE_lEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
}; };
static const SSysDbTableSchema userUserPrivilegesSchema[] = { static const SSysDbTableSchema userUserPrivilegesSchema[] = {

View File

@ -136,7 +136,7 @@ typedef enum {
typedef enum { typedef enum {
TRN_KILL_MODE_SKIP = 0, TRN_KILL_MODE_SKIP = 0,
TRN_KILL_MODE_INTERUPT = 1, TRN_KILL_MODE_INTERUPT = 1,
//TRN_KILL_MODE_ROLLBACK = 2, // TRN_KILL_MODE_ROLLBACK = 2,
} ETrnKillMode; } ETrnKillMode;
typedef enum { typedef enum {
@ -476,6 +476,8 @@ typedef struct {
int32_t dnodeId; int32_t dnodeId;
ESyncState syncState; ESyncState syncState;
int64_t syncTerm; int64_t syncTerm;
int64_t syncAppliedIndex;
int64_t syncCommitIndex;
bool syncRestore; bool syncRestore;
bool syncCanRead; bool syncCanRead;
int64_t roleTimeMs; int64_t roleTimeMs;

View File

@ -540,6 +540,8 @@ static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVloa
bool roleChanged = pGid->syncState != pVload->syncState || bool roleChanged = pGid->syncState != pVload->syncState ||
(pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) || (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
pGid->roleTimeMs != pVload->roleTimeMs; pGid->roleTimeMs != pVload->roleTimeMs;
pGid->syncAppliedIndex = pVload->syncAppliedIndex;
pGid->syncCommitIndex = pVload->syncCommitIndex;
if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead || if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
pGid->startTimeMs != pVload->startTimeMs) { pGid->startTimeMs != pVload->startTimeMs) {
mInfo( mInfo(

View File

@ -244,6 +244,8 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
pNewGid->syncState = pOldGid->syncState; pNewGid->syncState = pOldGid->syncState;
pNewGid->syncRestore = pOldGid->syncRestore; pNewGid->syncRestore = pOldGid->syncRestore;
pNewGid->syncCanRead = pOldGid->syncCanRead; pNewGid->syncCanRead = pOldGid->syncCanRead;
pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
} }
} }
} }
@ -1065,6 +1067,13 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
return code; return code;
} }
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].syncAppliedIndex, false);
if (code != 0) {
mError("vgId:%d, failed to set role, since %s", pVgroup->vgId, tstrerror(code));
return code;
}
bool exist = false; bool exist = false;
bool online = false; bool online = false;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
@ -1126,6 +1135,8 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
colDataSetNULL(pColInfo, numOfRows); colDataSetNULL(pColInfo, numOfRows);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows); colDataSetNULL(pColInfo, numOfRows);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);
} }
} }
@ -1232,6 +1243,14 @@ int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
return vnodeMemory; return vnodeMemory;
} }
double calculatePercentage(double part, double total) {
if (total == 0) {
return 0.0;
}
double percentage = (part / total) * 100;
return round(percentage * 100) / 100;
}
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
@ -1313,12 +1332,20 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
} }
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false); char restoreBuf[20] = {0};
if (pGid->syncRestore) {
sprintf(restoreBuf, "true");
} else {
double percentage = calculatePercentage(pGid->syncAppliedIndex, pGid->syncCommitIndex);
sprintf(restoreBuf, "%.2f", percentage);
}
STR_TO_VARSTR(buf, restoreBuf);
code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
if (code != 0) { if (code != 0) {
mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code)); mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
return code; return code;
} }
numOfRows++; numOfRows++;
sdbRelease(pSdb, pDnode); sdbRelease(pSdb, pDnode);
} }
@ -2771,7 +2798,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
pVgroup->vnodeGid[0].dnodeId); pVgroup->vnodeGid[0].dnodeId);
// add second // add second
if (pNewVgroup->replica == 1){ if (pNewVgroup->replica == 1) {
TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray)); TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
} }
@ -2792,8 +2819,8 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup)); TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
// add third // add third
if (pNewVgroup->replica == 2){ if (pNewVgroup->replica == 2) {
TAOS_CHECK_RETURN (mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray)); TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
} }
pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER; pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
@ -2823,7 +2850,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2)); TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true)); TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
TAOS_CHECK_RETURN( TAOS_CHECK_RETURN(
mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId)); mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup)); TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
} else if (pNewDb->cfg.replications == 2) { } else if (pNewDb->cfg.replications == 2) {
mInfo("db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, mInfo("db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,

View File

@ -499,6 +499,8 @@ _exit:
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
SSyncState state = syncGetState(pVnode->sync); SSyncState state = syncGetState(pVnode->sync);
pLoad->syncAppliedIndex = pVnode->state.applied;
syncGetCommitIndex(pVnode->sync, &pLoad->syncCommitIndex);
pLoad->vgId = TD_VID(pVnode); pLoad->vgId = TD_VID(pVnode);
pLoad->syncState = state.state; pLoad->syncState = state.state;

View File

@ -679,6 +679,14 @@ SSyncState syncGetState(int64_t rid) {
return state; return state;
} }
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode != NULL) {
*syncCommitIndex = pSyncNode->commitIndex;
syncNodeRelease(pSyncNode);
}
}
int32_t syncGetArbToken(int64_t rid, char* outToken) { int32_t syncGetArbToken(int64_t rid, char* outToken) {
int32_t code = 0; int32_t code = 0;
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);