From 2386f842fb0e2094c7d5c3bcc6f3ab676f8dcf38 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 5 Jan 2024 18:46:15 +0800 Subject: [PATCH] enh(stream): remove sleep to opt perf. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 13 +++++++------ source/libs/stream/src/streamExec.c | 20 +++++++++++++------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a73c436044..ad6d11aaf2 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -736,16 +736,17 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) { streamMetaStopAllTasks(pMeta); return 0; - } else if (type == STREAM_EXEC_T_RESUME_TASK) { - // task resume to run after idle for a while + } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask != NULL) { ASSERT(streamTaskReadyToRun(pTask, NULL)); - tqDebug("s-task:%s task resume to run after idle for a while", pTask->id.idStr); - streamResumeTask(pTask); - } + int64_t execTs = pTask->status.lastExecTs; + int32_t idle = taosGetTimestampMs() - execTs; + tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs); - streamMetaReleaseTask(pMeta, pTask); + streamResumeTask(pTask); + streamMetaReleaseTask(pMeta, pTask); + } return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5c38b24a77..7b740a8e7a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -107,12 +107,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return 0; } - if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { - stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr); - taosMsleep(1000); - continue; - } - SSDataBlock* output = NULL; uint64_t ts = 0; if ((code = qExecTask(pExecutor, &output, &ts)) < 0) { @@ -561,6 +555,11 @@ static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { pStatus->lastExecTs = taosGetTimestampMs(); } +static void clearTaskSchedInfo(SStreamTask* pTask) { + SStreamStatus* pStatus = &pTask->status; + pStatus->schedIdleTime = 0; +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -586,6 +585,12 @@ int32_t doStreamExecTask(SStreamTask* pTask) { break; } + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { + stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); + setTaskSchedInfo(pTask, 1000); + continue; + } + /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (pInput == NULL) { ASSERT(numOfBlocks == 0); @@ -737,7 +742,7 @@ static void doStreamExecTaskHelper(void* param, void* tmrId) { tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); // release the task ref count - pTask->status.schedIdleTime = 0; // clear the idle time + clearTaskSchedInfo(pTask); streamMetaReleaseTask(pTask->pMeta, pTask); } @@ -747,6 +752,7 @@ static int32_t schedTaskInFuture(SStreamTask* pTask) { pTask->status.schedIdleTime, ref); // add one ref count for task + // todo this may be failed, and add ref may be failed. SStreamTask* pAddRefTask = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); if (pTask->schedInfo.pIdleTimer == NULL) {