From 14a488c92cde194bad6cbb71edef66cffd1ee14f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 22 Jun 2022 21:21:39 +0800 Subject: [PATCH] feat(stream): stream support multiple type input --- include/libs/stream/tstream.h | 9 +++------ source/dnode/mnode/impl/src/mndScheduler.c | 22 ++++++--------------- source/dnode/vnode/src/tq/tq.c | 3 ++- source/dnode/vnode/src/tq/tqRead.c | 4 ++-- source/libs/executor/src/executor.c | 7 ++----- source/libs/executor/src/scanoperator.c | 13 ++++++------ source/libs/stream/src/streamExec.c | 23 +++++++++++----------- source/libs/stream/src/streamTask.c | 16 +++++++-------- 8 files changed, 40 insertions(+), 57 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 684a605e18..7673e73035 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -239,17 +239,14 @@ typedef struct { struct SStreamTask { int64_t streamId; int32_t taskId; - int8_t inputType; - int8_t taskStatus; - - int8_t execStatus; - + int8_t isDataScan; int8_t execType; int8_t sinkType; int8_t dispatchType; int16_t dispatchMsgType; - int8_t isDataScan; + int8_t taskStatus; + int8_t execStatus; // node info int32_t selfChildId; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index e055e3b2ba..b1729c4047 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -270,10 +270,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb pTask->nodeId = pVgroup->vgId; pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); - pTask->isDataScan = 0; - // source - pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; + pTask->isDataScan = 0; // exec pTask->execType = TASK_EXEC__NONE; @@ -320,10 +318,8 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* #endif pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); - pTask->isDataScan = 0; - // source - pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; + pTask->isDataScan = 0; // exec pTask->execType = TASK_EXEC__NONE; @@ -392,12 +388,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pInnerTask = tNewSStreamTask(pStream->uid); mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); - pInnerTask->isDataScan = 0; - pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); - // input - pInnerTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; + // source + pInnerTask->isDataScan = 0; // trigger pInnerTask->triggerParam = pStream->triggerParam; @@ -458,11 +452,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SStreamTask* pTask = tNewSStreamTask(pStream->uid); mndAddTaskToTaskSet(taskSourceLevel, pTask); + // source pTask->isDataScan = 1; - // input - pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; - // add fixed vg dispatch pTask->sinkType = TASK_SINK__NONE; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; @@ -517,10 +509,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SStreamTask* pTask = tNewSStreamTask(pStream->uid); mndAddTaskToTaskSet(taskOneLevel, pTask); - pTask->isDataScan = 1; - // input - pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; + pTask->isDataScan = 1; // trigger pTask->triggerParam = pStream->triggerParam; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 691548af7a..046ee64878 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -426,6 +426,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { ASSERT(0); } tDecoderClear(&decoder); + ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1); pTask->execStatus = TASK_EXEC_STATUS__IDLE; @@ -505,7 +506,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { continue; } - if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue; + if (!pTask->isDataScan) continue; if (!failed) { if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 6f44eb4569..f7558cc00b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -345,8 +345,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { while (1) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; - SStreamTask* pTask = (SStreamTask*)pIter; - if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { + SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->isDataScan) { int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd); ASSERT(code == 0); } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index cf72dfd796..6b93119987 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -40,7 +40,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SStreamBlockScanInfo* pInfo = pOperator->info; pInfo->assignBlockUid = assignUid; - // the block type can not be changed in the streamscan operators + // no need to check #if 0 if (pInfo->blockType == 0) { pInfo->blockType = type; @@ -49,10 +49,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu return TSDB_CODE_QRY_APP_ERROR; } #endif - // rollup sma, the same qTaskInfo is used to insert data by SubmitReq and fetch result by SSDataBlock - if (pInfo->blockType != type) { - pInfo->blockType = type; - } + pInfo->blockType = type; if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 62167ec9ff..45f80c3c6f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -537,7 +537,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, // taosSsleep(20); SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; - int32_t numOfCols = 0; + int32_t numOfCols = 0; SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); @@ -916,6 +916,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } size_t total = taosArrayGetSize(pInfo->pBlockLists); + // TODO: refactor if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { if (pInfo->validBlockIndex >= total) { /*doClearBufferedBlocks(pInfo);*/ @@ -1969,8 +1970,7 @@ int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) { } int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, - SNode* pTagCond) { + STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1990,11 +1990,10 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle } int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, - uint64_t taskId) { - + STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, + uint64_t taskId) { SQueryTableDataCond cond = {0}; - int32_t code = initQueryTableDataCond(&cond, pTableScanNode); + int32_t code = initQueryTableDataCond(&cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 4d9916e196..69cb5b43fa 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,13 +24,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) SStreamTrigger* pTrigger = (SStreamTrigger*)data; qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_DATA_TYPE_SSDATA_BLOCK, false); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + ASSERT(pTask->isDataScan); SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; - ASSERT(pTask->inputType == STREAM_INPUT__DATA_SUBMIT); qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)data; - ASSERT(pTask->inputType == STREAM_INPUT__DATA_BLOCK); - SArray* blocks = pBlock->blocks; + SArray* blocks = pBlock->blocks; qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); } else if (pItem->type == STREAM_INPUT__DROP) { // TODO exec drop @@ -89,17 +88,17 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { return NULL; } - if (((SStreamQueueItem*)data)->type == STREAM_INPUT__TRIGGER) { + int8_t type = ((SStreamQueueItem*)data)->type; + if (type == STREAM_INPUT__TRIGGER) { blockDataDestroy(((SStreamTrigger*)data)->pBlock); taosFreeQitem(data); - } else { - if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { - streamDataSubmitRefDec((SStreamDataSubmit*)data); - taosFreeQitem(data); - } else { - taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); - taosFreeQitem(data); - } + } else if (type == STREAM_INPUT__DATA_BLOCK) { + taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); + taosFreeQitem(data); + } else if (type == STREAM_INPUT__DATA_SUBMIT) { + ASSERT(pTask->isDataScan); + streamDataSubmitRefDec((SStreamDataSubmit*)data); + taosFreeQitem(data); } streamQueueProcessSuccess(pTask->inputQueue); return taosArrayInit(0, sizeof(SSDataBlock)); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8a7c425f79..1daa9bd50e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -50,14 +50,14 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { /*if (tStartEncode(pEncoder) < 0) return -1;*/ if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1; if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1; + + if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1; if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; @@ -106,14 +106,14 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { /*if (tStartDecode(pDecoder) < 0) return -1;*/ if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1; + + if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;