diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 3677fc5616..234d4f41e1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1016,15 +1016,15 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e78996c231..247024b283 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1158,51 +1158,22 @@ int32_t extractStreamNodeList(SMnode *pMnode) { } static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { - bool ready = true; + int32_t code = 0; if (mndStreamNodeIsUpdated(pMnode)) { - TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } streamMutexLock(&execInfo.lock); if (taosArrayGetSize(execInfo.pNodeList) == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); if (taosArrayGetSize(execInfo.pTaskList) != 0) { - streamMutexUnlock(&execInfo.lock); mError("stream task node change checking done, no vgroups exist, but task list is not empty"); - return TSDB_CODE_FAILED; - } - } - - for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { - STaskId *p = taosArrayGet(execInfo.pTaskList, i); - if (p == NULL) { - continue; - } - - STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); - if (pEntry == NULL) { - continue; - } - - if (pEntry->status != TASK_STATUS__READY) { - mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId, - (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); - ready = false; - break; - } - - if (pEntry->hTaskId != 0) { - mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64 - " exists, checkpoint not issued", - pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status), - pEntry->hTaskId); - ready = false; - break; + code = TSDB_CODE_STREAM_TASK_IVLD_STATUS; } } streamMutexUnlock(&execInfo.lock); - return ready ? 0 : -1; + return code; } int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) { @@ -1216,7 +1187,22 @@ int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) { continue; } - if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) { + // -1 denote not ready now or never ready till now + if (pEntry->hTaskId != 0) { + mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64 + " exists, checkpoint not issued", + pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status), + pEntry->hTaskId); + return -1; + } + + if (pEntry->status != TASK_STATUS__READY) { + mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId, + (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); + return -1; + } + + if (ts < pEntry->startTime) { ts = pEntry->startTime; taskId = pEntry->id.taskId; } @@ -1249,11 +1235,11 @@ static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) { int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid); if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) { + if (lastReadyTs != -1) { - mInfo("not start checkpoint, stream:0x%"PRIx64" last ready ts:%"PRId64" ready duration:%"PRId64" less than threshold", - pStream->uid, lastReadyTs, now - lastReadyTs); - } else { - mInfo("not start checkpoint, stream:0x%"PRIx64" not ready now", pStream->uid); + mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64 + "ms less than threshold", + pStream->uid, lastReadyTs, (now - lastReadyTs)); } ready = false; @@ -1274,7 +1260,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t numOfCheckpointTrans = 0; if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { - TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval)); @@ -1326,7 +1312,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { } int32_t numOfQual = taosArrayGetSize(pList); - if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) { + if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) { mDebug( "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new " "checkpoint trans are not allowed, wait for 30s", @@ -2601,20 +2587,51 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, } } +static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) { + int32_t alreadySend = taosArrayGetSize(pList); + + for (int32_t i = 0; i < alreadySend; ++i) { + int32_t *taskId = taosArrayGet(pList, i); + if (taskId == NULL) { + continue; + } + + for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) { + SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k); + if ((pe != NULL) && (pe->req.taskId == *taskId)) { + taosArrayRemove(pInfo->pTaskList, k); + break; + } + } + } + + return alreadySend; +} + int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; int64_t now = taosGetTimestampMs(); + bool allReady = true; + SArray *pNodeSnapshot = NULL; + int32_t maxAllowedTrans = 50; + int32_t numOfTrans = 0; + int32_t code = 0; + void *pIter = NULL; + + SArray *pList = taosArrayInit(4, sizeof(int32_t)); + if (pList == NULL) { + return terrno; + } + SArray *pStreamList = taosArrayInit(4, sizeof(int64_t)); if (pStreamList == NULL) { + taosArrayDestroy(pList); return terrno; } mDebug("start to process consensus-checkpointId in tmr"); - bool allReady = true; - SArray *pNodeSnapshot = NULL; - - int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); + code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); taosArrayDestroy(pNodeSnapshot); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); @@ -2623,28 +2640,30 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { if (!allReady) { mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process"); taosArrayDestroy(pStreamList); + taosArrayDestroy(pList); return 0; } streamMutexLock(&execInfo.lock); - void *pIter = NULL; while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) { SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter; - int64_t streamId = -1; - int32_t num = taosArrayGetSize(pInfo->pTaskList); - SArray *pList = taosArrayInit(4, sizeof(int32_t)); - if (pList == NULL) { - continue; - } + taosArrayClear(pList); + int64_t streamId = -1; + int32_t num = taosArrayGetSize(pInfo->pTaskList); SStreamObj *pStream = NULL; + code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream); if (pStream == NULL || code != 0) { // stream has been dropped already mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId); void *p = taosArrayPush(pStreamList, &pInfo->streamId); - taosArrayDestroy(pList); + if (p == NULL) { + mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64 + " code:%s, continue", + pInfo->streamId, tstrerror(terrno)); + } continue; } @@ -2654,7 +2673,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { continue; } - streamId = pe->req.streamId; + if (streamId == -1) { + streamId = pe->req.streamId; + } int32_t existed = 0; bool allSame = true; @@ -2665,7 +2686,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { break; } - if (((now - pe->ts) >= 10 * 1000) || allSame) { + if (((now - pe->ts) >= 10 * 1000) && allSame) { mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId, pe->req.startTs, (now - pe->ts) / 1000.0); if (chkId > pe->req.checkpointId) { @@ -2673,8 +2694,12 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { taosArrayDestroy(pStreamList); mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId, pe->req.checkpointId, chkId); + + mndReleaseStream(pMnode, pStream); + taosHashCancelIterate(execInfo.pStreamConsensus, pIter); return TSDB_CODE_FAILED; } + code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid); @@ -2684,7 +2709,6 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { if (p == NULL) { mError("failed to put into task list, taskId:0x%x", pe->req.taskId); } - streamId = pe->req.streamId; } else { mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId, pe->req.startTs, (now - pe->ts) / 1000.0); @@ -2693,38 +2717,27 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { mndReleaseStream(pMnode, pStream); - if (taosArrayGetSize(pList) > 0) { - for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { - int32_t *taskId = taosArrayGet(pList, i); - if (taskId == NULL) { - continue; - } - - for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) { - SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k); - if ((pe != NULL) && (pe->req.taskId == *taskId)) { - taosArrayRemove(pInfo->pTaskList, k); - break; - } - } - } - } - - taosArrayDestroy(pList); + int32_t alreadySend = doCleanReqList(pList, pInfo); + // clear request stream item with empty task list if (taosArrayGetSize(pInfo->pTaskList) == 0) { mndClearConsensusRspEntry(pInfo); if (streamId == -1) { - streamMutexUnlock(&execInfo.lock); - taosArrayDestroy(pStreamList); - mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId); - return TSDB_CODE_FAILED; + mError("streamId is -1, streamId:%" PRIx64" in consensus-checkpointId hashMap, cont", pInfo->streamId); } + void *p = taosArrayPush(pStreamList, &streamId); if (p == NULL) { - mError("failed to put into stream list, stream:0x%" PRIx64, streamId); + mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId); } } + + numOfTrans += alreadySend; + if (numOfTrans > maxAllowedTrans) { + mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend); + taosHashCancelIterate(execInfo.pStreamConsensus, pIter); + break; + } } for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) { @@ -2739,7 +2752,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { streamMutexUnlock(&execInfo.lock); taosArrayDestroy(pStreamList); - mDebug("end to process consensus-checkpointId in tmr"); + taosArrayDestroy(pList); + + mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans); return code; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index d896434f3b..4779f1d6cb 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -814,17 +814,18 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, int64_t ts) { - char msg[128] = {0}; + char msg[128] = {0}; + STrans *pTrans = NULL; + SStreamTask *pTask = NULL; + snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId); - STrans *pTrans = NULL; int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans); if (pTrans == NULL || code != 0) { return terrno; } - STaskId id = {.streamId = pStream->uid, .taskId = taskId}; - SStreamTask *pTask = NULL; + STaskId id = {.streamId = pStream->uid, .taskId = taskId}; code = mndGetStreamTask(&id, pStream, &pTask); if (code) { mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 43b3d8676d..6190f4b0a7 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -947,20 +947,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY: return tqProcessTaskScanHistory(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_CHECKPOINT_READY: - return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: - return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg); - case TDMT_STREAM_RETRIEVE_TRIGGER: - return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg); - case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: - return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg); - case TDMT_MND_STREAM_REQ_CHKPT_RSP: - return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); case TDMT_VND_GET_STREAM_PROGRESS: return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); - case TDMT_MND_STREAM_CHKPT_REPORT_RSP: - return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; @@ -987,6 +975,18 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn return tqProcessTaskCheckReq(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: return tqProcessTaskCheckRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_READY: + return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: + return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_RETRIEVE_TRIGGER: + return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg); + case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: + return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_REQ_CHKPT_RSP: + return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_CHKPT_REPORT_RSP: + return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index f922a5e03e..d9778a6a05 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -38,7 +38,7 @@ extern "C" { #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) #define STREAM_TASK_QUEUE_CAPACITY 5120 -#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) +#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (10) // clang-format off #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 42d7f44b62..b7039d372d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -131,12 +131,12 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg); if (code != 0) { - rpcFreeCont(buf); - return code; + stError("s-task:%s (child %d) failed to send retrieve req to task:0x%x (vgId:%d) QID:0x%" PRIx64 " code:%s", + pTask->id.idStr, pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId, tstrerror(code)); + } else { + stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d),QID:0x%" PRIx64, pTask->id.idStr, + pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId); } - - stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d),QID:0x%" PRIx64, pTask->id.idStr, - pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId); } return code; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5e099712ca..ee34648a47 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -807,6 +807,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } + int64_t st = taosGetTimestampMs(); + EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (ret == EXEC_AFTER_IDLE) { streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); @@ -841,8 +843,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { continue; } - int64_t st = taosGetTimestampMs(); - // here only handle the data block sink operation if (type == STREAM_INPUT__DATA_BLOCK) { pTask->execInfo.sink.dataSize += blockSize; @@ -873,6 +873,13 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (code) { return code; } + + double el = (taosGetTimestampMs() - st) / 1000.0; + if (el > 5.0) { // elapsed more than 5 sec, not occupy the CPU anymore + stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id); + streamTaskSetIdleInfo(pTask, 500); + return code; + } } } } diff --git a/tests/script/tsim/stream/checkpointInterval0.sim b/tests/script/tsim/stream/checkpointInterval0.sim index a5e5c87704..d560edfab5 100644 --- a/tests/script/tsim/stream/checkpointInterval0.sim +++ b/tests/script/tsim/stream/checkpointInterval0.sim @@ -190,7 +190,7 @@ system sh/exec.sh -n dnode1 -s start sql insert into t1 values(1648791223004,5,2,3,1.1); loop4: -sleep 1000 +run tsim/stream/checkTaskStatus.sim sql select * from streamt;