From 30486544841051038baf671d0c5865b8cc91804d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Feb 2025 10:01:30 +0800 Subject: [PATCH 1/8] refactor(stream): kill too long checkpoint trans. --- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 40 ++++++++++++++++---- source/dnode/mnode/impl/src/mndStreamTrans.c | 16 ++++++-- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index fc1c95a3b3..def817377d 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -116,7 +116,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); -int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt); +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans); int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 247024b283..ffd271fd00 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1258,16 +1258,48 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = NULL; int32_t code = 0; int32_t numOfCheckpointTrans = 0; + SArray *pLongChkpts = NULL; + SArray *pList = NULL; if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } - SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval)); + pList = taosArrayInit(4, sizeof(SCheckpointInterval)); if (pList == NULL) { + mError("failed to init chkptInterval info, not handle stream checkpoint, code:%s", tstrerror(terrno)); return terrno; } + pLongChkpts = taosArrayInit(4, sizeof(int64_t)); + if (pLongChkpts == NULL) { + mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno)); + taosArrayDestroy(pList); + return terrno; + } + + // check if ongong checkpoint trans or long chkpt trans exist. + code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts); + if (code) { + mError("failed to clear finish trans, code:%s", tstrerror(code)); + return code; + } + + // kill long exec checkpoint and set task status + if (taosArrayGetSize(pLongChkpts) > 0) { + //todo: + + for(int32_t i = 0; i < taosArrayGetSize(pLongChkpts); ++i) { + + mndKillTransImpl(pMnode, xx, ""); + mndCreateStreamResetStatusTrans(pMnode, pStream, chkptId); + } + + taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); + return TSDB_CODE_SUCCESS; + } + int64_t now = taosGetTimestampMs(); while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { @@ -1304,12 +1336,6 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { } taosArraySort(pList, streamWaitComparFn); - code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans); - if (code) { - mError("failed to clear finish trans, code:%s", tstrerror(code)); - taosArrayDestroy(pList); - return code; - } int32_t numOfQual = taosArrayGetSize(pList); if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index fe3359dc74..ce82522029 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -31,11 +31,12 @@ int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t s return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo)); } -int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) { +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans) { size_t keyLen = 0; void *pIter = NULL; SArray *pList = taosArrayInit(4, sizeof(SKeyInfo)); int32_t numOfChkpt = 0; + int64_t now = taosGetTimestampMs(); if (pNumOfActiveChkpt != NULL) { *pNumOfActiveChkpt = 0; @@ -63,6 +64,15 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) } else { if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) { numOfChkpt++; + + // last for 10min, kill it + int64_t dur = now - pTrans->createdTime; + if ((dur >= 600 * 1000) && pSlowChkptTrans != NULL) { + mInfo("long chkpt transId:%d, start:%" PRId64 + " exec duration:%.2fs, beyond threshold 10min, kill it and reset task status", + pTrans->id, pTrans->createdTime, dur / 1000.0); + taosArrayPush(pSlowChkptTrans, &pEntry->transId); + } } mndReleaseTrans(pMnode, pTrans); } @@ -101,7 +111,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons } // if any task updates exist, any other stream trans are not allowed to be created - int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); + int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL); if (code) { mError("failed to clear finish trans, code:%s, and continue", tstrerror(code)); } @@ -160,7 +170,7 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) { return 0; } - int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); + int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL); if (code) { mError("failed to clear finish trans, code:%s", tstrerror(code)); } From 99d6086c5a60eca931ea1b0c18e9e76a1e4a7762 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Feb 2025 15:07:24 +0800 Subject: [PATCH 2/8] enh(stream): kill too long checkpoint trans. --- source/dnode/mnode/impl/inc/mndStream.h | 3 +- source/dnode/mnode/impl/src/mndStream.c | 16 +++----- source/dnode/mnode/impl/src/mndStreamTrans.c | 42 +++++++++++++++++--- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index def817377d..d694dc67eb 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -116,7 +116,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); -int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans); +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans); int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); @@ -159,6 +159,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo *pInfo); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); +void killChkptAndResetStreamTask(SMnode *pMnode, SArray *pLongChkpts); bool isNodeUpdateTransActive(); int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ffd271fd00..4cd4721bc4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1260,6 +1260,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t numOfCheckpointTrans = 0; SArray *pLongChkpts = NULL; SArray *pList = NULL; + int64_t now = taosGetTimestampMs(); if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { return TSDB_CODE_STREAM_TASK_IVLD_STATUS; @@ -1271,7 +1272,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { return terrno; } - pLongChkpts = taosArrayInit(4, sizeof(int64_t)); + pLongChkpts = taosArrayInit(4, sizeof(SStreamTransInfo)); if (pLongChkpts == NULL) { mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno)); taosArrayDestroy(pList); @@ -1282,26 +1283,21 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts); if (code) { mError("failed to clear finish trans, code:%s", tstrerror(code)); + + taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); return code; } // kill long exec checkpoint and set task status if (taosArrayGetSize(pLongChkpts) > 0) { - //todo: - - for(int32_t i = 0; i < taosArrayGetSize(pLongChkpts); ++i) { - - mndKillTransImpl(pMnode, xx, ""); - mndCreateStreamResetStatusTrans(pMnode, pStream, chkptId); - } + killChkptAndResetStreamTask(pMnode, pLongChkpts); taosArrayDestroy(pList); taosArrayDestroy(pLongChkpts); return TSDB_CODE_SUCCESS; } - int64_t now = taosGetTimestampMs(); - while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { int64_t duration = now - pStream->checkpointFreq; if (duration < tsStreamCheckpointInterval * 1000) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index ce82522029..4c3ba0c077 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -16,6 +16,8 @@ #include "mndStream.h" #include "mndTrans.h" +#define MAX_CHKPT_EXEC_ELAPSED (60*1000) // 60s + typedef struct SKeyInfo { void *pKey; int32_t keyLen; @@ -31,7 +33,7 @@ int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t s return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo)); } -int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans) { +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans) { size_t keyLen = 0; void *pIter = NULL; SArray *pList = taosArrayInit(4, sizeof(SKeyInfo)); @@ -67,11 +69,11 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, // last for 10min, kill it int64_t dur = now - pTrans->createdTime; - if ((dur >= 600 * 1000) && pSlowChkptTrans != NULL) { + if ((dur >= MAX_CHKPT_EXEC_ELAPSED) && (pLongChkptTrans != NULL)) { mInfo("long chkpt transId:%d, start:%" PRId64 - " exec duration:%.2fs, beyond threshold 10min, kill it and reset task status", - pTrans->id, pTrans->createdTime, dur / 1000.0); - taosArrayPush(pSlowChkptTrans, &pEntry->transId); + " exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status", + pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0)); + taosArrayPush(pLongChkptTrans, pEntry); } } mndReleaseTrans(pMnode, pTrans); @@ -371,3 +373,33 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { mDebug("complete clear checkpoints in all Dbs"); } + +void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) { + int32_t code = 0; + int64_t now = taosGetTimestampMs(); + int32_t num = taosArrayGetSize(pLongChkpts); + + mInfo("start to kill %d long checkpoint trans", num); + + for(int32_t i = 0; i < num; ++i) { + SStreamTransInfo* pTrans = (SStreamTransInfo*) taosArrayGet(pLongChkpts, i); + if (pTrans == NULL) { + continue; + } + + double el = (now - pTrans->startTime) / 1000.0; + mInfo("stream:%s id:%" PRIx64 " ongoing checkpoint trans, id:%d, elapsed time:%.2fs killed", pTrans->name, + pTrans->streamId, pTrans->transId, el); + + SStreamObj *p = NULL; + code = mndGetStreamObj(pMnode, pTrans->streamId, &p); + if (code == 0 && p != NULL) { + mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb); + + mDebug("create reset task trans for stream:%s 0x%" PRIx64, pTrans->name, pTrans->streamId); + mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); + + sdbRelease(pMnode->pSdb, p); + } + } +} \ No newline at end of file From 0a672f1b9628a5d24e9f16bb5c1a3b11db6f4767 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Feb 2025 16:55:10 +0800 Subject: [PATCH 3/8] refactor(stream): update logs. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 7 ++++--- source/libs/stream/src/streamExec.c | 9 ++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 4c3ba0c077..9c11d99c35 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -388,7 +388,7 @@ void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) { } double el = (now - pTrans->startTime) / 1000.0; - mInfo("stream:%s id:%" PRIx64 " ongoing checkpoint trans, id:%d, elapsed time:%.2fs killed", pTrans->name, + mInfo("stream:0x%" PRIx64 " start to kill ongoing long checkpoint transId:%d, elapsed time:%.2fs. killed", pTrans->streamId, pTrans->transId, el); SStreamObj *p = NULL; @@ -396,9 +396,10 @@ void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) { if (code == 0 && p != NULL) { mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb); - mDebug("create reset task trans for stream:%s 0x%" PRIx64, pTrans->name, pTrans->streamId); - mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); + mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name, + pTrans->streamId, pTrans->transId, p->checkpointId); + mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); sdbRelease(pMnode->pSdb, p); } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ee34648a47..8ee06a82db 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -777,7 +777,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { int32_t code = 0; // merge multiple input data if possible in the input queue. - stDebug("s-task:%s start to extract data block from inputQ", id); + int64_t st = taosGetTimestampMs(); + stDebug("s-task:%s start to extract data block from inputQ, ts:%" PRId64, id, st); while (1) { int32_t blockSize = 0; @@ -807,8 +808,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } - int64_t st = taosGetTimestampMs(); - EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (ret == EXEC_AFTER_IDLE) { streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); @@ -825,6 +824,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { // dispatch checkpoint msg to all downstream tasks int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + + // Injection error: for automatic kill long trans test + taosMsleep(50*1000); + code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); if (code != 0) { stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code)); From 6d5984421f25b8cf5e506edee68c8e7ef8fcff2f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Feb 2025 17:27:41 +0800 Subject: [PATCH 4/8] refactor(stream): update the long trans threshold. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 9c11d99c35..fb990c6fe9 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -16,7 +16,7 @@ #include "mndStream.h" #include "mndTrans.h" -#define MAX_CHKPT_EXEC_ELAPSED (60*1000) // 60s +#define MAX_CHKPT_EXEC_ELAPSED (600*1000) // 600s typedef struct SKeyInfo { void *pKey; From 4edc21e446229adeb63a2962f54d87d130f86970 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Feb 2025 17:42:50 +0800 Subject: [PATCH 5/8] refactor(stream): disable error injection. --- source/libs/stream/src/streamExec.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8ee06a82db..267dc88807 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -824,10 +824,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { // dispatch checkpoint msg to all downstream tasks int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - +#if 0 // Injection error: for automatic kill long trans test taosMsleep(50*1000); - +#endif code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); if (code != 0) { stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code)); From ea549364d490f82fe7c6bbcf0ae9bc04c5c8e7b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Feb 2025 16:38:26 +0800 Subject: [PATCH 6/8] fix(stream): free array and check the return values. --- source/dnode/mnode/impl/src/mndStream.c | 6 ++++++ source/dnode/mnode/impl/src/mndStreamTrans.c | 10 ++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4cd4721bc4..18e404564d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1328,6 +1328,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t size = taosArrayGetSize(pList); if (size == 0) { taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); + return code; } @@ -1340,6 +1342,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { "checkpoint trans are not allowed, wait for 30s", numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint); taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); + return code; } @@ -1379,6 +1383,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { } taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); + return code; } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index fb990c6fe9..f4f7c65a00 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -73,7 +73,10 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, mInfo("long chkpt transId:%d, start:%" PRId64 " exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status", pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0)); - taosArrayPush(pLongChkptTrans, pEntry); + void* p = taosArrayPush(pLongChkptTrans, pEntry); + if (p == NULL) { + mError("failed to add long checkpoint trans, transId:%d, code:%s", pEntry->transId, tstrerror(terrno)); + } } } mndReleaseTrans(pMnode, pTrans); @@ -399,7 +402,10 @@ void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) { mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name, pTrans->streamId, pTrans->transId, p->checkpointId); - mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); + code = mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); + if (code) { + mError("stream:%s 0x%"PRIx64" failed to create reset stream task, code:%s", p->name, p->uid, tstrerror(code)); + } sdbRelease(pMnode->pSdb, p); } } From 4d21d5e055294846f3685a715cd36e3a947478e6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 24 Feb 2025 13:48:21 +0800 Subject: [PATCH 7/8] fix(stream): fix memory leak. --- source/dnode/mnode/impl/src/mndStream.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 18e404564d..6af54aeb2e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1298,6 +1298,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; } + taosArrayDestroy(pLongChkpts); + while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { int64_t duration = now - pStream->checkpointFreq; if (duration < tsStreamCheckpointInterval * 1000) { From 91271f3fa46eb25ea208dfe8157662737bfbfa11 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 24 Feb 2025 13:53:28 +0800 Subject: [PATCH 8/8] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndStream.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6af54aeb2e..337e07756b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1330,8 +1330,6 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t size = taosArrayGetSize(pList); if (size == 0) { taosArrayDestroy(pList); - taosArrayDestroy(pLongChkpts); - return code; } @@ -1344,8 +1342,6 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { "checkpoint trans are not allowed, wait for 30s", numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint); taosArrayDestroy(pList); - taosArrayDestroy(pLongChkpts); - return code; } @@ -1385,8 +1381,6 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { } taosArrayDestroy(pList); - taosArrayDestroy(pLongChkpts); - return code; }