From 5b7261d63fab335351123b0c6b025f28aa48c9fb Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 29 Oct 2021 17:09:25 +0800 Subject: [PATCH] [TD-10645][raft]add sync node timer --- source/libs/sync/inc/raft.h | 1 + source/libs/sync/inc/raft_message.h | 2 ++ source/libs/sync/inc/syncInt.h | 5 +++++ source/libs/sync/src/raft.c | 8 ++++--- source/libs/sync/src/raft_message.c | 5 +++++ source/libs/sync/src/sync.c | 33 ++++++++++++++++++++++++++++- 6 files changed, 50 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 0df46db3fc..f81040658e 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -29,5 +29,6 @@ typedef struct SSyncRaft { int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo); int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg); +int32_t syncRaftTick(SSyncRaft* pRaft); #endif /* _TD_LIBS_SYNC_RAFT_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_message.h b/source/libs/sync/inc/raft_message.h index cb0552500a..faf14840c9 100644 --- a/source/libs/sync/inc/raft_message.h +++ b/source/libs/sync/inc/raft_message.h @@ -73,4 +73,6 @@ static FORCE_INLINE bool syncIsInternalMsg(const RaftMessage* pMsg) { return pMsg->msgType == RAFT_MSG_INTERNAL_PROP; } +void syncFreeMessage(const RaftMessage* pMsg); + #endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index c1c3ed17a8..81cb686781 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -30,8 +30,10 @@ typedef struct SSyncWorker { struct SSyncNode { pthread_mutex_t mutex; + int32_t refCount; SyncGroupId vgId; SSyncRaft raft; + void* syncTimer; }; typedef struct SSyncManager { @@ -46,6 +48,9 @@ typedef struct SSyncManager { // vgroup hash table SHashObj* vgroupTable; + // timer manager + void* syncTimerManager; + } SSyncManager; extern SSyncManager* gSyncManager; diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 109b08902a..23442803c4 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -67,8 +67,10 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { } int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg) { - if (!syncIsInternalMsg(pMsg)) { - free(pMsg); - } + syncFreeMessage(pMsg); + return 0; +} + +int32_t syncRaftTick(SSyncRaft* pRaft) { return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/raft_message.c b/source/libs/sync/src/raft_message.c index d35efce9db..912314daf2 100644 --- a/source/libs/sync/src/raft_message.c +++ b/source/libs/sync/src/raft_message.c @@ -15,3 +15,8 @@ #include "raft_message.h" +void syncFreeMessage(const RaftMessage* pMsg) { + if (!syncIsInternalMsg(pMsg)) { + free((RaftMessage*)pMsg); + } +} \ No newline at end of file diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c index e627cf8bc1..a9df02f818 100644 --- a/source/libs/sync/src/sync.c +++ b/source/libs/sync/src/sync.c @@ -14,12 +14,16 @@ */ #include "syncInt.h" +#include "ttimer.h" SSyncManager* gSyncManager = NULL; +#define SYNC_TICK_TIMER 50 + static int syncOpenWorkerPool(SSyncManager* syncManager); static int syncCloseWorkerPool(SSyncManager* syncManager); static void *syncWorkerMain(void *argv); +static void syncNodeTick(void *param, void *tmrId); int32_t syncInit() { if (gSyncManager != NULL) { @@ -33,6 +37,14 @@ int32_t syncInit() { } pthread_mutex_init(&gSyncManager->mutex, NULL); + + // init sync timer manager + gSyncManager->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); + if (gSyncManager->syncTimerManager == NULL) { + syncCleanUp(); + return -1; + } + // init worker pool if (syncOpenWorkerPool(gSyncManager) != 0) { syncCleanUp(); @@ -56,6 +68,7 @@ void syncCleanUp() { if (gSyncManager->vgroupTable) { taosHashCleanup(gSyncManager->vgroupTable); } + taosTmrCleanUp(gSyncManager->syncTimerManager); syncCloseWorkerPool(gSyncManager); pthread_mutex_unlock(&gSyncManager->mutex); pthread_mutex_destroy(&gSyncManager->mutex); @@ -80,6 +93,8 @@ SSyncNode* syncStart(const SSyncInfo* pInfo) { return NULL; } + pNode->syncTimer = taosTmrStart(syncNodeTick, SYNC_TICK_TIMER, (void*)pInfo->vgId, gSyncManager->syncTimerManager); + // start raft pNode->raft.pNode = pNode; if (syncRaftStart(&pNode->raft, pInfo) != 0) { @@ -106,7 +121,8 @@ void syncStop(const SSyncNode* pNode) { return; } assert(*ppNode == pNode); - + taosTmrStop(pNode->syncTimer); + taosHashRemove(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId)); pthread_mutex_unlock(&gSyncManager->mutex); @@ -158,4 +174,19 @@ static void *syncWorkerMain(void *argv) { setThreadName("syncWorker"); return NULL; +} + +static void syncNodeTick(void *param, void *tmrId) { + SyncGroupId vgId = (SyncGroupId)param; + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId)); + if (ppNode == NULL) { + return; + } + SSyncNode *pNode = *ppNode; + + pthread_mutex_lock(&pNode->mutex); + syncRaftTick(&pNode->raft); + pthread_mutex_unlock(&pNode->mutex); + + pNode->syncTimer = taosTmrStart(syncNodeTick, SYNC_TICK_TIMER, (void*)pNode->vgId, gSyncManager->syncTimerManager); } \ No newline at end of file