diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 82eaa2359e..cedaf223c5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1355,10 +1355,10 @@ typedef struct { int8_t encryptAlgorithm; char dnodeListStr[TSDB_DNODE_LIST_LEN]; // 1. add auto-compact parameters - int32_t compactInterval; // minutes - int32_t compactStartTime; // minutes - int32_t compactEndTime; // minutes - int8_t compactTimeOffset; // hour + int32_t compactInterval; // minutes + int32_t compactStartTime; // minutes + int32_t compactEndTime; // minutes + int8_t compactTimeOffset; // hour } SCreateDbReq; int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq); @@ -1777,6 +1777,8 @@ typedef struct { int64_t numOfBatchInsertSuccessReqs; int32_t numOfCachedTables; int32_t learnerProgress; // use one reservered + int64_t syncAppliedIndex; + int64_t syncCommitIndex; } SVnodeLoad; typedef struct { @@ -3937,8 +3939,8 @@ typedef struct { int8_t igExists; int8_t intervalUnit; int8_t slidingUnit; - int8_t timezone; // int8_t is not enough, timezone is unit of second - int32_t dstVgId; // for stream + int8_t timezone; // int8_t is not enough, timezone is unit of second + int32_t dstVgId; // for stream int64_t interval; int64_t offset; int64_t sliding; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index f1f907ce37..a02634298c 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -293,6 +293,7 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg); int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm); SSyncState syncGetState(int64_t rid); +void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex); int32_t syncGetArbToken(int64_t rid, char* outToken); int32_t syncGetAssignedLogSynced(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/include/util/tdef.h b/include/util/tdef.h index f08697b0d4..532592e7ff 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -328,8 +328,8 @@ typedef enum ELogicConditionType { #define TSDB_ARB_GROUP_MEMBER_NUM 2 #define TSDB_ARB_TOKEN_SIZE 32 -#define TSDB_TRANS_STAGE_LEN 12 -#define TSDB_TRANS_TYPE_LEN 16 +#define TSDB_TRANS_STAGE_LEN 12 +#define TSDB_TRANS_TYPE_LEN 16 #define TSDB_TRANS_ERROR_LEN 512 #define TSDB_TRANS_OBJTYPE_LEN 40 #define TSDB_TRANS_RESULT_LEN 100 @@ -363,6 +363,7 @@ typedef enum ELogicConditionType { #define TSDB_MAX_REPLICA 5 #define TSDB_MAX_LEARNER_REPLICA 10 +#define TSDB_SYNC_RESOTRE_lEN 8 #define TSDB_SYNC_LOG_BUFFER_SIZE 4096 #define TSDB_SYNC_LOG_BUFFER_RETENTION 256 #define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5) @@ -678,7 +679,7 @@ typedef enum { TSDB_VERSION_END, } EVersionType; -#define MIN_RESERVE_MEM_SIZE 1024 // MB +#define MIN_RESERVE_MEM_SIZE 1024 // MB #ifdef __cplusplus } diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 7a51669d46..ba8d136a66 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -14,8 +14,8 @@ */ #define _DEFAULT_SOURCE -#include "tmsg.h" #include "tglobal.h" +#include "tmsg.h" #undef TD_MSG_NUMBER_ #undef TD_MSG_DICT_ @@ -1432,6 +1432,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { TAOS_CHECK_EXIT(tEncodeI32(&encoder, pload->learnerProgress)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->roleTimeMs)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->startTimeMs)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->syncAppliedIndex)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->syncCommitIndex)); } // mnode loads @@ -1542,6 +1544,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vload.learnerProgress)); TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload.roleTimeMs)); TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload.startTimeMs)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload.syncAppliedIndex)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &vload.syncCommitIndex)); if (taosArrayPush(pReq->pVloads, &vload) == NULL) { TAOS_CHECK_EXIT(terrno); } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 1f018606a8..8223908663 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -290,12 +290,16 @@ static const SSysDbTableSchema vgroupsSchema[] = { {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "v1_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, + {.name = "v1_applied", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "v1_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "v2_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, + {.name = "v2_applied", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "v2_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "v3_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, + {.name = "v3_applied", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "v3_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, + {.name = "v4_applied", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "cacheelements", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, @@ -366,7 +370,7 @@ static const SSysDbTableSchema vnodesSchema[] = { {.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, - {.name = "restored", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = true}, + {.name = "restored", .bytes = TSDB_SYNC_RESOTRE_lEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, }; static const SSysDbTableSchema userUserPrivilegesSchema[] = { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9bed10ce99..78809a2d58 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -136,7 +136,7 @@ typedef enum { typedef enum { TRN_KILL_MODE_SKIP = 0, TRN_KILL_MODE_INTERUPT = 1, - //TRN_KILL_MODE_ROLLBACK = 2, + // TRN_KILL_MODE_ROLLBACK = 2, } ETrnKillMode; typedef enum { @@ -476,6 +476,8 @@ typedef struct { int32_t dnodeId; ESyncState syncState; int64_t syncTerm; + int64_t syncAppliedIndex; + int64_t syncCommitIndex; bool syncRestore; bool syncCanRead; int64_t roleTimeMs; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index ca119191eb..57b82fbd17 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -540,6 +540,8 @@ static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVloa bool roleChanged = pGid->syncState != pVload->syncState || (pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) || pGid->roleTimeMs != pVload->roleTimeMs; + pGid->syncAppliedIndex = pVload->syncAppliedIndex; + pGid->syncCommitIndex = pVload->syncCommitIndex; if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead || pGid->startTimeMs != pVload->startTimeMs) { mInfo( diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e20afb7201..209961a3b8 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -244,6 +244,8 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) { pNewGid->syncState = pOldGid->syncState; pNewGid->syncRestore = pOldGid->syncRestore; pNewGid->syncCanRead = pOldGid->syncCanRead; + pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex; + pNewGid->syncCommitIndex = pOldGid->syncCommitIndex; } } } @@ -1065,6 +1067,13 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p return code; } + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].syncAppliedIndex, false); + if (code != 0) { + mError("vgId:%d, failed to set role, since %s", pVgroup->vgId, tstrerror(code)); + return code; + } + bool exist = false; bool online = false; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId); @@ -1126,6 +1135,8 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p colDataSetNULL(pColInfo, numOfRows); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetNULL(pColInfo, numOfRows); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetNULL(pColInfo, numOfRows); } } @@ -1232,6 +1243,14 @@ int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) { return vnodeMemory; } +double calculatePercentage(double part, double total) { + if (total == 0) { + return 0.0; + } + double percentage = (part / total) * 100; + return round(percentage * 100) / 100; +} + static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1313,12 +1332,20 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB } pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false); + char restoreBuf[20] = {0}; + if (pGid->syncRestore) { + sprintf(restoreBuf, "true"); + } else { + double percentage = calculatePercentage(pGid->syncAppliedIndex, pGid->syncCommitIndex); + sprintf(restoreBuf, "%.2f", percentage); + } + + STR_TO_VARSTR(buf, restoreBuf); + code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false); if (code != 0) { mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code)); return code; } - numOfRows++; sdbRelease(pSdb, pDnode); } @@ -2771,7 +2798,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb pVgroup->vnodeGid[0].dnodeId); // add second - if (pNewVgroup->replica == 1){ + if (pNewVgroup->replica == 1) { TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray)); } @@ -2792,8 +2819,8 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup)); // add third - if (pNewVgroup->replica == 2){ - TAOS_CHECK_RETURN (mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray)); + if (pNewVgroup->replica == 2) { + TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray)); } pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER; @@ -2823,7 +2850,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2)); TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true)); TAOS_CHECK_RETURN( - mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId)); + mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId)); TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup)); } else if (pNewDb->cfg.replications == 2) { mInfo("db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 2b07de916c..dda2e467c1 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -499,6 +499,8 @@ _exit: int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { SSyncState state = syncGetState(pVnode->sync); + pLoad->syncAppliedIndex = pVnode->state.applied; + syncGetCommitIndex(pVnode->sync, &pLoad->syncCommitIndex); pLoad->vgId = TD_VID(pVnode); pLoad->syncState = state.state; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0933fd48c7..78ae197079 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -679,6 +679,14 @@ SSyncState syncGetState(int64_t rid) { return state; } +void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode != NULL) { + *syncCommitIndex = pSyncNode->commitIndex; + syncNodeRelease(pSyncNode); + } +} + int32_t syncGetArbToken(int64_t rid, char* outToken) { int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid);