add syncReplicateTest.cpp

This commit is contained in:
Minghao Li 2022-04-21 15:24:50 +08:00
parent a6b949831a
commit 49cf252495
1 changed files with 16 additions and 17 deletions

View File

@ -30,7 +30,7 @@ void init() {
void cleanup() { walCleanUp(); } void cleanup() { walCleanUp(); }
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshot != NULL) {
SSnapshot snapshot; SSnapshot snapshot;
@ -42,7 +42,7 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} else { } else {
sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index); sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index);
} }
@ -117,11 +117,9 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
int64_t rid = syncOpen(&syncInfo); int64_t rid = syncOpen(&syncInfo);
assert(rid > 0); assert(rid > 0);
syncStart(rid);
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
@ -131,22 +129,22 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
gSyncIO->pSyncNode = pSyncNode; gSyncIO->pSyncNode = pSyncNode;
syncNodeRelease(pSyncNode);
syncNodeStart(pSyncNode);
return rid; return rid;
} }
void usage(char* exe) { printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum \n", exe); } void usage(char* exe) { printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum \n", exe); }
SRpcMsg *createRpcMsg(int i, int count, int myIndex) { SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
SRpcMsg *pMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg)); memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999; pMsg->msgType = 9999;
pMsg->contLen = 128; pMsg->contLen = 256;
pMsg->pCont = taosMemoryMalloc(pMsg->contLen); pMsg->pCont = rpcMallocCont(pMsg->contLen);
snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs()); snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs());
return pMsg; return pMsg;
} }
@ -180,17 +178,16 @@ int main(int argc, char** argv) {
assert(rid > 0); assert(rid > 0);
syncStart(rid); syncStart(rid);
SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
//--------------------------- //---------------------------
int32_t alreadySend = 0; int32_t alreadySend = 0;
while (1) { while (1) {
char* s = syncNode2SimpleStr(pSyncNode); char* s = syncNode2SimpleStr(pSyncNode);
sTrace("%s", s);
if (alreadySend < writeRecordNum) { if (alreadySend < writeRecordNum) {
SRpcMsg *pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false); int32_t ret = syncPropose(rid, pRpcMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
sTrace("%s value%d write not leader", s, alreadySend); sTrace("%s value%d write not leader", s, alreadySend);
@ -200,8 +197,10 @@ int main(int argc, char** argv) {
} }
alreadySend++; alreadySend++;
//rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
taosMemoryFree(pRpcMsg); taosMemoryFree(pRpcMsg);
} else {
sTrace("%s", s);
} }
taosMsleep(1000); taosMsleep(1000);