[TD-14241]: concat_ws function adoption
This commit is contained in:
parent
6f6d0e4a09
commit
6eb6cfe279
|
@ -508,12 +508,19 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case FUNCTION_TYPE_CONCAT: {
|
case FUNCTION_TYPE_CONCAT:
|
||||||
int32_t paraTypeFirst, totalBytes = 0;
|
case FUNCTION_TYPE_CONCAT_WS: {
|
||||||
for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) {
|
int32_t paraTypeFirst, totalBytes = 0, sepBytes = 0;
|
||||||
|
int32_t firstParamIndex = 0;
|
||||||
|
if (pFunc->funcType == FUNCTION_TYPE_CONCAT_WS) {
|
||||||
|
firstParamIndex = 1;
|
||||||
|
SColumnNode* pSep = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
sepBytes = pSep->node.resType.type;
|
||||||
|
}
|
||||||
|
for (int32_t i = firstParamIndex; i < pFunc->pParameterList->length; ++i) {
|
||||||
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i);
|
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i);
|
||||||
int32_t paraType = pParam->node.resType.type;
|
int32_t paraType = pParam->node.resType.type;
|
||||||
if (i == 0) {
|
if (i == firstParamIndex) {
|
||||||
paraTypeFirst = paraType;
|
paraTypeFirst = paraType;
|
||||||
}
|
}
|
||||||
if (paraType != paraTypeFirst) {
|
if (paraType != paraTypeFirst) {
|
||||||
|
@ -521,10 +528,11 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
}
|
}
|
||||||
totalBytes += pParam->node.resType.bytes;
|
totalBytes += pParam->node.resType.bytes;
|
||||||
}
|
}
|
||||||
|
//TODO: need to get numOfRows to decide how much space separator needed. Currently set to 100.
|
||||||
|
totalBytes += sepBytes * (pFunc->pParameterList->length - 2) * 100;
|
||||||
pFunc->node.resType = (SDataType) { .bytes = totalBytes, .type = paraTypeFirst };
|
pFunc->node.resType = (SDataType) { .bytes = totalBytes, .type = paraTypeFirst };
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case FUNCTION_TYPE_CONCAT_WS:
|
|
||||||
case FUNCTION_TYPE_LOWER:
|
case FUNCTION_TYPE_LOWER:
|
||||||
case FUNCTION_TYPE_UPPER:
|
case FUNCTION_TYPE_UPPER:
|
||||||
case FUNCTION_TYPE_LTRIM:
|
case FUNCTION_TYPE_LTRIM:
|
||||||
|
|
|
@ -374,7 +374,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
||||||
offset += dataTLen;
|
offset += dataTLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOutput->numOfRows = pInput->numOfRows;
|
pOutput->numOfRows = numOfRows;
|
||||||
taosMemoryFree(input);
|
taosMemoryFree(input);
|
||||||
taosMemoryFree(pInputData);
|
taosMemoryFree(pInputData);
|
||||||
|
|
||||||
|
@ -388,45 +388,70 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
|
|
||||||
SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
|
SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
|
||||||
SColumnInfoData *pOutputData = pOutput->columnData;
|
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||||
|
char **input = taosMemoryCalloc(inputNum, POINTER_BYTES);
|
||||||
|
char *output = NULL;
|
||||||
|
|
||||||
|
int32_t inputLen = 0;
|
||||||
|
int32_t numOfRows = pInput[1].numOfRows;
|
||||||
for (int32_t i = 0; i < inputNum; ++i) {
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
|
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
|
||||||
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) {
|
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
pInputData[i] = pInput[i].columnData;
|
pInputData[i] = pInput[i].columnData;
|
||||||
|
if (i == 0) {
|
||||||
|
// calculate required separator space
|
||||||
|
inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * (inputNum - 2);
|
||||||
|
} else {
|
||||||
|
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
|
||||||
|
}
|
||||||
|
input[i] = pInputData[i]->pData;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t k = 0; k < pInput->numOfRows; ++k) {
|
//allocate output buf
|
||||||
|
if (pOutputData->pData == NULL) {
|
||||||
|
int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
|
||||||
|
pOutputData->pData = taosMemoryCalloc(outputLen, sizeof(char));
|
||||||
|
pOutputData->info.type = GET_PARAM_TYPE(pInput);
|
||||||
|
pOutputData->info.bytes = outputLen;
|
||||||
|
pOutputData->varmeta.length = outputLen;
|
||||||
|
pOutputData->varmeta.allocLen = outputLen;
|
||||||
|
}
|
||||||
|
output = pOutputData->pData;
|
||||||
|
|
||||||
|
int32_t offset = 0;
|
||||||
|
for (int32_t k = 0; k < numOfRows; ++k) {
|
||||||
char *sep = pInputData[0]->pData;
|
char *sep = pInputData[0]->pData;
|
||||||
if (colDataIsNull_f(pInputData[0]->nullbitmap, k)) {
|
if (colDataIsNull_f(pInputData[0]->nullbitmap, k)) {
|
||||||
colDataSetNull_f(pOutputData->nullbitmap, k);
|
colDataSetNull_f(pOutputData->nullbitmap, k);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *in = NULL;
|
|
||||||
char *out = pOutputData->pData + k * GET_PARAM_BYTES(pOutput);
|
|
||||||
|
|
||||||
int16_t dataLen = 0;
|
int16_t dataLen = 0;
|
||||||
for (int32_t i = 1; i < inputNum; ++i) {
|
for (int32_t i = 1; i < inputNum; ++i) {
|
||||||
if (colDataIsNull_f(pInputData[i]->nullbitmap, k)) {
|
if (colDataIsNull_f(pInputData[i]->nullbitmap, k)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
in = pInputData[i]->pData + k * GET_PARAM_BYTES(&pInput[i]);
|
memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i]));
|
||||||
memcpy(varDataVal(out) + dataLen, varDataVal(in), varDataLen(in));
|
dataLen += varDataLen(input[i]);
|
||||||
dataLen += varDataLen(in);
|
input[i] += varDataTLen(input[i]);
|
||||||
|
|
||||||
if (i < inputNum - 1) {
|
if (i < inputNum - 1) {
|
||||||
//insert the separator
|
//insert the separator
|
||||||
memcpy(varDataVal(out) + dataLen, varDataVal(sep), varDataLen(sep));
|
memcpy(varDataVal(output) + dataLen, varDataVal(sep), varDataLen(sep));
|
||||||
dataLen += varDataLen(sep);
|
dataLen += varDataLen(sep);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
varDataSetLen(out, dataLen);
|
varDataSetLen(output, dataLen);
|
||||||
|
int32_t dataTLen = varDataTLen(output);
|
||||||
|
output += dataTLen;
|
||||||
|
pOutputData->varmeta.offset[k] = offset;
|
||||||
|
offset += dataTLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOutput->numOfRows = pInput->numOfRows;
|
pOutput->numOfRows = numOfRows;
|
||||||
|
taosMemoryFree(input);
|
||||||
taosMemoryFree(pInputData);
|
taosMemoryFree(pInputData);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue