Merge pull request #15956 from taosdata/feature/3.0_mhli
refactor(sync): add leader, follower call back2
This commit is contained in:
commit
2fc72860f7
|
@ -163,6 +163,7 @@ typedef struct SSyncNode {
|
|||
bool changing;
|
||||
|
||||
int64_t startTime;
|
||||
int64_t leaderTime;
|
||||
int64_t lastReplicateTime;
|
||||
|
||||
} SSyncNode;
|
||||
|
|
|
@ -32,9 +32,9 @@ typedef struct SRespStub {
|
|||
} SRespStub;
|
||||
|
||||
typedef struct SSyncRespMgr {
|
||||
SHashObj * pRespHash;
|
||||
SHashObj *pRespHash;
|
||||
int64_t ttl;
|
||||
void * data;
|
||||
void *data;
|
||||
TdThreadMutex mutex;
|
||||
uint64_t seqNum;
|
||||
} SSyncRespMgr;
|
||||
|
@ -46,7 +46,8 @@ int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index);
|
|||
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
|
||||
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
|
||||
void syncRespClean(SSyncRespMgr *pObj);
|
||||
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl);
|
||||
void syncRespCleanRsp(SSyncRespMgr *pObj);
|
||||
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -1100,6 +1100,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
|||
|
||||
int64_t timeNow = taosGetTimestampMs();
|
||||
pSyncNode->startTime = timeNow;
|
||||
pSyncNode->leaderTime = timeNow;
|
||||
pSyncNode->lastReplicateTime = timeNow;
|
||||
|
||||
syncNodeEventLog(pSyncNode, "sync open");
|
||||
|
@ -2015,6 +2016,8 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
|
|||
}
|
||||
}
|
||||
|
||||
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
|
||||
|
||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
||||
// maybe clear leader cache
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
|
@ -2028,6 +2031,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// reset elect timer
|
||||
syncNodeResetElectTimer(pSyncNode);
|
||||
|
||||
// send rsp to client
|
||||
syncNodeLeaderChangeRsp(pSyncNode);
|
||||
|
||||
// call back
|
||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
|
||||
pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
|
||||
|
@ -2068,6 +2074,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
|
||||
//
|
||||
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
||||
pSyncNode->leaderTime = taosGetTimestampMs();
|
||||
|
||||
// reset restoreFinish
|
||||
pSyncNode->restoreFinish = false;
|
||||
|
||||
|
@ -2954,8 +2962,11 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
|||
}
|
||||
ths->restoreFinish = true;
|
||||
|
||||
int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
|
||||
|
||||
char eventLog[128];
|
||||
snprintf(eventLog, sizeof(eventLog), "restore finish, index:%" PRId64, pEntry->index);
|
||||
snprintf(eventLog, sizeof(eventLog), "restore finish, index:%ld, elapsed:%ld ms, ", pEntry->index,
|
||||
restoreDelay);
|
||||
syncNodeEventLog(ths, eventLog);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -108,13 +108,19 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
|
|||
return 0; // get none object
|
||||
}
|
||||
|
||||
void syncRespClean(SSyncRespMgr *pObj) {
|
||||
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
||||
taosThreadMutexLock(&(pObj->mutex));
|
||||
syncRespCleanByTTL(pObj, pObj->ttl);
|
||||
syncRespCleanByTTL(pObj, -1, true);
|
||||
taosThreadMutexUnlock(&(pObj->mutex));
|
||||
}
|
||||
|
||||
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
|
||||
void syncRespClean(SSyncRespMgr *pObj) {
|
||||
taosThreadMutexLock(&(pObj->mutex));
|
||||
syncRespCleanByTTL(pObj, pObj->ttl, false);
|
||||
taosThreadMutexUnlock(&(pObj->mutex));
|
||||
}
|
||||
|
||||
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
|
||||
int cnt = 0;
|
||||
int sum = 0;
|
||||
|
@ -126,12 +132,12 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
|
|||
|
||||
while (pStub) {
|
||||
size_t len;
|
||||
void * key = taosHashGetKey(pStub, &len);
|
||||
void *key = taosHashGetKey(pStub, &len);
|
||||
uint64_t *pSeqNum = (uint64_t *)key;
|
||||
sum++;
|
||||
|
||||
int64_t nowMS = taosGetTimestampMs();
|
||||
if (nowMS - pStub->createTime > ttl) {
|
||||
if (nowMS - pStub->createTime > ttl || -1 == ttl) {
|
||||
taosArrayPush(delIndexArray, pSeqNum);
|
||||
cnt++;
|
||||
|
||||
|
@ -148,7 +154,14 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
|
|||
|
||||
pStub->rpcMsg.pCont = NULL;
|
||||
pStub->rpcMsg.contLen = 0;
|
||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
|
||||
|
||||
// TODO: and make rpcMsg body, call commit cb
|
||||
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
|
||||
|
||||
pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
|
||||
if (pStub->rpcMsg.info.handle != NULL) {
|
||||
tmsgSendRsp(&(pStub->rpcMsg));
|
||||
}
|
||||
}
|
||||
|
||||
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
|
||||
|
|
Loading…
Reference in New Issue