fix schedule

This commit is contained in:
Liu Jicong 2022-08-04 17:00:45 +08:00
parent 56156ef2b1
commit e64e41679e
3 changed files with 5 additions and 3 deletions

View File

@ -474,7 +474,7 @@ typedef struct {
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId); // int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask);

View File

@ -720,7 +720,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
continue; continue;
} }
if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode)) < 0) { if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->taskId); qError("stream task launch failed, task id %d", pTask->taskId);
continue; continue;
} }

View File

@ -68,7 +68,7 @@ void streamTriggerByTimer(void* param, void* tmrId) {
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
streamTaskInput(pTask, (SStreamQueueItem*)trigger); streamTaskInput(pTask, (SStreamQueueItem*)trigger);
streamLaunchByWrite(pTask, pTask->nodeId); streamSchedExec(pTask);
} }
taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
@ -82,6 +82,7 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
return 0; return 0;
} }
#if 0
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
int8_t schedStatus = atomic_load_8(&pTask->schedStatus); int8_t schedStatus = atomic_load_8(&pTask->schedStatus);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
@ -101,6 +102,7 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
} }
return 0; return 0;
} }
#endif
int32_t streamSchedExec(SStreamTask* pTask) { int32_t streamSchedExec(SStreamTask* pTask) {
int8_t schedStatus = int8_t schedStatus =