Merge pull request #10872 from taosdata/feature/3.0_mhli

Feature/3.0 mhli
This commit is contained in:
Li Minghao 2022-03-21 17:22:18 +08:00 committed by GitHub
commit 990a6049bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 224 additions and 20 deletions

View File

@ -157,7 +157,8 @@ void syncCleanUp();
int64_t syncStart(const SSyncInfo* pSyncInfo); int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid); void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // use this function
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // just for compatibility
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole);

View File

@ -142,7 +142,6 @@ typedef struct SSyncNode {
SRaftId leaderCache; SRaftId leaderCache;
// life cycle // life cycle
int32_t refCount;
int64_t rid; int64_t rid;
// tla+ server vars // tla+ server vars

View File

@ -31,6 +31,7 @@
#include "syncTimeout.h" #include "syncTimeout.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
#include "tref.h"
static int32_t tsNodeRefId = -1; static int32_t tsNodeRefId = -1;
@ -44,31 +45,57 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
// life cycle
static void syncFreeNode(void* param);
// --------------------------------- // ---------------------------------
int32_t syncInit() { int32_t syncInit() {
int32_t ret = syncEnvStart(); int32_t ret;
tsNodeRefId = taosOpenRef(200, syncFreeNode);
if (tsNodeRefId < 0) {
sError("failed to init node ref");
syncCleanUp();
ret = -1;
} else {
ret = syncEnvStart();
}
return ret; return ret;
} }
void syncCleanUp() { void syncCleanUp() {
int32_t ret = syncEnvStop(); int32_t ret = syncEnvStop();
assert(ret == 0); assert(ret == 0);
if (tsNodeRefId != -1) {
taosCloseRef(tsNodeRefId);
tsNodeRefId = -1;
}
} }
int64_t syncStart(const SSyncInfo* pSyncInfo) { int64_t syncStart(const SSyncInfo* pSyncInfo) {
int32_t ret = 0;
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
// todo : return ref id pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
return ret; if (pSyncNode->rid < 0) {
syncFreeNode(pSyncNode);
return -1;
}
return pSyncNode->rid;
} }
void syncStop(int64_t rid) { void syncStop(int64_t rid) {
// todo : get pointer from rid SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
SSyncNode* pSyncNode = NULL; if (pSyncNode == NULL) {
return;
}
syncNodeClose(pSyncNode); syncNodeClose(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
taosRemoveRef(tsNodeRefId, rid);
} }
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
@ -76,11 +103,16 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
return ret; return ret;
} }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0; int32_t ret = 0;
// todo : get pointer from rid // todo : get pointer from rid
SSyncNode* pSyncNode = NULL; SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return -1;
}
assert(rid == pSyncNode->rid);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak); SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
@ -93,6 +125,13 @@ int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state)); sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state));
ret = -1; // todo : need define err code !! ret = -1; // todo : need define err code !!
} }
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
}
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret; return ret;
} }
@ -155,7 +194,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum); pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum);
pSyncNode->leaderCache = EMPTY_RAFT_ID; pSyncNode->leaderCache = EMPTY_RAFT_ID;
// init life cycle // init life cycle outside
// TLA+ Spec // TLA+ Spec
// InitHistoryVars == /\ elections = {} // InitHistoryVars == /\ elections = {}
@ -444,6 +483,10 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache); cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache); cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);
// life cycle
snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->rid);
cJSON_AddStringToObject(pRoot, "rid", u64buf);
// tla+ server vars // tla+ server vars
cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state)); cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
@ -813,3 +856,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
return ret; return ret;
} }
static void syncFreeNode(void* param) {
SSyncNode* pNode = param;
syncNodePrint2((char*)"==syncFreeNode==", pNode);
free(pNode);
}

View File

@ -31,6 +31,7 @@ add_executable(syncElectTest "")
add_executable(syncEncodeTest "") add_executable(syncEncodeTest "")
add_executable(syncWriteTest "") add_executable(syncWriteTest "")
add_executable(syncReplicateTest "") add_executable(syncReplicateTest "")
add_executable(syncRefTest "")
target_sources(syncTest target_sources(syncTest
@ -165,6 +166,10 @@ target_sources(syncReplicateTest
PRIVATE PRIVATE
"syncReplicateTest.cpp" "syncReplicateTest.cpp"
) )
target_sources(syncRefTest
PRIVATE
"syncRefTest.cpp"
)
target_include_directories(syncTest target_include_directories(syncTest
@ -337,6 +342,11 @@ target_include_directories(syncReplicateTest
"${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncRefTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest target_link_libraries(syncTest
@ -471,6 +481,10 @@ target_link_libraries(syncReplicateTest
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncRefTest
sync
gtest_main
)
enable_testing() enable_testing()

View File

@ -116,9 +116,10 @@ int main(int argc, char** argv) {
//--------------------------- //---------------------------
while (1) { while (1) {
sTrace("elect sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", sTrace(
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock, "elect sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d",
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm,
gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
} }
return 0; return 0;

View File

@ -0,0 +1,135 @@
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "tref.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
static void syncFreeObj(void *param);
int32_t init();
void cleanup();
int64_t start();
void stop(int64_t rid);
static int32_t tsNodeRefId = -1;
int g = 100;
typedef struct SyncObj {
int64_t rid;
void * data;
char name[32];
int counter;
} SyncObj;
static void syncFreeObj(void *param) {
SyncObj *pObj = (SyncObj *)param;
printf("syncFreeObj name:%s rid:%ld \n", pObj->name, pObj->rid);
free(pObj);
}
int32_t init() {
tsNodeRefId = taosOpenRef(200, syncFreeObj);
if (tsNodeRefId < 0) {
sError("failed to init node ref");
cleanup();
return -1;
}
return 0;
}
void cleanup() {
if (tsNodeRefId != -1) {
taosCloseRef(tsNodeRefId);
tsNodeRefId = -1;
}
}
int64_t start() {
SyncObj *pObj = (SyncObj *)malloc(sizeof(SyncObj));
assert(pObj != NULL);
pObj->data = &g;
snprintf(pObj->name, sizeof(pObj->name), "%s", "hello");
pObj->rid = taosAddRef(tsNodeRefId, pObj);
if (pObj->rid < 0) {
syncFreeObj(pObj);
return -1;
}
printf("start name:%s rid:%ld \n", pObj->name, pObj->rid);
return pObj->rid;
}
void stop(int64_t rid) {
SyncObj *pObj = (SyncObj *)taosAcquireRef(tsNodeRefId, rid);
if (pObj == NULL) return;
printf("stop name:%s rid:%ld \n", pObj->name, pObj->rid);
pObj->data = NULL;
taosReleaseRef(tsNodeRefId, pObj->rid);
taosRemoveRef(tsNodeRefId, rid);
}
void *func(void *param) {
int64_t rid = (int64_t)param;
int32_t ms = taosRand() % 10000;
taosMsleep(ms);
SyncObj *pObj = (SyncObj *)taosAcquireRef(tsNodeRefId, rid);
if (pObj != NULL) {
printf("taosAcquireRef sleep:%d, name:%s, rid:%ld \n", ms, pObj->name, pObj->rid);
} else {
printf("taosAcquireRef sleep:%d, NULL! \n", ms);
}
taosReleaseRef(tsNodeRefId, rid);
return NULL;
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
taosSeedRand(taosGetTimestampSec());
int32_t ret;
ret = init();
assert(ret == 0);
int64_t rid = start();
assert(rid > 0);
for (int i = 0; i < 20; ++i) {
TdThread tid;
taosThreadCreate(&tid, NULL, func, (void *)rid);
}
int32_t ms = taosRand() % 10000;
taosMsleep(ms);
printf("main sleep %d, stop and clean ", ms);
stop(rid);
cleanup();
while (1) {
taosMsleep(1000);
printf("sleep 1 ... \n");
}
return 0;
}

View File

@ -172,15 +172,19 @@ int main(int argc, char **argv) {
gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg); gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
sTrace("replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", sTrace(
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock, "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); "electTimerMS:%d",
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm,
gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
} }
while (1) { while (1) {
sTrace("replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", sTrace(
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock, "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); "electTimerMS:%d",
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm,
gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
taosMsleep(1000); taosMsleep(1000);
} }