From 7094a8ddcfd49be3074d6001bd96d0e7fa41c9db Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Tue, 22 Oct 2024 14:15:48 +0800 Subject: [PATCH] fix: bytes of repeat and timezone --- include/libs/scalar/scalar.h | 1 + source/client/src/clientImpl.c | 21 ------ source/libs/executor/src/dataDispatcher.c | 78 +++++++++++++++++++++++ source/libs/function/src/builtins.c | 12 ++-- source/libs/scalar/src/sclfunc.c | 4 ++ 5 files changed, 91 insertions(+), 25 deletions(-) diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index fd936dd087..4b89a6a439 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -105,6 +105,7 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t timeZoneStrLen(); int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t weekdayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t dayofweekFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 323a5aed3f..a3e327ee51 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2355,26 +2355,6 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo) { return TSDB_CODE_SUCCESS; } -int32_t resultInfoSafeCheck(SReqResultInfo* pResultInfo) { - if (pResultInfo->totalRows < pResultInfo->numOfRows) { - tscError("checkResultInfo error: totalRows:%" PRId64 " < numOfRows:%" PRId64, pResultInfo->totalRows, - pResultInfo->numOfRows); - return TSDB_CODE_TSC_INTERNAL_ERROR; - } - for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) { - if (pResultInfo->fields[i].bytes < 0) { - tscError("checkResultInfo error: bytes:%d <= 0", pResultInfo->fields[i].bytes); - return TSDB_CODE_TSC_INTERNAL_ERROR; - } - if(!IS_VAR_DATA_TYPE(pResultInfo->fields[i].type) && TYPE_BYTES[pResultInfo->fields[i].type] != pResultInfo->fields[i].bytes) { - tscError("checkResultInfo error: type:%d bytes:%d != %d", pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, TYPE_BYTES[pResultInfo->fields[i].type]); - return TSDB_CODE_TSC_INTERNAL_ERROR; - } - } - - return TSDB_CODE_SUCCESS; -} - int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) { if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) { tscError("setResultDataPtr paras error"); @@ -2482,7 +2462,6 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) { if (convertUcs4) { code = doConvertUCS4(pResultInfo, colLength); } - code = resultInfoSafeCheck(pResultInfo); return code; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index d358cedfb7..983309b7c2 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -54,6 +54,78 @@ typedef struct SDataDispatchHandle { TdThreadMutex mutex; } SDataDispatchHandle; +static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData* pInput) { + if (pInput == NULL || pInput->pData == NULL || pInput->pData->info.rows <= 0) { + qError("invalid input data"); + return TSDB_CODE_QRY_INVALID_INPUT; + } + SDataBlockDescNode* pSchema = pHandle->pSchema; + if (pSchema == NULL || pSchema->outputRowSize > pInput->pData->info.rowSize) { + qError("invalid schema"); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + SNode* pNode; + int32_t numOfCols = 0; + int32_t realOutputRowSize = 0; + FOREACH(pNode, pHandle->pSchema->pSlots) { + SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; + if (pSlotDesc->output) { + realOutputRowSize += pSlotDesc->dataType.bytes; + ++numOfCols; + } else { + break; + } + } + if (realOutputRowSize != pSchema->outputRowSize) { + qError("invalid schema, realOutputRowSize:%d, outputRowSize:%d", realOutputRowSize, pSchema->outputRowSize); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + if (numOfCols > taosArrayGetSize(pInput->pData->pDataBlock)) { + qError("invalid column number, schema:%d, input:%" PRIu64, numOfCols, taosArrayGetSize(pInput->pData->pDataBlock)); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + int32_t colNum = 0; + FOREACH(pNode, pHandle->pSchema->pSlots) { + SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; + if (pSlotDesc->output) { + SColumnInfoData* pColInfoData = taosArrayGet(pInput->pData->pDataBlock, colNum); + if (pColInfoData == NULL) { + return -1; + } + if (pColInfoData->info.bytes < 0) { + qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes); + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + if (!IS_VAR_DATA_TYPE(pColInfoData->info.type) && + TYPE_BYTES[pColInfoData->info.type] != pColInfoData->info.bytes) { + qError("invalid column bytes, schema:%d, input:%d", TYPE_BYTES[pColInfoData->info.type], + pColInfoData->info.bytes); + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + if (pColInfoData->info.type != pSlotDesc->dataType.type) { + qError("invalid column type, schema:%d, input:%d", pSlotDesc->dataType.type, pColInfoData->info.type); + return TSDB_CODE_QRY_INVALID_INPUT; + } + if (pColInfoData->info.bytes != pSlotDesc->dataType.bytes) { + qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + if (IS_INVALID_TYPE(pColInfoData->info.type)) { + qError("invalid column type, type:%d", pColInfoData->info.type); + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + ++colNum; + } + } + + + return TSDB_CODE_SUCCESS; +} + // clang-format off // data format: // +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+ @@ -67,6 +139,12 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* int32_t numOfCols = 0; SNode* pNode; + int32_t code = inputSafetyCheck(pHandle, pInput); + if (code) { + qError("failed to check input data, code:%d", code); + return code; + } + FOREACH(pNode, pHandle->pSchema->pSlots) { SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; if (pSlotDesc->output) { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 1fd99125a0..1d3a43c498 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -695,7 +695,8 @@ static int32_t translateIsFilledPseudoColumn(SFunctionNode* pFunc, char* pErrBuf } static int32_t translateTimezone(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - pFunc->node.resType = (SDataType){.bytes = TD_TIMEZONE_LEN, .type = TSDB_DATA_TYPE_BINARY}; + int32_t bytesLen = timeZoneStrLen(); + pFunc->node.resType = (SDataType){.bytes = bytesLen, .type = TSDB_DATA_TYPE_BINARY}; return TSDB_CODE_SUCCESS; } @@ -2465,9 +2466,12 @@ static int32_t translateRepeat(SFunctionNode* pFunc, char* pErrBuf, int32_t len) uint8_t type = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type; int32_t orgLen = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->bytes; - int32_t count = TMAX((int32_t)((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i, 1); - - int32_t resLen = orgLen * count; + int32_t resLen; + if (nodeType(nodesListGetNode(pFunc->pParameterList, 1)) == QUERY_NODE_VALUE) { + resLen = orgLen * TMAX((int32_t)((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i, 1); + } else { + resLen = TSDB_MAX_BINARY_LEN; + } pFunc->node.resType = (SDataType){.bytes = resLen, .type = type}; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 6f6362a8f7..9aa67c441b 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -2662,6 +2662,10 @@ int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut return TSDB_CODE_SUCCESS; } +int32_t timeZoneStrLen() { + return sizeof(VarDataLenT) + strlen(tsTimezoneStr); +} + int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { char output[TD_TIMEZONE_LEN + VARSTR_HEADER_SIZE] = {0}; (void)memcpy(varDataVal(output), tsTimezoneStr, TD_TIMEZONE_LEN);