diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index f2c8c916c8..983c695786 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -29,6 +29,7 @@ extern "C" { typedef struct SMnode SMnode; typedef struct { + bool isStandBy; bool deploy; int8_t replica; int8_t selfIndex; diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 94d41a7416..a9e2f5a71a 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -44,12 +44,9 @@ extern "C" { } #define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t) - #define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t) - #define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t) - -#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t) +#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t) #define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \ { \ @@ -66,11 +63,8 @@ extern "C" { } #define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t) - #define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t) - #define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t) - #define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t) #define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \ @@ -356,6 +350,14 @@ typedef struct SSdb { SdbDecodeFp decodeFps[SDB_MAX]; } SSdb; +typedef struct SSdbIter { + TdFilePtr file; + int64_t readlen; +} SSdbIter; + +SSdbIter *sdbIterInit(SSdb *pSdb); +SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len); + #ifdef __cplusplus } #endif diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 2bf678fa48..5ffcbb7a09 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -82,14 +82,29 @@ typedef struct SFsmCbMeta { SyncTerm currentTerm; } SFsmCbMeta; +typedef struct SReConfigCbMeta { + int32_t code; + SyncIndex index; + SyncTerm term; + SyncTerm currentTerm; +} SReConfigCbMeta; + typedef struct SSyncFSM { void* data; + void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); - void (*FpRestoreFinish)(struct SSyncFSM* pFsm); + + void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); - int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); + void* (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len); + int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len); + + void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta); + + // int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); + } SSyncFSM; // abstract definition of log store in raft diff --git a/include/util/tdef.h b/include/util/tdef.h index 808fcf0152..59b72d8785 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -428,11 +428,11 @@ enum { }; #define DEFAULT_HANDLE 0 -#define MNODE_HANDLE -1 -#define QNODE_HANDLE -2 -#define SNODE_HANDLE -3 -#define VNODE_HANDLE -4 -#define BNODE_HANDLE -5 +#define MNODE_HANDLE 1 +#define QNODE_HANDLE -1 +#define SNODE_HANDLE -2 +#define VNODE_HANDLE -3 +#define BNODE_HANDLE -4 #define TSDB_CONFIG_OPTION_LEN 16 #define TSDB_CONIIG_VALUE_LEN 48 diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 43113d05af..22513bc2e4 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -55,9 +55,31 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { pOption->msgCb = pMgmt->msgCb; - pOption->selfIndex = pMgmt->selfIndex; - pOption->replica = pMgmt->replica; - memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); + + if (pMgmt->replica > 1) { + pOption->replica = 1; + pOption->selfIndex = 0; + SReplica *pReplica = &pOption->replicas[0]; + for (int32_t i = 0; i < pMgmt->replica; ++i) { + if (pMgmt->replicas[i].id == pMgmt->pData->dnodeId) { + pReplica->id = pMgmt->replicas[i].id; + pReplica->port = pMgmt->replicas[i].port; + memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN); + } + } + pMgmt->selfIndex = pOption->selfIndex; + pOption->isStandBy = 1; + } else { + pOption->replica = pMgmt->replica; + pOption->selfIndex = -1; + memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); + for (int32_t i = 0; i < pOption->replica; ++i) { + if (pOption->replicas[i].id == pMgmt->pData->dnodeId) { + pOption->selfIndex = i; + } + } + pMgmt->selfIndex = pOption->selfIndex; + } pOption->deploy = false; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 59d0c491a1..71cc2d2693 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -61,6 +61,11 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { dTrace("msg:%p, get from mnode-sync queue", pMsg); pMsg->info.node = pMgmt->pMnode; + + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = ntohl(pHead->contLen); + pHead->vgId = ntohl(pHead->vgId); + int32_t code = mndProcessSyncMsg(pMsg); dTrace("msg:%p, is freed, code:0x%x", pMsg, code); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 5a1653b937..fcbb26205d 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -76,11 +76,12 @@ typedef struct { typedef struct { SWal *pWal; - int32_t errCode; - bool restored; sem_t syncSem; int64_t sync; ESyncState state; + bool isStandBy; + bool restored; + int32_t errCode; } SSyncMgmt; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 7f86eb8b32..0bb89a1d91 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -39,14 +39,16 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); int32_t mndInitMnode(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_MNODE, - .keyType = SDB_KEY_INT32, - .deployFp = (SdbDeployFp)mndCreateDefaultMnode, - .encodeFp = (SdbEncodeFp)mndMnodeActionEncode, - .decodeFp = (SdbDecodeFp)mndMnodeActionDecode, - .insertFp = (SdbInsertFp)mndMnodeActionInsert, - .updateFp = (SdbUpdateFp)mndMnodeActionUpdate, - .deleteFp = (SdbDeleteFp)mndMnodeActionDelete}; + SSdbTable table = { + .sdbType = SDB_MNODE, + .keyType = SDB_KEY_INT32, + .deployFp = (SdbDeployFp)mndCreateDefaultMnode, + .encodeFp = (SdbEncodeFp)mndMnodeActionEncode, + .decodeFp = (SdbDecodeFp)mndMnodeActionDecode, + .insertFp = (SdbInsertFp)mndMnodeActionInsert, + .updateFp = (SdbUpdateFp)mndMnodeActionUpdate, + .deleteFp = (SdbDeleteFp)mndMnodeActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index a4e6cfd5ca..0858162959 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,22 +17,26 @@ #include "mndSync.h" #include "mndTrans.h" -int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } +int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + + return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); +} int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SMnode *pMnode = pFsm->data; - SSdb *pSdb = pMnode->pSdb; - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SSdbRaw *pRaw = pMsg->pCont; + SMnode *pMnode = pFsm->data; + SSdbRaw *pRaw = pMsg->pCont; mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state)); - sdbWriteWithoutFree(pSdb, pRaw); - sdbSetApplyIndex(pSdb, cbMeta.index); - sdbSetApplyTerm(pSdb, cbMeta.term); + sdbWriteWithoutFree(pMnode->pSdb, pRaw); + sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); + sdbSetApplyTerm(pMnode->pSdb, cbMeta.term); if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { - tsem_post(&pMgmt->syncSem); + tsem_post(&pMnode->syncMgmt.syncSem); } } @@ -49,15 +53,41 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { pMnode->syncMgmt.restored = true; } +void *mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *snapshot, void *iter, char **ppBuf, int32_t *len) { + SMnode *pMnode = pFsm->data; + SSdbIter *pIter = iter; + + if (iter == NULL) { + pIter = sdbIterInit(pMnode->pSdb); + } + + return sdbIterRead(pMnode->pSdb, pIter, ppBuf, len); +} + +int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len) { + SMnode *pMnode = pFsm->data; + sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf); + return 0; +} + +void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { + +} + SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; + pFsm->FpCommitCb = mndSyncCommitMsg; pFsm->FpPreCommitCb = NULL; pFsm->FpRollBackCb = NULL; + pFsm->FpGetSnapshot = mndSyncGetSnapshot; - pFsm->FpRestoreFinish = mndRestoreFinish; - pFsm->FpRestoreSnapshot = NULL; + pFsm->FpRestoreFinishCb = mndRestoreFinish; + pFsm->FpSnapshotRead = mndSnapshotRead; + pFsm->FpSnapshotApply = mndSnapshotApply; + pFsm->FpReConfigCb = mndReConfig; + return pFsm; } @@ -90,10 +120,13 @@ int32_t mndInitSync(SMnode *pMnode) { SSyncCfg *pCfg = &syncInfo.syncCfg; pCfg->replicaNum = pMnode->replica; pCfg->myIndex = pMnode->selfIndex; + mInfo("start to open mnode sync, replica:%d myIndex:%d standBy:%d", pCfg->replicaNum, pCfg->myIndex, + pMgmt->isStandBy); for (int32_t i = 0; i < pMnode->replica; ++i) { SNodeInfo *pNode = &pCfg->nodeInfo[i]; tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); pNode->nodePort = pMnode->replicas[i].port; + mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); } tsem_init(&pMgmt->syncSem, 0, 0); @@ -149,7 +182,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { void mndSyncStart(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); - syncStart(pMgmt->sync); + if (pMgmt->isStandBy) { + syncStartStandBy(pMgmt->sync); + } else { + syncStart(pMgmt->sync); + } mDebug("sync:%" PRId64 " is started", pMgmt->sync); } @@ -161,3 +198,18 @@ bool mndIsMaster(SMnode *pMnode) { return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored); } + +int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { + SSyncCfg cfg = {.replicaNum = pOption->replica, .myIndex = pOption->selfIndex}; + mInfo("start to alter mnode sync, replica:%d myIndex:%d standBy:%d", cfg.replicaNum, cfg.myIndex, pOption->isStandBy); + for (int32_t i = 0; i < pOption->replica; ++i) { + SNodeInfo *pNode = &cfg.nodeInfo[i]; + tstrncpy(pNode->nodeFqdn, pOption->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); + pNode->nodePort = pOption->replicas[i].port; + mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); + } + + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + pMgmt->isStandBy = pOption->isStandBy; + return syncReconfig(pMgmt->sync, &cfg); +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 775c64ceab..d50f284b5c 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -263,6 +263,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pMnode->msgCb = pOption->msgCb; pMnode->selfId = pOption->replicas[pOption->selfIndex].id; + pMnode->syncMgmt.isStandBy = pOption->isStandBy; } SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { @@ -329,12 +330,6 @@ void mndClose(SMnode *pMnode) { } } -int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { - mDebug("start to alter mnode"); - mDebug("mnode is altered"); - return 0; -} - int32_t mndStart(SMnode *pMnode) { mndSyncStart(pMnode); return mndInitTimer(pMnode); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index b000c208c8..eac7f4af5d 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -392,3 +392,66 @@ int32_t sdbDeploy(SSdb *pSdb) { return 0; } + +SSdbIter *sdbIterInit(SSdb *pSdb) { + char datafile[PATH_MAX] = {0}; + char tmpfile[PATH_MAX] = {0}; + snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); + snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + + if (taosCopyFile(datafile, tmpfile) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr()); + return NULL; + } + + SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter)); + if (pIter == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pIter->file = taosOpenFile(tmpfile, TD_FILE_READ); + if (pIter->file == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to read snapshot file:%s since %s", tmpfile, terrstr()); + taosMemoryFree(pIter); + return NULL; + } + + mDebug("start to read snapshot file:%s, iter:%p", tmpfile, pIter); + return pIter; +} + +SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *pIter, char **ppBuf, int32_t *buflen) { + const int32_t maxlen = 100; + + char *pBuf = taosMemoryCalloc(1, maxlen); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen); + if (readlen == 0) { + mTrace("read snapshot to the end, readlen:%" PRId64, pIter->readlen); + taosMemoryFree(pBuf); + taosCloseFile(&pIter->file); + taosMemoryFree(pIter); + pIter = NULL; + } else if (readlen < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to read snapshot since %s, readlen:%" PRId64, terrstr(), pIter->readlen); + taosMemoryFree(pBuf); + taosCloseFile(&pIter->file); + taosMemoryFree(pIter); + pIter = NULL; + } else { + pIter->readlen += readlen; + mTrace("read snapshot, readlen:%" PRId64, pIter->readlen); + *ppBuf = pBuf; + *buflen = readlen; + } + + return pIter; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 882ee912cd..d8f3110a16 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -147,6 +147,10 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; - pFsm->FpRestoreFinish = NULL; + pFsm->FpRestoreFinishCb = NULL; + pFsm->FpSnapshotRead = NULL; + pFsm->FpSnapshotApply = NULL; + pFsm->FpReConfigCb = NULL; + return pFsm; } \ No newline at end of file diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 9246041b81..69549d2a7e 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -148,8 +148,8 @@ typedef struct SSyncNode { SSyncRespMgr* pSyncRespMgr; // restore state - bool restoreFinish; - //sem_t restoreSem; + bool restoreFinish; + // sem_t restoreSem; SSnapshot* pSnapshot; } SSyncNode; diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index 5bc240e921..716d2f620c 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -42,6 +42,7 @@ typedef struct SVotesGranted { SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode); void voteGrantedDestroy(SVotesGranted *pVotesGranted); +void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode); bool voteGrantedMajority(SVotesGranted *pVotesGranted); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); @@ -65,6 +66,7 @@ typedef struct SVotesRespond { SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode); void votesRespondDestory(SVotesRespond *pVotesRespond); +void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode); bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index fa735e71c0..0411628c5c 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -362,8 +362,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // restore finish if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) { if (ths->restoreFinish == false) { - if (ths->pFsm->FpRestoreFinish != NULL) { - ths->pFsm->FpRestoreFinish(ths->pFsm); + if (ths->pFsm->FpRestoreFinishCb != NULL) { + ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 18c6f8930a..36713ceed5 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -139,8 +139,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // restore finish if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) { if (pSyncNode->restoreFinish == false) { - if (pSyncNode->pFsm->FpRestoreFinish != NULL) { - pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); + if (pSyncNode->pFsm->FpRestoreFinishCb != NULL) { + pSyncNode->pFsm->FpRestoreFinishCb(pSyncNode->pFsm); } pSyncNode->restoreFinish = true; sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a69a94831d..4aca68fa64 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -509,7 +509,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); } - //tsem_init(&(pSyncNode->restoreSem), 0, 0); + // tsem_init(&(pSyncNode->restoreSem), 0, 0); // start in syncNodeStart // start raft @@ -606,7 +606,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pSnapshot); } - //tsem_destroy(&pSyncNode->restoreSem); + // tsem_destroy(&pSyncNode->restoreSem); // free memory in syncFreeNode // taosMemoryFree(pSyncNode); @@ -920,6 +920,17 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { } void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { + bool hit = false; + for (int i = 0; i < newConfig->replicaNum; ++i) { + if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 && + pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) { + newConfig->myIndex = i; + hit = true; + break; + } + } + ASSERT(hit == true); + pSyncNode->pRaftCfg->cfg = *newConfig; int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); ASSERT(ret == 0); @@ -949,6 +960,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); + voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); + votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode); syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); } diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 733dfd05b6..1c1f0809bd 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -45,6 +45,17 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted) { } } +void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) { + pVotesGranted->replicas = &(pSyncNode->replicasId); + pVotesGranted->replicaNum = pSyncNode->replicaNum; + voteGrantedClearVotes(pVotesGranted); + + pVotesGranted->term = 0; + pVotesGranted->quorum = pSyncNode->quorum; + pVotesGranted->toLeader = false; + pVotesGranted->pSyncNode = pSyncNode; +} + bool voteGrantedMajority(SVotesGranted *pVotesGranted) { bool ret = pVotesGranted->votes >= pVotesGranted->quorum; return ret; @@ -168,6 +179,13 @@ void votesRespondDestory(SVotesRespond *pVotesRespond) { } } +void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) { + pVotesRespond->replicas = &(pSyncNode->replicasId); + pVotesRespond->replicaNum = pSyncNode->replicaNum; + pVotesRespond->term = 0; + pVotesRespond->pSyncNode = pSyncNode; +} + bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { bool ret = false; for (int i = 0; i < pVotesRespond->replicaNum; ++i) { diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 0850ef6343..f52fef0019 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -73,9 +73,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -void FpRestoreFinishCb(struct SSyncFSM* pFsm) { - sTrace("==callback== ==FpRestoreFinishCb=="); -} +void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } SSyncFSM* createFsm() { SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); @@ -83,7 +81,7 @@ SSyncFSM* createFsm() { pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; pFsm->FpGetSnapshot = GetSnapshotCb; - pFsm->FpRestoreFinish = FpRestoreFinishCb; + pFsm->FpRestoreFinishCb = RestoreFinishCb; return pFsm; } diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index da295f640e..5edc0a4d3e 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -136,7 +136,7 @@ echo "qDebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG echo "uDebugFlag 143" >> $TAOS_CFG -echo "sDebugFlag 135" >> $TAOS_CFG +echo "sDebugFlag 143" >> $TAOS_CFG echo "wDebugFlag 143" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG diff --git a/tests/script/tsim/mnode/basic2.sim b/tests/script/tsim/mnode/basic2.sim new file mode 100644 index 0000000000..53dea6821e --- /dev/null +++ b/tests/script/tsim/mnode/basic2.sim @@ -0,0 +1,84 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sql connect + +print =============== show dnodes +sql show dnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +sql show mnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data02 != LEADER then + return -1 +endi + +print =============== create dnodes +sql create dnode $hostname port 7200 +sleep 2000 + +sql show dnodes; +if $rows != 2 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data10 != 2 then + return -1 +endi + +print $data02 +if $data02 != 0 then + return -1 +endi + +if $data12 != 0 then + return -1 +endi + +if $data04 != ready then + return -1 +endi + +if $data14 != ready then + return -1 +endi + +sql show mnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data02 != LEADER then + return -1 +endi + +print =============== create mnode 2 +sql create mnode on dnode 2 +sql show mnodes +if $rows != 2 then + return -1 +endi +