Merge branch 'feature/sync2-merge' of https://github.com/taosdata/TDengine into feature/sync2-merge
This commit is contained in:
commit
92584d133f
|
@ -248,6 +248,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
|
||||||
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
|
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
|
||||||
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
|
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
|
||||||
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
|
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
|
||||||
|
char* syncNodePeerState2Str(const SSyncNode* pSyncNode);
|
||||||
|
|
||||||
SSyncNode* syncNodeAcquire(int64_t rid);
|
SSyncNode* syncNodeAcquire(int64_t rid);
|
||||||
void syncNodeRelease(SSyncNode* pNode);
|
void syncNodeRelease(SSyncNode* pNode);
|
||||||
|
|
|
@ -170,6 +170,8 @@ static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
|
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
|
||||||
|
return pSyncNode->quorum;
|
||||||
|
|
||||||
int32_t quorum = 1; // self
|
int32_t quorum = 1; // self
|
||||||
|
|
||||||
int64_t timeNow = taosGetTimestampMs();
|
int64_t timeNow = taosGetTimestampMs();
|
||||||
|
|
|
@ -322,6 +322,38 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
|
||||||
return minMatchIndex;
|
return minMatchIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char* syncNodePeerState2Str(const SSyncNode* pSyncNode) {
|
||||||
|
int32_t len = 128;
|
||||||
|
int32_t useLen = 0;
|
||||||
|
int32_t leftLen = len - useLen;
|
||||||
|
char* pStr = taosMemoryMalloc(len);
|
||||||
|
memset(pStr, 0, len);
|
||||||
|
|
||||||
|
char* p = pStr;
|
||||||
|
int32_t use = snprintf(p, leftLen, "{");
|
||||||
|
useLen += use;
|
||||||
|
leftLen -= use;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||||
|
SPeerState* pState = syncNodeGetPeerState((SSyncNode*)pSyncNode, &(pSyncNode->replicasId[i]));
|
||||||
|
ASSERT(pState != NULL);
|
||||||
|
|
||||||
|
p = pStr + useLen;
|
||||||
|
use = snprintf(p, leftLen, "%d:%ld,%ld, ", i, pState->lastSendIndex, pState->lastSendTime);
|
||||||
|
useLen += use;
|
||||||
|
leftLen -= use;
|
||||||
|
}
|
||||||
|
|
||||||
|
p = pStr + useLen;
|
||||||
|
use = snprintf(p, leftLen, "}");
|
||||||
|
useLen += use;
|
||||||
|
leftLen -= use;
|
||||||
|
|
||||||
|
// sTrace("vgId:%d, ------------------ syncNodePeerState2Str:%s", pSyncNode->vgId, pStr);
|
||||||
|
|
||||||
|
return pStr;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
@ -1825,8 +1857,6 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
int32_t userStrLen = strlen(str);
|
|
||||||
|
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
@ -1845,6 +1875,9 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
printStr = pCfgStr;
|
printStr = pCfgStr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char* peerStateStr = syncNodePeerState2Str(pSyncNode);
|
||||||
|
int32_t userStrLen = strlen(str) + strlen(peerStateStr);
|
||||||
|
|
||||||
if (userStrLen < 256) {
|
if (userStrLen < 256) {
|
||||||
char logBuf[256 + 256];
|
char logBuf[256 + 256];
|
||||||
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
|
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
|
||||||
|
@ -1854,13 +1887,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
", sby:%d, "
|
", sby:%d, "
|
||||||
"stgy:%d, bch:%d, "
|
"stgy:%d, bch:%d, "
|
||||||
"r-num:%d, "
|
"r-num:%d, "
|
||||||
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
|
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
|
||||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
|
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
|
||||||
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
|
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
|
||||||
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
||||||
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||||
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
||||||
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
|
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
|
||||||
} else {
|
} else {
|
||||||
snprintf(logBuf, sizeof(logBuf), "%s", str);
|
snprintf(logBuf, sizeof(logBuf), "%s", str);
|
||||||
}
|
}
|
||||||
|
@ -1878,19 +1911,20 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
", sby:%d, "
|
", sby:%d, "
|
||||||
"stgy:%d, bch:%d, "
|
"stgy:%d, bch:%d, "
|
||||||
"r-num:%d, "
|
"r-num:%d, "
|
||||||
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
|
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
|
||||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
|
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
|
||||||
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
|
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
|
||||||
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
||||||
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||||
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
||||||
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
|
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
|
||||||
} else {
|
} else {
|
||||||
snprintf(s, len, "%s", str);
|
snprintf(s, len, "%s", str);
|
||||||
}
|
}
|
||||||
// sDebug("%s", s);
|
// sDebug("%s", s);
|
||||||
// sInfo("%s", s);
|
// sInfo("%s", s);
|
||||||
sTrace("%s", s);
|
sTrace("%s", s);
|
||||||
|
taosMemoryFree(peerStateStr);
|
||||||
taosMemoryFree(s);
|
taosMemoryFree(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3441,7 +3475,9 @@ SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
|
||||||
|
|
||||||
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
|
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
|
||||||
SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
|
SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
|
||||||
ASSERT(pState != NULL);
|
if (pState == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
SyncIndex sendIndex = pMsg->prevLogIndex + 1;
|
SyncIndex sendIndex = pMsg->prevLogIndex + 1;
|
||||||
int64_t tsNow = taosGetTimestampMs();
|
int64_t tsNow = taosGetTimestampMs();
|
||||||
|
|
|
@ -165,8 +165,10 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
|
||||||
SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId);
|
SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId);
|
||||||
ASSERT(pState != NULL);
|
ASSERT(pState != NULL);
|
||||||
|
|
||||||
pState->lastSendIndex = pMsg->prevLogIndex + 1;
|
if (pMsg->dataLen > 0) {
|
||||||
pState->lastSendTime = taosGetTimestampMs();
|
pState->lastSendIndex = pMsg->prevLogIndex + 1;
|
||||||
|
pState->lastSendTime = taosGetTimestampMs();
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue