diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0ed63e08bf..bf7ca50bdc 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1274,7 +1274,6 @@ int32_t tSerializeSCreateDropMQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnod int32_t tDeserializeSCreateDropMQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq); typedef struct { - int32_t dnodeId; int8_t replica; SReplica replicas[TSDB_MAX_REPLICA]; } SDCreateMnodeReq, SDAlterMnodeReq; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index b14b7667d2..b43eafc918 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -146,6 +146,7 @@ typedef struct SSyncLogStore { } SSyncLogStore; typedef struct SSyncInfo { + bool isStandBy; SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; @@ -160,7 +161,6 @@ int32_t syncInit(); void syncCleanUp(); int64_t syncOpen(const SSyncInfo* pSyncInfo); void syncStart(int64_t rid); -void syncStartStandBy(int64_t rid); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); ESyncState syncGetMyRole(int64_t rid); @@ -173,6 +173,10 @@ bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); +// to be moved to static +void syncStartNormal(int64_t rid); +void syncStartStandBy(int64_t rid); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2f6dbf5389..c300790f57 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3188,7 +3188,6 @@ int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq * tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; if (tEncodeI8(&encoder, pReq->replica) < 0) return -1; for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { SReplica *pReplica = &pReq->replicas[i]; @@ -3206,7 +3205,6 @@ int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq tDecoderInit(&decoder, buf, bufLen); if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1; for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { SReplica *pReplica = &pReq->replicas[i]; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 90d7b88859..f6350ba279 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -79,7 +79,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { return -1; } - if (createReq.replica <= 1 || (createReq.dnodeId != pInput->pData->dnodeId && pInput->pData->dnodeId != 0)) { + if (createReq.replica != 1) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create mnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/test/CMakeLists.txt b/source/dnode/mgmt/test/CMakeLists.txt index e1656ceb34..6b1919bf18 100644 --- a/source/dnode/mgmt/test/CMakeLists.txt +++ b/source/dnode/mgmt/test/CMakeLists.txt @@ -3,7 +3,7 @@ if(${BUILD_TEST}) add_subdirectory(qnode) add_subdirectory(bnode) add_subdirectory(snode) - add_subdirectory(mnode) + #add_subdirectory(mnode) add_subdirectory(vnode) add_subdirectory(sut) endif(${BUILD_TEST}) diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index a47221ea39..afdc27a96a 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -267,75 +267,83 @@ static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnod } static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - int32_t numOfReplicas = 0; - + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + int32_t numOfReplicas = 0; + SDAlterMnodeReq alterReq = {0}; SDCreateMnodeReq createReq = {0}; + SEpSet alterEpset = {0}; + SEpSet createEpset = {0}; + while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); if (pIter == NULL) break; - SReplica *pReplica = &createReq.replicas[numOfReplicas]; - pReplica->id = pMObj->id; - pReplica->port = pMObj->pDnode->port; - memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); - numOfReplicas++; + alterReq.replicas[numOfReplicas].id = pMObj->id; + alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port; + memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port; + memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + if (pMObj->state == TAOS_SYNC_STATE_LEADER) { + alterEpset.inUse = numOfReplicas; + } + + numOfReplicas++; sdbRelease(pSdb, pMObj); } - SReplica *pReplica = &createReq.replicas[numOfReplicas]; - pReplica->id = pDnode->id; - pReplica->port = pDnode->port; - memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - numOfReplicas++; + alterReq.replica = numOfReplicas + 1; + alterReq.replicas[numOfReplicas].id = pDnode->id; + alterReq.replicas[numOfReplicas].port = pDnode->port; + memcpy(alterReq.replicas[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - createReq.replica = numOfReplicas; + alterEpset.numOfEps = numOfReplicas + 1; + alterEpset.eps[numOfReplicas].port = pDnode->port; + memcpy(alterEpset.eps[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - while (1) { - SMnodeObj *pMObj = NULL; - pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); - if (pIter == NULL) break; + createReq.replica = 1; + createReq.replicas[0].id = pDnode->id; + createReq.replicas[0].port = pDnode->port; + memcpy(createReq.replicas[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - STransAction action = {0}; + createEpset.numOfEps = 1; + createEpset.eps[0].port = pDnode->port; + memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - createReq.dnodeId = pMObj->id; - int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq); + { + int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq); void *pReq = taosMemoryMalloc(contLen); - tSerializeSDCreateMnodeReq(pReq, contLen, &createReq); + tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq); - action.epSet = mndGetDnodeEpset(pMObj->pDnode); - action.pCont = pReq; - action.contLen = contLen; - action.msgType = TDMT_DND_ALTER_MNODE; - action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED; + STransAction action = { + .epSet = alterEpset, + .pCont = pReq, + .contLen = contLen, + .msgType = TDMT_DND_ALTER_MNODE, + .acceptableCode = 0, + }; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pMObj); return -1; } - - sdbRelease(pSdb, pMObj); } { - STransAction action = {0}; - action.epSet = mndGetDnodeEpset(pDnode); - - createReq.dnodeId = pObj->id; int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq); void *pReq = taosMemoryMalloc(contLen); tSerializeSDCreateMnodeReq(pReq, contLen, &createReq); - action.epSet = mndGetDnodeEpset(pDnode); - action.pCont = pReq; - action.contLen = contLen; - action.msgType = TDMT_DND_CREATE_MNODE; - action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED; + STransAction action = { + .epSet = createEpset, + .pCont = pReq, + .contLen = contLen, + .msgType = TDMT_DND_CREATE_MNODE, + .acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED, + }; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -441,73 +449,77 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO } static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - int32_t numOfReplicas = 0; - + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + int32_t numOfReplicas = 0; SDAlterMnodeReq alterReq = {0}; + SDDropMnodeReq dropReq = {0}; + SEpSet alterEpset = {0}; + SEpSet dropEpSet = {0}; + while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); if (pIter == NULL) break; - - if (pMObj->id != pObj->id) { - SReplica *pReplica = &alterReq.replicas[numOfReplicas]; - pReplica->id = pMObj->id; - pReplica->port = pMObj->pDnode->port; - memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); - numOfReplicas++; + if (pMObj->id == pObj->id) { + sdbRelease(pSdb, pMObj); + continue; } + alterReq.replicas[numOfReplicas].id = pMObj->id; + alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port; + memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + + alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port; + memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + if (pMObj->state == TAOS_SYNC_STATE_LEADER) { + alterEpset.inUse = numOfReplicas; + } + + numOfReplicas++; sdbRelease(pSdb, pMObj); } alterReq.replica = numOfReplicas; + alterEpset.numOfEps = numOfReplicas; - while (1) { - SMnodeObj *pMObj = NULL; - pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); - if (pIter == NULL) break; - if (pMObj->id != pObj->id) { - STransAction action = {0}; + dropReq.dnodeId = pDnode->id; + dropEpSet.numOfEps = 1; + dropEpSet.eps[0].port = pDnode->port; + memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - alterReq.dnodeId = pMObj->id; - int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq); - void *pReq = taosMemoryMalloc(contLen); - tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq); + { + int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq); + void *pReq = taosMemoryMalloc(contLen); + tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq); - action.epSet = mndGetDnodeEpset(pMObj->pDnode); - action.pCont = pReq; - action.contLen = contLen; - action.msgType = TDMT_DND_ALTER_MNODE; - action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED; + STransAction action = { + .epSet = alterEpset, + .pCont = pReq, + .contLen = contLen, + .msgType = TDMT_DND_ALTER_MNODE, + .acceptableCode = 0, + }; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pMObj); - return -1; - } + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; } - - sdbRelease(pSdb, pMObj); } { - STransAction action = {0}; - action.epSet = mndGetDnodeEpset(pDnode); - - SDDropMnodeReq dropReq = {0}; - dropReq.dnodeId = pObj->id; int32_t contLen = tSerializeSCreateDropMQSBNodeReq(NULL, 0, &dropReq); void *pReq = taosMemoryMalloc(contLen); tSerializeSCreateDropMQSBNodeReq(pReq, contLen, &dropReq); - action.epSet = mndGetDnodeEpset(pDnode); - action.pCont = pReq; - action.contLen = contLen; - action.msgType = TDMT_DND_DROP_MNODE; - action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED; + STransAction action = { + .epSet = dropEpSet, + .pCont = pReq, + .contLen = contLen, + .msgType = TDMT_DND_DROP_MNODE, + .acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED, + }; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -662,7 +674,7 @@ static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) { } static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode *pMnode = pReq->info.node; SDAlterMnodeReq alterReq = {0}; if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { @@ -670,12 +682,6 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { return -1; } - if (alterReq.dnodeId != pMnode->selfDnodeId) { - terrno = TSDB_CODE_INVALID_OPTION; - mError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMnode->selfDnodeId); - return -1; - } - SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1}; for (int32_t i = 0; i < alterReq.replica; ++i) { SNodeInfo *pNode = &cfg.nodeInfo[i]; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index ca25133c96..aa391ad0d3 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -31,7 +31,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SMnode *pMnode = pFsm->data; SSdbRaw *pRaw = pMsg->pCont; - mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state)); + mTrace("raw:%p, apply to sdb, ver:%" PRId64 " term:%" PRId64 " role:%s", pRaw, cbMeta.index, cbMeta.term, + syncStr(cbMeta.state)); sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); sdbSetApplyTerm(pMnode->pSdb, cbMeta.term); @@ -125,6 +126,7 @@ int32_t mndInitSync(SMnode *pMnode) { snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); syncInfo.pWal = pMgmt->pWal; syncInfo.pFsm = mndSyncMakeFsm(pMnode); + syncInfo.isStandBy = pMgmt->standby; SSyncCfg *pCfg = &syncInfo.syncCfg; pCfg->replicaNum = pMnode->replica; @@ -191,11 +193,17 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { void mndSyncStart(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); + + syncStart(pMgmt->sync); + +#if 0 if (pMgmt->standby) { syncStartStandBy(pMgmt->sync); } else { syncStart(pMgmt->sync); } +#endif + mDebug("sync:%" PRId64 " is started", pMgmt->sync); } diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index bfc64cb7b6..f4c857bb06 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -31,6 +31,7 @@ typedef struct SRaftCfg { SSyncCfg cfg; TdFilePtr pFile; char path[TSDB_FILENAME_LEN * 2]; + int8_t isStandBy; } SRaftCfg; SRaftCfg *raftCfgOpen(const char *path); @@ -42,10 +43,12 @@ char * syncCfg2Str(SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); -cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); -char * raftCfg2Str(SRaftCfg *pRaftCfg); +cJSON * raftCfg2Json(SRaftCfg *pRaftCfg); +char * raftCfg2Str(SRaftCfg *pRaftCfg); +int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); +int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); -int32_t syncCfgCreateFile(SSyncCfg *pCfg, const char *path); +int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path); // for debug ---------------------- void syncCfgPrint(SSyncCfg *pCfg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e4b6fc215f..914ce68245 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -100,6 +100,21 @@ void syncStart(int64_t rid) { if (pSyncNode == NULL) { return; } + + if (pSyncNode->pRaftCfg->isStandBy) { + syncNodeStartStandBy(pSyncNode); + } else { + syncNodeStart(pSyncNode); + } + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); +} + +void syncStartNormal(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return; + } syncNodeStart(pSyncNode); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -368,7 +383,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); if (!taosCheckExistFile(pSyncNode->configPath)) { // create raft config file - ret = syncCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncNode->configPath); + ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncInfo->isStandBy, pSyncNode->configPath); assert(ret == 0); } else { @@ -979,6 +994,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode); + pSyncNode->pRaftCfg->isStandBy = 0; + raftCfgPersist(pSyncNode->pRaftCfg); + syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); } diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index dc540424ec..daf7992d43 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -32,7 +32,7 @@ SRaftCfg *raftCfgOpen(const char *path) { int len = taosReadFile(pCfg->pFile, buf, sizeof(buf)); assert(len > 0); - int32_t ret = syncCfgFromStr(buf, &(pCfg->cfg)); + int32_t ret = raftCfgFromStr(buf, pCfg); assert(ret == 0); return pCfg; @@ -48,7 +48,7 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg) { int32_t raftCfgPersist(SRaftCfg *pRaftCfg) { assert(pRaftCfg != NULL); - char *s = syncCfg2Str(&(pRaftCfg->cfg)); + char *s = raftCfg2Str(pRaftCfg); taosLSeekFile(pRaftCfg->pFile, 0, SEEK_SET); int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1); assert(ret == strlen(s) + 1); @@ -76,9 +76,12 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { } } + return pRoot; + /* cJSON *pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SSyncCfg", pRoot); return pJson; + */ } char *syncCfg2Str(SSyncCfg *pSyncCfg) { @@ -90,7 +93,8 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) { memset(pSyncCfg, 0, sizeof(SSyncCfg)); - cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg"); + // cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg"); + const cJSON *pJson = pRoot; cJSON *pReplicaNum = cJSON_GetObjectItem(pJson, "replicaNum"); assert(cJSON_IsNumber(pReplicaNum)); @@ -133,22 +137,32 @@ int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg) { } cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { - cJSON *pJson = syncCfg2Json(&(pRaftCfg->cfg)); + cJSON *pRoot = cJSON_CreateObject(); + cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg))); + cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy); + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "RaftCfg", pRoot); return pJson; } char *raftCfg2Str(SRaftCfg *pRaftCfg) { - char *s = syncCfg2Str(&(pRaftCfg->cfg)); - return s; + cJSON *pJson = raftCfg2Json(pRaftCfg); + char * serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; } -int32_t syncCfgCreateFile(SSyncCfg *pCfg, const char *path) { +int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path) { assert(pCfg != NULL); TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE); assert(pFile != NULL); - char * s = syncCfg2Str(pCfg); + SRaftCfg raftCfg; + raftCfg.cfg = *pCfg; + raftCfg.isStandBy = isStandBy; + char * s = raftCfg2Str(&raftCfg); int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1); assert(ret == strlen(s) + 1); @@ -157,6 +171,31 @@ int32_t syncCfgCreateFile(SSyncCfg *pCfg, const char *path) { return 0; } +int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { + // memset(pRaftCfg, 0, sizeof(SRaftCfg)); + cJSON *pJson = cJSON_GetObjectItem(pRoot, "RaftCfg"); + + cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy"); + pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy); + + cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); + ASSERT(code == 0); + + return code; +} + +int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg) { + cJSON *pRoot = cJSON_Parse(s); + assert(pRoot != NULL); + + int32_t ret = raftCfgFromJson(pRoot, pRaftCfg); + assert(ret == 0); + + cJSON_Delete(pRoot); + return 0; +} + // for debug ---------------------- void syncCfgPrint(SSyncCfg *pCfg) { char *serialized = syncCfg2Str(pCfg); diff --git a/source/libs/sync/test/syncRaftCfgTest.cpp b/source/libs/sync/test/syncRaftCfgTest.cpp index d3c06fa83e..f5b24db651 100644 --- a/source/libs/sync/test/syncRaftCfgTest.cpp +++ b/source/libs/sync/test/syncRaftCfgTest.cpp @@ -15,6 +15,21 @@ void logTest() { sFatal("--- sync log test: fatal"); } +SRaftCfg* createRaftCfg() { + SRaftCfg* pCfg = (SRaftCfg*)taosMemoryMalloc(sizeof(SRaftCfg)); + memset(pCfg, 0, sizeof(SRaftCfg)); + + pCfg->cfg.replicaNum = 3; + pCfg->cfg.myIndex = 1; + for (int i = 0; i < pCfg->cfg.replicaNum; ++i) { + ((pCfg->cfg.nodeInfo)[i]).nodePort = i * 100; + snprintf(((pCfg->cfg.nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->cfg.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i); + } + pCfg->isStandBy = taosGetTimestampSec() % 100; + + return pCfg; +} + SSyncCfg* createSyncCfg() { SSyncCfg* pCfg = (SSyncCfg*)taosMemoryMalloc(sizeof(SSyncCfg)); memset(pCfg, 0, sizeof(SSyncCfg)); @@ -56,7 +71,7 @@ void test3() { if (taosCheckExistFile(s)) { printf("%s file: %s already exist! \n", (char*)__FUNCTION__, s); } else { - syncCfgCreateFile(pCfg, s); + raftCfgCreateFile(pCfg, 7, s); printf("%s create json file: %s \n", (char*)__FUNCTION__, s); } @@ -78,6 +93,7 @@ void test5() { assert(pCfg != NULL); pCfg->cfg.myIndex = taosGetTimestampSec(); + pCfg->isStandBy += 2; raftCfgPersist(pCfg); printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex); diff --git a/tests/test/c/sdbDump.c b/tests/test/c/sdbDump.c index 2a19ae778f..d7f50a2fae 100644 --- a/tests/test/c/sdbDump.c +++ b/tests/test/c/sdbDump.c @@ -20,10 +20,13 @@ #include "tconfig.h" #include "tjson.h" -#define TMP_SDB_DATA_DIR "/tmp/dumpsdb" -#define TMP_SDB_MNODE_DIR "/tmp/dumpsdb/mnode" -#define TMP_SDB_FILE "/tmp/dumpsdb/mnode/data/sdb.data" -#define TMP_SDB_PATH "/tmp/dumpsdb/mnode/data" +#define TMP_DNODE_DIR "/tmp/dumpsdb" +#define TMP_MNODE_DIR "/tmp/dumpsdb/mnode" +#define TMP_SDB_DATA_DIR "/tmp/dumpsdb/mnode/data" +#define TMP_SDB_SYNC_DIR "/tmp/dumpsdb/mnode/sync" +#define TMP_SDB_DATA_FILE "/tmp/dumpsdb/mnode/data/sdb.data" +#define TMP_SDB_RAFT_CFG_FILE "/tmp/dumpsdb/mnode/sync/raft_config.json" +#define TMP_SDB_RAFT_STORE_FILE "/tmp/dumpsdb/mnode/sync/raft_store.json" void reportStartup(const char *name, const char *desc) {} @@ -318,6 +321,10 @@ void dumpHeader(SSdb *pSdb, SJson *json) { } int32_t dumpSdb() { + wDebugFlag = 0; + mDebugFlag = 0; + sDebugFlag = 0; + SMsgCb msgCb = {0}; msgCb.reportStartupFp = reportStartup; msgCb.sendReqFp = sendReq; @@ -325,9 +332,10 @@ int32_t dumpSdb() { msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack tmsgSetDefault(&msgCb); walInit(); + syncInit(); SMnodeOpt opt = {.msgCb = msgCb}; - SMnode *pMnode = mndOpen(TMP_SDB_MNODE_DIR, &opt); + SMnode *pMnode = mndOpen(TMP_MNODE_DIR, &opt); if (pMnode == NULL) return -1; SSdb *pSdb = pMnode->pSdb; @@ -369,13 +377,11 @@ int32_t dumpSdb() { taosCloseFile(&pFile); tjsonDelete(json); taosMemoryFree(pCont); - taosRemoveDir(TMP_SDB_DATA_DIR); + taosRemoveDir(TMP_DNODE_DIR); return 0; } int32_t parseArgs(int32_t argc, char *argv[]) { - char file[PATH_MAX] = {0}; - for (int32_t i = 1; i < argc; ++i) { if (strcmp(argv[i], "-c") == 0) { if (i < argc - 1) { @@ -388,20 +394,8 @@ int32_t parseArgs(int32_t argc, char *argv[]) { printf("'-c' requires a parameter, default is %s\n", configDir); return -1; } - } else if (strcmp(argv[i], "-f") == 0) { - if (i < argc - 1) { - if (strlen(argv[++i]) >= PATH_MAX) { - printf("file path overflow"); - return -1; - } - tstrncpy(file, argv[i], PATH_MAX); - } else { - printf("'-f' requires a parameter, default is %s\n", configDir); - return -1; - } } else { printf("-c Configuration directory. \n"); - printf("-f Input sdb.data file. \n"); return -1; } } @@ -416,13 +410,28 @@ int32_t parseArgs(int32_t argc, char *argv[]) { return -1; } - if (file[0] == 0) { - snprintf(file, PATH_MAX, "%s/mnode/data/sdb.data", tsDataDir); - } + char dataFile[PATH_MAX] = {0}; + char raftCfgFile[PATH_MAX] = {0}; + char raftStoreFile[PATH_MAX] = {0}; + snprintf(dataFile, PATH_MAX, "%s/mnode/data/sdb.data", tsDataDir); + snprintf(raftCfgFile, PATH_MAX, "%s/mnode/sync/raft_config.json", tsDataDir); + snprintf(raftStoreFile, PATH_MAX, "%s/mnode/sync/raft_store.json", tsDataDir); - strcpy(tsDataDir, TMP_SDB_DATA_DIR); - taosMulMkDir(TMP_SDB_PATH); - taosCopyFile(file, TMP_SDB_FILE); + char cmd[PATH_MAX * 2] = {0}; + snprintf(cmd, sizeof(cmd), "rm -rf %s", TMP_DNODE_DIR); + system(cmd); + snprintf(cmd, sizeof(cmd), "mkdir -p %s", TMP_SDB_DATA_DIR); + system(cmd); + snprintf(cmd, sizeof(cmd), "mkdir -p %s", TMP_SDB_SYNC_DIR); + system(cmd); + snprintf(cmd, sizeof(cmd), "cp %s %s 2>/dev/null", dataFile, TMP_SDB_DATA_FILE); + system(cmd); + snprintf(cmd, sizeof(cmd), "cp %s %s 2>/dev/null", raftCfgFile, TMP_SDB_RAFT_CFG_FILE); + system(cmd); + snprintf(cmd, sizeof(cmd), "cp %s %s 2>/dev/null", raftStoreFile, TMP_SDB_RAFT_STORE_FILE); + system(cmd); + + strcpy(tsDataDir, TMP_DNODE_DIR); return 0; }