diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 99a247b45e..541346b330 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -530,10 +530,32 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { case FUNCTION_TYPE_CONCAT: case FUNCTION_TYPE_CONCAT_WS: { int32_t paraType, paraBytes = 0; + bool typeSet = false; for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) { SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i); - paraBytes += pParam->node.resType.bytes; - paraType = pParam->node.resType.type; + if (pParam->node.type == QUERY_NODE_COLUMN) { + if (typeSet == false) { + paraType = pParam->node.resType.type; + typeSet = true; + } else { + //columns have to be the same type + if (paraType != pParam->node.resType.type) { + return TSDB_CODE_FAILED; + } + } + paraBytes += pParam->node.resType.bytes; + } + } + + for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) { + SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i); + if (pParam->node.type == QUERY_NODE_VALUE) { + if (paraType == TSDB_DATA_TYPE_NCHAR) { + paraBytes += pParam->node.resType.bytes * TSDB_NCHAR_SIZE; + } else { + paraBytes += pParam->node.resType.bytes; + } + } } pFunc->node.resType = (SDataType) { .bytes = paraBytes, .type = paraType }; break; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index c02729a084..e25fe4b41a 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -12,7 +12,7 @@ typedef void (*_trim_fn)(char *, char*, int32_t, int32_t); typedef int16_t (*_len_fn)(char *, int32_t); /** Math functions **/ -double tlog(double v, double base) { +static double tlog(double v, double base) { return log(v) / log(base); } @@ -113,7 +113,7 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu return TSDB_CODE_SUCCESS; } -int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) { +static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) { int32_t type = GET_PARAM_TYPE(pInput); if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) { return TSDB_CODE_FAILED; @@ -138,7 +138,7 @@ int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarPa return TSDB_CODE_SUCCESS; } -int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) { +static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) { if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) { return TSDB_CODE_FAILED; } @@ -167,7 +167,7 @@ int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarP return TSDB_CODE_SUCCESS; } -int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) { +static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) { int32_t type = GET_PARAM_TYPE(pInput); if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) { return TSDB_CODE_FAILED; @@ -215,11 +215,11 @@ int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* p } /** String functions **/ -int16_t tlength(char *input, int32_t type) { +static int16_t tlength(char *input, int32_t type) { return varDataLen(input); } -int16_t tcharlength(char *input, int32_t type) { +static int16_t tcharlength(char *input, int32_t type) { if (type == TSDB_DATA_TYPE_VARCHAR) { return varDataLen(input); } else { //NCHAR @@ -227,7 +227,7 @@ int16_t tcharlength(char *input, int32_t type) { } } -void tltrim(char *input, char *output, int32_t type, int32_t charLen) { +static void tltrim(char *input, char *output, int32_t type, int32_t charLen) { int32_t numOfSpaces = 0; if (type == TSDB_DATA_TYPE_VARCHAR) { for (int32_t i = 0; i < charLen; ++i) { @@ -257,7 +257,7 @@ void tltrim(char *input, char *output, int32_t type, int32_t charLen) { varDataSetLen(output, resLen); } -void trtrim(char *input, char *output, int32_t type, int32_t charLen) { +static void trtrim(char *input, char *output, int32_t type, int32_t charLen) { int32_t numOfSpaces = 0; if (type == TSDB_DATA_TYPE_VARCHAR) { for (int32_t i = charLen - 1; i >= 0; --i) { @@ -286,7 +286,7 @@ void trtrim(char *input, char *output, int32_t type, int32_t charLen) { varDataSetLen(output, resLen); } -int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _len_fn lenFn) { +static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _len_fn lenFn) { int32_t type = GET_PARAM_TYPE(pInput); if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { return TSDB_CODE_FAILED; @@ -312,12 +312,22 @@ int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p return TSDB_CODE_SUCCESS; } -static void setVarTypeOutputBuf(SColumnInfoData *pOutputData, int32_t len, int32_t type) { - pOutputData->pData = taosMemoryCalloc(len, sizeof(char)); - pOutputData->info.type = type; - pOutputData->info.bytes = len; - pOutputData->varmeta.length = len; - pOutputData->varmeta.allocLen = len; +static int32_t concatCopyHelper(const char *input, char *output, bool hasNcharCol, int32_t type, int16_t *dataLen) { + if (hasNcharCol && type == TSDB_DATA_TYPE_VARCHAR) { + TdUcs4 *newBuf = taosMemoryCalloc((varDataLen(input) + 1) * TSDB_NCHAR_SIZE, 1); + bool ret = taosMbsToUcs4(varDataVal(input), varDataLen(input), newBuf, (varDataLen(input) + 1) * TSDB_NCHAR_SIZE, NULL); + if (!ret) { + taosMemoryFree(newBuf); + return TSDB_CODE_FAILED; + } + memcpy(varDataVal(output) + *dataLen, newBuf, varDataLen(input) * TSDB_NCHAR_SIZE); + *dataLen += varDataLen(input) * TSDB_NCHAR_SIZE; + taosMemoryFree(newBuf); + } else { + memcpy(varDataVal(output) + *dataLen, varDataVal(input), varDataLen(input)); + *dataLen += varDataLen(input); + } + return TSDB_CODE_SUCCESS; } int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { @@ -332,11 +342,15 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu int32_t inputLen = 0; int32_t numOfRows = 0; + bool hasNcharCol = false; for (int32_t i = 0; i < inputNum; ++i) { - if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || - GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { + int32_t type = GET_PARAM_TYPE(&pInput[i]); + if (!IS_VAR_DATA_TYPE(type)) { return TSDB_CODE_FAILED; } + if (type == TSDB_DATA_TYPE_NCHAR) { + hasNcharCol = true; + } if (pInput[i].numOfRows > numOfRows) { numOfRows = pInput[i].numOfRows; } @@ -344,8 +358,12 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu for (int32_t i = 0; i < inputNum; ++i) { pInputData[i] = pInput[i].columnData; input[i] = pInputData[i]->pData; + int32_t factor = 1; + if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) { + factor = TSDB_NCHAR_SIZE; + } if (pInput[i].numOfRows == 1) { - inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows; + inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * factor * numOfRows; } else { inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; } @@ -371,8 +389,10 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu int16_t dataLen = 0; for (int32_t i = 0; i < inputNum; ++i) { - memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i])); - dataLen += varDataLen(input[i]); + int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } if (pInput[i].numOfRows != 1) { input[i] += varDataTLen(input[i]); } @@ -390,6 +410,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu return TSDB_CODE_SUCCESS; } + int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator return TSDB_CODE_FAILED; @@ -402,27 +423,34 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p int32_t inputLen = 0; int32_t numOfRows = 0; + bool hasNcharCol = false; for (int32_t i = 1; i < inputNum; ++i) { - if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || - GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[1])) { + int32_t type = GET_PARAM_TYPE(&pInput[i]); + if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i]))) { return TSDB_CODE_FAILED; } + if (type == TSDB_DATA_TYPE_NCHAR) { + hasNcharCol = true; + } if (pInput[i].numOfRows > numOfRows) { numOfRows = pInput[i].numOfRows; } } for (int32_t i = 0; i < inputNum; ++i) { pInputData[i] = pInput[i].columnData; + input[i] = pInputData[i]->pData; + int32_t factor = 1; + if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) { + factor = TSDB_NCHAR_SIZE; + } if (i == 0) { // calculate required separator space - int32_t factor = (GET_PARAM_TYPE(&pInput[1]) == TSDB_DATA_TYPE_NCHAR) ? TSDB_NCHAR_SIZE : 1; inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2) * factor; } else if (pInput[i].numOfRows == 1) { - inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows; + inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * factor; } else { inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; } - input[i] = pInputData[i]->pData; } int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE; @@ -441,8 +469,11 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p continue; } - memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i])); - dataLen += varDataLen(input[i]); + int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + if (pInput[i].numOfRows != 1) { input[i] += varDataTLen(input[i]); } @@ -450,8 +481,10 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p if (i < inputNum - 1) { //insert the separator char *sep = pInputData[0]->pData; - memcpy(varDataVal(output) + dataLen, varDataVal(sep), varDataLen(sep)); - dataLen += varDataLen(sep); + int32_t ret = concatCopyHelper(sep, output, hasNcharCol, GET_PARAM_TYPE(&pInput[0]), &dataLen); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } } } varDataSetLen(output, dataLen); @@ -467,7 +500,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p return TSDB_CODE_SUCCESS; } -int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) { +static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) { int32_t type = GET_PARAM_TYPE(pInput); if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { return TSDB_CODE_FAILED; @@ -512,7 +545,7 @@ int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam } -int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) { +static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) { int32_t type = GET_PARAM_TYPE(pInput); if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { return TSDB_CODE_FAILED;