diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index f1cfa58f62..2509c10601 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -95,8 +95,8 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); -int32_t mndPreProcessMsg(SRpcMsg *pMsg); -void mndAbortPreprocessMsg(SRpcMsg *pMsg); +int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg); +void mndPostProcessQueryMsg(SRpcMsg *pMsg); /** * @brief Generate machine code diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 60c42d31f5..32477febeb 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -68,6 +68,10 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { mmSendRsp(pMsg, code); } + if (code == TSDB_CODE_RPC_REDIRECT) { + mndPostProcessQueryMsg(pMsg); + } + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); @@ -116,7 +120,7 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { pMsg->info.node = pMgmt->pMnode; - if (mndPreProcessMsg(pMsg) != 0) { + if (mndPreProcessQueryMsg(pMsg) != 0) { dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); return -1; } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index b50781e8a5..ef82d1131c 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -550,12 +550,10 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; - if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && + if (pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && pMsg->msgType != TDMT_MND_TRANS_TIMER) { mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); - mndAbortPreprocessMsg(pMsg); - SEpSet epSet = {0}; mndGetMnodeEpSet(pMsg->info.node, &epSet); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index f32a3129de..671152f9c6 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -18,16 +18,14 @@ #include "mndMnode.h" #include "qworker.h" -int32_t mndPreProcessMsg(SRpcMsg *pMsg) { +int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) { if (TDMT_VND_QUERY != pMsg->msgType) return 0; - SMnode *pMnode = pMsg->info.node; return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); } -void mndAbortPreprocessMsg(SRpcMsg *pMsg) { +void mndPostProcessQueryMsg(SRpcMsg *pMsg) { if (TDMT_VND_QUERY != pMsg->msgType) return; - SMnode *pMnode = pMsg->info.node; qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 07f65b2a90..8666739dab 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -75,12 +75,10 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM } int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { - // TODO: - - // atomic operation - // step1. sdbGetCommitInfo - // step2. create ppReader with pReaderParam - + mInfo("start to read snapshot from sdb in atomic way"); + SMnode *pMnode = pFsm->data; + return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, + &pSnapshot->lastConfigIndex); return 0; } @@ -106,14 +104,6 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; -#if 0 -// send response - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index}; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); - syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info); -#endif - pMgmt->errCode = cbMeta.code; mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term); @@ -130,7 +120,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { mInfo("start to read snapshot from sdb"); SMnode *pMnode = pFsm->data; - return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader); + return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL); } int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { diff --git a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp index bc118ee26e..87895d3b49 100644 --- a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp +++ b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp @@ -910,7 +910,7 @@ TEST_F(MndTestSdb, 01_Read_Str) { int32_t len = 0; int32_t code = 0; - code = sdbStartRead(pSdb, &pReader); + code = sdbStartRead(pSdb, &pReader, NULL, NULL, NULL); ASSERT_EQ(code, 0); code = sdbStartWrite(pSdb, &pWritter); ASSERT_EQ(code, 0); diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 3b1c4000a8..1294f0cff3 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -388,7 +388,7 @@ SSdbRow *sdbAllocRow(int32_t objSize); void *sdbGetRowObj(SSdbRow *pRow); void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); -int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter); +int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config); int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter); int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 0f4e1276c1..47c4a4ed0f 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -523,7 +523,7 @@ static void sdbCloseIter(SSdbIter *pIter) { taosMemoryFree(pIter); } -int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { +int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config) { SSdbIter *pIter = sdbCreateIter(pSdb); if (pIter == NULL) return -1; @@ -552,6 +552,10 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { } *ppIter = pIter; + if (index != NULL) *index = commitIndex; + if (term != NULL) *term = commitTerm; + if (config != NULL) *config = commitConfig; + mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", pIter, commitIndex, commitTerm, commitConfig, pIter->name); return 0; diff --git a/tests/script/tsim/mnode/basic5.sim b/tests/script/tsim/mnode/basic5.sim index fc591aa25d..23f5f6d782 100644 --- a/tests/script/tsim/mnode/basic5.sim +++ b/tests/script/tsim/mnode/basic5.sim @@ -148,7 +148,7 @@ endi print =============== step6: stop mnode1 system sh/exec.sh -n dnode1 -s stop -x SIGKILL -sql_error drop mnode on dnode 1 +# sql_error drop mnode on dnode 1 $x = 0 step61: