set agg task status

This commit is contained in:
liuyao 2023-05-19 18:02:15 +08:00
parent b5d01f9bab
commit 180abf2229
1 changed files with 9 additions and 6 deletions

View File

@ -1224,6 +1224,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
pTask->id.idStr, pTask->chkInfo.version); pTask->id.idStr, pTask->chkInfo.version);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
} else { } else {
if (streamTaskShouldPause(&pTask->status)) {
atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE);
}
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
} }
@ -1301,19 +1304,19 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) { // discard all the data when the stream task is suspended. if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) { // discard all the data when the stream task is suspended.
pTask->chkInfo.currentVer = sversion; pTask->chkInfo.currentVer = sversion;
walReaderSeekVer(pTask->exec.pWalReader, sversion); walReaderSeekVer(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
pTask->id.idStr, pTask->chkInfo.currentVer, sversion); pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on } else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
pTask->id.idStr, pTask->chkInfo.currentVer, sversion); pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask); if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
} else { } else {
streamSchedExec(pTask); streamSchedExec(pTask);
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
} }
return 0; return 0;