From 87fefa9e15f638a0a950f4a05322c320995f1f44 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH 01/11] [TD-14241]: add string functions --- include/libs/function/functionMgt.h | 9 +- include/libs/scalar/scalar.h | 13 + source/libs/function/src/builtins.c | 82 +++++- source/libs/scalar/inc/sclInt.h | 2 +- source/libs/scalar/src/sclfunc.c | 408 ++++++++++++++++++++++++---- 5 files changed, 453 insertions(+), 61 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 7d46b543cb..0969fb203a 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -72,10 +72,15 @@ typedef enum EFunctionType { FUNCTION_TYPE_ATAN, // string function - FUNCTION_TYPE_CHAR_LENGTH = 1500, + FUNCTION_TYPE_LENGTH = 1500, + FUNCTION_TYPE_CHAR_LENGTH, FUNCTION_TYPE_CONCAT, FUNCTION_TYPE_CONCAT_WS, - FUNCTION_TYPE_LENGTH, + FUNCTION_TYPE_LOWER, + FUNCTION_TYPE_UPPER, + FUNCTION_TYPE_LTRIM, + FUNCTION_TYPE_RTRIM, + FUNCTION_TYPE_SUBSTR, // conversion function FUNCTION_TYPE_CAST = 2000, diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index c6d17ef65c..10ef0c1b23 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -42,6 +42,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type); int32_t vectorGetConvertType(int32_t type1, int32_t type2); int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut); +/* Math functions */ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); @@ -58,6 +59,18 @@ int32_t ceilFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int32_t floorFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +/* String functions */ +int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t lowerFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); + + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 0a5a90d2d9..4c07ba924d 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -282,6 +282,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = atanFunction, .finalizeFunc = NULL }, + { + .name = "length", + .type = FUNCTION_TYPE_LENGTH, + .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = lengthFunction, + .finalizeFunc = NULL + }, + { + .name = "char_length", + .type = FUNCTION_TYPE_CHAR_LENGTH, + .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = charLengthFunction, + .finalizeFunc = NULL + }, { .name = "concat", .type = FUNCTION_TYPE_CONCAT, @@ -289,7 +309,67 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .checkFunc = stubCheckAndGetResultType, .getEnvFunc = NULL, .initFunc = NULL, - .sprocessFunc = NULL, + .sprocessFunc = concatFunction, + .finalizeFunc = NULL + }, + { + .name = "concat_ws", + .type = FUNCTION_TYPE_CONCAT_WS, + .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = concatWsFunction, + .finalizeFunc = NULL + }, + { + .name = "lower", + .type = FUNCTION_TYPE_LOWER, + .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = lowerFunction, + .finalizeFunc = NULL + }, + { + .name = "upper", + .type = FUNCTION_TYPE_UPPER, + .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = upperFunction, + .finalizeFunc = NULL + }, + { + .name = "ltrim", + .type = FUNCTION_TYPE_LTRIM, + .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = ltrimFunction, + .finalizeFunc = NULL + }, + { + .name = "rtrim", + .type = FUNCTION_TYPE_RTRIM, + .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = rtrimFunction, + .finalizeFunc = NULL + }, + { + .name = "substr", + .type = FUNCTION_TYPE_SUBSTR, + .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = substrFunction, .finalizeFunc = NULL }, { diff --git a/source/libs/scalar/inc/sclInt.h b/source/libs/scalar/inc/sclInt.h index cf34fc24a9..58b62bb05e 100644 --- a/source/libs/scalar/inc/sclInt.h +++ b/source/libs/scalar/inc/sclInt.h @@ -47,7 +47,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out); SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows); #define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type) -#define GET_PARAM_BYTES(_c) ((_c)->pColumnInfoData->info.bytes) +#define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes) void sclFreeParam(SScalarParam *param); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index af5f07751b..15a9984c80 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -3,10 +3,14 @@ #include "sclInt.h" #include "sclvector.h" -static void assignBasicParaInfo(struct SScalarParam* dst, const struct SScalarParam* src) { -// dst->type = src->type; -// dst->bytes = src->bytes; -// dst->num = src->num; +typedef float (*_float_fn)(float); +typedef double (*_double_fn)(double); +typedef double (*_double_fn_2)(double, double); +typedef int (*_conv_fn)(int); +typedef void (*_trim_fn)(char *, char*, int32_t, int32_t); + +double tlog(double v, double base) { + return log(v) / log(base); } /** Math functions **/ @@ -107,14 +111,6 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu return TSDB_CODE_SUCCESS; } -typedef float (*_float_fn)(float); -typedef double (*_double_fn)(double); -typedef double (*_double_fn_2)(double, double); - -double tlog(double v, double base) { - return log(v) / log(base); -} - int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) { int32_t type = GET_PARAM_TYPE(pInput); if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) { @@ -216,6 +212,341 @@ int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* p return TSDB_CODE_SUCCESS; } +/** String functions **/ +int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + int32_t type = GET_PARAM_TYPE(pInput); + if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { + return TSDB_CODE_FAILED; + } + + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + char **in = (char **)pInputData->pData; + int16_t *out = (int16_t *)pOutputData->pData; + + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_f(pInputData->nullbitmap, i)) { + colDataSetNull_f(pOutputData->nullbitmap, i); + continue; + } + + out[i] = varDataLen(in[i]); + } + + pOutput->numOfRows = pInput->numOfRows; + return TSDB_CODE_SUCCESS; +} + +int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + int32_t type = GET_PARAM_TYPE(pInput); + if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { + return TSDB_CODE_FAILED; + } + + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + char **in = (char **)pInputData->pData; + int16_t *out = (int16_t *)pOutputData->pData; + + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_f(pInputData->nullbitmap, i)) { + colDataSetNull_f(pOutputData->nullbitmap, i); + continue; + } + + if (type == TSDB_DATA_TYPE_VARCHAR) { + out[i] = varDataLen(in[i]); + } else { //NCHAR + out[i] = varDataLen(in[i]) / TSDB_NCHAR_SIZE; + } + } + + pOutput->numOfRows = pInput->numOfRows; + return TSDB_CODE_SUCCESS; +} + +int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + if (inputNum < 2 || inputNum > 8) { // concat accpet 2-8 input strings + return TSDB_CODE_FAILED; + } + + SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *)); + SColumnInfoData *pOutputData = pOutput->columnData; + + for (int32_t i = 0; i < inputNum; ++i) { + if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || + GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { + return TSDB_CODE_FAILED; + } + pInputData[i] = pInput[i].columnData; + } + + bool hasNull = false; + for (int32_t k = 0; k < pInput->numOfRows; ++k) { + for (int32_t i = 0; i < inputNum; ++i) { + if (colDataIsNull_f(pInputData[i]->nullbitmap, k)) { + colDataSetNull_f(pOutputData->nullbitmap, k); + hasNull = true; + break; + } + } + + if (hasNull) { + continue; + } + + char *in = NULL; + char *out = pOutputData->pData + k * GET_PARAM_BYTES(pOutput); + + int16_t dataLen = 0; + for (int32_t i = 0; i < inputNum; ++i) { + in = pInputData[i]->pData + k * GET_PARAM_BYTES(&pInput[i]); + + memcpy(varDataVal(out) + dataLen, varDataVal(in), varDataLen(in)); + dataLen += varDataLen(in); + } + varDataSetLen(out, dataLen); + } + + pOutput->numOfRows = pInput->numOfRows; + taosMemoryFree(pInputData); + + return TSDB_CODE_SUCCESS; +} + +int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator + return TSDB_CODE_FAILED; + } + + SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *)); + SColumnInfoData *pOutputData = pOutput->columnData; + + for (int32_t i = 0; i < inputNum; ++i) { + if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || + GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { + return TSDB_CODE_FAILED; + } + pInputData[i] = pInput[i].columnData; + } + + for (int32_t k = 0; k < pInput->numOfRows; ++k) { + char *sep = pInputData[0]->pData; + if (colDataIsNull_f(pInputData[0]->nullbitmap, k)) { + colDataSetNull_f(pOutputData->nullbitmap, k); + continue; + } + + char *in = NULL; + char *out = pOutputData->pData + k * GET_PARAM_BYTES(pOutput); + + int16_t dataLen = 0; + for (int32_t i = 1; i < inputNum; ++i) { + if (colDataIsNull_f(pInputData[i]->nullbitmap, k)) { + continue; + } + + in = pInputData[i]->pData + k * GET_PARAM_BYTES(&pInput[i]); + memcpy(varDataVal(out) + dataLen, varDataVal(in), varDataLen(in)); + dataLen += varDataLen(in); + + if (i < inputNum - 1) { + //insert the separator + memcpy(varDataVal(out) + dataLen, varDataVal(sep), varDataLen(sep)); + dataLen += varDataLen(sep); + } + } + varDataSetLen(out, dataLen); + } + + pOutput->numOfRows = pInput->numOfRows; + taosMemoryFree(pInputData); + + return TSDB_CODE_SUCCESS; +} + +int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) { + int32_t type = GET_PARAM_TYPE(pInput); + if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { + return TSDB_CODE_FAILED; + } + + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_f(pInputData->nullbitmap, i)) { + colDataSetNull_f(pOutputData->nullbitmap, i); + continue; + } + + char *in = pInputData->pData + i * GET_PARAM_BYTES(pInput); + char *out = pOutputData->pData + i * GET_PARAM_BYTES(pInput); + + int32_t len = varDataLen(in); + if (type == TSDB_DATA_TYPE_VARCHAR) { + for (int32_t j = 0; j < len; ++j) { + *(varDataVal(out) + j) = convFn(*(varDataVal(in) + j)); + } + } else { //NCHAR + for (int32_t j = 0; j < len / TSDB_NCHAR_SIZE; ++j) { + *((uint32_t *)varDataVal(out) + j) = convFn(*((uint32_t *)varDataVal(in) + j)); + } + } + varDataSetLen(out, len); + } + + pOutput->numOfRows = pInput->numOfRows; + + return TSDB_CODE_SUCCESS; +} + +void tltrim(char *input, char *output, int32_t type, int32_t charLen) { + int32_t numOfSpaces = 0; + if (type == TSDB_DATA_TYPE_VARCHAR) { + for (int32_t i = 0; i < charLen; ++i) { + if (!isspace(*(varDataVal(input) + i))) { + break; + } + numOfSpaces++; + } + } else { //NCHAR + for (int32_t i = 0; i < charLen; ++i) { + if (!iswspace(*((uint32_t *)varDataVal(input) + i))) { + break; + } + numOfSpaces++; + } + } + + int32_t resLen; + if (type == TSDB_DATA_TYPE_VARCHAR) { + resLen = charLen - numOfSpaces; + memcpy(varDataVal(output), varDataVal(input) + numOfSpaces, resLen); + } else { + resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE; + memcpy(varDataVal(output), varDataVal(input) + numOfSpaces * TSDB_NCHAR_SIZE, resLen); + } + + varDataSetLen(output, resLen); +} + +void trtrim(char *input, char *output, int32_t type, int32_t charLen) { + int32_t numOfSpaces = 0; + if (type == TSDB_DATA_TYPE_VARCHAR) { + for (int32_t i = charLen - 1; i >= 0; --i) { + if (!isspace(*(varDataVal(input) + i))) { + break; + } + numOfSpaces++; + } + } else { //NCHAR + for (int32_t i = charLen - 1; i < charLen; ++i) { + if (!iswspace(*((uint32_t *)varDataVal(input) + i))) { + break; + } + numOfSpaces++; + } + } + + int32_t resLen; + if (type == TSDB_DATA_TYPE_VARCHAR) { + resLen = charLen - numOfSpaces; + } else { + resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE; + } + memcpy(varDataVal(output), varDataVal(input), resLen); + + varDataSetLen(output, resLen); +} + +int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) { + int32_t type = GET_PARAM_TYPE(pInput); + if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { + return TSDB_CODE_FAILED; + } + + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_f(pInputData->nullbitmap, i)) { + colDataSetNull_f(pOutputData->nullbitmap, i); + continue; + } + + char *in = pInputData->pData + i * GET_PARAM_BYTES(pInput); + char *out = pOutputData->pData + i * GET_PARAM_BYTES(pInput); + + int32_t len = varDataLen(in); + int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE; + trimFn(in, out, type, charLen); + } + + pOutput->numOfRows = pInput->numOfRows; + + return TSDB_CODE_SUCCESS; +} + +int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + if (inputNum != 2 || inputNum!= 3) { + return TSDB_CODE_FAILED; + } + + int32_t subPos = 0; + GET_TYPED_DATA(subPos, int32_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData); + if (subPos == 0) { //subPos needs to be positive or negative values; + return TSDB_CODE_FAILED; + } + + int32_t subLen = INT16_MAX; + if (inputNum == 3) { + GET_TYPED_DATA(subLen, int32_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); + if (subLen < 0) { //subLen cannot be negative + return TSDB_CODE_FAILED; + } + subLen = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subLen : subLen * TSDB_NCHAR_SIZE; + } + + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + for (int32_t i = 0; i < pOutput->numOfRows; ++i) { + if (colDataIsNull_f(pInputData->nullbitmap, i)) { + colDataSetNull_f(pOutputData->nullbitmap, i); + continue; + } + + char *in = pInputData->pData + i * GET_PARAM_BYTES(pInput); + char *out = pOutputData->pData + i * GET_PARAM_BYTES(pInput); + + int32_t len = varDataLen(in); + int32_t startPosBytes; + + if (subPos > 0) { + startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subPos - 1 : (subPos - 1) * TSDB_NCHAR_SIZE; + startPosBytes = MIN(startPosBytes, len); + } else { + startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? len + subPos : len + subPos * TSDB_NCHAR_SIZE; + startPosBytes = MAX(startPosBytes, 0); + } + + subLen = MIN(subLen, len - startPosBytes); + if (subLen > 0) { + memcpy(varDataVal(out), varDataVal(in) + startPosBytes, subLen); + } + + varDataSetLen(out, subLen); + } + + pOutput->numOfRows = pInput->numOfRows; + + return TSDB_CODE_SUCCESS; +} + + int32_t atanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { return doScalarFunctionUnique(pInput, inputNum, pOutput, atan); } @@ -264,57 +595,20 @@ int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut return doScalarFunction(pInput, inputNum, pOutput, roundf, round); } -static void tlength(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { - assert(numOfInput == 1); -#if 0 - int64_t* out = (int64_t*) pOutput->data; - char* s = pLeft->data; - - for(int32_t i = 0; i < pLeft->num; ++i) { - out[i] = varDataLen(POINTER_SHIFT(s, i * pLeft->bytes)); - } -#endif +int32_t lowerFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doCaseConvFunction(pInput, inputNum, pOutput, tolower); } -static void tconcat(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { - assert(numOfInput > 0); -#if 0 - int32_t rowLen = 0; - int32_t num = 1; - for(int32_t i = 0; i < numOfInput; ++i) { - rowLen += pLeft[i].bytes; - - if (pLeft[i].num > 1) { - num = pLeft[i].num; - } - } - - pOutput->data = taosMemoryRealloc(pOutput->data, rowLen * num); - assert(pOutput->data); - - char* rstart = pOutput->data; - for(int32_t i = 0; i < num; ++i) { - - char* s = rstart; - varDataSetLen(s, 0); - for (int32_t j = 0; j < numOfInput; ++j) { - char* p1 = POINTER_SHIFT(pLeft[j].data, i * pLeft[j].bytes); - - memcpy(varDataVal(s) + varDataLen(s), varDataVal(p1), varDataLen(p1)); - varDataLen(s) += varDataLen(p1); - } - - rstart += rowLen; - } -#endif +int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doCaseConvFunction(pInput, inputNum, pOutput, toupper); } -static void tltrim(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { - +int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doTrimFunction(pInput, inputNum, pOutput, tltrim); } -static void trtrim(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { - +int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doTrimFunction(pInput, inputNum, pOutput, trtrim); } static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) { From b0eefba1b3fa738c8a29e087c76beb0e80dce796 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH 02/11] [TD-14241]: add string functions --- source/libs/scalar/src/sclfunc.c | 169 +++++++++++++++---------------- 1 file changed, 81 insertions(+), 88 deletions(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 15a9984c80..12eee76838 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -8,12 +8,13 @@ typedef double (*_double_fn)(double); typedef double (*_double_fn_2)(double, double); typedef int (*_conv_fn)(int); typedef void (*_trim_fn)(char *, char*, int32_t, int32_t); +typedef int16_t (*_len_fn)(char *, int32_t); +/** Math functions **/ double tlog(double v, double base) { return log(v) / log(base); } -/** Math functions **/ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; @@ -213,32 +214,78 @@ int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* p } /** String functions **/ -int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - int32_t type = GET_PARAM_TYPE(pInput); - if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { - return TSDB_CODE_FAILED; - } - - SColumnInfoData *pInputData = pInput->columnData; - SColumnInfoData *pOutputData = pOutput->columnData; - - char **in = (char **)pInputData->pData; - int16_t *out = (int16_t *)pOutputData->pData; - - for (int32_t i = 0; i < pInput->numOfRows; ++i) { - if (colDataIsNull_f(pInputData->nullbitmap, i)) { - colDataSetNull_f(pOutputData->nullbitmap, i); - continue; - } - - out[i] = varDataLen(in[i]); - } - - pOutput->numOfRows = pInput->numOfRows; - return TSDB_CODE_SUCCESS; +int16_t tlength(char *input, int32_t type) { + return varDataLen(input); } -int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { +int16_t tcharlength(char *input, int32_t type) { + if (type == TSDB_DATA_TYPE_VARCHAR) { + return varDataLen(input); + } else { //NCHAR + return varDataLen(input) / TSDB_NCHAR_SIZE; + } +} + +void tltrim(char *input, char *output, int32_t type, int32_t charLen) { + int32_t numOfSpaces = 0; + if (type == TSDB_DATA_TYPE_VARCHAR) { + for (int32_t i = 0; i < charLen; ++i) { + if (!isspace(*(varDataVal(input) + i))) { + break; + } + numOfSpaces++; + } + } else { //NCHAR + for (int32_t i = 0; i < charLen; ++i) { + if (!iswspace(*((uint32_t *)varDataVal(input) + i))) { + break; + } + numOfSpaces++; + } + } + + int32_t resLen; + if (type == TSDB_DATA_TYPE_VARCHAR) { + resLen = charLen - numOfSpaces; + memcpy(varDataVal(output), varDataVal(input) + numOfSpaces, resLen); + } else { + resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE; + memcpy(varDataVal(output), varDataVal(input) + numOfSpaces * TSDB_NCHAR_SIZE, resLen); + } + + varDataSetLen(output, resLen); +} + +void trtrim(char *input, char *output, int32_t type, int32_t charLen) { + int32_t numOfSpaces = 0; + if (type == TSDB_DATA_TYPE_VARCHAR) { + for (int32_t i = charLen - 1; i >= 0; --i) { + if (!isspace(*(varDataVal(input) + i))) { + break; + } + numOfSpaces++; + } + } else { //NCHAR + for (int32_t i = charLen - 1; i < charLen; ++i) { + if (!iswspace(*((uint32_t *)varDataVal(input) + i))) { + break; + } + numOfSpaces++; + } + } + + int32_t resLen; + if (type == TSDB_DATA_TYPE_VARCHAR) { + resLen = charLen - numOfSpaces; + } else { + resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE; + } + memcpy(varDataVal(output), varDataVal(input), resLen); + + varDataSetLen(output, resLen); +} + +int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _len_fn lenFn) { int32_t type = GET_PARAM_TYPE(pInput); if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { return TSDB_CODE_FAILED; @@ -256,11 +303,7 @@ int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam continue; } - if (type == TSDB_DATA_TYPE_VARCHAR) { - out[i] = varDataLen(in[i]); - } else { //NCHAR - out[i] = varDataLen(in[i]) / TSDB_NCHAR_SIZE; - } + out[i] = lenFn(in[i], type); } pOutput->numOfRows = pInput->numOfRows; @@ -403,64 +446,6 @@ int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam return TSDB_CODE_SUCCESS; } -void tltrim(char *input, char *output, int32_t type, int32_t charLen) { - int32_t numOfSpaces = 0; - if (type == TSDB_DATA_TYPE_VARCHAR) { - for (int32_t i = 0; i < charLen; ++i) { - if (!isspace(*(varDataVal(input) + i))) { - break; - } - numOfSpaces++; - } - } else { //NCHAR - for (int32_t i = 0; i < charLen; ++i) { - if (!iswspace(*((uint32_t *)varDataVal(input) + i))) { - break; - } - numOfSpaces++; - } - } - - int32_t resLen; - if (type == TSDB_DATA_TYPE_VARCHAR) { - resLen = charLen - numOfSpaces; - memcpy(varDataVal(output), varDataVal(input) + numOfSpaces, resLen); - } else { - resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE; - memcpy(varDataVal(output), varDataVal(input) + numOfSpaces * TSDB_NCHAR_SIZE, resLen); - } - - varDataSetLen(output, resLen); -} - -void trtrim(char *input, char *output, int32_t type, int32_t charLen) { - int32_t numOfSpaces = 0; - if (type == TSDB_DATA_TYPE_VARCHAR) { - for (int32_t i = charLen - 1; i >= 0; --i) { - if (!isspace(*(varDataVal(input) + i))) { - break; - } - numOfSpaces++; - } - } else { //NCHAR - for (int32_t i = charLen - 1; i < charLen; ++i) { - if (!iswspace(*((uint32_t *)varDataVal(input) + i))) { - break; - } - numOfSpaces++; - } - } - - int32_t resLen; - if (type == TSDB_DATA_TYPE_VARCHAR) { - resLen = charLen - numOfSpaces; - } else { - resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE; - } - memcpy(varDataVal(output), varDataVal(input), resLen); - - varDataSetLen(output, resLen); -} int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) { int32_t type = GET_PARAM_TYPE(pInput); @@ -611,6 +596,14 @@ int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut return doTrimFunction(pInput, inputNum, pOutput, trtrim); } +int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doLengthFunction(pInput, inputNum, pOutput, tlength); +} + +int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doLengthFunction(pInput, inputNum, pOutput, tcharlength); +} + static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) { switch(type) { case TSDB_DATA_TYPE_TINYINT: From 89abd4e3495d2ce733a05ef59d6b17ca165fcf12 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH 03/11] [TD-14241]: add string functions --- source/libs/function/src/builtins.c | 39 +++++++++++++++++++++-------- source/libs/scalar/src/sclfunc.c | 5 ++-- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4c07ba924d..a18386aff6 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -478,16 +478,6 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType }; break; } - case FUNCTION_TYPE_CONCAT: - case FUNCTION_TYPE_ROWTS: - case FUNCTION_TYPE_TBNAME: - case FUNCTION_TYPE_QSTARTTS: - case FUNCTION_TYPE_QENDTS: - case FUNCTION_TYPE_WSTARTTS: - case FUNCTION_TYPE_WENDTS: - case FUNCTION_TYPE_WDURATION: - // todo - break; case FUNCTION_TYPE_ABS: case FUNCTION_TYPE_CEIL: @@ -512,6 +502,35 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { break; } + case FUNCTION_TYPE_LENGTH: + case FUNCTION_TYPE_CHAR_LENGTH: { + pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_SMALLINT].bytes, .type = TSDB_DATA_TYPE_SMALLINT }; + break; + } + + case FUNCTION_TYPE_CONCAT: + case FUNCTION_TYPE_CONCAT_WS: + case FUNCTION_TYPE_LOWER: + case FUNCTION_TYPE_UPPER: + case FUNCTION_TYPE_LTRIM: + case FUNCTION_TYPE_RTRIM: + case FUNCTION_TYPE_SUBSTR: { + SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); + int32_t paraType = pParam->node.resType.type; + pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType }; + break; + } + + case FUNCTION_TYPE_ROWTS: + case FUNCTION_TYPE_TBNAME: + case FUNCTION_TYPE_QSTARTTS: + case FUNCTION_TYPE_QENDTS: + case FUNCTION_TYPE_WSTARTTS: + case FUNCTION_TYPE_WENDTS: + case FUNCTION_TYPE_WDURATION: + // todo + break; + default: ASSERT(0); // to found the fault ASAP. } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 12eee76838..8eee608ee0 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -294,7 +294,7 @@ int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; - char **in = (char **)pInputData->pData; + char *in = pInputData->pData; int16_t *out = (int16_t *)pOutputData->pData; for (int32_t i = 0; i < pInput->numOfRows; ++i) { @@ -303,7 +303,8 @@ int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p continue; } - out[i] = lenFn(in[i], type); + out[i] = lenFn(in, type); + in += varDataTLen(in); } pOutput->numOfRows = pInput->numOfRows; From 89aba48960cc6afd5c342cb61e15e078d6fcc074 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH 04/11] [TD-14241]: concat function adoption --- source/libs/scalar/src/sclfunc.c | 39 ++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 8eee608ee0..864b8acf75 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -310,6 +310,8 @@ int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p pOutput->numOfRows = pInput->numOfRows; return TSDB_CODE_SUCCESS; } +void allocateOutputBuf() { +} int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { if (inputNum < 2 || inputNum > 8) { // concat accpet 2-8 input strings @@ -318,17 +320,35 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *)); SColumnInfoData *pOutputData = pOutput->columnData; + char **input = taosMemoryCalloc(inputNum, POINTER_BYTES); + char *output = NULL; + int32_t inputLen = 0; + int32_t numOfRows = pInput->numOfRows; for (int32_t i = 0; i < inputNum; ++i) { if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { return TSDB_CODE_FAILED; } pInputData[i] = pInput[i].columnData; + inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; + input[i] = pInputData[i]->pData; } + //allocate output buf + if (pOutputData->pData == NULL) { + int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE; + pOutputData->pData = taosMemoryCalloc(outputLen, sizeof(char)); + pOutputData->info.type = GET_PARAM_TYPE(pInput); + pOutputData->info.bytes = outputLen; + pOutputData->varmeta.length = outputLen; + pOutputData->varmeta.allocLen = outputLen; + } + output = pOutputData->pData; + bool hasNull = false; - for (int32_t k = 0; k < pInput->numOfRows; ++k) { + int32_t offset = 0; + for (int32_t k = 0; k < numOfRows; ++k) { for (int32_t i = 0; i < inputNum; ++i) { if (colDataIsNull_f(pInputData[i]->nullbitmap, k)) { colDataSetNull_f(pOutputData->nullbitmap, k); @@ -341,20 +361,21 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu continue; } - char *in = NULL; - char *out = pOutputData->pData + k * GET_PARAM_BYTES(pOutput); - int16_t dataLen = 0; for (int32_t i = 0; i < inputNum; ++i) { - in = pInputData[i]->pData + k * GET_PARAM_BYTES(&pInput[i]); - - memcpy(varDataVal(out) + dataLen, varDataVal(in), varDataLen(in)); - dataLen += varDataLen(in); + memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i])); + dataLen += varDataLen(input[i]); + input[i] += varDataTLen(input[i]); } - varDataSetLen(out, dataLen); + varDataSetLen(output, dataLen); + int32_t dataTLen = varDataTLen(output); + output += dataTLen; + pOutputData->varmeta.offset[k] = offset; + offset += dataTLen; } pOutput->numOfRows = pInput->numOfRows; + taosMemoryFree(input); taosMemoryFree(pInputData); return TSDB_CODE_SUCCESS; From 6f6d0e4a0997d2c592e07fefcab16d8829c6c608 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH 05/11] [TD-14241]: concat function adoption --- source/libs/function/src/builtins.c | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a18386aff6..a19acb096d 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -508,7 +508,22 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { break; } - case FUNCTION_TYPE_CONCAT: + case FUNCTION_TYPE_CONCAT: { + int32_t paraTypeFirst, totalBytes = 0; + for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) { + SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i); + int32_t paraType = pParam->node.resType.type; + if (i == 0) { + paraTypeFirst = paraType; + } + if (paraType != paraTypeFirst) { + return TSDB_CODE_FAILED; + } + totalBytes += pParam->node.resType.bytes; + } + pFunc->node.resType = (SDataType) { .bytes = totalBytes, .type = paraTypeFirst }; + break; + } case FUNCTION_TYPE_CONCAT_WS: case FUNCTION_TYPE_LOWER: case FUNCTION_TYPE_UPPER: @@ -517,7 +532,8 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { case FUNCTION_TYPE_SUBSTR: { SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); int32_t paraType = pParam->node.resType.type; - pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType }; + //pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType }; + pFunc->node.resType = (SDataType) { .bytes = 23, .type = paraType }; break; } From 6eb6cfe279b214bc0e457bd5f2fbd24576746255 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH 06/11] [TD-14241]: concat_ws function adoption --- source/libs/function/src/builtins.c | 18 ++++++++--- source/libs/scalar/src/sclfunc.c | 47 ++++++++++++++++++++++------- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a19acb096d..4cbc91fa6a 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -508,12 +508,19 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { break; } - case FUNCTION_TYPE_CONCAT: { - int32_t paraTypeFirst, totalBytes = 0; - for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) { + case FUNCTION_TYPE_CONCAT: + case FUNCTION_TYPE_CONCAT_WS: { + int32_t paraTypeFirst, totalBytes = 0, sepBytes = 0; + int32_t firstParamIndex = 0; + if (pFunc->funcType == FUNCTION_TYPE_CONCAT_WS) { + firstParamIndex = 1; + SColumnNode* pSep = nodesListGetNode(pFunc->pParameterList, 0); + sepBytes = pSep->node.resType.type; + } + for (int32_t i = firstParamIndex; i < pFunc->pParameterList->length; ++i) { SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i); int32_t paraType = pParam->node.resType.type; - if (i == 0) { + if (i == firstParamIndex) { paraTypeFirst = paraType; } if (paraType != paraTypeFirst) { @@ -521,10 +528,11 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { } totalBytes += pParam->node.resType.bytes; } + //TODO: need to get numOfRows to decide how much space separator needed. Currently set to 100. + totalBytes += sepBytes * (pFunc->pParameterList->length - 2) * 100; pFunc->node.resType = (SDataType) { .bytes = totalBytes, .type = paraTypeFirst }; break; } - case FUNCTION_TYPE_CONCAT_WS: case FUNCTION_TYPE_LOWER: case FUNCTION_TYPE_UPPER: case FUNCTION_TYPE_LTRIM: diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 864b8acf75..f1eab5c05c 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -374,7 +374,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu offset += dataTLen; } - pOutput->numOfRows = pInput->numOfRows; + pOutput->numOfRows = numOfRows; taosMemoryFree(input); taosMemoryFree(pInputData); @@ -388,45 +388,70 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *)); SColumnInfoData *pOutputData = pOutput->columnData; + char **input = taosMemoryCalloc(inputNum, POINTER_BYTES); + char *output = NULL; + int32_t inputLen = 0; + int32_t numOfRows = pInput[1].numOfRows; for (int32_t i = 0; i < inputNum; ++i) { if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { return TSDB_CODE_FAILED; } pInputData[i] = pInput[i].columnData; + if (i == 0) { + // calculate required separator space + inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * (inputNum - 2); + } else { + inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; + } + input[i] = pInputData[i]->pData; } - for (int32_t k = 0; k < pInput->numOfRows; ++k) { + //allocate output buf + if (pOutputData->pData == NULL) { + int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE; + pOutputData->pData = taosMemoryCalloc(outputLen, sizeof(char)); + pOutputData->info.type = GET_PARAM_TYPE(pInput); + pOutputData->info.bytes = outputLen; + pOutputData->varmeta.length = outputLen; + pOutputData->varmeta.allocLen = outputLen; + } + output = pOutputData->pData; + + int32_t offset = 0; + for (int32_t k = 0; k < numOfRows; ++k) { char *sep = pInputData[0]->pData; if (colDataIsNull_f(pInputData[0]->nullbitmap, k)) { colDataSetNull_f(pOutputData->nullbitmap, k); continue; } - char *in = NULL; - char *out = pOutputData->pData + k * GET_PARAM_BYTES(pOutput); - int16_t dataLen = 0; for (int32_t i = 1; i < inputNum; ++i) { if (colDataIsNull_f(pInputData[i]->nullbitmap, k)) { continue; } - in = pInputData[i]->pData + k * GET_PARAM_BYTES(&pInput[i]); - memcpy(varDataVal(out) + dataLen, varDataVal(in), varDataLen(in)); - dataLen += varDataLen(in); + memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i])); + dataLen += varDataLen(input[i]); + input[i] += varDataTLen(input[i]); if (i < inputNum - 1) { //insert the separator - memcpy(varDataVal(out) + dataLen, varDataVal(sep), varDataLen(sep)); + memcpy(varDataVal(output) + dataLen, varDataVal(sep), varDataLen(sep)); dataLen += varDataLen(sep); } } - varDataSetLen(out, dataLen); + varDataSetLen(output, dataLen); + int32_t dataTLen = varDataTLen(output); + output += dataTLen; + pOutputData->varmeta.offset[k] = offset; + offset += dataTLen; } - pOutput->numOfRows = pInput->numOfRows; + pOutput->numOfRows = numOfRows; + taosMemoryFree(input); taosMemoryFree(pInputData); return TSDB_CODE_SUCCESS; From dddf621aa7a4504f756c1d2aa96a18a417b8a85c Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH 07/11] [TD-14241]: concat/concat_ws fix input has both constant and column issue --- source/libs/function/src/builtins.c | 1 + source/libs/scalar/src/sclfunc.c | 52 +++++++++++++++++++---------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4cbc91fa6a..957e7ee406 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -526,6 +526,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { if (paraType != paraTypeFirst) { return TSDB_CODE_FAILED; } + //TODO: for constants also needs numOfRows totalBytes += pParam->node.resType.bytes; } //TODO: need to get numOfRows to decide how much space separator needed. Currently set to 100. diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index f1eab5c05c..21d8aee0ec 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -310,7 +310,13 @@ int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p pOutput->numOfRows = pInput->numOfRows; return TSDB_CODE_SUCCESS; } -void allocateOutputBuf() { + +static void setVarTypeOutputBuf(SColumnInfoData *pOutputData, int32_t len, int32_t type) { + pOutputData->pData = taosMemoryCalloc(len, sizeof(char)); + pOutputData->info.type = type; + pOutputData->info.bytes = len; + pOutputData->varmeta.length = len; + pOutputData->varmeta.allocLen = len; } int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { @@ -324,25 +330,30 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu char *output = NULL; int32_t inputLen = 0; - int32_t numOfRows = pInput->numOfRows; + int32_t numOfRows = 0; + for (int32_t i = 0; i < inputNum; ++i) { + if (pInput[i].numOfRows > numOfRows) { + numOfRows = pInput[i].numOfRows; + } + } for (int32_t i = 0; i < inputNum; ++i) { if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { return TSDB_CODE_FAILED; } pInputData[i] = pInput[i].columnData; - inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; input[i] = pInputData[i]->pData; + if (pInput[i].numOfRows == 1) { + inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows; + } else { + inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; + } } //allocate output buf if (pOutputData->pData == NULL) { int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE; - pOutputData->pData = taosMemoryCalloc(outputLen, sizeof(char)); - pOutputData->info.type = GET_PARAM_TYPE(pInput); - pOutputData->info.bytes = outputLen; - pOutputData->varmeta.length = outputLen; - pOutputData->varmeta.allocLen = outputLen; + setVarTypeOutputBuf(pOutputData, outputLen, GET_PARAM_TYPE(pInput)); } output = pOutputData->pData; @@ -365,7 +376,9 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu for (int32_t i = 0; i < inputNum; ++i) { memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i])); dataLen += varDataLen(input[i]); - input[i] += varDataTLen(input[i]); + if (pInput[i].numOfRows != 1) { + input[i] += varDataTLen(input[i]); + } } varDataSetLen(output, dataLen); int32_t dataTLen = varDataTLen(output); @@ -392,7 +405,12 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p char *output = NULL; int32_t inputLen = 0; - int32_t numOfRows = pInput[1].numOfRows; + int32_t numOfRows = 0; + for (int32_t i = 0; i < inputNum; ++i) { + if (pInput[i].numOfRows > numOfRows) { + numOfRows = pInput[i].numOfRows; + } + } for (int32_t i = 0; i < inputNum; ++i) { if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { @@ -401,7 +419,9 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p pInputData[i] = pInput[i].columnData; if (i == 0) { // calculate required separator space - inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * (inputNum - 2); + inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2); + } else if (pInput[i].numOfRows == 1) { + inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows; } else { inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; } @@ -411,11 +431,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p //allocate output buf if (pOutputData->pData == NULL) { int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE; - pOutputData->pData = taosMemoryCalloc(outputLen, sizeof(char)); - pOutputData->info.type = GET_PARAM_TYPE(pInput); - pOutputData->info.bytes = outputLen; - pOutputData->varmeta.length = outputLen; - pOutputData->varmeta.allocLen = outputLen; + setVarTypeOutputBuf(pOutputData, outputLen, GET_PARAM_TYPE(&pInput[1])); } output = pOutputData->pData; @@ -435,7 +451,9 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i])); dataLen += varDataLen(input[i]); - input[i] += varDataTLen(input[i]); + if (pInput[i].numOfRows != 1) { + input[i] += varDataTLen(input[i]); + } if (i < inputNum - 1) { //insert the separator From c846c6ebf30ba307e5ba028253294a6637de0b25 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH 08/11] [TD-14241]: lower/upper function adoption --- source/libs/function/src/builtins.c | 6 +++--- source/libs/scalar/src/sclfunc.c | 26 +++++++++++++++++++------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 957e7ee406..dd9e32655f 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -540,9 +540,9 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { case FUNCTION_TYPE_RTRIM: case FUNCTION_TYPE_SUBSTR: { SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); - int32_t paraType = pParam->node.resType.type; - //pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType }; - pFunc->node.resType = (SDataType) { .bytes = 23, .type = paraType }; + int32_t paraType = pParam->node.resType.type; + int32_t paraBytes = pParam->node.resType.bytes; + pFunc->node.resType = (SDataType) { .bytes = paraBytes, .type = paraType }; break; } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 21d8aee0ec..e6eb3c43b4 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -484,26 +484,38 @@ int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; + //allocate output buf + if (pOutputData->pData == NULL) { + int32_t outputLen = pInputData->varmeta.length; + setVarTypeOutputBuf(pOutputData, outputLen, GET_PARAM_TYPE(pInput)); + } + + char *input = pInputData->pData; + char *output = pOutputData->pData; + + int32_t offset = 0; for (int32_t i = 0; i < pInput->numOfRows; ++i) { if (colDataIsNull_f(pInputData->nullbitmap, i)) { colDataSetNull_f(pOutputData->nullbitmap, i); continue; } - char *in = pInputData->pData + i * GET_PARAM_BYTES(pInput); - char *out = pOutputData->pData + i * GET_PARAM_BYTES(pInput); - - int32_t len = varDataLen(in); + int32_t len = varDataLen(input); if (type == TSDB_DATA_TYPE_VARCHAR) { for (int32_t j = 0; j < len; ++j) { - *(varDataVal(out) + j) = convFn(*(varDataVal(in) + j)); + *(varDataVal(output) + j) = convFn(*(varDataVal(input) + j)); } } else { //NCHAR for (int32_t j = 0; j < len / TSDB_NCHAR_SIZE; ++j) { - *((uint32_t *)varDataVal(out) + j) = convFn(*((uint32_t *)varDataVal(in) + j)); + *((uint32_t *)varDataVal(output) + j) = convFn(*((uint32_t *)varDataVal(input) + j)); } } - varDataSetLen(out, len); + varDataSetLen(output, len); + input += varDataTLen(input); + output += varDataTLen(output); + + pOutputData->varmeta.offset[i] = offset; + offset += len + VARSTR_HEADER_SIZE; } pOutput->numOfRows = pInput->numOfRows; From 64184611d7e35c1aa84e59a91640e6b380e6b811 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH 09/11] [TD-14241]: ltrim/rtrim function adoption --- source/libs/scalar/src/sclfunc.c | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index e6eb3c43b4..9f481c8fb2 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -533,18 +533,32 @@ int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; + //allocate output buf + if (pOutputData->pData == NULL) { + int32_t outputLen = pInputData->varmeta.length; + setVarTypeOutputBuf(pOutputData, outputLen, GET_PARAM_TYPE(pInput)); + } + + char *input = pInputData->pData; + char *output = pOutputData->pData; + + int32_t offset = 0; for (int32_t i = 0; i < pInput->numOfRows; ++i) { if (colDataIsNull_f(pInputData->nullbitmap, i)) { colDataSetNull_f(pOutputData->nullbitmap, i); continue; } - char *in = pInputData->pData + i * GET_PARAM_BYTES(pInput); - char *out = pOutputData->pData + i * GET_PARAM_BYTES(pInput); - - int32_t len = varDataLen(in); + int32_t len = varDataLen(input); int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE; - trimFn(in, out, type, charLen); + trimFn(input, output, type, charLen); + + varDataSetLen(output, len); + input += varDataTLen(input); + output += varDataTLen(output); + + pOutputData->varmeta.offset[i] = offset; + offset += len + VARSTR_HEADER_SIZE; } pOutput->numOfRows = pInput->numOfRows; From 5a1548bf1548b9a9f66d8ea7283037703482ae13 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH 10/11] [TD-14241]: substr function adoption --- source/libs/function/src/builtins.c | 49 ++++++++++++++++------------- source/libs/scalar/src/sclfunc.c | 30 ++++++++++++------ 2 files changed, 49 insertions(+), 30 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index dd9e32655f..dc43a3508f 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -510,29 +510,36 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { case FUNCTION_TYPE_CONCAT: case FUNCTION_TYPE_CONCAT_WS: { - int32_t paraTypeFirst, totalBytes = 0, sepBytes = 0; - int32_t firstParamIndex = 0; - if (pFunc->funcType == FUNCTION_TYPE_CONCAT_WS) { - firstParamIndex = 1; - SColumnNode* pSep = nodesListGetNode(pFunc->pParameterList, 0); - sepBytes = pSep->node.resType.type; - } - for (int32_t i = firstParamIndex; i < pFunc->pParameterList->length; ++i) { + int32_t paraType, paraBytes = 0; + for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) { SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i); - int32_t paraType = pParam->node.resType.type; - if (i == firstParamIndex) { - paraTypeFirst = paraType; - } - if (paraType != paraTypeFirst) { - return TSDB_CODE_FAILED; - } - //TODO: for constants also needs numOfRows - totalBytes += pParam->node.resType.bytes; + paraBytes += pParam->node.resType.bytes; + paraType = pParam->node.resType.type; } - //TODO: need to get numOfRows to decide how much space separator needed. Currently set to 100. - totalBytes += sepBytes * (pFunc->pParameterList->length - 2) * 100; - pFunc->node.resType = (SDataType) { .bytes = totalBytes, .type = paraTypeFirst }; - break; + pFunc->node.resType = (SDataType) { .bytes = paraBytes, .type = paraType }; + //int32_t paraTypeFirst, totalBytes = 0, sepBytes = 0; + //int32_t firstParamIndex = 0; + //if (pFunc->funcType == FUNCTION_TYPE_CONCAT_WS) { + // firstParamIndex = 1; + // SColumnNode* pSep = nodesListGetNode(pFunc->pParameterList, 0); + // sepBytes = pSep->node.resType.type; + //} + //for (int32_t i = firstParamIndex; i < pFunc->pParameterList->length; ++i) { + // SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i); + // int32_t paraType = pParam->node.resType.type; + // if (i == firstParamIndex) { + // paraTypeFirst = paraType; + // } + // if (paraType != paraTypeFirst) { + // return TSDB_CODE_FAILED; + // } + // //TODO: for constants also needs numOfRows + // totalBytes += pParam->node.resType.bytes; + //} + ////TODO: need to get numOfRows to decide how much space separator needed. Currently set to 100. + //totalBytes += sepBytes * (pFunc->pParameterList->length - 2) * 100; + //pFunc->node.resType = (SDataType) { .bytes = totalBytes, .type = paraTypeFirst }; + //break; } case FUNCTION_TYPE_LOWER: case FUNCTION_TYPE_UPPER: diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 9f481c8fb2..222f7ac69a 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -350,7 +350,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu } } - //allocate output buf + allocate output buf if (pOutputData->pData == NULL) { int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE; setVarTypeOutputBuf(pOutputData, outputLen, GET_PARAM_TYPE(pInput)); @@ -567,7 +567,7 @@ int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu } int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - if (inputNum != 2 || inputNum!= 3) { + if (inputNum != 2 && inputNum!= 3) { return TSDB_CODE_FAILED; } @@ -589,16 +589,23 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; - for (int32_t i = 0; i < pOutput->numOfRows; ++i) { + //allocate output buf + if (pOutputData->pData == NULL) { + int32_t outputLen = pInputData->varmeta.length; + setVarTypeOutputBuf(pOutputData, outputLen, GET_PARAM_TYPE(pInput)); + } + + char *input = pInputData->pData; + char *output = pOutputData->pData; + + int32_t offset = 0; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { if (colDataIsNull_f(pInputData->nullbitmap, i)) { colDataSetNull_f(pOutputData->nullbitmap, i); continue; } - char *in = pInputData->pData + i * GET_PARAM_BYTES(pInput); - char *out = pOutputData->pData + i * GET_PARAM_BYTES(pInput); - - int32_t len = varDataLen(in); + int32_t len = varDataLen(input); int32_t startPosBytes; if (subPos > 0) { @@ -611,10 +618,15 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu subLen = MIN(subLen, len - startPosBytes); if (subLen > 0) { - memcpy(varDataVal(out), varDataVal(in) + startPosBytes, subLen); + memcpy(varDataVal(output), varDataVal(input) + startPosBytes, subLen); } - varDataSetLen(out, subLen); + varDataSetLen(output, subLen); + input += varDataTLen(input); + output += varDataTLen(output); + + pOutputData->varmeta.offset[i] = offset; + offset += subLen + VARSTR_HEADER_SIZE; } pOutput->numOfRows = pInput->numOfRows; From 8bcdf8e200132cf066cd3a8b3253f3eecd33c413 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 2 Apr 2022 18:34:17 +0800 Subject: [PATCH 11/11] [TD-14241]: refactor string functions --- source/libs/function/src/builtins.c | 1 + source/libs/scalar/src/sclfunc.c | 25 +++++++++++++------------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 126117f982..99a247b45e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -536,6 +536,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { paraType = pParam->node.resType.type; } pFunc->node.resType = (SDataType) { .bytes = paraBytes, .type = paraType }; + break; //int32_t paraTypeFirst, totalBytes = 0, sepBytes = 0; //int32_t firstParamIndex = 0; //if (pFunc->funcType == FUNCTION_TYPE_CONCAT_WS) { diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index bc508f6e22..c02729a084 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -333,15 +333,15 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu int32_t inputLen = 0; int32_t numOfRows = 0; for (int32_t i = 0; i < inputNum; ++i) { + if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || + GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { + return TSDB_CODE_FAILED; + } if (pInput[i].numOfRows > numOfRows) { numOfRows = pInput[i].numOfRows; } } for (int32_t i = 0; i < inputNum; ++i) { - if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || - GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { - return TSDB_CODE_FAILED; - } pInputData[i] = pInput[i].columnData; input[i] = pInputData[i]->pData; if (pInput[i].numOfRows == 1) { @@ -402,20 +402,21 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p int32_t inputLen = 0; int32_t numOfRows = 0; - for (int32_t i = 0; i < inputNum; ++i) { + for (int32_t i = 1; i < inputNum; ++i) { + if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || + GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[1])) { + return TSDB_CODE_FAILED; + } if (pInput[i].numOfRows > numOfRows) { numOfRows = pInput[i].numOfRows; } } for (int32_t i = 0; i < inputNum; ++i) { - if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || - GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { - return TSDB_CODE_FAILED; - } pInputData[i] = pInput[i].columnData; if (i == 0) { // calculate required separator space - inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2); + int32_t factor = (GET_PARAM_TYPE(&pInput[1]) == TSDB_DATA_TYPE_NCHAR) ? TSDB_NCHAR_SIZE : 1; + inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2) * factor; } else if (pInput[i].numOfRows == 1) { inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows; } else { @@ -429,7 +430,6 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p char *output = outputBuf; for (int32_t k = 0; k < numOfRows; ++k) { - char *sep = pInputData[0]->pData; if (colDataIsNull_s(pInputData[0], k)) { colDataAppendNULL(pOutputData, k); continue; @@ -437,7 +437,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p int16_t dataLen = 0; for (int32_t i = 1; i < inputNum; ++i) { - if (colDataIsNull_f(pInputData[i]->nullbitmap, k)) { + if (colDataIsNull_s(pInputData[i], k)) { continue; } @@ -449,6 +449,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p if (i < inputNum - 1) { //insert the separator + char *sep = pInputData[0]->pData; memcpy(varDataVal(output) + dataLen, varDataVal(sep), varDataLen(sep)); dataLen += varDataLen(sep); }