enh(stream): remove sleep to opt perf.

This commit is contained in:
Haojun Liao 2024-01-05 18:46:15 +08:00
parent 33253cbb54
commit 2386f842fb
2 changed files with 20 additions and 13 deletions

View File

@ -736,16 +736,17 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
streamMetaStopAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_RESUME_TASK) {
// task resume to run after idle for a while
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask != NULL) {
ASSERT(streamTaskReadyToRun(pTask, NULL));
tqDebug("s-task:%s task resume to run after idle for a while", pTask->id.idStr);
streamResumeTask(pTask);
}
int64_t execTs = pTask->status.lastExecTs;
int32_t idle = taosGetTimestampMs() - execTs;
tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
streamMetaReleaseTask(pMeta, pTask);
streamResumeTask(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
return 0;
}

View File

@ -107,12 +107,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return 0;
}
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr);
taosMsleep(1000);
continue;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
@ -561,6 +555,11 @@ static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) {
pStatus->lastExecTs = taosGetTimestampMs();
}
static void clearTaskSchedInfo(SStreamTask* pTask) {
SStreamStatus* pStatus = &pTask->status;
pStatus->schedIdleTime = 0;
}
/**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec.
@ -586,6 +585,12 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
break;
}
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr);
setTaskSchedInfo(pTask, 1000);
continue;
}
/*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
if (pInput == NULL) {
ASSERT(numOfBlocks == 0);
@ -737,7 +742,7 @@ static void doStreamExecTaskHelper(void* param, void* tmrId) {
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
// release the task ref count
pTask->status.schedIdleTime = 0; // clear the idle time
clearTaskSchedInfo(pTask);
streamMetaReleaseTask(pTask->pMeta, pTask);
}
@ -747,6 +752,7 @@ static int32_t schedTaskInFuture(SStreamTask* pTask) {
pTask->status.schedIdleTime, ref);
// add one ref count for task
// todo this may be failed, and add ref may be failed.
SStreamTask* pAddRefTask = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
if (pTask->schedInfo.pIdleTimer == NULL) {