From 7b56df0adce996fdf9697b2bc21841531ba79c56 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 27 Feb 2025 14:51:26 +0800 Subject: [PATCH 1/2] refactor(stream): create task epset update trans out of lock, to avoid blocking mnode-read thread. --- source/dnode/mnode/impl/src/mndStream.c | 134 ++++++++++++++---------- 1 file changed, 80 insertions(+), 54 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 55974eb9c7..220319122c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1901,15 +1901,16 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) { +static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes, STrans** pUpdateTrans) { SSdb *pSdb = pMnode->pSdb; - SStreamObj *pStream = NULL; void *pIter = NULL; STrans *pTrans = NULL; int32_t code = 0; + *pUpdateTrans = NULL; // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool while (1) { + SStreamObj *pStream = NULL; pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { break; @@ -1926,6 +1927,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } while (1) { + SStreamObj *pStream = NULL; pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { break; @@ -1946,7 +1948,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); if (p1 == NULL && p2 == NULL) { - mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name); + mDebug("stream:0x%" PRIx64 " %s not involved in nodeUpdate, ignore", pStream->uid, pStream->name); sdbRelease(pSdb, pStream); continue; } @@ -1981,20 +1983,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } // no need to build the trans to handle the vgroup update - if (pTrans == NULL) { - return 0; - } - - code = mndTransPrepare(pMnode, pTrans); - if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code)); - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - return code; - } - - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); + *pUpdateTrans = pTrans; return code; } @@ -2076,7 +2065,7 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi taosHashCleanup(pHash); - mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList)); + mDebug("numOfvNodes:%d get after extracting nodeInfo from all streams", (int32_t)taosArrayGetSize(pNodeList)); return code; } @@ -2100,14 +2089,49 @@ static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) { } } +static int32_t doProcessNodeCheckHelp(SArray *pNodeSnapshot, SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, + bool *pUpdateAllVgroups) { + int32_t code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot); + if (code) { + mDebug("failed to remove expired node entry in buf, code:%s", tstrerror(code)); + return code; + } + + code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, pChangeInfo); + if (code) { + mDebug("failed to find changed vnode(s) during vnode(s) check, code:%s", tstrerror(code)); + return code; + } + + { + if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) { + mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans"); + *pUpdateAllVgroups = true; + execInfo.switchFromFollower = false; // reset the flag + addAllDbsIntoHashmap(pChangeInfo->pDBMap, pMnode->pSdb); + } + } + + if (taosArrayGetSize(pChangeInfo->pUpdateNodeList) > 0 || (*pUpdateAllVgroups)) { + // kill current active checkpoint transaction, since the transaction is vnode wide. + killAllCheckpointTrans(pMnode, pChangeInfo); + } else { + mDebug("no update found in vnode(s) list"); + } + + return code; +} + // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { - int32_t code = 0; - bool allReady = true; - SArray *pNodeSnapshot = NULL; - SMnode *pMnode = pMsg->info.node; - int64_t ts = taosGetTimestampSec(); - bool updateAllVgroups = false; + int32_t code = 0; + bool allReady = true; + SArray *pNodeSnapshot = NULL; + SMnode *pMnode = pMsg->info.node; + int64_t tsms = taosGetTimestampMs(); + int64_t ts = tsms / 1000; + bool updateAllVgroups = false; + SVgroupChangeInfo changeInfo = {0}; int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); if (old != 0) { @@ -2115,7 +2139,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } - mDebug("start to do node changing check"); + mDebug("start to do node changing check, ts:%" PRId64, tsms); streamMutexLock(&execInfo.lock); int32_t numOfNodes = extractStreamNodeList(pMnode); @@ -2141,58 +2165,60 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } streamMutexLock(&execInfo.lock); + code = doProcessNodeCheckHelp(pNodeSnapshot, pMnode, &changeInfo, &updateAllVgroups); + streamMutexUnlock(&execInfo.lock); - code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot); if (code) { goto _end; } - SVgroupChangeInfo changeInfo = {0}; - code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo); - if (code) { - goto _end; - } - - { - if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) { - mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans"); - updateAllVgroups = true; - execInfo.switchFromFollower = false; // reset the flag - addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb); - } - } - if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) { - // kill current active checkpoint transaction, since the transaction is vnode wide. - killAllCheckpointTrans(pMnode, &changeInfo); - code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups); + mDebug("vnode(s) change detected, build trans to update stream task epsets"); + + STrans *pTrans = NULL; + + streamMutexLock(&execInfo.lock); + code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups, &pTrans); + streamMutexUnlock(&execInfo.lock); + + // NOTE: sync trans out of lock + if (code == 0 && pTrans != NULL) { + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code)); + } + + mndTransDrop(pTrans); + } // keep the new vnode snapshot if success if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { + streamMutexLock(&execInfo.lock); + code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList); + int32_t num = (int)taosArrayGetSize(execInfo.pNodeList); + if (code == 0) { + execInfo.ts = ts; + mDebug("create trans successfully, update cached node list, numOfNodes:%d", num); + } + + streamMutexUnlock(&execInfo.lock); + if (code) { mError("failed to extract node list from stream, code:%s", tstrerror(code)); goto _end; } - - execInfo.ts = ts; - mDebug("create trans successfully, update cached node list, numOfNodes:%d", - (int)taosArrayGetSize(execInfo.pNodeList)); - } else { - mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code)); } - } else { - mDebug("no update found in nodeList"); } mndDestroyVgroupChangeInfo(&changeInfo); _end: - streamMutexUnlock(&execInfo.lock); taosArrayDestroy(pNodeSnapshot); - mDebug("end to do stream task node change checking"); + mDebug("end to do stream task node change checking, elapsed time:%" PRId64 "ms", taosGetTimestampMs() - tsms); atomic_store_32(&mndNodeCheckSentinel, 0); + return 0; } From b6707fbbd207ff64c5721f7c2681db0d49e1a6ad Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 26 Feb 2025 16:55:06 +0800 Subject: [PATCH 2/2] refactor(stream): move the generate checkpoint procedure out of lock, to avoid blocking heartbeat, and resulting in leader/follower switch. --- source/libs/stream/src/streamCheckpoint.c | 5 ++++- source/libs/stream/src/streamExec.c | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index c53e1a19a3..41e3d40542 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -910,9 +910,12 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int64_t startTs = pTask->chkInfo.startTs; int64_t ckId = pTask->chkInfo.pActiveInfo->activeId; const char* id = pTask->id.idStr; - bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); SStreamMeta* pMeta = pTask->pMeta; + streamMutexLock(&pTask->lock); + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + streamMutexUnlock(&pTask->lock); + // sink task does not need to save the status, and generated the checkpoint if (pTask->info.taskLevel != TASK_LEVEL__SINK) { stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 267dc88807..999a6b5a4c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -698,6 +698,8 @@ static int32_t doHandleChkptBlock(SStreamTask* pTask) { streamMutexLock(&pTask->lock); SStreamTaskState pState = streamTaskGetStatus(pTask); + streamMutexUnlock(&pTask->lock); + if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue @@ -715,7 +717,7 @@ static int32_t doHandleChkptBlock(SStreamTask* pTask) { } } - streamMutexUnlock(&pTask->lock); +// streamMutexUnlock(&pTask->lock); return code; }