diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7a3f93b28b..34d5e04310 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -751,12 +751,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { + ASSERT(0); char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamDispatchReq req; SDecoder decoder; - tDecoderInit(&decoder, msgBody, msgLen); + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); int32_t taskId = req.taskId; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 03f9e3eafd..7006588246 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -295,10 +295,11 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S streamTaskEnqueueRetrieve(pTask, pReq, pRsp); ASSERT(pTask->execType != TASK_EXEC__NONE); - streamTryExec(pTask); + streamSchedExec(pTask); + /*streamTryExec(pTask);*/ - ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); - streamDispatch(pTask); + /*ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);*/ + /*streamDispatch(pTask);*/ return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index bbc02080c6..79c35f2889 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -162,10 +162,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { data = qItem; streamQueueProcessSuccess(pTask->inputQueue); if (pTask->execType == TASK_EXEC__NONE) { - ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutput(pTask, data); - data = NULL; - continue; + break; } } else { void* newRet; @@ -187,7 +184,13 @@ int32_t streamExecForAll(SStreamTask* pTask) { } if (data == NULL) { - return 0; + break; + } + + if (pTask->execType == TASK_EXEC__NONE) { + ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); + streamTaskOutput(pTask, data); + continue; } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));