Merge branch 'feature/sync-mnode-integration' of https://github.com/taosdata/TDengine into feature/sync-mnode-integration

This commit is contained in:
Shengliang Guan 2022-05-24 15:05:15 +08:00
commit ca89251caa
6 changed files with 25 additions and 9 deletions

View File

@ -90,6 +90,10 @@ typedef struct SSyncFSM {
void (*FpRestoreFinish)(struct SSyncFSM* pFsm); void (*FpRestoreFinish)(struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
void* (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len);
int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len);
} SSyncFSM; } SSyncFSM;
// abstract definition of log store in raft // abstract definition of log store in raft

View File

@ -55,6 +55,19 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption->msgCb = pMgmt->msgCb; pOption->msgCb = pMgmt->msgCb;
#if 1
pOption->replica = 1;
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
for (int32_t i = 0; i < pMgmt->replica; ++i) {
if (pMgmt->replicas[i].id == pMgmt->pData->dnodeId) {
pReplica->id = pMgmt->replicas[i].id;
pReplica->port = pMgmt->replicas[i].port;
memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN);
}
}
pMgmt->selfIndex = pOption->selfIndex;
#else
pOption->replica = pMgmt->replica; pOption->replica = pMgmt->replica;
pOption->selfIndex = -1; pOption->selfIndex = -1;
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
@ -64,6 +77,7 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
} }
} }
pMgmt->selfIndex = pOption->selfIndex; pMgmt->selfIndex = pOption->selfIndex;
#endif
pOption->deploy = false; pOption->deploy = false;
} }

View File

@ -90,7 +90,7 @@ int32_t mndInitSync(SMnode *pMnode) {
SSyncCfg *pCfg = &syncInfo.syncCfg; SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = pMnode->replica; pCfg->replicaNum = pMnode->replica;
pCfg->myIndex = pMnode->selfIndex; pCfg->myIndex = pMnode->selfIndex;
mInfo("start to open mnode, replica:%d myIndex:%d", pCfg->replicaNum, pCfg->myIndex); mInfo("start to open mnode sync, replica:%d myIndex:%d", pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pMnode->replica; ++i) { for (int32_t i = 0; i < pMnode->replica; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i]; SNodeInfo *pNode = &pCfg->nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
@ -166,7 +166,7 @@ bool mndIsMaster(SMnode *pMnode) {
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
SSyncCfg cfg = {.replicaNum = pOption->replica, .myIndex = pOption->selfIndex}; SSyncCfg cfg = {.replicaNum = pOption->replica, .myIndex = pOption->selfIndex};
mInfo("start to alter mnode, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex); mInfo("start to alter mnode sync, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex);
for (int32_t i = 0; i < pOption->replica; ++i) { for (int32_t i = 0; i < pOption->replica; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i]; SNodeInfo *pNode = &cfg.nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pOption->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pOption->replicas[i].fqdn, sizeof(pNode->nodeFqdn));

View File

@ -148,8 +148,8 @@ typedef struct SSyncNode {
SSyncRespMgr* pSyncRespMgr; SSyncRespMgr* pSyncRespMgr;
// restore state // restore state
bool restoreFinish; bool restoreFinish;
//sem_t restoreSem; // sem_t restoreSem;
SSnapshot* pSnapshot; SSnapshot* pSnapshot;
} SSyncNode; } SSyncNode;

View File

@ -509,7 +509,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
} }
//tsem_init(&(pSyncNode->restoreSem), 0, 0); // tsem_init(&(pSyncNode->restoreSem), 0, 0);
// start in syncNodeStart // start in syncNodeStart
// start raft // start raft
@ -606,7 +606,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree(pSyncNode->pSnapshot); taosMemoryFree(pSyncNode->pSnapshot);
} }
//tsem_destroy(&pSyncNode->restoreSem); // tsem_destroy(&pSyncNode->restoreSem);
// free memory in syncFreeNode // free memory in syncFreeNode
// taosMemoryFree(pSyncNode); // taosMemoryFree(pSyncNode);

View File

@ -73,9 +73,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return 0; return 0;
} }
void FpRestoreFinishCb(struct SSyncFSM* pFsm) { void FpRestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==FpRestoreFinishCb=="); }
sTrace("==callback== ==FpRestoreFinishCb==");
}
SSyncFSM* createFsm() { SSyncFSM* createFsm() {
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));