From 5ae8b68b176a3890d31d10c3c7b603f3d31f0e7a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Apr 2024 15:37:51 +0800 Subject: [PATCH] fix(stream): fix deadlock. --- source/dnode/mnode/impl/src/mndStream.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ed492fe254..364cc062d1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -817,7 +817,7 @@ _OVER: return terrno; } -int64_t mndStreamGenChkpId(SMnode *pMnode) { +int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { SStreamObj *pStream = NULL; void *pIter = NULL; SSdb *pSdb = pMnode->pSdb; @@ -834,7 +834,10 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) { { // check the max checkpoint id from all vnodes. int64_t maxCheckpointId = -1; - taosThreadMutexLock(&execInfo.lock); + if (lock) { + taosThreadMutexLock(&execInfo.lock); + } + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId *p = taosArrayGet(execInfo.pTaskList, i); @@ -852,7 +855,10 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) { } } - taosThreadMutexUnlock(&execInfo.lock); + if (lock) { + taosThreadMutexUnlock(&execInfo.lock); + } + if (maxCheckpointId > maxChkptId) { mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId, maxCheckpointId); @@ -872,7 +878,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { } SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); - pMsg->checkpointId = mndStreamGenChkpId(pMnode); + pMsg->checkpointId = mndStreamGenChkptId(pMnode, true); int32_t size = sizeof(SMStreamDoCheckpointMsg); SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; @@ -2329,7 +2335,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs - int64_t checkpointId = mndStreamGenChkpId(pMnode); + int64_t checkpointId = mndStreamGenChkptId(pMnode, false); mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); if (pStream != NULL) { // TODO:handle error