From 1e96edcdd763af19e9f281ce303bd4b92f9a04fb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 1 Nov 2023 17:30:46 +0800 Subject: [PATCH] Merge branch 'enh/new3.0' into enh/refactorBackend --- source/dnode/mnode/impl/src/mndStream.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b6a15ae368..f8b976d90e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2532,15 +2532,24 @@ int32_t doKillActiveCheckpointTrans(SMnode *pMnode) { } sdbRelease(pSdb, pTrans); - } - return 0; + + if (transId == 0) { + mError("failed to find the checkpoint trans, reset not executed"); + return TSDB_CODE_SUCCESS; + } + + pTrans = mndAcquireTrans(pMnode, transId); + mInfo("kill checkpoint trans:%d", transId); + + mndKillTrans(pMnode, pTrans); + mndReleaseTrans(pMnode, pTrans); + return TSDB_CODE_SUCCESS; } int32_t mndResetFromCheckpoint(SMnode* pMnode) { doKillActiveCheckpointTrans(pMnode); - int32_t code = 0; // set all tasks status to be normal, refactor later to be stream level, instead of vnode level. SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; @@ -2553,7 +2562,7 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) { // todo this transaction should exist be only one mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid); - code = createStreamResetStatusTrans(pMnode, pStream); + int32_t code = createStreamResetStatusTrans(pMnode, pStream); if (code != TSDB_CODE_SUCCESS) { sdbCancelFetch(pSdb, pIter); return code;