fix schedule

This commit is contained in:
Liu Jicong 2022-08-04 17:46:13 +08:00
parent e64e41679e
commit 031414001d
3 changed files with 14 additions and 9 deletions

View File

@ -751,12 +751,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
ASSERT(0);
char* msgStr = pMsg->pCont; char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamDispatchReq req; SStreamDispatchReq req;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));

View File

@ -295,10 +295,11 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
streamTaskEnqueueRetrieve(pTask, pReq, pRsp); streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->execType != TASK_EXEC__NONE); ASSERT(pTask->execType != TASK_EXEC__NONE);
streamTryExec(pTask); streamSchedExec(pTask);
/*streamTryExec(pTask);*/
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); /*ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);*/
streamDispatch(pTask); /*streamDispatch(pTask);*/
return 0; return 0;
} }

View File

@ -162,10 +162,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
data = qItem; data = qItem;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->execType == TASK_EXEC__NONE) { if (pTask->execType == TASK_EXEC__NONE) {
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); break;
streamTaskOutput(pTask, data);
data = NULL;
continue;
} }
} else { } else {
void* newRet; void* newRet;
@ -187,7 +184,13 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
if (data == NULL) { if (data == NULL) {
return 0; break;
}
if (pTask->execType == TASK_EXEC__NONE) {
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, data);
continue;
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));