diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index fc1c95a3b3..d694dc67eb 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -116,7 +116,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); -int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt); +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans); int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); @@ -159,6 +159,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo *pInfo); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); +void killChkptAndResetStreamTask(SMnode *pMnode, SArray *pLongChkpts); bool isNodeUpdateTransActive(); int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 11fd2d4f65..55974eb9c7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1258,17 +1258,47 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = NULL; int32_t code = 0; int32_t numOfCheckpointTrans = 0; + SArray *pLongChkpts = NULL; + SArray *pList = NULL; + int64_t now = taosGetTimestampMs(); if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } - SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval)); + pList = taosArrayInit(4, sizeof(SCheckpointInterval)); if (pList == NULL) { + mError("failed to init chkptInterval info, not handle stream checkpoint, code:%s", tstrerror(terrno)); return terrno; } - int64_t now = taosGetTimestampMs(); + pLongChkpts = taosArrayInit(4, sizeof(SStreamTransInfo)); + if (pLongChkpts == NULL) { + mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno)); + taosArrayDestroy(pList); + return terrno; + } + + // check if ongong checkpoint trans or long chkpt trans exist. + code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts); + if (code) { + mError("failed to clear finish trans, code:%s", tstrerror(code)); + + taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); + return code; + } + + // kill long exec checkpoint and set task status + if (taosArrayGetSize(pLongChkpts) > 0) { + killChkptAndResetStreamTask(pMnode, pLongChkpts); + + taosArrayDestroy(pList); + taosArrayDestroy(pLongChkpts); + return TSDB_CODE_SUCCESS; + } + + taosArrayDestroy(pLongChkpts); while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { int64_t duration = now - pStream->checkpointFreq; @@ -1304,12 +1334,6 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { } taosArraySort(pList, streamWaitComparFn); - code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans); - if (code) { - mError("failed to clear finish trans, code:%s", tstrerror(code)); - taosArrayDestroy(pList); - return code; - } int32_t numOfQual = taosArrayGetSize(pList); if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index fe3359dc74..f4f7c65a00 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -16,6 +16,8 @@ #include "mndStream.h" #include "mndTrans.h" +#define MAX_CHKPT_EXEC_ELAPSED (600*1000) // 600s + typedef struct SKeyInfo { void *pKey; int32_t keyLen; @@ -31,11 +33,12 @@ int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t s return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo)); } -int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) { +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans) { size_t keyLen = 0; void *pIter = NULL; SArray *pList = taosArrayInit(4, sizeof(SKeyInfo)); int32_t numOfChkpt = 0; + int64_t now = taosGetTimestampMs(); if (pNumOfActiveChkpt != NULL) { *pNumOfActiveChkpt = 0; @@ -63,6 +66,18 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) } else { if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) { numOfChkpt++; + + // last for 10min, kill it + int64_t dur = now - pTrans->createdTime; + if ((dur >= MAX_CHKPT_EXEC_ELAPSED) && (pLongChkptTrans != NULL)) { + mInfo("long chkpt transId:%d, start:%" PRId64 + " exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status", + pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0)); + void* p = taosArrayPush(pLongChkptTrans, pEntry); + if (p == NULL) { + mError("failed to add long checkpoint trans, transId:%d, code:%s", pEntry->transId, tstrerror(terrno)); + } + } } mndReleaseTrans(pMnode, pTrans); } @@ -101,7 +116,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons } // if any task updates exist, any other stream trans are not allowed to be created - int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); + int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL); if (code) { mError("failed to clear finish trans, code:%s, and continue", tstrerror(code)); } @@ -160,7 +175,7 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) { return 0; } - int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); + int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL); if (code) { mError("failed to clear finish trans, code:%s", tstrerror(code)); } @@ -361,3 +376,37 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { mDebug("complete clear checkpoints in all Dbs"); } + +void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) { + int32_t code = 0; + int64_t now = taosGetTimestampMs(); + int32_t num = taosArrayGetSize(pLongChkpts); + + mInfo("start to kill %d long checkpoint trans", num); + + for(int32_t i = 0; i < num; ++i) { + SStreamTransInfo* pTrans = (SStreamTransInfo*) taosArrayGet(pLongChkpts, i); + if (pTrans == NULL) { + continue; + } + + double el = (now - pTrans->startTime) / 1000.0; + mInfo("stream:0x%" PRIx64 " start to kill ongoing long checkpoint transId:%d, elapsed time:%.2fs. killed", + pTrans->streamId, pTrans->transId, el); + + SStreamObj *p = NULL; + code = mndGetStreamObj(pMnode, pTrans->streamId, &p); + if (code == 0 && p != NULL) { + mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb); + + mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name, + pTrans->streamId, pTrans->transId, p->checkpointId); + + code = mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); + if (code) { + mError("stream:%s 0x%"PRIx64" failed to create reset stream task, code:%s", p->name, p->uid, tstrerror(code)); + } + sdbRelease(pMnode->pSdb, p); + } + } +} \ No newline at end of file diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ee34648a47..267dc88807 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -777,7 +777,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { int32_t code = 0; // merge multiple input data if possible in the input queue. - stDebug("s-task:%s start to extract data block from inputQ", id); + int64_t st = taosGetTimestampMs(); + stDebug("s-task:%s start to extract data block from inputQ, ts:%" PRId64, id, st); while (1) { int32_t blockSize = 0; @@ -807,8 +808,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } - int64_t st = taosGetTimestampMs(); - EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (ret == EXEC_AFTER_IDLE) { streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); @@ -825,6 +824,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { // dispatch checkpoint msg to all downstream tasks int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { +#if 0 + // Injection error: for automatic kill long trans test + taosMsleep(50*1000); +#endif code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); if (code != 0) { stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code));