sync refactor

This commit is contained in:
Minghao Li 2022-03-16 21:22:56 +08:00
parent 5ab55c6554
commit bc60b00d75
5 changed files with 22 additions and 16 deletions

View File

@ -39,7 +39,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg); syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg);
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm); sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
ths->pRaftStore->currentTerm);
return ret; return ret;
} }

View File

@ -54,8 +54,9 @@ static void syncEnvTick(void *param, void *tmrId) {
SSyncEnv *pSyncEnv = (SSyncEnv *)param; SSyncEnv *pSyncEnv = (SSyncEnv *)param;
if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) { if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) {
++(pSyncEnv->envTickTimerCounter); ++(pSyncEnv->envTickTimerCounter);
sTrace( sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
"syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", " ", envTickTimerCounter:%" PRIu64
", "
"envTickTimerMS:%d, tmrId:%p", "envTickTimerMS:%d, tmrId:%p",
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
pSyncEnv->envTickTimerMS, tmrId); pSyncEnv->envTickTimerMS, tmrId);
@ -63,8 +64,9 @@ static void syncEnvTick(void *param, void *tmrId) {
// do something, tick ... // do something, tick ...
taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer); taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer);
} else { } else {
sTrace( sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
"syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", " ", envTickTimerCounter:%" PRIu64
", "
"envTickTimerMS:%d, tmrId:%p", "envTickTimerMS:%d, tmrId:%p",
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
pSyncEnv->envTickTimerMS, tmrId); pSyncEnv->envTickTimerMS, tmrId);

View File

@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io);
static void *syncIOConsumerFunc(void *param); static void * syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
} }
static void *syncIOConsumerFunc(void *param) { static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param; SSyncIO * io = param;
STaosQall *qall; STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg; SRpcMsg * pRpcMsg, rpcMsg;
qall = taosAllocateQall(); qall = taosAllocateQall();
while (1) { while (1) {

View File

@ -41,7 +41,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg); syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg);
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm); sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
ths->pRaftStore->currentTerm);
return ret; return ret;
} }

View File

@ -116,7 +116,9 @@ int main(int argc, char** argv) {
//--------------------------- //---------------------------
while (1) { while (1) {
sTrace("while 1 sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); sTrace("while 1 sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d",
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock,
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
taosMsleep(1000); taosMsleep(1000);
} }