From f77f91026492ea75124691337ee4383bd60e44a7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 16 May 2022 01:48:49 +0800 Subject: [PATCH] refactor(stream) --- include/libs/stream/tstream.h | 5 +- source/dnode/mnode/impl/src/mndScheduler.c | 5 + source/dnode/vnode/src/tq/tq.c | 56 +++++++- source/libs/stream/src/tstream.c | 152 ++++++++++++++++++++- 4 files changed, 212 insertions(+), 6 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 796fee4f89..4460327b88 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -40,6 +40,7 @@ enum { TASK_INPUT_STATUS__BLOCKED, TASK_INPUT_STATUS__RECOVER, TASK_INPUT_STATUS__STOP, + TASK_INPUT_STATUS__FAILED, }; enum { @@ -234,7 +235,9 @@ struct SStreamTask { int8_t outputStatus; STaosQueue* inputQ; + STaosQall* inputQAll; STaosQueue* outputQ; + STaosQall* outputQAll; // application storage void* ahandle; @@ -282,7 +285,7 @@ int32_t streamDequeueOutput(SStreamTask* pTask, void** output); int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId); -int32_t streamTaskExecNew(SStreamTask* pTask); +int32_t streamTaskRun(SStreamTask* pTask); int32_t streamTaskHandleInput(SStreamTask* pTask, void* data); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 824f031004..22a5f37334 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -194,6 +194,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p // source pTask->sourceType = TASK_SOURCE__MERGE; + pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; // exec pTask->execType = TASK_EXEC__NONE; @@ -235,6 +236,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); // source pTask->sourceType = TASK_SOURCE__MERGE; + pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; // exec pTask->execType = TASK_EXEC__NONE; @@ -309,6 +311,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SStreamTask* pTask = tNewSStreamTask(pStream->uid); // source part pTask->sourceType = TASK_SOURCE__SCAN; + pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; // sink part if (level == 0) { @@ -372,6 +375,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // source part, currently only support multi source pTask->sourceType = TASK_SOURCE__PIPE; + pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; // sink part pTask->sinkType = TASK_SINK__NONE; @@ -459,6 +463,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // source part pTask->sourceType = TASK_SOURCE__MERGE; + pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; // sink part pTask->sinkType = TASK_SINK__NONE; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c69b3de05d..873db62dd8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -14,6 +14,7 @@ */ #include "tq.h" +#include "tqueue.h" int32_t tqInit() { // @@ -1032,6 +1033,59 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) return 0; } +int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) { + void* pIter = NULL; + bool failed = false; + + SStreamDataSubmit* pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); + if (pSubmit == NULL) { + failed = true; + } + pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t)); + if (pSubmit->dataRef == NULL) { + failed = true; + } + + pSubmit->type = STREAM_DATA_TYPE_SUBMIT_BLOCK; + pSubmit->sourceVer = ver; + pSubmit->sourceVg = pTq->pVnode->config.vgId; + pSubmit->data = pReq; + *pSubmit->dataRef = 1; + + while (1) { + pIter = taosHashIterate(pTq->pStreamTasks, pIter); + if (pIter == NULL) break; + SStreamTask* pTask = (SStreamTask*)pIter; + if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue; + + int8_t inputStatus = atomic_load_8(&pTask->inputStatus); + if (inputStatus == TASK_INPUT_STATUS__NORMAL) { + if (failed) { + atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); + continue; + } + + streamDataSubmitRefInc(pSubmit); + taosWriteQitem(pTask->inputQ, pSubmit); + + int8_t execStatus = atomic_load_8(&pTask->status); + if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { + // TODO dispatch task launch msg to fetch queue + } + + } else { + // blocked or stopped, do nothing + } + } + + if (!failed) { + streamDataSubmitRefDec(pSubmit); + return 0; + } else { + return -1; + } +} + int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) { SStreamTaskExecReq req = {0}; tDecodeSStreamTaskExecReq(msg, &req); @@ -1051,7 +1105,7 @@ int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) { // try exec int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); if (execStatus == TASK_STATUS__IDLE) { - if (streamTaskExecNew(pTask) < 0) { + if (streamTaskRun(pTask) < 0) { atomic_store_8(&pTask->status, TASK_STATUS__CLOSING); goto FAIL; diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 33147a0c0a..812874dafb 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -133,7 +133,144 @@ int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) { return inputStatus; } -int32_t streamTaskProcessTriggerReq(SStreamTask* pTask, SMsgCb* pMsgCb, char* msg, int32_t msgLen) { +int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) { + void* exec = pTask->exec.runners[0].executor; + + // set input + if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; + ASSERT(pSubmit->type == STREAM_INPUT__DATA_SUBMIT); + + qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK); + } else if (pTask->inputType == STREAM_INPUT__DATA_BLOCK) { + SStreamDataBlock* pBlock = (SStreamDataBlock*)data; + ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); + + SArray* blocks = pBlock->blocks; + qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK); + } + + // exec + while (1) { + SSDataBlock* output; + uint64_t ts = 0; + if (qExecTask(exec, &output, &ts) < 0) { + ASSERT(false); + } + if (output == NULL) break; + taosArrayPush(pRes, &output); + } + + // destroy + if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { + streamDataSubmitRefDec((SStreamDataSubmit*)data); + } else { + taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); + } + return 0; +} + +// TODO: handle version +int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + if (pRes == NULL) return -1; + while (1) { + int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); + void* exec = pTask->exec.runners[0].executor; + if (execStatus == TASK_STATUS__IDLE) { + // first run, from qall, handle failure from last exec + while (1) { + void* data = NULL; + taosGetQitem(pTask->inputQAll, &data); + if (data == NULL) break; + + streamTaskExecImpl(pTask, data, pRes); + + taosFreeQitem(data); + + if (taosArrayGetSize(pRes) != 0) { + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + resQ->type = STREAM_INPUT__DATA_BLOCK; + resQ->blocks = pRes; + taosWriteQitem(pTask->outputQ, resQ); + pRes = taosArrayInit(0, sizeof(SSDataBlock)); + if (pRes == NULL) goto FAIL; + } + } + // second run, from inputQ + taosReadAllQitems(pTask->inputQ, pTask->inputQAll); + while (1) { + void* data = NULL; + taosGetQitem(pTask->inputQAll, &data); + if (data == NULL) break; + + streamTaskExecImpl(pTask, data, pRes); + + taosFreeQitem(data); + + if (taosArrayGetSize(pRes) != 0) { + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + resQ->type = STREAM_INPUT__DATA_BLOCK; + resQ->blocks = pRes; + taosWriteQitem(pTask->outputQ, resQ); + pRes = taosArrayInit(0, sizeof(SSDataBlock)); + if (pRes == NULL) goto FAIL; + } + } + // set status closing + atomic_store_8(&pTask->status, TASK_STATUS__CLOSING); + // third run, make sure all inputQ is cleared + taosReadAllQitems(pTask->inputQ, pTask->inputQAll); + while (1) { + void* data = NULL; + taosGetQitem(pTask->inputQAll, &data); + if (data == NULL) break; + + streamTaskExecImpl(pTask, data, pRes); + + taosFreeQitem(data); + + if (taosArrayGetSize(pRes) != 0) { + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + resQ->type = STREAM_INPUT__DATA_BLOCK; + resQ->blocks = pRes; + taosWriteQitem(pTask->outputQ, resQ); + pRes = taosArrayInit(0, sizeof(SSDataBlock)); + if (pRes == NULL) goto FAIL; + } + } + // set status closing + atomic_store_8(&pTask->status, TASK_STATUS__CLOSING); + // third run, make sure all inputQ is cleared + taosReadAllQitems(pTask->inputQ, pTask->inputQAll); + while (1) { + void* data = NULL; + taosGetQitem(pTask->inputQAll, &data); + if (data == NULL) break; + } + + atomic_store_8(&pTask->status, TASK_STATUS__IDLE); + break; + } else if (execStatus == TASK_STATUS__CLOSING) { + continue; + } else if (execStatus == TASK_STATUS__EXECUTING) { + break; + } else { + ASSERT(0); + } + } + return 0; +FAIL: + atomic_store_8(&pTask->status, TASK_STATUS__IDLE); + return -1; +} + +int32_t streamTaskDispatchDown(SStreamTask* pTask, SMsgCb* pMsgCb) { + // + return 0; +} + +int32_t streamTaskSink(SStreamTask* pTask) { // return 0; } @@ -156,8 +293,8 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat // 2.3. closing: keep trying while (1) { int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); - void* exec = pTask->exec.runners[0].executor; if (execStatus == TASK_STATUS__IDLE) { + void* exec = pTask->exec.runners[0].executor; SArray* pRes = taosArrayInit(0, sizeof(void*)); const SArray* blocks = pBlock->blocks; qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK); @@ -278,7 +415,7 @@ int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) { return 0; } -int32_t streamTaskExecNew(SStreamTask* pTask) { +int32_t streamTaskRun(SStreamTask* pTask) { SArray* pRes = NULL; if (pTask->execType == TASK_EXEC__PIPE || pTask->execType == TASK_EXEC__MERGE) { // TODO remove multi runner @@ -494,11 +631,16 @@ SStreamTask* tNewSStreamTask(int64_t streamId) { pTask->inputQ = taosOpenQueue(); pTask->outputQ = taosOpenQueue(); - if (pTask->inputQ == NULL || pTask->outputQ == NULL) goto FAIL; + pTask->inputQAll = taosAllocateQall(); + pTask->outputQAll = taosAllocateQall(); + if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL) + goto FAIL; return pTask; FAIL: if (pTask->inputQ) taosCloseQueue(pTask->inputQ); if (pTask->outputQ) taosCloseQueue(pTask->outputQ); + if (pTask->inputQAll) taosFreeQall(pTask->inputQAll); + if (pTask->outputQAll) taosFreeQall(pTask->outputQAll); if (pTask) taosMemoryFree(pTask); return NULL; } @@ -507,6 +649,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { /*if (tStartEncode(pEncoder) < 0) return -1;*/ if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; @@ -552,6 +695,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { /*if (tStartDecode(pDecoder) < 0) return -1;*/ if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;