From 7041e6474493a184a2abb5c56a6ce5719adcd44d Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Sun, 3 Nov 2024 23:00:31 +0800 Subject: [PATCH] add colDataSetVal func desc --- include/common/tdatablock.h | 4 +++ source/libs/executor/src/dataDispatcher.c | 39 +++++++++++++++++++---- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 6578999db4..1103b89ccb 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -189,7 +189,11 @@ static FORCE_INLINE void colDataSetDouble(SColumnInfoData* pColumnInfoData, uint int32_t getJsonValueLen(const char* data); +// For the VAR_DATA_TYPE type, new data is inserted strictly according to the position of SVarColAttr.length. +// If the same row is inserted repeatedly, data holes will result. int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); +// For the VAR_DATA_TYPE type, if a row already has data before inserting it (judged by offset != -1), +// it will be inserted at the original position and the old data will be overwritten. int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData); int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 236d6a4d3e..48f4ed3ed1 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -70,21 +70,15 @@ static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData* SNode* pNode; int32_t numOfCols = 0; - int32_t realOutputRowSize = 0; FOREACH(pNode, pHandle->pSchema->pSlots) { SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; if (pSlotDesc->output) { - realOutputRowSize += pSlotDesc->dataType.bytes; ++numOfCols; } else { // Slots must be sorted, and slots with 'output' set to true must come first break; } } - if (realOutputRowSize != pSchema->outputRowSize) { - qError("invalid schema, realOutputRowSize:%d, outputRowSize:%d", realOutputRowSize, pSchema->outputRowSize); - return TSDB_CODE_QRY_INVALID_INPUT; - } if (numOfCols > taosArrayGetSize(pInput->pData->pDataBlock)) { qError("invalid column number, schema:%d, input:%zu", numOfCols, taosArrayGetSize(pInput->pData->pDataBlock)); @@ -397,8 +391,41 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { return TSDB_CODE_SUCCESS; } +static int32_t blockDescNodeCheck(SDataBlockDescNode* pInputDataBlockDesc) { + if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) { + return TSDB_CODE_SUCCESS; + } + + if (pInputDataBlockDesc == NULL) { + qError("invalid schema"); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + SNode* pNode; + int32_t realOutputRowSize = 0; + FOREACH(pNode, pInputDataBlockDesc->pSlots) { + SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; + if (pSlotDesc->output) { + realOutputRowSize += pSlotDesc->dataType.bytes; + } else { + // Slots must be sorted, and slots with 'output' set to true must come first + break; + } + } + if (realOutputRowSize != pInputDataBlockDesc->outputRowSize) { + qError("invalid schema, realOutputRowSize:%d, outputRowSize:%d", realOutputRowSize, pInputDataBlockDesc->outputRowSize); + return TSDB_CODE_QRY_INVALID_INPUT; + } + return TSDB_CODE_SUCCESS; +} + int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { int32_t code; + code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc); + if (code) { + qError("failed to check input data block desc, code:%d", code); + return code; + } SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle)); if (NULL == dispatcher) {