diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 856870caba..3e247e5d79 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -163,6 +163,7 @@ typedef struct SSyncNode { bool changing; int64_t startTime; + int64_t leaderTime; int64_t lastReplicateTime; } SSyncNode; diff --git a/source/libs/sync/inc/syncRespMgr.h b/source/libs/sync/inc/syncRespMgr.h index e37c0bb625..28978af77e 100644 --- a/source/libs/sync/inc/syncRespMgr.h +++ b/source/libs/sync/inc/syncRespMgr.h @@ -32,9 +32,9 @@ typedef struct SRespStub { } SRespStub; typedef struct SSyncRespMgr { - SHashObj * pRespHash; + SHashObj *pRespHash; int64_t ttl; - void * data; + void *data; TdThreadMutex mutex; uint64_t seqNum; } SSyncRespMgr; @@ -46,7 +46,8 @@ int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index); int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub); int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub); void syncRespClean(SSyncRespMgr *pObj); -void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl); +void syncRespCleanRsp(SSyncRespMgr *pObj); +void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5f3ff3015c..9f9fdf4844 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1100,6 +1100,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { int64_t timeNow = taosGetTimestampMs(); pSyncNode->startTime = timeNow; + pSyncNode->leaderTime = timeNow; pSyncNode->lastReplicateTime = timeNow; syncNodeEventLog(pSyncNode, "sync open"); @@ -2015,6 +2016,8 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { } } +void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); } + void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -2028,6 +2031,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // reset elect timer syncNodeResetElectTimer(pSyncNode); + // send rsp to client + syncNodeLeaderChangeRsp(pSyncNode); + // call back if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) { pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm); @@ -2068,6 +2074,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // /\ UNCHANGED <> // void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { + pSyncNode->leaderTime = taosGetTimestampMs(); + // reset restoreFinish pSyncNode->restoreFinish = false; @@ -2954,8 +2962,11 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, } ths->restoreFinish = true; + int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime; + char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "restore finish, index:%" PRId64, pEntry->index); + snprintf(eventLog, sizeof(eventLog), "restore finish, index:%ld, elapsed:%ld ms, ", pEntry->index, + restoreDelay); syncNodeEventLog(ths, eventLog); } } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index da04b19e17..d7ed864180 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -108,13 +108,19 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu return 0; // get none object } -void syncRespClean(SSyncRespMgr *pObj) { +void syncRespCleanRsp(SSyncRespMgr *pObj) { taosThreadMutexLock(&(pObj->mutex)); - syncRespCleanByTTL(pObj, pObj->ttl); + syncRespCleanByTTL(pObj, -1, true); taosThreadMutexUnlock(&(pObj->mutex)); } -void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) { +void syncRespClean(SSyncRespMgr *pObj) { + taosThreadMutexLock(&(pObj->mutex)); + syncRespCleanByTTL(pObj, pObj->ttl, false); + taosThreadMutexUnlock(&(pObj->mutex)); +} + +void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL); int cnt = 0; int sum = 0; @@ -126,12 +132,12 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) { while (pStub) { size_t len; - void * key = taosHashGetKey(pStub, &len); + void *key = taosHashGetKey(pStub, &len); uint64_t *pSeqNum = (uint64_t *)key; sum++; int64_t nowMS = taosGetTimestampMs(); - if (nowMS - pStub->createTime > ttl) { + if (nowMS - pStub->createTime > ttl || -1 == ttl) { taosArrayPush(delIndexArray, pSeqNum); cnt++; @@ -148,7 +154,14 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) { pStub->rpcMsg.pCont = NULL; pStub->rpcMsg.contLen = 0; - pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta); + + // TODO: and make rpcMsg body, call commit cb + // pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta); + + pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER; + if (pStub->rpcMsg.info.handle != NULL) { + tmsgSendRsp(&(pStub->rpcMsg)); + } } pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);