From 249aecacda766ee5b2560ae5416985b201c80d69 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 21 May 2022 21:26:27 +0800 Subject: [PATCH] enh(sync) sync/mnode integration --- include/common/tmsgdef.h | 2 + include/dnode/mnode/mnode.h | 3 + source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 2 + source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 5 + source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 42 ++++++++ source/dnode/mnode/impl/inc/mndInt.h | 4 +- source/dnode/mnode/impl/src/mndSync.c | 107 +++++++++++++++++++- source/dnode/mnode/impl/src/mndTrans.c | 5 +- source/dnode/mnode/impl/src/mnode.c | 20 +++- source/libs/sync/src/syncMain.c | 19 ++-- 10 files changed, 193 insertions(+), 16 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 455898585a..bf683fc9ac 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -158,6 +158,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL) + + TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 28c470a443..16cb4cdd70 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -90,6 +90,9 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessMsg(SRpcMsg *pMsg); + +int32_t mndProcessApplyMsg(SRpcMsg *pMsg); + /** * @brief Generate machine code */ diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 75e83d6547..380ae63b8d 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -33,6 +33,7 @@ typedef struct SMnodeMgmt { SSingleWorker readWorker; SSingleWorker writeWorker; SSingleWorker syncWorker; + SSingleWorker applyWorker; SSingleWorker monitorWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; @@ -59,6 +60,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 4f7fd4a1c0..68b2889278 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -122,6 +122,11 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { return -1; } + if (syncInit() != 0) { + dError("failed to init sync since %s", terrstr()); + return -1; + } + SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt)); if (pMgmt == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index c4314a57b1..04fe891ce9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -56,6 +56,32 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } + +static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { + SMnodeMgmt *pMgmt = pInfo->ahandle; + int32_t code = -1; + tmsg_t msgType = pMsg->msgType; + bool isRequest = msgType & 1U; + dTrace("msg:%p, get from mnode-query queue", pMsg); + + pMsg->info.node = pMgmt->pMnode; + + mndProcessApplyMsg(pMsg); + + /* + if (isRequest) { + if (pMsg->info.handle != NULL && code != 0) { + if (code != 0 && terrno != 0) code = terrno; + mmSendRsp(pMsg, code); + } + } + */ + + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; @@ -92,6 +118,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); } +int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutNodeMsgToWorker(&pMgmt->applyWorker, pMsg); +} + int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); } @@ -179,6 +209,18 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { return -1; } + SSingleWorkerCfg aCfg = { + .min = 1, + .max = 1, + .name = "mnode-apply", + .fp = (FItem)mmProcessApplyQueue, + .param = pMgmt, + }; + if (tSingleWorkerInit(&pMgmt->applyWorker, &aCfg) != 0) { + dError("failed to start mnode mnode-apply worker since %s", terrstr()); + return -1; + } + SSingleWorkerCfg mCfg = { .min = 1, .max = 1, diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 5258fa9e02..0982096d25 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -75,7 +75,9 @@ typedef struct { int32_t errCode; sem_t syncSem; SWal *pWal; - SSyncNode *pSyncNode; + //SSyncNode *pSyncNode; + int64_t sync; + ESyncState state; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3dbe3241a7..e7144f5673 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -46,6 +46,12 @@ static void mndCloseWal(SMnode *pMnode) { } static int32_t mndRestoreWal(SMnode *pMnode) { + +// do nothing +return 0; + +#if 0 + SWal *pWal = pMnode->syncMgmt.pWal; SSdb *pSdb = pMnode->pSdb; int64_t lastSdbVer = sdbUpdateVer(pSdb, 0); @@ -114,6 +120,70 @@ static int32_t mndRestoreWal(SMnode *pMnode) { _OVER: walCloseReadHandle(pHandle); return code; + +#endif + +} + +int32_t mndSyncEqMsg(const SMsgCb* msgcb, SRpcMsg *pMsg) { + int32_t ret = 0; + if (msgcb->queueFps[SYNC_QUEUE] != NULL) { + tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); + } else { + mError("mndSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); + } + return ret; +} + +int32_t mndSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { + int32_t ret = 0; + pMsg->info.noResp = 1; + tmsgSendReq(pEpSet, pMsg); + return ret; +} + +void mndSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + SyncIndex beginIndex = SYNC_INDEX_INVALID; + if (pFsm->FpGetSnapshot != NULL) { + SSnapshot snapshot; + pFsm->FpGetSnapshot(pFsm, &snapshot); + beginIndex = snapshot.lastApplyIndex; + } + + if (cbMeta.index > beginIndex) { + SMnode *pMnode = pFsm->data; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + + mndProcessApplyMsg((SRpcMsg*)pMsg); + //mmPutNodeMsgToApplyQueue(pMnode->pWrapper->pMgmt, pMsg); + + if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { + tsem_post(&pMgmt->syncSem); + } + } +} + +void mndSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + // strict consistent, do nothing +} + +void mndSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + // strict consistent, do nothing +} + +int32_t mndSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { + // snapshot + return 0; +} + +SSyncFSM *syncMnodeMakeFsm(SMnode *pMnode) { + SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); + pFsm->data = pMnode; + pFsm->FpCommitCb = mndSyncCommitCb; + pFsm->FpPreCommitCb = mndSyncPreCommitCb; + pFsm->FpRollBackCb = mndSyncRollBackCb; + pFsm->FpGetSnapshot = mndSyncGetSnapshotCb; + return pFsm; } int32_t mndInitSync(SMnode *pMnode) { @@ -133,7 +203,27 @@ int32_t mndInitSync(SMnode *pMnode) { if (pMnode->selfId == 1) { pMgmt->state = TAOS_SYNC_STATE_LEADER; } - pMgmt->pSyncNode = NULL; + + // pMgmt->pSyncNode = NULL; + SSyncInfo syncInfo; + syncInfo.vgId = 1; + SSyncCfg *pCfg = &(syncInfo.syncCfg); + pCfg->replicaNum = pMnode->replica; + pCfg->myIndex = pMnode->selfIndex; + for (int i = 0; i < pMnode->replica; ++i) { + snprintf((pCfg->nodeInfo)->nodeFqdn, sizeof((pCfg->nodeInfo)->nodeFqdn), "%s", (pMnode->replicas)[i].fqdn); + (pCfg->nodeInfo)->nodePort = (pMnode->replicas)[i].port; + } + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path); + syncInfo.pWal = pMnode->syncMgmt.pWal; + + syncInfo.pFsm = syncMnodeMakeFsm(pMnode); + syncInfo.FpSendMsg = mndSendMsg; + syncInfo.FpEqMsg = mndSyncEqMsg; + + pMnode->syncMgmt.sync = syncOpen(&syncInfo); + ASSERT(pMnode->syncMgmt.sync > 0); + return 0; } @@ -157,6 +247,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { SWal *pWal = pMnode->syncMgmt.pWal; SSdb *pSdb = pMnode->pSdb; +#if 0 int64_t ver = sdbUpdateVer(pSdb, 1); if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) { sdbUpdateVer(pSdb, -1); @@ -168,24 +259,32 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { walCommit(pWal, ver); walFsync(pWal, true); -#if 1 return 0; + #else + if (pMnode->replica == 1) return 0; SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->errCode = 0; - SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)}; + //SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)}; + + SRpcMsg rpcMsg; + rpcMsg.code = TDMT_MND_APPLY_MSG; + rpcMsg.contLen = sdbGetRawTotalSize(pRaw); + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pRaw, rpcMsg.contLen); bool isWeak = false; - int32_t code = syncPropose(pMgmt->pSyncNode, &buf, pMnode, isWeak); + int32_t code = syncPropose(pMgmt->sync, &rpcMsg, isWeak); if (code != 0) return code; tsem_wait(&pMgmt->syncSem); return pMgmt->errCode; #endif + } bool mndIsMaster(SMnode *pMnode) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 35ecaa748e..f4fa4beaf1 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -683,11 +683,14 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, sync finished", pTrans->id); - code = sdbWrite(pMnode->pSdb, pRaw); +// do it in state machine commit cb +#if 0 + code = sdbWriteWithout(pMnode->pSdb, pRaw); if (code != 0) { mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); return -1; } +#endif return 0; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 8c805dd8c7..f8e5a65f0f 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -336,9 +336,25 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { return 0; } -int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); } +int32_t mndStart(SMnode *pMnode) { + syncStart(pMnode->syncMgmt.sync); + return mndInitTimer(pMnode); +} -void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); } +void mndStop(SMnode *pMnode) { + syncStop(pMnode->syncMgmt.sync); + return mndCleanupTimer(pMnode); +} + +int32_t mndProcessApplyMsg(SRpcMsg *pMsg) { + + SSdbRaw *pRaw = pMsg->pCont; + SMnode *pMnode = pMsg->info.node; + int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); + rpcFreeCont(pMsg->pCont); + + return code; +} int32_t mndProcessMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d9ff60bbe2..9c02b91ef0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -55,14 +55,17 @@ static void syncFreeNode(void* param); // --------------------------------- int32_t syncInit() { - int32_t ret; - tsNodeRefId = taosOpenRef(200, syncFreeNode); - if (tsNodeRefId < 0) { - sError("failed to init node ref"); - syncCleanUp(); - ret = -1; - } else { - ret = syncEnvStart(); + int32_t ret = 0; + + if (!syncEnvIsStart()) { + tsNodeRefId = taosOpenRef(200, syncFreeNode); + if (tsNodeRefId < 0) { + sError("failed to init node ref"); + syncCleanUp(); + ret = -1; + } else { + ret = syncEnvStart(); + } } return ret;