From a51bd9a24ff5661615cdf13487de53d9fd6da697 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 5 Nov 2024 15:42:57 +0800 Subject: [PATCH] enh: cal output col counts --- source/libs/executor/src/dataDispatcher.c | 38 ++++++++++++++--------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 48f4ed3ed1..f255d0b95c 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -45,6 +45,7 @@ typedef struct SDataDispatchHandle { SDataBlockDescNode* pSchema; STaosQueue* pDataBlocks; SDataDispatchBuf nextOutput; + int32_t outPutColCounts; int32_t status; bool queryEnd; uint64_t useconds; @@ -68,23 +69,12 @@ static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData* return TSDB_CODE_QRY_INVALID_INPUT; } - SNode* pNode; - int32_t numOfCols = 0; - FOREACH(pNode, pHandle->pSchema->pSlots) { - SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; - if (pSlotDesc->output) { - ++numOfCols; - } else { - // Slots must be sorted, and slots with 'output' set to true must come first - break; - } - } - - if (numOfCols > taosArrayGetSize(pInput->pData->pDataBlock)) { - qError("invalid column number, schema:%d, input:%zu", numOfCols, taosArrayGetSize(pInput->pData->pDataBlock)); + if (pHandle->outPutColCounts > taosArrayGetSize(pInput->pData->pDataBlock)) { + qError("invalid column number, schema:%d, input:%zu", pHandle->outPutColCounts, taosArrayGetSize(pInput->pData->pDataBlock)); return TSDB_CODE_QRY_INVALID_INPUT; } + SNode* pNode; int32_t colNum = 0; FOREACH(pNode, pHandle->pSchema->pSlots) { SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; @@ -419,6 +409,25 @@ static int32_t blockDescNodeCheck(SDataBlockDescNode* pInputDataBlockDesc) { return TSDB_CODE_SUCCESS; } +int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) { + if (pInputDataBlockDesc == NULL) { + qError("invalid schema"); + return 0; + } + SNode* pNode; + int32_t numOfCols = 0; + FOREACH(pNode, pInputDataBlockDesc->pSlots) { + SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; + if (pSlotDesc->output) { + ++numOfCols; + } else { + // Slots must be sorted, and slots with 'output' set to true must come first + break; + } + } + return numOfCols; +} + int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { int32_t code; code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc); @@ -443,6 +452,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD dispatcher->pManager = pManager; pManager = NULL; dispatcher->pSchema = pDataSink->pInputDataBlockDesc; + dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema); dispatcher->status = DS_BUF_EMPTY; dispatcher->queryEnd = false; code = taosOpenQueue(&dispatcher->pDataBlocks);