From 7129645023bb798bb617f00a0215c8598997239c Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 21 Mar 2022 16:28:50 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncInt.h | 1 - source/libs/sync/src/syncMain.c | 52 +++++++++++++++++++++++---- source/libs/sync/test/syncRefTest.cpp | 4 +-- 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8e36424f19..ee009d6428 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -142,7 +142,6 @@ typedef struct SSyncNode { SRaftId leaderCache; // life cycle - int32_t refCount; int64_t rid; // tla+ server vars diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 829526512b..4b3283fc53 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -31,6 +31,7 @@ #include "syncTimeout.h" #include "syncUtil.h" #include "syncVoteMgr.h" +#include "tref.h" static int32_t tsNodeRefId = -1; @@ -44,31 +45,57 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); + +// life cycle +static void syncFreeNode(void* param); // --------------------------------- int32_t syncInit() { - int32_t ret = syncEnvStart(); + int32_t ret; + tsNodeRefId = taosOpenRef(200, syncFreeNode); + if (tsNodeRefId < 0) { + sError("failed to init node ref"); + syncCleanUp(); + ret = -1; + } else { + ret = syncEnvStart(); + } + return ret; } void syncCleanUp() { int32_t ret = syncEnvStop(); assert(ret == 0); + + if (tsNodeRefId != -1) { + taosCloseRef(tsNodeRefId); + tsNodeRefId = -1; + } } int64_t syncStart(const SSyncInfo* pSyncInfo) { - int32_t ret = 0; SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); assert(pSyncNode != NULL); - // todo : return ref id - return ret; + pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode); + if (pSyncNode->rid < 0) { + syncFreeNode(pSyncNode); + return -1; + } + + return pSyncNode->rid; } void syncStop(int64_t rid) { - // todo : get pointer from rid - SSyncNode* pSyncNode = NULL; + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return; + } syncNodeClose(pSyncNode); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + taosRemoveRef(tsNodeRefId, rid); } int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { @@ -155,7 +182,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum); pSyncNode->leaderCache = EMPTY_RAFT_ID; - // init life cycle + // init life cycle outside // TLA+ Spec // InitHistoryVars == /\ elections = {} @@ -444,6 +471,10 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache); cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache); + // life cycle + snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->rid); + cJSON_AddStringToObject(pRoot, "rid", u64buf); + // tla+ server vars cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state)); @@ -813,3 +844,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg syncEntryDestory(pEntry); return ret; } + +static void syncFreeNode(void* param) { + SSyncNode* pNode = param; + syncNodePrint2((char*)"==syncFreeNode==", pNode); + + free(pNode); +} \ No newline at end of file diff --git a/source/libs/sync/test/syncRefTest.cpp b/source/libs/sync/test/syncRefTest.cpp index 8e6061a8ae..e5606af14f 100644 --- a/source/libs/sync/test/syncRefTest.cpp +++ b/source/libs/sync/test/syncRefTest.cpp @@ -82,7 +82,7 @@ void stop(int64_t rid) { } void *func(void *param) { - int64_t rid = (int64_t)param; + int64_t rid = (int64_t)param; int32_t ms = taosRand() % 10000; taosMsleep(ms); @@ -115,7 +115,7 @@ int main() { for (int i = 0; i < 20; ++i) { pthread_t tid; - pthread_create(&tid, NULL, func, (void*)rid); + pthread_create(&tid, NULL, func, (void *)rid); } int32_t ms = taosRand() % 10000;