Feat(sync):Use remaining time to replace progress.
This commit is contained in:
parent
fbe65197d6
commit
b62d8d37b1
|
@ -363,7 +363,7 @@ typedef enum ELogicConditionType {
|
|||
|
||||
#define TSDB_MAX_REPLICA 5
|
||||
#define TSDB_MAX_LEARNER_REPLICA 10
|
||||
#define TSDB_SYNC_RESOTRE_lEN 8
|
||||
#define TSDB_SYNC_RESOTRE_lEN 20
|
||||
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
|
||||
#define TSDB_SYNC_LOG_BUFFER_RETENTION 256
|
||||
#define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5)
|
||||
|
|
|
@ -370,7 +370,8 @@ static const SSysDbTableSchema vnodesSchema[] = {
|
|||
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
{.name = "restored", .bytes = TSDB_SYNC_RESOTRE_lEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "restored", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = true},
|
||||
{.name = "restored_finish", .bytes = TSDB_SYNC_RESOTRE_lEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
};
|
||||
|
||||
static const SSysDbTableSchema userUserPrivilegesSchema[] = {
|
||||
|
|
|
@ -477,6 +477,8 @@ typedef struct {
|
|||
ESyncState syncState;
|
||||
int64_t syncTerm;
|
||||
int64_t syncAppliedIndex;
|
||||
int64_t lastSyncAppliedIndexUpdateTime;
|
||||
double appliedRate;
|
||||
int64_t syncCommitIndex;
|
||||
bool syncRestore;
|
||||
bool syncCanRead;
|
||||
|
|
|
@ -535,11 +535,35 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const S
|
|||
return DND_REASON_ONLINE;
|
||||
}
|
||||
|
||||
double calcAppliedRate(int64_t currentCount, int64_t lastCount, int64_t currentTimeMs, int64_t lastTimeMs) {
|
||||
if ((currentTimeMs <= lastTimeMs) || (currentCount <= lastCount)) {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
int64_t deltaCount = currentCount - lastCount;
|
||||
int64_t deltaMs = currentTimeMs - lastTimeMs;
|
||||
double rate = (double)deltaCount / (double)deltaMs;
|
||||
return rate;
|
||||
}
|
||||
|
||||
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
|
||||
bool stateChanged = false;
|
||||
bool roleChanged = pGid->syncState != pVload->syncState ||
|
||||
(pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
|
||||
pGid->roleTimeMs != pVload->roleTimeMs;
|
||||
|
||||
if (!pVload->syncRestore) {
|
||||
if (pGid->lastSyncAppliedIndexUpdateTime == 0) {
|
||||
pGid->lastSyncAppliedIndexUpdateTime = taosGetTimestampMs();
|
||||
} else if (pGid->syncAppliedIndex != pVload->syncAppliedIndex) {
|
||||
int64_t currentTimeMs = taosGetTimestampMs();
|
||||
pGid->appliedRate = calcAppliedRate(pVload->syncAppliedIndex, pGid->syncAppliedIndex, currentTimeMs,
|
||||
pGid->lastSyncAppliedIndexUpdateTime);
|
||||
|
||||
pGid->lastSyncAppliedIndexUpdateTime = currentTimeMs;
|
||||
}
|
||||
}
|
||||
|
||||
pGid->syncAppliedIndex = pVload->syncAppliedIndex;
|
||||
pGid->syncCommitIndex = pVload->syncCommitIndex;
|
||||
if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
|
||||
|
|
|
@ -1243,12 +1243,19 @@ int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
|
|||
return vnodeMemory;
|
||||
}
|
||||
|
||||
double calculatePercentage(double part, double total) {
|
||||
if (total == 0) {
|
||||
return 0.0;
|
||||
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
|
||||
if (rate == 0) {
|
||||
snprintf(restoreStr, restoreStrSize, "0:0:0");
|
||||
return;
|
||||
}
|
||||
double percentage = (part / total) * 100;
|
||||
return round(percentage * 100) / 100;
|
||||
|
||||
int64_t costTime = applyCount / rate;
|
||||
int64_t totalSeconds = costTime / 1000;
|
||||
int64_t hours = totalSeconds / 3600;
|
||||
totalSeconds %= 3600;
|
||||
int64_t minutes = totalSeconds / 60;
|
||||
int64_t seconds = totalSeconds % 60;
|
||||
snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
|
@ -1332,20 +1339,25 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
}
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
char restoreBuf[20] = {0};
|
||||
if (pGid->syncRestore) {
|
||||
sprintf(restoreBuf, "true");
|
||||
} else {
|
||||
double percentage = calculatePercentage(pGid->syncAppliedIndex, pGid->syncCommitIndex);
|
||||
sprintf(restoreBuf, "%.2f", percentage);
|
||||
}
|
||||
|
||||
STR_TO_VARSTR(buf, restoreBuf);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);
|
||||
if (code != 0) {
|
||||
mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
char restoreStr[20] = {0};
|
||||
if (!pGid->syncRestore) {
|
||||
calculateRstoreFinishTime(pGid->appliedRate, pGid->syncCommitIndex - pGid->syncAppliedIndex, restoreStr,
|
||||
sizeof(restoreStr));
|
||||
}
|
||||
STR_TO_VARSTR(buf, restoreStr);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&buf, false);
|
||||
if (code != 0) {
|
||||
mError("vgId:%d, failed to set syncRestore finish time, since %s", pVgroup->vgId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pDnode);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue