From ca1ffd584d2f5cf473754b65bb64af49e9d119d9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 23 Oct 2023 16:45:17 +0800 Subject: [PATCH] refactor stream backend --- source/dnode/mnode/impl/src/mndStream.c | 50 ++++++++++++------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 283d3fde38..87a85c4086 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2450,32 +2450,34 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } +int32_t mndFindTransByName(SMnode *pMnode, char *key, int32_t *transId) { + SSdb *pSdb = pMnode->pSdb; + STrans *pTrans = NULL; + void *pIter = NULL; + bool found = false; + while (1) { + pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans); + if (pIter == NULL) { + break; + } + + if (strncmp(pTrans->opername, key, tListLen(pTrans->opername) - 1) == 0) { + *transId = pTrans->id; + found = true; + sdbRelease(pSdb, pTrans); + sdbCancelFetch(pSdb, pIter); + break; + } + + sdbRelease(pSdb, pTrans); + } + return found ? 0 : -1; +} int32_t mndResetFromCheckpoint(SMnode *pMnode) { // find the checkpoint trans id int32_t transId = 0; - - { - SSdb *pSdb = pMnode->pSdb; - STrans *pTrans = NULL; - void *pIter = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans); - if (pIter == NULL) { - break; - } - - if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) { - transId = pTrans->id; - sdbRelease(pSdb, pTrans); - sdbCancelFetch(pSdb, pIter); - break; - } - - sdbRelease(pSdb, pTrans); - } - } - - if (transId == 0) { + int32_t code = mndFindTransByName(pMnode, MND_STREAM_CHECKPOINT_NAME, &transId); + if (code == -1) { mError("failed to find the checkpoint trans, reset not executed"); return TSDB_CODE_SUCCESS; } @@ -2492,7 +2494,6 @@ int32_t mndResetFromCheckpoint(SMnode *pMnode) { if (pIter == NULL) { break; } - mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid); int32_t code = createStreamResetStatusTrans(pMnode, pStream); if (code != TSDB_CODE_SUCCESS) { @@ -2500,7 +2501,6 @@ int32_t mndResetFromCheckpoint(SMnode *pMnode) { return code; } } - return 0; }