diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9572157fbb..2b0156a668 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -46,9 +46,10 @@ enum { }; enum { - TASK_EXEC_STATUS__IDLE = 1, - TASK_EXEC_STATUS__EXECUTING, - TASK_EXEC_STATUS__CLOSING, + TASK_SCHED_STATUS__INACTIVE = 1, + TASK_SCHED_STATUS__WAITING, + TASK_SCHED_STATUS__ACTIVE, + TASK_SCHED_STATUS__FAILED, }; enum { @@ -204,13 +205,11 @@ typedef struct { enum { TASK_SOURCE__SCAN = 1, TASK_SOURCE__PIPE, - TASK_SOURCE__MERGE, }; enum { TASK_EXEC__NONE = 1, TASK_EXEC__PIPE, - TASK_EXEC__MERGE, }; enum { @@ -256,7 +255,7 @@ typedef struct SStreamTask { int16_t dispatchMsgType; int8_t taskStatus; - int8_t execStatus; + int8_t schedStatus; // node info int32_t selfChildId; @@ -487,6 +486,9 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); +int32_t streamTryExec(SStreamTask* pTask); +int32_t streamSchedExec(SStreamTask* pTask); + typedef struct SStreamMeta SStreamMeta; SStreamMeta* streamMetaOpen(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b6293bd6cf..adc16eee0f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -604,7 +604,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); } - pTask->execStatus = TASK_EXEC_STATUS__IDLE; + pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen(); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 0debeaef90..1c48ef7535 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -136,31 +136,6 @@ int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) { return 0; } -int32_t tqEnqueueAll(STQ* pTq, SSubmitReq* pReq) { - void* pIter = NULL; - SStreamDataSubmit* pSubmit = streamDataSubmitNew(pReq); - if (pSubmit == NULL) { - return -1; - } - - while (1) { - pIter = taosHashIterate(pTq->handles, pIter); - if (pIter == NULL) break; - STqHandle* pHandle = (STqHandle*)pIter; - if (tqEnqueue(pHandle, pSubmit) < 0) { - continue; - } - int8_t execStatus = atomic_load_8(&pHandle->pushHandle.execStatus); - if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) { - tqSendExecReq(pTq, pHandle); - } - } - - streamDataSubmitRefDec(pSubmit); - - return 0; -} - int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) { if (msgType != TDMT_VND_SUBMIT) return 0; void* pIter = NULL; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 7a19998d02..fd4329d285 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -83,8 +83,8 @@ int32_t streamSetupTrigger(SStreamTask* pTask) { } int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { - int8_t execStatus = atomic_load_8(&pTask->execStatus); - if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) { + int8_t schedStatus = atomic_load_8(&pTask->schedStatus); + if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) return -1; @@ -102,6 +102,28 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { return 0; } +int32_t streamSchedExec(SStreamTask* pTask) { + int8_t schedStatus = + atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING); + if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE); + return -1; + } + pRunReq->head.vgId = pTask->nodeId; + pRunReq->streamId = pTask->streamId; + pRunReq->taskId = pTask->taskId; + SRpcMsg msg = { + .msgType = TDMT_STREAM_TASK_RUN, + .pCont = pRunReq, + .contLen = sizeof(SStreamTaskRunReq), + }; + tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); + } + return 0; +} + int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); int8_t status; @@ -182,14 +204,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S streamTaskEnqueue(pTask, pReq, pRsp); if (exec) { - streamExec(pTask); + streamTryExec(pTask); if (pTask->dispatchType != TASK_DISPATCH__NONE) { - ASSERT(pTask->sinkType == TASK_SINK__NONE); streamDispatch(pTask); } } else { - streamLaunchByWrite(pTask, pTask->nodeId); + streamSchedExec(pTask); } return 0; @@ -219,7 +240,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { } int32_t streamProcessRunReq(SStreamTask* pTask) { - streamExec(pTask); + streamTryExec(pTask); if (pTask->dispatchType != TASK_DISPATCH__NONE) { streamDispatch(pTask); @@ -272,7 +293,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S streamTaskEnqueueRetrieve(pTask, pReq, pRsp); ASSERT(pTask->execType != TASK_EXEC__NONE); - streamExec(pTask); + streamTryExec(pTask); ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); streamDispatch(pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 26cd9111bf..ef7c10c8e1 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -440,13 +440,13 @@ FAIL: int32_t streamDispatch(SStreamTask* pTask) { ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); -#if 1 + ASSERT(pTask->sinkType == TASK_SINK__NONE); + int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); if (old != TASK_OUTPUT_STATUS__NORMAL) { return 0; } -#endif SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); if (pBlock == NULL) { @@ -466,22 +466,8 @@ int32_t streamDispatch(SStreamTask* pTask) { atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); goto FREE; } - /*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/ FREE: taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(pBlock); -#if 0 - SRpcMsg dispatchMsg = {0}; - SEpSet* pEpSet = NULL; - if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) { - ASSERT(0); - atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); - return -1; - } - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); - - tmsgSendReq(pEpSet, &dispatchMsg); -#endif return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3c008f7934..bbc02080c6 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -147,24 +147,26 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { return 0; } - -static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { +// TODO: handle version +int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t cnt = 1; void* data = NULL; while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { - qDebug("stream exec over, queue empty"); + qDebug("stream task exec over, queue empty, task: %d", pTask->taskId); break; } if (data == NULL) { data = qItem; streamQueueProcessSuccess(pTask->inputQueue); - if (pTask->execType == TASK_EXEC__NONE) break; - /*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/ - /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ - /*}*/ + if (pTask->execType == TASK_EXEC__NONE) { + ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); + streamTaskOutput(pTask, data); + data = NULL; + continue; + } } else { void* newRet; if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) { @@ -181,18 +183,15 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { if (pTask->taskStatus == TASK_STATUS__DROPPING) { if (data) streamFreeQitem(data); - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return NULL; + return 0; } - if (data == NULL) break; - - if (pTask->execType == TASK_EXEC__NONE) { - ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutput(pTask, data); - continue; + if (data == NULL) { + return 0; } + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); streamTaskExecImpl(pTask, data, pRes); qDebug("stream task %d exec end", pTask->taskId); @@ -203,76 +202,44 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { // TODO log failed ver streamQueueProcessFail(pTask->inputQueue); taosArrayDestroy(pRes); - return NULL; + return -1; } qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; - if (streamTaskOutput(pTask, qRes) < 0) { - // TODO log failed ver - /*streamQueueProcessFail(pTask->inputQueue);*/ - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); - return NULL; - } + if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; qRes->childId = pTask->selfChildId; qRes->sourceVer = pSubmit->ver; } + + if (streamTaskOutput(pTask, qRes) < 0) { + // TODO save failed ver + /*streamQueueProcessFail(pTask->inputQueue);*/ + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(qRes); + return -1; + } /*streamQueueProcessSuccess(pTask->inputQueue);*/ - pRes = taosArrayInit(0, sizeof(SSDataBlock)); - } - - streamFreeQitem(data); - } - return pRes; -} - -// TODO: handle version -int32_t streamExec(SStreamTask* pTask) { - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - if (pRes == NULL) return -1; - while (1) { - int8_t execStatus = - atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING); - if (execStatus == TASK_EXEC_STATUS__IDLE) { - // first run - qDebug("stream exec, enter exec status"); - pRes = streamExecForQall(pTask, pRes); - if (pRes == NULL) goto FAIL; - -// temporarily disable status closing, since it runs out of threads -#if 0 - // set status closing - atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING); - - // second run, make sure inputQ and qall are cleared - qDebug("stream exec, enter closing status"); - pRes = streamExecForQall(pTask, pRes); - if (pRes == NULL) goto FAIL; -#endif - - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); - qDebug("stream exec, return result"); - return 0; - } else if (execStatus == TASK_EXEC_STATUS__CLOSING) { - continue; - } else if (execStatus == TASK_EXEC_STATUS__EXECUTING) { - ASSERT(taosArrayGetSize(pRes) == 0); - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return 0; - } else { - ASSERT(0); } } -FAIL: - if (pRes) taosArrayDestroy(pRes); - if (pTask->taskStatus == TASK_STATUS__DROPPING) { - tFreeSStreamTask(pTask); - return 0; - } else { - atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); - return -1; - } + return 0; +} + +int32_t streamTryExec(SStreamTask* pTask) { + int8_t schedStatus = + atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE); + if (schedStatus == TASK_SCHED_STATUS__WAITING) { + int32_t code = streamExecForAll(pTask); + if (code < 0) { + atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__FAILED); + return -1; + } + atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE); + + if (!taosQueueEmpty(pTask->inputQueue->queue)) { + streamSchedExec(pTask); + } + } + return 0; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index f82ef1b42f..6819e5329f 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -35,9 +35,10 @@ FAIL: void streamQueueClose(SStreamQueue* queue) { while (1) { void* qItem = streamQueueNextItem(queue); - if (qItem) + if (qItem) { taosFreeQitem(qItem); - else + } else { return; + } } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 216e3fa761..c4e946e191 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) { } pTask->taskId = tGenIdPI32(); pTask->streamId = streamId; - pTask->execStatus = TASK_EXEC_STATUS__IDLE; + pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; @@ -59,7 +59,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->schedStatus) < 0) return -1; if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; @@ -114,7 +114,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->schedStatus) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;