From 267f7d3b086c1effbee1ea74d4a47f6fa9533eaf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Feb 2025 10:31:11 +0800 Subject: [PATCH] refactor(stream): limit the maximum number of consensus checkpoint trans. --- source/dnode/mnode/impl/src/mndStream.c | 107 +++++++++++++------- source/dnode/mnode/impl/src/mndStreamUtil.c | 9 +- 2 files changed, 73 insertions(+), 43 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ca227249dd..247024b283 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2587,20 +2587,51 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, } } +static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) { + int32_t alreadySend = taosArrayGetSize(pList); + + for (int32_t i = 0; i < alreadySend; ++i) { + int32_t *taskId = taosArrayGet(pList, i); + if (taskId == NULL) { + continue; + } + + for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) { + SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k); + if ((pe != NULL) && (pe->req.taskId == *taskId)) { + taosArrayRemove(pInfo->pTaskList, k); + break; + } + } + } + + return alreadySend; +} + int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; int64_t now = taosGetTimestampMs(); + bool allReady = true; + SArray *pNodeSnapshot = NULL; + int32_t maxAllowedTrans = 50; + int32_t numOfTrans = 0; + int32_t code = 0; + void *pIter = NULL; + + SArray *pList = taosArrayInit(4, sizeof(int32_t)); + if (pList == NULL) { + return terrno; + } + SArray *pStreamList = taosArrayInit(4, sizeof(int64_t)); if (pStreamList == NULL) { + taosArrayDestroy(pList); return terrno; } mDebug("start to process consensus-checkpointId in tmr"); - bool allReady = true; - SArray *pNodeSnapshot = NULL; - - int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); + code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); taosArrayDestroy(pNodeSnapshot); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); @@ -2609,28 +2640,30 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { if (!allReady) { mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process"); taosArrayDestroy(pStreamList); + taosArrayDestroy(pList); return 0; } streamMutexLock(&execInfo.lock); - void *pIter = NULL; while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) { SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter; - int64_t streamId = -1; - int32_t num = taosArrayGetSize(pInfo->pTaskList); - SArray *pList = taosArrayInit(4, sizeof(int32_t)); - if (pList == NULL) { - continue; - } + taosArrayClear(pList); + int64_t streamId = -1; + int32_t num = taosArrayGetSize(pInfo->pTaskList); SStreamObj *pStream = NULL; + code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream); if (pStream == NULL || code != 0) { // stream has been dropped already mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId); void *p = taosArrayPush(pStreamList, &pInfo->streamId); - taosArrayDestroy(pList); + if (p == NULL) { + mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64 + " code:%s, continue", + pInfo->streamId, tstrerror(terrno)); + } continue; } @@ -2640,7 +2673,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { continue; } - streamId = pe->req.streamId; + if (streamId == -1) { + streamId = pe->req.streamId; + } int32_t existed = 0; bool allSame = true; @@ -2651,7 +2686,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { break; } - if (((now - pe->ts) >= 10 * 1000) || allSame) { + if (((now - pe->ts) >= 10 * 1000) && allSame) { mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId, pe->req.startTs, (now - pe->ts) / 1000.0); if (chkId > pe->req.checkpointId) { @@ -2659,8 +2694,12 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { taosArrayDestroy(pStreamList); mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId, pe->req.checkpointId, chkId); + + mndReleaseStream(pMnode, pStream); + taosHashCancelIterate(execInfo.pStreamConsensus, pIter); return TSDB_CODE_FAILED; } + code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid); @@ -2670,7 +2709,6 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { if (p == NULL) { mError("failed to put into task list, taskId:0x%x", pe->req.taskId); } - streamId = pe->req.streamId; } else { mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId, pe->req.startTs, (now - pe->ts) / 1000.0); @@ -2679,38 +2717,27 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { mndReleaseStream(pMnode, pStream); - if (taosArrayGetSize(pList) > 0) { - for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { - int32_t *taskId = taosArrayGet(pList, i); - if (taskId == NULL) { - continue; - } - - for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) { - SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k); - if ((pe != NULL) && (pe->req.taskId == *taskId)) { - taosArrayRemove(pInfo->pTaskList, k); - break; - } - } - } - } - - taosArrayDestroy(pList); + int32_t alreadySend = doCleanReqList(pList, pInfo); + // clear request stream item with empty task list if (taosArrayGetSize(pInfo->pTaskList) == 0) { mndClearConsensusRspEntry(pInfo); if (streamId == -1) { - streamMutexUnlock(&execInfo.lock); - taosArrayDestroy(pStreamList); - mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId); - return TSDB_CODE_FAILED; + mError("streamId is -1, streamId:%" PRIx64" in consensus-checkpointId hashMap, cont", pInfo->streamId); } + void *p = taosArrayPush(pStreamList, &streamId); if (p == NULL) { - mError("failed to put into stream list, stream:0x%" PRIx64, streamId); + mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId); } } + + numOfTrans += alreadySend; + if (numOfTrans > maxAllowedTrans) { + mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend); + taosHashCancelIterate(execInfo.pStreamConsensus, pIter); + break; + } } for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) { @@ -2725,7 +2752,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { streamMutexUnlock(&execInfo.lock); taosArrayDestroy(pStreamList); - mDebug("end to process consensus-checkpointId in tmr"); + taosArrayDestroy(pList); + + mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans); return code; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index d896434f3b..4779f1d6cb 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -814,17 +814,18 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, int64_t ts) { - char msg[128] = {0}; + char msg[128] = {0}; + STrans *pTrans = NULL; + SStreamTask *pTask = NULL; + snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId); - STrans *pTrans = NULL; int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans); if (pTrans == NULL || code != 0) { return terrno; } - STaskId id = {.streamId = pStream->uid, .taskId = taskId}; - SStreamTask *pTask = NULL; + STaskId id = {.streamId = pStream->uid, .taskId = taskId}; code = mndGetStreamTask(&id, pStream, &pTask); if (code) { mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);