From 30486544841051038baf671d0c5865b8cc91804d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Feb 2025 10:01:30 +0800 Subject: [PATCH] refactor(stream): kill too long checkpoint trans. --- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 40 ++++++++++++++++---- source/dnode/mnode/impl/src/mndStreamTrans.c | 16 ++++++-- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index fc1c95a3b3..def817377d 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -116,7 +116,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); -int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt); +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans); int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 247024b283..ffd271fd00 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1258,16 +1258,48 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = NULL; int32_t code = 0; int32_t numOfCheckpointTrans = 0; + SArray *pLongChkpts = NULL; + SArray *pList = NULL; if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } - SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval)); + pList = taosArrayInit(4, sizeof(SCheckpointInterval)); if (pList == NULL) { + mError("failed to init chkptInterval info, not handle stream checkpoint, code:%s", tstrerror(terrno)); return terrno; } + pLongChkpts = taosArrayInit(4, sizeof(int64_t)); + if (pLongChkpts == NULL) { + mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno)); + taosArrayDestroy(pList); + return terrno; + } + + // check if ongong checkpoint trans or long chkpt trans exist. + code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts); + if (code) { + mError("failed to clear finish trans, code:%s", tstrerror(code)); + return code; + } + + // kill long exec checkpoint and set task status + if (taosArrayGetSize(pLongChkpts) > 0) { + //todo: + + for(int32_t i = 0; i < taosArrayGetSize(pLongChkpts); ++i) { + + mndKillTransImpl(pMnode, xx, ""); + mndCreateStreamResetStatusTrans(pMnode, pStream, chkptId); + } + + taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); + return TSDB_CODE_SUCCESS; + } + int64_t now = taosGetTimestampMs(); while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { @@ -1304,12 +1336,6 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { } taosArraySort(pList, streamWaitComparFn); - code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans); - if (code) { - mError("failed to clear finish trans, code:%s", tstrerror(code)); - taosArrayDestroy(pList); - return code; - } int32_t numOfQual = taosArrayGetSize(pList); if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index fe3359dc74..ce82522029 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -31,11 +31,12 @@ int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t s return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo)); } -int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) { +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans) { size_t keyLen = 0; void *pIter = NULL; SArray *pList = taosArrayInit(4, sizeof(SKeyInfo)); int32_t numOfChkpt = 0; + int64_t now = taosGetTimestampMs(); if (pNumOfActiveChkpt != NULL) { *pNumOfActiveChkpt = 0; @@ -63,6 +64,15 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) } else { if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) { numOfChkpt++; + + // last for 10min, kill it + int64_t dur = now - pTrans->createdTime; + if ((dur >= 600 * 1000) && pSlowChkptTrans != NULL) { + mInfo("long chkpt transId:%d, start:%" PRId64 + " exec duration:%.2fs, beyond threshold 10min, kill it and reset task status", + pTrans->id, pTrans->createdTime, dur / 1000.0); + taosArrayPush(pSlowChkptTrans, &pEntry->transId); + } } mndReleaseTrans(pMnode, pTrans); } @@ -101,7 +111,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons } // if any task updates exist, any other stream trans are not allowed to be created - int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); + int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL); if (code) { mError("failed to clear finish trans, code:%s, and continue", tstrerror(code)); } @@ -160,7 +170,7 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) { return 0; } - int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); + int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL); if (code) { mError("failed to clear finish trans, code:%s", tstrerror(code)); }