From 6eb6cfe279b214bc0e457bd5f2fbd24576746255 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 30 Mar 2022 14:03:54 +0800 Subject: [PATCH] [TD-14241]: concat_ws function adoption --- source/libs/function/src/builtins.c | 18 ++++++++--- source/libs/scalar/src/sclfunc.c | 47 ++++++++++++++++++++++------- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a19acb096d..4cbc91fa6a 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -508,12 +508,19 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { break; } - case FUNCTION_TYPE_CONCAT: { - int32_t paraTypeFirst, totalBytes = 0; - for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) { + case FUNCTION_TYPE_CONCAT: + case FUNCTION_TYPE_CONCAT_WS: { + int32_t paraTypeFirst, totalBytes = 0, sepBytes = 0; + int32_t firstParamIndex = 0; + if (pFunc->funcType == FUNCTION_TYPE_CONCAT_WS) { + firstParamIndex = 1; + SColumnNode* pSep = nodesListGetNode(pFunc->pParameterList, 0); + sepBytes = pSep->node.resType.type; + } + for (int32_t i = firstParamIndex; i < pFunc->pParameterList->length; ++i) { SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i); int32_t paraType = pParam->node.resType.type; - if (i == 0) { + if (i == firstParamIndex) { paraTypeFirst = paraType; } if (paraType != paraTypeFirst) { @@ -521,10 +528,11 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { } totalBytes += pParam->node.resType.bytes; } + //TODO: need to get numOfRows to decide how much space separator needed. Currently set to 100. + totalBytes += sepBytes * (pFunc->pParameterList->length - 2) * 100; pFunc->node.resType = (SDataType) { .bytes = totalBytes, .type = paraTypeFirst }; break; } - case FUNCTION_TYPE_CONCAT_WS: case FUNCTION_TYPE_LOWER: case FUNCTION_TYPE_UPPER: case FUNCTION_TYPE_LTRIM: diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 864b8acf75..f1eab5c05c 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -374,7 +374,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu offset += dataTLen; } - pOutput->numOfRows = pInput->numOfRows; + pOutput->numOfRows = numOfRows; taosMemoryFree(input); taosMemoryFree(pInputData); @@ -388,45 +388,70 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *)); SColumnInfoData *pOutputData = pOutput->columnData; + char **input = taosMemoryCalloc(inputNum, POINTER_BYTES); + char *output = NULL; + int32_t inputLen = 0; + int32_t numOfRows = pInput[1].numOfRows; for (int32_t i = 0; i < inputNum; ++i) { if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { return TSDB_CODE_FAILED; } pInputData[i] = pInput[i].columnData; + if (i == 0) { + // calculate required separator space + inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * (inputNum - 2); + } else { + inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; + } + input[i] = pInputData[i]->pData; } - for (int32_t k = 0; k < pInput->numOfRows; ++k) { + //allocate output buf + if (pOutputData->pData == NULL) { + int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE; + pOutputData->pData = taosMemoryCalloc(outputLen, sizeof(char)); + pOutputData->info.type = GET_PARAM_TYPE(pInput); + pOutputData->info.bytes = outputLen; + pOutputData->varmeta.length = outputLen; + pOutputData->varmeta.allocLen = outputLen; + } + output = pOutputData->pData; + + int32_t offset = 0; + for (int32_t k = 0; k < numOfRows; ++k) { char *sep = pInputData[0]->pData; if (colDataIsNull_f(pInputData[0]->nullbitmap, k)) { colDataSetNull_f(pOutputData->nullbitmap, k); continue; } - char *in = NULL; - char *out = pOutputData->pData + k * GET_PARAM_BYTES(pOutput); - int16_t dataLen = 0; for (int32_t i = 1; i < inputNum; ++i) { if (colDataIsNull_f(pInputData[i]->nullbitmap, k)) { continue; } - in = pInputData[i]->pData + k * GET_PARAM_BYTES(&pInput[i]); - memcpy(varDataVal(out) + dataLen, varDataVal(in), varDataLen(in)); - dataLen += varDataLen(in); + memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i])); + dataLen += varDataLen(input[i]); + input[i] += varDataTLen(input[i]); if (i < inputNum - 1) { //insert the separator - memcpy(varDataVal(out) + dataLen, varDataVal(sep), varDataLen(sep)); + memcpy(varDataVal(output) + dataLen, varDataVal(sep), varDataLen(sep)); dataLen += varDataLen(sep); } } - varDataSetLen(out, dataLen); + varDataSetLen(output, dataLen); + int32_t dataTLen = varDataTLen(output); + output += dataTLen; + pOutputData->varmeta.offset[k] = offset; + offset += dataTLen; } - pOutput->numOfRows = pInput->numOfRows; + pOutput->numOfRows = numOfRows; + taosMemoryFree(input); taosMemoryFree(pInputData); return TSDB_CODE_SUCCESS;