From dddf621aa7a4504f756c1d2aa96a18a417b8a85c Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH] [TD-14241]: concat/concat_ws fix input has both constant and column issue --- source/libs/function/src/builtins.c | 1 + source/libs/scalar/src/sclfunc.c | 52 +++++++++++++++++++---------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4cbc91fa6a..957e7ee406 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -526,6 +526,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { if (paraType != paraTypeFirst) { return TSDB_CODE_FAILED; } + //TODO: for constants also needs numOfRows totalBytes += pParam->node.resType.bytes; } //TODO: need to get numOfRows to decide how much space separator needed. Currently set to 100. diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index f1eab5c05c..21d8aee0ec 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -310,7 +310,13 @@ int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p pOutput->numOfRows = pInput->numOfRows; return TSDB_CODE_SUCCESS; } -void allocateOutputBuf() { + +static void setVarTypeOutputBuf(SColumnInfoData *pOutputData, int32_t len, int32_t type) { + pOutputData->pData = taosMemoryCalloc(len, sizeof(char)); + pOutputData->info.type = type; + pOutputData->info.bytes = len; + pOutputData->varmeta.length = len; + pOutputData->varmeta.allocLen = len; } int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { @@ -324,25 +330,30 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu char *output = NULL; int32_t inputLen = 0; - int32_t numOfRows = pInput->numOfRows; + int32_t numOfRows = 0; + for (int32_t i = 0; i < inputNum; ++i) { + if (pInput[i].numOfRows > numOfRows) { + numOfRows = pInput[i].numOfRows; + } + } for (int32_t i = 0; i < inputNum; ++i) { if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { return TSDB_CODE_FAILED; } pInputData[i] = pInput[i].columnData; - inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; input[i] = pInputData[i]->pData; + if (pInput[i].numOfRows == 1) { + inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows; + } else { + inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; + } } //allocate output buf if (pOutputData->pData == NULL) { int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE; - pOutputData->pData = taosMemoryCalloc(outputLen, sizeof(char)); - pOutputData->info.type = GET_PARAM_TYPE(pInput); - pOutputData->info.bytes = outputLen; - pOutputData->varmeta.length = outputLen; - pOutputData->varmeta.allocLen = outputLen; + setVarTypeOutputBuf(pOutputData, outputLen, GET_PARAM_TYPE(pInput)); } output = pOutputData->pData; @@ -365,7 +376,9 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu for (int32_t i = 0; i < inputNum; ++i) { memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i])); dataLen += varDataLen(input[i]); - input[i] += varDataTLen(input[i]); + if (pInput[i].numOfRows != 1) { + input[i] += varDataTLen(input[i]); + } } varDataSetLen(output, dataLen); int32_t dataTLen = varDataTLen(output); @@ -392,7 +405,12 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p char *output = NULL; int32_t inputLen = 0; - int32_t numOfRows = pInput[1].numOfRows; + int32_t numOfRows = 0; + for (int32_t i = 0; i < inputNum; ++i) { + if (pInput[i].numOfRows > numOfRows) { + numOfRows = pInput[i].numOfRows; + } + } for (int32_t i = 0; i < inputNum; ++i) { if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { @@ -401,7 +419,9 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p pInputData[i] = pInput[i].columnData; if (i == 0) { // calculate required separator space - inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * (inputNum - 2); + inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2); + } else if (pInput[i].numOfRows == 1) { + inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows; } else { inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; } @@ -411,11 +431,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p //allocate output buf if (pOutputData->pData == NULL) { int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE; - pOutputData->pData = taosMemoryCalloc(outputLen, sizeof(char)); - pOutputData->info.type = GET_PARAM_TYPE(pInput); - pOutputData->info.bytes = outputLen; - pOutputData->varmeta.length = outputLen; - pOutputData->varmeta.allocLen = outputLen; + setVarTypeOutputBuf(pOutputData, outputLen, GET_PARAM_TYPE(&pInput[1])); } output = pOutputData->pData; @@ -435,7 +451,9 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i])); dataLen += varDataLen(input[i]); - input[i] += varDataTLen(input[i]); + if (pInput[i].numOfRows != 1) { + input[i] += varDataTLen(input[i]); + } if (i < inputNum - 1) { //insert the separator