diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 9fbea03265..c3679636e6 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -29,6 +29,7 @@ extern "C" { #include "ttimer.h" #define TIMER_MAX_MS 0x7FFFFFFF +#define ENV_TICK_TIMER_MS 1000 #define PING_TIMER_MS 1000 #define ELECT_TIMER_MS_MIN 150 #define ELECT_TIMER_MS_MAX 300 @@ -38,17 +39,28 @@ extern "C" { #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) typedef struct SSyncEnv { - tmr_h pEnvTickTimer; + // tick timer + tmr_h pEnvTickTimer; + int32_t envTickTimerMS; + uint64_t envTickTimerLogicClock; + uint64_t envTickTimerLogicClockUser; + TAOS_TMR_CALLBACK FpEnvTickTimer; // Timer Fp + uint64_t envTickTimerCounter; + + // timer manager tmr_h pTimerManager; - char name[128]; + + // other resources shared by SyncNodes + // ... + } SSyncEnv; extern SSyncEnv* gSyncEnv; int32_t syncEnvStart(); int32_t syncEnvStop(); -tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param); -void syncEnvStopTimer(tmr_h* pTimer); +int32_t syncEnvStartTimer(); +int32_t syncEnvStopTimer(); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index cb38b3f6f8..2830d2d4aa 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -19,19 +19,18 @@ SSyncEnv *gSyncEnv = NULL; // local function ----------------- -static void syncEnvTick(void *param, void *tmrId); -static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv); -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); -static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param); -static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer); +static SSyncEnv *doSyncEnvStart(); +static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); +static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv); +static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv); +static void syncEnvTick(void *param, void *tmrId); // -------------------------------- int32_t syncEnvStart() { - int32_t ret; + int32_t ret = 0; taosSeedRand(taosGetTimestampSec()); - gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv)); + gSyncEnv = doSyncEnvStart(gSyncEnv); assert(gSyncEnv != NULL); - ret = doSyncEnvStart(gSyncEnv); return ret; } @@ -40,31 +39,46 @@ int32_t syncEnvStop() { return ret; } -tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void *param) { - return doSyncEnvStartTimer(gSyncEnv, fp, mseconds, param); +int32_t syncEnvStartTimer() { + int32_t ret = doSyncEnvStartTimer(gSyncEnv); + return ret; } -void syncEnvStopTimer(tmr_h *pTimer) { doSyncEnvStopTimer(gSyncEnv, pTimer); } +int32_t syncEnvStopTimer() { + int32_t ret = doSyncEnvStopTimer(gSyncEnv); + return ret; +} // local function ----------------- static void syncEnvTick(void *param, void *tmrId) { SSyncEnv *pSyncEnv = (SSyncEnv *)param; - sTrace("syncEnvTick ... name:%s ", pSyncEnv->name); + if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) { + ++(pSyncEnv->envTickTimerCounter); + sTrace( + "syncEnvTick ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, " + "envTickTimerMS:%d", + pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, + pSyncEnv->envTickTimerMS); - pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager); + // do something, tick ... + taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer); + } } -static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { - snprintf(pSyncEnv->name, sizeof(pSyncEnv->name), "SyncEnv_%p", pSyncEnv); +static SSyncEnv *doSyncEnvStart() { + SSyncEnv *pSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv)); + assert(pSyncEnv != NULL); + memset(pSyncEnv, 0, sizeof(pSyncEnv)); + + pSyncEnv->envTickTimerCounter = 0; + pSyncEnv->envTickTimerMS = ENV_TICK_TIMER_MS; + pSyncEnv->FpEnvTickTimer = syncEnvTick; + atomic_store_64(&pSyncEnv->envTickTimerLogicClock, 0); + atomic_store_64(&pSyncEnv->envTickTimerLogicClockUser, 0); // start tmr thread pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); - - // pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager); - - sTrace("SyncEnv start ok, name:%s", pSyncEnv->name); - - return 0; + return pSyncEnv; } static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { @@ -72,8 +86,17 @@ static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { return 0; } -static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param) { - return taosTmrStart(fp, mseconds, pSyncEnv, pSyncEnv->pTimerManager); +static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv) { + int32_t ret = 0; + pSyncEnv->pEnvTickTimer = + taosTmrStart(pSyncEnv->FpEnvTickTimer, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager); + atomic_store_64(&pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerLogicClockUser); + return ret; } -static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer) {} +static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv) { + atomic_add_fetch_64(&pSyncEnv->envTickTimerLogicClockUser, 1); + taosTmrStop(pSyncEnv->pEnvTickTimer); + pSyncEnv->pEnvTickTimer = NULL; + return 0; +} diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index 101c0efe9a..a7a819e046 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -14,15 +14,6 @@ void logTest() { sFatal("--- sync log test: fatal"); } -void *pTimer = NULL; -void *pTimerMgr = NULL; -int g = 300; - -static void timerFp(void *param, void *tmrId) { - printf("param:%p, tmrId:%p, pTimer:%p, pTimerMgr:%p \n", param, tmrId, pTimer, pTimerMgr); - taosTmrReset(timerFp, 1000, param, pTimerMgr, &pTimer); -} - int main() { // taosInitLog((char*)"syncEnvTest.log", 100000, 10); tsAsyncLog = 0; @@ -34,13 +25,20 @@ int main() { ret = syncEnvStart(); assert(ret == 0); - // timer - pTimerMgr = taosTmrInit(1000, 50, 10000, "SYNC-ENV-TEST"); - taosTmrStart(timerFp, 1000, &g, pTimerMgr); + for (int i = 0; i < 5; ++i) { + ret = syncEnvStartTimer(); + assert(ret == 0); - while (1) { - taosMsleep(1000); + taosMsleep(5000); + + ret = syncEnvStopTimer(); + assert(ret == 0); + + taosMsleep(5000); } + ret = syncEnvStop(); + assert(ret == 0); + return 0; } diff --git a/source/libs/sync/test/syncRpcMsgTest.cpp b/source/libs/sync/test/syncRpcMsgTest.cpp index 0331a29f22..61edbd3012 100644 --- a/source/libs/sync/test/syncRpcMsgTest.cpp +++ b/source/libs/sync/test/syncRpcMsgTest.cpp @@ -100,7 +100,7 @@ SyncAppendEntriesReply *createSyncAppendEntriesReply() { void test1() { SyncTimeout *pMsg = createSyncTimeout(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncTimeout2RpcMsg(pMsg, &rpcMsg); syncRpcMsgPrint2((char *)"test1", &rpcMsg); syncTimeoutDestroy(pMsg); @@ -108,7 +108,7 @@ void test1() { void test2() { SyncPing *pMsg = createSyncPing(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncPing2RpcMsg(pMsg, &rpcMsg); syncRpcMsgPrint2((char *)"test2", &rpcMsg); syncPingDestroy(pMsg); @@ -116,7 +116,7 @@ void test2() { void test3() { SyncPingReply *pMsg = createSyncPingReply(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncPingReply2RpcMsg(pMsg, &rpcMsg); syncRpcMsgPrint2((char *)"test3", &rpcMsg); syncPingReplyDestroy(pMsg); @@ -132,7 +132,7 @@ void test4() { void test5() { SyncRequestVoteReply *pMsg = createSyncRequestVoteReply(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg); syncRpcMsgPrint2((char *)"test5", &rpcMsg); syncRequestVoteReplyDestroy(pMsg); @@ -140,7 +140,7 @@ void test5() { void test6() { SyncAppendEntries *pMsg = createSyncAppendEntries(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncAppendEntries2RpcMsg(pMsg, &rpcMsg); syncRpcMsgPrint2((char *)"test6", &rpcMsg); syncAppendEntriesDestroy(pMsg); @@ -148,7 +148,7 @@ void test6() { void test7() { SyncAppendEntriesReply *pMsg = createSyncAppendEntriesReply(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg); syncRpcMsgPrint2((char *)"test7", &rpcMsg); syncAppendEntriesReplyDestroy(pMsg); @@ -156,7 +156,7 @@ void test7() { void test8() { SyncClientRequest *pMsg = createSyncClientRequest(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pMsg, &rpcMsg); syncRpcMsgPrint2((char *)"test8", &rpcMsg); syncClientRequestDestroy(pMsg);