Merge pull request #10796 from taosdata/feature/3.0_mhli
Feature/3.0 mhli
This commit is contained in:
commit
d2366f6320
|
@ -163,15 +163,15 @@ typedef struct SyncClientRequest {
|
|||
} SyncClientRequest;
|
||||
|
||||
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
|
||||
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak);
|
||||
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak); // step 1
|
||||
void syncClientRequestDestroy(SyncClientRequest* pMsg);
|
||||
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg);
|
||||
char* syncClientRequestSerialize2(const SyncClientRequest* pMsg, uint32_t* len);
|
||||
SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len);
|
||||
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2
|
||||
void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg);
|
||||
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg); // step 3
|
||||
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg);
|
||||
char* syncClientRequest2Str(const SyncClientRequest* pMsg);
|
||||
|
||||
|
|
|
@ -40,12 +40,13 @@ typedef struct SSyncRaftEntry {
|
|||
} SSyncRaftEntry;
|
||||
|
||||
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen);
|
||||
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index);
|
||||
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4
|
||||
void syncEntryDestory(SSyncRaftEntry* pEntry);
|
||||
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len);
|
||||
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len);
|
||||
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5
|
||||
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6
|
||||
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
|
||||
char* syncEntry2Str(const SSyncRaftEntry* pEntry);
|
||||
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7
|
||||
|
||||
// for debug ----------------------
|
||||
void syncEntryPrint(const SSyncRaftEntry* pObj);
|
||||
|
|
|
@ -101,7 +101,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
assert(pMsg->dataLen >= 0);
|
||||
|
||||
SyncTerm localPreLogTerm = 0;
|
||||
if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
|
||||
assert(pEntry != NULL);
|
||||
localPreLogTerm = pEntry->term;
|
||||
|
@ -174,7 +174,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
pReply->destId = pMsg->srcId;
|
||||
pReply->term = ths->pRaftStore->currentTerm;
|
||||
pReply->success = true;
|
||||
pReply->matchIndex = pMsg->prevLogIndex + 1;
|
||||
|
||||
if (pMsg->dataLen > 0) {
|
||||
pReply->matchIndex = pMsg->prevLogIndex + 1;
|
||||
} else {
|
||||
pReply->matchIndex = pMsg->prevLogIndex;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
|
|
|
@ -39,7 +39,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
|
|||
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg);
|
||||
|
||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm);
|
||||
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
|
||||
ths->pRaftStore->currentTerm);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "syncIndexMgr.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncRaftLog.h"
|
||||
|
||||
// \* Leader i advances its commitIndex.
|
||||
// \* This is done as a separate step from handling AppendEntries responses,
|
||||
|
@ -42,4 +43,25 @@
|
|||
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
|
||||
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
|
||||
|
||||
// update commit index
|
||||
|
||||
if (pSyncNode->pFsm != NULL) {
|
||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||
SyncIndex endIndex = SYNC_INDEX_INVALID;
|
||||
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
|
||||
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
|
||||
assert(pEntry != NULL);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (pSyncNode->pFsm->FpCommitCb != NULL) {
|
||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -54,20 +54,22 @@ static void syncEnvTick(void *param, void *tmrId) {
|
|||
SSyncEnv *pSyncEnv = (SSyncEnv *)param;
|
||||
if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) {
|
||||
++(pSyncEnv->envTickTimerCounter);
|
||||
sTrace(
|
||||
"syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", "
|
||||
"envTickTimerMS:%d, tmrId:%p",
|
||||
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
|
||||
pSyncEnv->envTickTimerMS, tmrId);
|
||||
sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
|
||||
", envTickTimerCounter:%" PRIu64
|
||||
", "
|
||||
"envTickTimerMS:%d, tmrId:%p",
|
||||
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
|
||||
pSyncEnv->envTickTimerMS, tmrId);
|
||||
|
||||
// do something, tick ...
|
||||
taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer);
|
||||
} else {
|
||||
sTrace(
|
||||
"syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", "
|
||||
"envTickTimerMS:%d, tmrId:%p",
|
||||
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
|
||||
pSyncEnv->envTickTimerMS, tmrId);
|
||||
sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
|
||||
", envTickTimerCounter:%" PRIu64
|
||||
", "
|
||||
"envTickTimerMS:%d, tmrId:%p",
|
||||
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
|
||||
pSyncEnv->envTickTimerMS, tmrId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io);
|
|||
static int32_t syncIOStartInternal(SSyncIO *io);
|
||||
static int32_t syncIOStopInternal(SSyncIO *io);
|
||||
|
||||
static void *syncIOConsumerFunc(void *param);
|
||||
static void * syncIOConsumerFunc(void *param);
|
||||
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
|
||||
|
@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
|
|||
}
|
||||
|
||||
static void *syncIOConsumerFunc(void *param) {
|
||||
SSyncIO *io = param;
|
||||
SSyncIO * io = param;
|
||||
STaosQall *qall;
|
||||
SRpcMsg *pRpcMsg, rpcMsg;
|
||||
SRpcMsg * pRpcMsg, rpcMsg;
|
||||
qall = taosAllocateQall();
|
||||
|
||||
while (1) {
|
||||
|
|
|
@ -731,14 +731,44 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
|
|||
int32_t ret = 0;
|
||||
syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);
|
||||
|
||||
SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
|
||||
SyncTerm term = ths->pRaftStore->currentTerm;
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
|
||||
assert(pEntry != NULL);
|
||||
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
|
||||
ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
|
||||
|
||||
// only myself, maybe commit
|
||||
syncNodeMaybeAdvanceCommitIndex(ths);
|
||||
|
||||
// start replicate right now!
|
||||
syncNodeReplicate(ths);
|
||||
syncEntryDestory(pEntry);
|
||||
|
||||
// pre commit
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
|
||||
}
|
||||
}
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
|
||||
} else {
|
||||
// ths->pFsm->FpCommitCb(-1)
|
||||
// pre commit
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1);
|
||||
}
|
||||
}
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
}
|
||||
|
||||
syncEntryDestory(pEntry);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
|
|||
free(s);
|
||||
|
||||
} else {
|
||||
pRoot = syncRpcUnknownMsg2Json();
|
||||
pRoot = cJSON_CreateObject();
|
||||
char* s;
|
||||
s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen);
|
||||
cJSON_AddStringToObject(pRoot, "pCont", s);
|
||||
|
@ -608,6 +608,7 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) {
|
|||
return pMsg;
|
||||
}
|
||||
|
||||
// step 1. original SRpcMsg => SyncClientRequest, add seqNum, isWeak
|
||||
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) {
|
||||
SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen);
|
||||
pMsg->originalRpcType = pOriginalRpcMsg->msgType;
|
||||
|
@ -652,6 +653,7 @@ SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len)
|
|||
return pMsg;
|
||||
}
|
||||
|
||||
// step 2. SyncClientRequest => RpcMsg, to queue
|
||||
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
|
@ -664,6 +666,7 @@ void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg
|
|||
syncClientRequestDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
// step 3. RpcMsg => SyncClientRequest, from queue
|
||||
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncClientRequest* pMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
return pMsg;
|
||||
|
|
|
@ -26,6 +26,7 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
|
|||
return pEntry;
|
||||
}
|
||||
|
||||
// step 4. SyncClientRequest => SSyncRaftEntry, add term, index
|
||||
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
|
||||
assert(pEntry != NULL);
|
||||
|
@ -48,6 +49,7 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) {
|
|||
}
|
||||
}
|
||||
|
||||
// step 5. SSyncRaftEntry => bin, to raft log
|
||||
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
|
||||
char* buf = malloc(pEntry->bytes);
|
||||
assert(buf != NULL);
|
||||
|
@ -58,6 +60,7 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
|
|||
return buf;
|
||||
}
|
||||
|
||||
// step 6. bin => SSyncRaftEntry, from raft log
|
||||
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SSyncRaftEntry* pEntry = malloc(bytes);
|
||||
|
@ -106,6 +109,15 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) {
|
|||
return serialized;
|
||||
}
|
||||
|
||||
// step 7. SSyncRaftEntry => original SRpcMsg, commit to user, delete seqNum, isWeak, term, index
|
||||
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pEntry->originalRpcType;
|
||||
pRpcMsg->contLen = pEntry->dataLen;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
// for debug ----------------------
|
||||
void syncEntryPrint(const SSyncRaftEntry* pObj) {
|
||||
char* serialized = syncEntry2Str(pObj);
|
||||
|
|
|
@ -73,7 +73,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
|
|||
SyncAppendEntries* pMsg = NULL;
|
||||
SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex);
|
||||
if (pEntry != NULL) {
|
||||
SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes);
|
||||
pMsg = syncAppendEntriesBuild(pEntry->bytes);
|
||||
assert(pMsg != NULL);
|
||||
|
||||
// add pEntry into msg
|
||||
uint32_t len;
|
||||
|
@ -86,9 +87,11 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
|
|||
|
||||
} else {
|
||||
// maybe overflow, send empty record
|
||||
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0);
|
||||
pMsg = syncAppendEntriesBuild(0);
|
||||
assert(pMsg != NULL);
|
||||
}
|
||||
|
||||
assert(pMsg != NULL);
|
||||
pMsg->srcId = pSyncNode->myRaftId;
|
||||
pMsg->destId = *pDestId;
|
||||
pMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||
|
|
|
@ -41,7 +41,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
|||
syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg);
|
||||
|
||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm);
|
||||
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
|
||||
ths->pRaftStore->currentTerm);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ add_executable(syncRpcMsgTest "")
|
|||
add_executable(syncPingTimerTest2 "")
|
||||
add_executable(syncPingSelfTest "")
|
||||
add_executable(syncElectTest "")
|
||||
add_executable(syncEncodeTest "")
|
||||
|
||||
|
||||
target_sources(syncTest
|
||||
|
@ -150,6 +151,10 @@ target_sources(syncElectTest
|
|||
PRIVATE
|
||||
"syncElectTest.cpp"
|
||||
)
|
||||
target_sources(syncEncodeTest
|
||||
PRIVATE
|
||||
"syncEncodeTest.cpp"
|
||||
)
|
||||
|
||||
|
||||
target_include_directories(syncTest
|
||||
|
@ -307,6 +312,11 @@ target_include_directories(syncElectTest
|
|||
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncEncodeTest
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
|
||||
target_link_libraries(syncTest
|
||||
|
@ -429,6 +439,10 @@ target_link_libraries(syncElectTest
|
|||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncEncodeTest
|
||||
sync
|
||||
gtest_main
|
||||
)
|
||||
|
||||
|
||||
enable_testing()
|
||||
|
|
|
@ -116,7 +116,9 @@ int main(int argc, char** argv) {
|
|||
|
||||
//---------------------------
|
||||
while (1) {
|
||||
sTrace("while 1 sleep, state: %d, %s", gSyncNode->state, syncUtilState2String(gSyncNode->state));
|
||||
sTrace("while 1 sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d",
|
||||
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock,
|
||||
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,236 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <stdio.h>
|
||||
#include "syncEnv.h"
|
||||
#include "syncIO.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncMessage.h"
|
||||
#include "syncRaftEntry.h"
|
||||
#include "syncRaftLog.h"
|
||||
#include "syncRaftStore.h"
|
||||
#include "syncUtil.h"
|
||||
#include "syncRaftLog.h"
|
||||
|
||||
void logTest() {
|
||||
sTrace("--- sync log test: trace");
|
||||
sDebug("--- sync log test: debug");
|
||||
sInfo("--- sync log test: info");
|
||||
sWarn("--- sync log test: warn");
|
||||
sError("--- sync log test: error");
|
||||
sFatal("--- sync log test: fatal");
|
||||
}
|
||||
|
||||
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
|
||||
int32_t replicaNum = 1;
|
||||
int32_t myIndex = 0;
|
||||
|
||||
SRaftId ids[TSDB_MAX_REPLICA];
|
||||
SSyncInfo syncInfo;
|
||||
SSyncFSM* pFsm;
|
||||
SWal* pWal;
|
||||
SSyncNode* pSyncNode;
|
||||
|
||||
SSyncNode* syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||
|
||||
int code = walInit();
|
||||
assert(code == 0);
|
||||
SWalCfg walCfg;
|
||||
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||
walCfg.vgId = syncInfo.vgId;
|
||||
walCfg.fsyncPeriod = 1000;
|
||||
walCfg.retentionPeriod = 1000;
|
||||
walCfg.rollPeriod = 1000;
|
||||
walCfg.retentionSize = 1000;
|
||||
walCfg.segSize = 1000;
|
||||
walCfg.level = TAOS_WAL_FSYNC;
|
||||
pWal = walOpen("./wal_test", &walCfg);
|
||||
assert(pWal != NULL);
|
||||
|
||||
syncInfo.pWal = pWal;
|
||||
|
||||
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
||||
pCfg->myIndex = myIndex;
|
||||
pCfg->replicaNum = replicaNum;
|
||||
|
||||
for (int i = 0; i < replicaNum; ++i) {
|
||||
pCfg->nodeInfo[i].nodePort = ports[i];
|
||||
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
|
||||
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||
}
|
||||
|
||||
pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
return pSyncNode;
|
||||
}
|
||||
|
||||
SSyncNode* syncInitTest() { return syncNodeInit(); }
|
||||
|
||||
void logStoreTest() {
|
||||
logStorePrint2((char*)"logStoreTest", pSyncNode->pLogStore);
|
||||
|
||||
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_INVALID);
|
||||
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
int32_t dataLen = 10;
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
|
||||
assert(pEntry != NULL);
|
||||
pEntry->msgType = 1;
|
||||
pEntry->originalRpcType = 2;
|
||||
pEntry->seqNum = 3;
|
||||
pEntry->isWeak = true;
|
||||
pEntry->term = 100;
|
||||
pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
|
||||
snprintf(pEntry->data, dataLen, "value%d", i);
|
||||
|
||||
// syncEntryPrint2((char*)"write entry:", pEntry);
|
||||
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
|
||||
syncEntryDestory(pEntry);
|
||||
|
||||
if (i == 0) {
|
||||
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_BEGIN);
|
||||
}
|
||||
}
|
||||
logStorePrint2((char*)"after appendEntry", pSyncNode->pLogStore);
|
||||
|
||||
pSyncNode->pLogStore->truncate(pSyncNode->pLogStore, 3);
|
||||
logStorePrint2((char*)"after truncate 3", pSyncNode->pLogStore);
|
||||
}
|
||||
|
||||
void initRaftId(SSyncNode* pSyncNode) {
|
||||
for (int i = 0; i < replicaNum; ++i) {
|
||||
ids[i] = pSyncNode->replicasId[i];
|
||||
char* s = syncUtilRaftId2Str(&ids[i]);
|
||||
printf("raftId[%d] : %s\n", i, s);
|
||||
free(s);
|
||||
}
|
||||
}
|
||||
|
||||
SRpcMsg *step0() {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
|
||||
memset(pMsg, 0, sizeof(SRpcMsg));
|
||||
pMsg->msgType = 9999;
|
||||
pMsg->contLen = 32;
|
||||
pMsg->pCont = malloc(pMsg->contLen);
|
||||
snprintf((char *)(pMsg->pCont), pMsg->contLen, "hello, world");
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
SyncClientRequest *step1(const SRpcMsg *pMsg) {
|
||||
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
SRpcMsg *step2(const SyncClientRequest *pMsg) {
|
||||
SRpcMsg *pRetMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
|
||||
syncClientRequest2RpcMsg(pMsg, pRetMsg);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
SyncClientRequest *step3(const SRpcMsg *pMsg) {
|
||||
SyncClientRequest *pRetMsg = syncClientRequestFromRpcMsg2(pMsg);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
SSyncRaftEntry *step4(const SyncClientRequest *pMsg) {
|
||||
SSyncRaftEntry *pRetMsg = syncEntryBuild2((SyncClientRequest *)pMsg, 100, 0);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
char *step5(const SSyncRaftEntry *pMsg, uint32_t *len) {
|
||||
char *pRetMsg = syncEntrySerialize(pMsg, len);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
SSyncRaftEntry *step6(const char *pMsg, uint32_t len) {
|
||||
SSyncRaftEntry *pRetMsg = syncEntryDeserialize(pMsg, len);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
SRpcMsg *step7(const SSyncRaftEntry *pMsg) {
|
||||
SRpcMsg *pRetMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
|
||||
syncEntry2OriginalRpc(pMsg, pRetMsg);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = 143 + 64;
|
||||
void logTest();
|
||||
|
||||
myIndex = 0;
|
||||
if (argc >= 2) {
|
||||
myIndex = atoi(argv[1]);
|
||||
}
|
||||
|
||||
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
|
||||
assert(ret == 0);
|
||||
|
||||
ret = syncEnvStart();
|
||||
assert(ret == 0);
|
||||
|
||||
taosRemoveDir("./wal_test");
|
||||
|
||||
// step0
|
||||
SRpcMsg *pMsg0 = step0();
|
||||
syncRpcMsgPrint2((char *)"==step0==", pMsg0);
|
||||
|
||||
// step1
|
||||
SyncClientRequest *pMsg1 = step1(pMsg0);
|
||||
syncClientRequestPrint2((char *)"==step1==", pMsg1);
|
||||
|
||||
// step2
|
||||
SRpcMsg *pMsg2 = step2(pMsg1);
|
||||
syncRpcMsgPrint2((char *)"==step2==", pMsg2);
|
||||
|
||||
// step3
|
||||
SyncClientRequest *pMsg3 = step3(pMsg2);
|
||||
syncClientRequestPrint2((char *)"==step3==", pMsg3);
|
||||
|
||||
// step4
|
||||
SSyncRaftEntry *pMsg4 = step4(pMsg3);
|
||||
syncEntryPrint2((char *)"==step4==", pMsg4);
|
||||
|
||||
// log, relog
|
||||
SSyncNode* pSyncNode = syncNodeInit();
|
||||
assert(pSyncNode != NULL);
|
||||
SSyncRaftEntry *pEntry = pMsg4;
|
||||
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
|
||||
SSyncRaftEntry *pEntry2 = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, pEntry->index);
|
||||
syncEntryPrint2((char *)"==pEntry2==", pEntry2);
|
||||
|
||||
// step5
|
||||
uint32_t len;
|
||||
char * pMsg5 = step5(pMsg4, &len);
|
||||
char * s = syncUtilprintBin(pMsg5, len);
|
||||
printf("==step5== [%s] \n", s);
|
||||
free(s);
|
||||
|
||||
// step6
|
||||
SSyncRaftEntry *pMsg6 = step6(pMsg5, len);
|
||||
syncEntryPrint2((char *)"==step6==", pMsg6);
|
||||
|
||||
// step7
|
||||
SRpcMsg *pMsg7 = step7(pMsg6);
|
||||
syncRpcMsgPrint2((char *)"==step7==", pMsg7);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue