From 99d6086c5a60eca931ea1b0c18e9e76a1e4a7762 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Feb 2025 15:07:24 +0800 Subject: [PATCH] enh(stream): kill too long checkpoint trans. --- source/dnode/mnode/impl/inc/mndStream.h | 3 +- source/dnode/mnode/impl/src/mndStream.c | 16 +++----- source/dnode/mnode/impl/src/mndStreamTrans.c | 42 +++++++++++++++++--- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index def817377d..d694dc67eb 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -116,7 +116,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); -int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans); +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans); int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); @@ -159,6 +159,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo *pInfo); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); +void killChkptAndResetStreamTask(SMnode *pMnode, SArray *pLongChkpts); bool isNodeUpdateTransActive(); int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ffd271fd00..4cd4721bc4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1260,6 +1260,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t numOfCheckpointTrans = 0; SArray *pLongChkpts = NULL; SArray *pList = NULL; + int64_t now = taosGetTimestampMs(); if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { return TSDB_CODE_STREAM_TASK_IVLD_STATUS; @@ -1271,7 +1272,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { return terrno; } - pLongChkpts = taosArrayInit(4, sizeof(int64_t)); + pLongChkpts = taosArrayInit(4, sizeof(SStreamTransInfo)); if (pLongChkpts == NULL) { mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno)); taosArrayDestroy(pList); @@ -1282,26 +1283,21 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts); if (code) { mError("failed to clear finish trans, code:%s", tstrerror(code)); + + taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); return code; } // kill long exec checkpoint and set task status if (taosArrayGetSize(pLongChkpts) > 0) { - //todo: - - for(int32_t i = 0; i < taosArrayGetSize(pLongChkpts); ++i) { - - mndKillTransImpl(pMnode, xx, ""); - mndCreateStreamResetStatusTrans(pMnode, pStream, chkptId); - } + killChkptAndResetStreamTask(pMnode, pLongChkpts); taosArrayDestroy(pList); taosArrayDestroy(pLongChkpts); return TSDB_CODE_SUCCESS; } - int64_t now = taosGetTimestampMs(); - while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { int64_t duration = now - pStream->checkpointFreq; if (duration < tsStreamCheckpointInterval * 1000) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index ce82522029..4c3ba0c077 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -16,6 +16,8 @@ #include "mndStream.h" #include "mndTrans.h" +#define MAX_CHKPT_EXEC_ELAPSED (60*1000) // 60s + typedef struct SKeyInfo { void *pKey; int32_t keyLen; @@ -31,7 +33,7 @@ int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t s return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo)); } -int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray* pSlowChkptTrans) { +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans) { size_t keyLen = 0; void *pIter = NULL; SArray *pList = taosArrayInit(4, sizeof(SKeyInfo)); @@ -67,11 +69,11 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, // last for 10min, kill it int64_t dur = now - pTrans->createdTime; - if ((dur >= 600 * 1000) && pSlowChkptTrans != NULL) { + if ((dur >= MAX_CHKPT_EXEC_ELAPSED) && (pLongChkptTrans != NULL)) { mInfo("long chkpt transId:%d, start:%" PRId64 - " exec duration:%.2fs, beyond threshold 10min, kill it and reset task status", - pTrans->id, pTrans->createdTime, dur / 1000.0); - taosArrayPush(pSlowChkptTrans, &pEntry->transId); + " exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status", + pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0)); + taosArrayPush(pLongChkptTrans, pEntry); } } mndReleaseTrans(pMnode, pTrans); @@ -371,3 +373,33 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { mDebug("complete clear checkpoints in all Dbs"); } + +void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) { + int32_t code = 0; + int64_t now = taosGetTimestampMs(); + int32_t num = taosArrayGetSize(pLongChkpts); + + mInfo("start to kill %d long checkpoint trans", num); + + for(int32_t i = 0; i < num; ++i) { + SStreamTransInfo* pTrans = (SStreamTransInfo*) taosArrayGet(pLongChkpts, i); + if (pTrans == NULL) { + continue; + } + + double el = (now - pTrans->startTime) / 1000.0; + mInfo("stream:%s id:%" PRIx64 " ongoing checkpoint trans, id:%d, elapsed time:%.2fs killed", pTrans->name, + pTrans->streamId, pTrans->transId, el); + + SStreamObj *p = NULL; + code = mndGetStreamObj(pMnode, pTrans->streamId, &p); + if (code == 0 && p != NULL) { + mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb); + + mDebug("create reset task trans for stream:%s 0x%" PRIx64, pTrans->name, pTrans->streamId); + mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); + + sdbRelease(pMnode->pSdb, p); + } + } +} \ No newline at end of file