From 49cf252495b0f1b782ab73ba7b353fa34b0d689d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 21 Apr 2022 15:24:50 +0800 Subject: [PATCH] add syncReplicateTest.cpp --- source/libs/sync/test/syncReplicateTest.cpp | 33 ++++++++++----------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index 14132848b4..0e94498a38 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -30,7 +30,7 @@ void init() { void cleanup() { walCleanUp(); } -void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; if (pFsm->FpGetSnapshot != NULL) { SSnapshot snapshot; @@ -42,7 +42,7 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); + syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index); } @@ -117,11 +117,9 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* int64_t rid = syncOpen(&syncInfo); assert(rid > 0); - syncStart(rid); SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); assert(pSyncNode != NULL); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; @@ -131,22 +129,22 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; gSyncIO->pSyncNode = pSyncNode; - - syncNodeStart(pSyncNode); + syncNodeRelease(pSyncNode); return rid; } void usage(char* exe) { printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum \n", exe); } -SRpcMsg *createRpcMsg(int i, int count, int myIndex) { - SRpcMsg *pMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); +SRpcMsg* createRpcMsg(int i, int count, int myIndex) { + SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg)); memset(pMsg, 0, sizeof(SRpcMsg)); pMsg->msgType = 9999; - pMsg->contLen = 128; - pMsg->pCont = taosMemoryMalloc(pMsg->contLen); - snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs()); + pMsg->contLen = 256; + pMsg->pCont = rpcMallocCont(pMsg->contLen); + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs()); return pMsg; } @@ -180,28 +178,29 @@ int main(int argc, char** argv) { assert(rid > 0); syncStart(rid); - SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); + SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); assert(pSyncNode != NULL); //--------------------------- int32_t alreadySend = 0; while (1) { char* s = syncNode2SimpleStr(pSyncNode); - sTrace("%s", s); if (alreadySend < writeRecordNum) { - SRpcMsg *pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); - int32_t ret = syncPropose(rid, pRpcMsg, false); + SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); + int32_t ret = syncPropose(rid, pRpcMsg, false); if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { - sTrace("%s value%d write not leader", s, alreadySend); + sTrace("%s value%d write not leader", s, alreadySend); } else { assert(ret == 0); sTrace("%s value%d write ok", s, alreadySend); } alreadySend++; - //rpcFreeCont(pRpcMsg->pCont); + rpcFreeCont(pRpcMsg->pCont); taosMemoryFree(pRpcMsg); + } else { + sTrace("%s", s); } taosMsleep(1000);