From 9ba8e6bd345296694f2dc7a7510a02c1438dd858 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:26:06 +0800 Subject: [PATCH] refactor: add logs for drop streams. --- source/dnode/mnode/impl/src/mndStream.c | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c87b8e84f4..f5cccbbeff 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1213,27 +1213,31 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMDropStreamReq dropReq = {0}; if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) { + mError("invalid drop stream msg recv, discarded"); terrno = TSDB_CODE_INVALID_MSG; return -1; } - pStream = mndAcquireStream(pMnode, dropReq.name); + mDebug("recv drop stream:%s msg", dropReq.name); + pStream = mndAcquireStream(pMnode, dropReq.name); if (pStream == NULL) { if (dropReq.igNotExists) { - mInfo("stream:%s not exist, ignore not exist is set", dropReq.name); + mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name); sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); return 0; } else { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - mError("stream:%s not exist failed to drop", dropReq.name); + mError("stream:%s not exist failed to drop it", dropReq.name); tFreeMDropStreamReq(&dropReq); return -1; } } if (pStream->smaId != 0) { + mDebug("stream:%s try to drop sma related stream", dropReq.name); + void *pIter = NULL; SSmaObj *pSma = NULL; pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma); @@ -1241,13 +1245,21 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (pSma && pSma->uid == pStream->smaId) { sdbRelease(pMnode->pSdb, pSma); sdbRelease(pMnode->pSdb, pStream); + sdbCancelFetch(pMnode->pSdb, pIter); tFreeMDropStreamReq(&dropReq); terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED; + + mError("try to drop sma-related stream:%s, code:%s only allowed to be dropped along with sma", dropReq.name, + tstrerror(terrno)); return -1; } - if (pSma) sdbRelease(pMnode->pSdb, pSma); - pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma); + + if (pSma) { + sdbRelease(pMnode->pSdb, pSma); + } + + pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma); } } @@ -1307,6 +1319,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mndKillTransImpl(pMnode, transId, pStream->sourceDb); } + mDebug("stream:%s transId:%d start to drop related task when dropping stream", dropReq.name, transId); removeStreamTasksInBuf(pStream, &execInfo); SName name = {0};