enh(sync): add syncStartStandBy

This commit is contained in:
Minghao Li 2022-05-14 18:12:53 +08:00
parent 1cfa16fcd8
commit a2d43fb96f
8 changed files with 82 additions and 60 deletions

View File

@ -272,7 +272,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
cJSON* syncNode2Json(const SSyncNode* pSyncNode); cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig); void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig);
SSyncNode* syncNodeAcquire(int64_t rid); SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode); void syncNodeRelease(SSyncNode* pNode);

View File

@ -62,7 +62,6 @@ bool syncUtilUserPreCommit(tmsg_t msgType);
bool syncUtilUserCommit(tmsg_t msgType); bool syncUtilUserCommit(tmsg_t msgType);
bool syncUtilUserRollback(tmsg_t msgType); bool syncUtilUserRollback(tmsg_t msgType);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -15,11 +15,11 @@
#include "syncAppendEntries.h" #include "syncAppendEntries.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
#include "syncRaftCfg.h"
// TLA+ Spec // TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) == // HandleAppendEntriesRequest(i, j, m) ==
@ -200,7 +200,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL); assert(pRollBackEntry != NULL);
//if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
if (syncUtilUserRollback(pRollBackEntry->msgType)) { if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
@ -229,7 +229,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
//if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta; SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index; cbMeta.index = pAppendEntry->index;
@ -261,7 +261,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
//if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta; SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index; cbMeta.index = pAppendEntry->index;
@ -324,7 +324,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
//if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { // if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta; SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;
@ -338,7 +338,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// config change // config change
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
SSyncCfg newSyncCfg; SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0); ASSERT(ret == 0);
syncNodeUpdateConfig(ths, &newSyncCfg); syncNodeUpdateConfig(ths, &newSyncCfg);

View File

@ -16,10 +16,10 @@
#include "syncCommit.h" #include "syncCommit.h"
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "syncRaftCfg.h"
// \* Leader i advances its commitIndex. // \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses, // \* This is done as a separate step from handling AppendEntries responses,
@ -102,7 +102,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
//if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { // if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta; SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;
@ -114,12 +114,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
} }
// config change // config change
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
SSyncCfg newSyncCfg; SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0); ASSERT(ret == 0);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg); syncNodeUpdateConfig(pSyncNode, &newSyncCfg);
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);

View File

@ -20,6 +20,7 @@
#include "tglobal.h" #include "tglobal.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "os.h"
SSyncIO *gSyncIO = NULL; SSyncIO *gSyncIO = NULL;
@ -198,6 +199,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
{ {
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
snprintf(rpcInit.localFqdn, sizeof(rpcInit.localFqdn), "%s", "127.0.0.1");
rpcInit.localPort = io->myAddr.eps[0].port; rpcInit.localPort = io->myAddr.eps[0].port;
rpcInit.label = "SYNC-IO-SERVER"; rpcInit.label = "SYNC-IO-SERVER";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;

View File

@ -126,7 +126,7 @@ void syncStop(int64_t rid) {
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
int32_t ret = 0; int32_t ret = 0;
char *configChange = syncCfg2Str((SSyncCfg*)pSyncCfg); char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE; rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
rpcMsg.noResp = 1; rpcMsg.noResp = 1;
@ -198,7 +198,6 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
(pEpSet->numOfEps)++; (pEpSet->numOfEps)++;
sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
} }
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex; pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
@ -879,7 +878,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
return s; return s;
} }
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig) { void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
pSyncNode->pRaftCfg->cfg = *newConfig; pSyncNode->pRaftCfg->cfg = *newConfig;
int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
ASSERT(ret == 0); ASSERT(ret == 0);
@ -1266,7 +1265,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
//if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta; SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;
@ -1288,7 +1287,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
//if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta; SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;

View File

@ -58,14 +58,15 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;
code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char *errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t linuxErr = errno; int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno); const char* linuxErrMsg = strerror(errno);
sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
linuxErrMsg);
ASSERT(0); ASSERT(0);
} }
//assert(code == 0); // assert(code == 0);
walFsync(pWal, true); walFsync(pWal, true);
return code; return code;
@ -77,16 +78,17 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
int32_t code = walReadWithHandle(pWalHandle, index); int32_t code = walReadWithHandle(pWalHandle, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char *errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t linuxErr = errno; int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno); const char* linuxErrMsg = strerror(errno);
sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
linuxErrMsg);
ASSERT(0); ASSERT(0);
} }
//assert(walReadWithHandle(pWalHandle, index) == 0); // assert(walReadWithHandle(pWalHandle, index) == 0);
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
assert(pEntry != NULL); assert(pEntry != NULL);
@ -112,14 +114,15 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
//assert(walRollback(pWal, fromIndex) == 0); // assert(walRollback(pWal, fromIndex) == 0);
int32_t code = walRollback(pWal, fromIndex); int32_t code = walRollback(pWal, fromIndex);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char *errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t linuxErr = errno; int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno); const char* linuxErrMsg = strerror(errno);
sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
linuxErrMsg);
ASSERT(0); ASSERT(0);
} }
return 0; // to avoid compiler error return 0; // to avoid compiler error
@ -145,13 +148,13 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
//assert(walCommit(pWal, index) == 0); // assert(walCommit(pWal, index) == 0);
int32_t code = walCommit(pWal, index); int32_t code = walCommit(pWal, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char *errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t linuxErr = errno; int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno); const char* linuxErrMsg = strerror(errno);
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
ASSERT(0); ASSERT(0);
} }

View File

@ -5,6 +5,7 @@
#include "syncInt.h" #include "syncInt.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "wal.h" #include "wal.h"
#include "os.h"
void logTest() { void logTest() {
sTrace("--- sync log test: trace"); sTrace("--- sync log test: trace");
@ -26,6 +27,8 @@ void init() {
code = syncInit(); code = syncInit();
assert(code == 0); assert(code == 0);
sprintf(tsTempDir, "%s", ".");
} }
void cleanup() { walCleanUp(); } void cleanup() { walCleanUp(); }
@ -94,7 +97,7 @@ SWal* createWal(char* path, int32_t vgId) {
return pWal; return pWal;
} }
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) { int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy) {
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.rpcClient = gSyncIO->clientRpc;
@ -106,13 +109,22 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
syncInfo.pWal = pWal; syncInfo.pWal = pWal;
SSyncCfg* pCfg = &syncInfo.syncCfg; SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) { if (isStandBy) {
pCfg->nodeInfo[i].nodePort = gPorts[i]; pCfg->myIndex = 0;
taosGetFqdn(pCfg->nodeInfo[i].nodeFqdn); pCfg->replicaNum = 1;
// snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); pCfg->nodeInfo[0].nodePort = gPorts[myIndex];
taosGetFqdn(pCfg->nodeInfo[myIndex].nodeFqdn);
} else {
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = gPorts[i];
taosGetFqdn(pCfg->nodeInfo[i].nodeFqdn);
// snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
}
} }
int64_t rid = syncOpen(&syncInfo); int64_t rid = syncOpen(&syncInfo);
@ -136,7 +148,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
return rid; return rid;
} }
void usage(char* exe) { printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum \n", exe); } void usage(char* exe) { printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy \n", exe); }
SRpcMsg* createRpcMsg(int i, int count, int myIndex) { SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg)); SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg));
@ -151,14 +163,16 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
if (argc != 5) { if (argc != 6) {
usage(argv[0]); usage(argv[0]);
exit(-1); exit(-1);
} }
int32_t replicaNum = atoi(argv[1]); int32_t replicaNum = atoi(argv[1]);
int32_t myIndex = atoi(argv[2]); int32_t myIndex = atoi(argv[2]);
int32_t lastApplyIndex = atoi(argv[3]); int32_t lastApplyIndex = atoi(argv[3]);
int32_t writeRecordNum = atoi(argv[4]); int32_t writeRecordNum = atoi(argv[4]);
bool isStandBy = atoi(argv[5]);
gSnapshotLastApplyIndex = lastApplyIndex; gSnapshotLastApplyIndex = lastApplyIndex;
assert(replicaNum >= 1 && replicaNum <= 5); assert(replicaNum >= 1 && replicaNum <= 5);
@ -174,9 +188,14 @@ int main(int argc, char** argv) {
snprintf(walPath, sizeof(walPath), "%s_wal_replica%d_index%d", gDir, replicaNum, myIndex); snprintf(walPath, sizeof(walPath), "%s_wal_replica%d_index%d", gDir, replicaNum, myIndex);
SWal* pWal = createWal(walPath, gVgId); SWal* pWal = createWal(walPath, gVgId);
int64_t rid = createSyncNode(replicaNum, myIndex, gVgId, pWal, (char*)gDir); int64_t rid = createSyncNode(replicaNum, myIndex, gVgId, pWal, (char*)gDir, isStandBy);
assert(rid > 0); assert(rid > 0);
syncStart(rid);
if (isStandBy) {
syncStartStandBy(rid);
} else {
syncStart(rid);
}
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);