From efd33aa4d71228f4d82304144d3176c1ea3d312c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Jan 2025 01:25:53 +0800 Subject: [PATCH 01/10] fix(stream): check the checkpoint-report transId and checkpointId, and identify the expired checkpoint-report info. --- source/dnode/mnode/impl/src/mndStream.c | 15 ++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 81 ++++++++++++++++++--- 2 files changed, 78 insertions(+), 18 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4f5198acc0..4f75aae42e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2416,8 +2416,8 @@ static bool validateChkptReport(const SCheckpointReport *pReport, int64_t report return true; } -static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) { - bool valid = validateChkptReport(pReport, reportChkptId); +static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) { + bool valid = validateChkptReport(pReport, reportedChkptId); if (!valid) { return; } @@ -2433,7 +2433,7 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard", pReport->taskId, p->checkpointId, pReport->checkpointId); } else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it - mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64, + mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64, pReport->taskId, p->checkpointId, pReport->checkpointId); // update the checkpoint report info @@ -2465,7 +2465,8 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC mError("failed to put into task list, taskId:0x%x", pReport->taskId); } else { int32_t size = taosArrayGetSize(pList); - mDebug("stream:0x%" PRIx64 " %d tasks has send checkpoint-report", pReport->streamId, size); + mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report", + pReport->streamId, pReport->taskId, size); } } @@ -2491,7 +2492,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { " checkpointVer:%" PRId64 " transId:%d", req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId); - // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. + // register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans. streamMutexLock(&execInfo.lock); SStreamObj *pStream = NULL; @@ -2500,7 +2501,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId); // not in meta-store yet, try to acquire the task in exec buffer - // the checkpoint req arrives too soon before the completion of the create stream trans. + // the checkpoint req arrives too soon before the completion of the creation of stream trans. STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (p == NULL) { @@ -2533,7 +2534,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { } int32_t total = taosArrayGetSize(pInfo->pTaskList); - if (total == numOfTasks) { // all tasks has send the reqs + if (total == numOfTasks) { // all tasks have sent the reqs mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 " will be issued soon", req.streamId, pStream->name, total, req.checkpointId); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 7a38e68744..69ab56429c 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -658,6 +658,65 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { return 0; } +static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t numOfTasks, const char* pName) { + int64_t checkpointId = -1; + int32_t transId = -1; + int32_t taskId = -1; + + int32_t existed = (int32_t)taosArrayGetSize(pReportInfo->pTaskList); + if (existed != numOfTasks) { + mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pReportInfo->streamId, pName, + existed, numOfTasks, numOfTasks - existed); + return -1; + } + + // acquire current active checkpointId, and do cross-check checkpointId info in exec.pTaskList + for(int32_t i = 0; i < numOfTasks; ++i) { + STaskChkptInfo *pInfo = taosArrayGet(pReportInfo->pTaskList, i); + if (pInfo == NULL) { + continue; + } + + if (checkpointId == -1) { + checkpointId = pInfo->checkpointId; + transId = pInfo->transId; + taskId = pInfo->taskId; + } else if (checkpointId != pInfo->checkpointId) { + mError("stream:0x%" PRIx64 + " checkpointId in checkpoint-report list are not identical, type 1 taskId:0x%x checkpointId:%" PRId64 + ", type 2 taskId:0x%x checkpointId:%" PRId64, + pReportInfo->streamId, taskId, checkpointId, pInfo->taskId, pInfo->checkpointId); + return -1; + } + } + + // check for the correct checkpointId for current task info in STaskChkptInfo + STaskChkptInfo *p = taosArrayGet(pReportInfo->pTaskList, 0); + STaskId id = {.streamId = p->streamId, .taskId = p->taskId}; + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + + // cross-check failed, there must be something unknown wrong + SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId)); + if (pTransInfo == NULL) { + mError("stream:0x%" PRIx64" no active exists for checkpoint transId:%d, clear checkpoint-report list", id.streamId); + taosArrayClear(pReportInfo->pTaskList); + return -1; + } + + if (pTransInfo->transId != transId) { + mError("stream:0x%" PRIx64 + " checkpoint-report list info are expired, clear and retry, active transId:%d trans in list:%d", + id.streamId, pTransInfo->transId, transId); + taosArrayClear(pReportInfo->pTaskList); + return -1; + } + + mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", id.streamId, + pName, numOfTasks); + + return TSDB_CODE_SUCCESS; +} + int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; void *pIter = NULL; @@ -668,6 +727,7 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { } mDebug("start to scan checkpoint report info"); + streamMutexLock(&execInfo.lock); while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { @@ -693,30 +753,27 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { } int32_t total = mndGetNumOfStreamTasks(pStream); - int32_t existed = (int32_t)taosArrayGetSize(px->pTaskList); - - if (total == existed) { - mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", - pStream->uid, pStream->name, total); - + int32_t ret = allTasksSendChkptReport(px, total, pStream->name); + if (ret == 0) { code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); if (code == 0) { code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList); if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry taosArrayClear(px->pTaskList); + mInfo("stream:0x%" PRIx64 " clear checkpoint-report list and update the report checkpointId from:%" PRId64 + " to %" PRId64, + pInfo->streamId, px->reportChkpt, pInfo->checkpointId); px->reportChkpt = pInfo->checkpointId; - mDebug("stream:0x%" PRIx64 " clear checkpoint-report list", pInfo->streamId); } else { - mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", + mDebug("stream:0x%" PRIx64 " not launch chkpt-info update trans, due to checkpoint not finished yet", pInfo->streamId); } + + sdbRelease(pMnode->pSdb, pStream); break; } else { mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId); } - } else { - mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name, - existed, total, total - existed); } sdbRelease(pMnode->pSdb, pStream); @@ -743,6 +800,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { streamMutexUnlock(&execInfo.lock); taosArrayDestroy(pDropped); + + mDebug("end to scan checkpoint report info") return TSDB_CODE_SUCCESS; } From 7900c725d529d4f2e126b596bb8a2872e15ed0c0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Jan 2025 09:15:27 +0800 Subject: [PATCH 02/10] fix(stream): fix syntax error. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 69ab56429c..dc9d3fabd9 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -698,7 +698,7 @@ static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t nu // cross-check failed, there must be something unknown wrong SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId)); if (pTransInfo == NULL) { - mError("stream:0x%" PRIx64" no active exists for checkpoint transId:%d, clear checkpoint-report list", id.streamId); + mError("stream:0x%" PRIx64" no active exists for checkpoint transId:%d, clear checkpoint-report list", id.streamId, transId); taosArrayClear(pReportInfo->pTaskList); return -1; } From 08092aeb0c8e1feb28c4735fbc5a79786d5daffa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Jan 2025 10:27:39 +0800 Subject: [PATCH 03/10] fix(stream): not clear task list if check failed, and add more checks. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index dc9d3fabd9..0bef4af47e 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -698,16 +698,23 @@ static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t nu // cross-check failed, there must be something unknown wrong SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId)); if (pTransInfo == NULL) { - mError("stream:0x%" PRIx64" no active exists for checkpoint transId:%d, clear checkpoint-report list", id.streamId, transId); - taosArrayClear(pReportInfo->pTaskList); - return -1; + mWarn("stream:0x%" PRIx64 " no active trans exists for checkpoint transId:%d, it may have been cleared already", + id.streamId, transId); + + if (pe->checkpointInfo.activeId != 0 && pe->checkpointInfo.activeId != checkpointId) { + mWarn("stream:0x%" PRIx64 " active checkpointId is not equalled to the required, current:%" PRId64 + ", req:%" PRId64 " recheck next time", + id.streamId, pe->checkpointInfo.activeId, checkpointId); + return -1; + } else { + // do nothing + } } if (pTransInfo->transId != transId) { mError("stream:0x%" PRIx64 - " checkpoint-report list info are expired, clear and retry, active transId:%d trans in list:%d", + " checkpoint-report list info are expired, active transId:%d trans in list:%d, recheck next time", id.streamId, pTransInfo->transId, transId); - taosArrayClear(pReportInfo->pTaskList); return -1; } From df6ec3afc255dc652f0bd155f53028452ce69d28 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Jan 2025 11:21:53 +0800 Subject: [PATCH 04/10] fix(stream): check null ptr. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 0bef4af47e..e3b8289e4c 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -709,13 +709,13 @@ static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t nu } else { // do nothing } - } - - if (pTransInfo->transId != transId) { - mError("stream:0x%" PRIx64 - " checkpoint-report list info are expired, active transId:%d trans in list:%d, recheck next time", - id.streamId, pTransInfo->transId, transId); - return -1; + } else { + if (pTransInfo->transId != transId) { + mError("stream:0x%" PRIx64 + " checkpoint-report list info are expired, active transId:%d trans in list:%d, recheck next time", + id.streamId, pTransInfo->transId, transId); + return -1; + } } mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", id.streamId, From 0ea46585f444f429aab66cb89424bc9c416d6ff8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Jan 2025 16:29:00 +0800 Subject: [PATCH 05/10] fix(stream): update checkpoint-info after check the failed checkpointId, and update the consensus-checkpoint id in mnode. --- source/dnode/mnode/impl/inc/mndStream.h | 4 ++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 17 ++++++++++------- source/libs/stream/src/streamCheckpoint.c | 13 ++++++++++++- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index ec04aa3111..857fd5c99c 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -175,8 +175,8 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo); void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); -int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId); -int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId); +int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId); +int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId); int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId); int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index e3b8289e4c..d896434f3b 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -902,7 +902,8 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()}; memcpy(&info.req, pRestoreInfo, sizeof(info.req)); - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { + int32_t num = (int32_t) taosArrayGetSize(pInfo->pTaskList); + for (int32_t i = 0; i < num; ++i) { SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i); if (p == NULL) { continue; @@ -910,10 +911,12 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo if (p->req.taskId == info.req.taskId) { mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64 - "->%" PRId64 " total existed:%d", - pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, - (int32_t)taosArrayGetSize(pInfo->pTaskList)); + "->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " total existed:%d", + pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId, + info.req.checkpointId, num); p->req.startTs = info.req.startTs; + p->req.checkpointId = info.req.checkpointId; + p->req.transId = info.req.transId; return; } } @@ -922,7 +925,7 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo if (p == NULL) { mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId); } else { - int32_t num = taosArrayGetSize(pInfo->pTaskList); + num = taosArrayGetSize(pInfo->pTaskList); mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64 " waiting tasks:%d", pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num); @@ -934,7 +937,7 @@ void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) { pInfo->pTaskList = NULL; } -int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) { +int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) { int32_t code = 0; int32_t numOfStreams = taosHashGetSize(pHash); if (numOfStreams == 0) { @@ -951,7 +954,7 @@ int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) { return code; } -int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) { +int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) { int32_t code = 0; int32_t numOfStreams = taosHashGetSize(pHash); if (numOfStreams == 0) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d3eba382c9..0765018c9c 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -604,6 +604,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV streamMutexLock(&pTask->lock); + // not update the checkpoint info if the checkpointId is less than the failed checkpointId + if (pReq->checkpointId < pInfo->pActiveInfo->failedId) { + stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64 + " is less than the failed checkpointId:%" PRId64 ", discard the update info", + id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId); + streamMutexUnlock(&pTask->lock); + + // always return true + return TSDB_CODE_SUCCESS; + } + if (pReq->checkpointId <= pInfo->checkpointId) { stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64 " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored", @@ -638,7 +649,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pInfo->checkpointTime, pReq->checkpointTs); } else { // not in restore status, must be in checkpoint status if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) { - stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); From 3206c688ed8e187a5ec03ae871cdec31620bacbf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Jan 2025 16:30:43 +0800 Subject: [PATCH 06/10] fix(stream): fix syntax error. --- source/libs/stream/src/streamCheckpoint.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 0765018c9c..1a49e50547 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -651,7 +651,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) { stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, - id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, + id, vgId, pStatus.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); } else { stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 From 88a94919c43bef731f607926e5bf68c8c953d4ec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Jan 2025 18:41:52 +0800 Subject: [PATCH 07/10] fix(stream): check for nodeupdate trans before create streams. --- source/dnode/mnode/impl/inc/mndStream.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 9 +++++++-- source/dnode/mnode/impl/src/mndStreamTrans.c | 18 ++++++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 857fd5c99c..fc1c95a3b3 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -159,6 +159,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo *pInfo); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); +bool isNodeUpdateTransActive(); int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter); void destroyStreamTaskIter(SStreamTaskIter *pIter); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4f75aae42e..c1ae5273c2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -65,8 +65,6 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq); static void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); -static void removeExpiredNodeInfo(const SArray *pNodeSnapshot); -static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); @@ -809,6 +807,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } + // check for the taskEp update trans + if (isNodeUpdateTransActive()) { + mError("stream:%s failed to create stream, node update trans is active", createReq.name); + code = TSDB_CODE_STREAM_TASK_IVLD_STATUS; + goto _OVER; + } + code = mndCheckForSnode(pMnode, pSourceDb); mndReleaseDb(pMnode, pSourceDb); if (code != 0) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index a1e104aeca..9ef27f53b8 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -292,6 +292,24 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg return mndTransAppendRedoAction(pTrans, &action); } +bool isNodeUpdateTransActive() { + bool exist = false; + void *pIter = NULL; + + streamMutexLock(&execInfo.lock); + + while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { + SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter; + if (strcmp(pTransInfo->name, MND_STREAM_CHKPT_UPDATE_NAME) != 0) { + mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed", + pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime) exist = true; + } + } + + streamMutexUnlock(&execInfo.lock); + return exist; +} + int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { void *pIter = NULL; From 2f065062c3a91412ce46cc7ee75b1cc3782d08dd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Jan 2025 19:04:14 +0800 Subject: [PATCH 08/10] fix(stream): adjust position of checking trans. --- source/dnode/mnode/impl/src/mndStream.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c1ae5273c2..30953736eb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -799,6 +799,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno); } + // check for the taskEp update trans + if (isNodeUpdateTransActive()) { + mError("stream:%s failed to create stream, node update trans is active", createReq.name); + code = TSDB_CODE_STREAM_TASK_IVLD_STATUS; + goto _OVER; + } + SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB); if (pSourceDb == NULL) { code = terrno; @@ -807,13 +814,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - // check for the taskEp update trans - if (isNodeUpdateTransActive()) { - mError("stream:%s failed to create stream, node update trans is active", createReq.name); - code = TSDB_CODE_STREAM_TASK_IVLD_STATUS; - goto _OVER; - } - code = mndCheckForSnode(pMnode, pSourceDb); mndReleaseDb(pMnode, pSourceDb); if (code != 0) { From 30358f1297ac81a04559db4e983f9b84cadf063f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 Jan 2025 12:52:30 +0800 Subject: [PATCH 09/10] fix(stream): fix error in check node update trans. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 9ef27f53b8..c0d3a11506 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -300,9 +300,10 @@ bool isNodeUpdateTransActive() { while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter; - if (strcmp(pTransInfo->name, MND_STREAM_CHKPT_UPDATE_NAME) != 0) { + if (strcmp(pTransInfo->name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) { mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed", - pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime) exist = true; + pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime); + exist = true; } } From 9f2c448e36eb3b7202bff7b6e38855b4b580a0e6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 Jan 2025 14:05:40 +0800 Subject: [PATCH 10/10] fix(stream): fix error in check node update trans. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index c0d3a11506..fe3359dc74 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -300,7 +300,7 @@ bool isNodeUpdateTransActive() { while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter; - if (strcmp(pTransInfo->name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) { + if (strcmp(pTransInfo->name, MND_STREAM_TASK_UPDATE_NAME) == 0) { mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed", pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime); exist = true;