[TD-14241]: concat/concat_ws fix input has both constant and column
issue
This commit is contained in:
parent
6eb6cfe279
commit
dddf621aa7
|
@ -526,6 +526,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
if (paraType != paraTypeFirst) {
|
if (paraType != paraTypeFirst) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
//TODO: for constants also needs numOfRows
|
||||||
totalBytes += pParam->node.resType.bytes;
|
totalBytes += pParam->node.resType.bytes;
|
||||||
}
|
}
|
||||||
//TODO: need to get numOfRows to decide how much space separator needed. Currently set to 100.
|
//TODO: need to get numOfRows to decide how much space separator needed. Currently set to 100.
|
||||||
|
|
|
@ -310,7 +310,13 @@ int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
pOutput->numOfRows = pInput->numOfRows;
|
pOutput->numOfRows = pInput->numOfRows;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
void allocateOutputBuf() {
|
|
||||||
|
static void setVarTypeOutputBuf(SColumnInfoData *pOutputData, int32_t len, int32_t type) {
|
||||||
|
pOutputData->pData = taosMemoryCalloc(len, sizeof(char));
|
||||||
|
pOutputData->info.type = type;
|
||||||
|
pOutputData->info.bytes = len;
|
||||||
|
pOutputData->varmeta.length = len;
|
||||||
|
pOutputData->varmeta.allocLen = len;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
@ -324,25 +330,30 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
||||||
char *output = NULL;
|
char *output = NULL;
|
||||||
|
|
||||||
int32_t inputLen = 0;
|
int32_t inputLen = 0;
|
||||||
int32_t numOfRows = pInput->numOfRows;
|
int32_t numOfRows = 0;
|
||||||
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
|
if (pInput[i].numOfRows > numOfRows) {
|
||||||
|
numOfRows = pInput[i].numOfRows;
|
||||||
|
}
|
||||||
|
}
|
||||||
for (int32_t i = 0; i < inputNum; ++i) {
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
|
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
|
||||||
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) {
|
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
pInputData[i] = pInput[i].columnData;
|
pInputData[i] = pInput[i].columnData;
|
||||||
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
|
|
||||||
input[i] = pInputData[i]->pData;
|
input[i] = pInputData[i]->pData;
|
||||||
|
if (pInput[i].numOfRows == 1) {
|
||||||
|
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows;
|
||||||
|
} else {
|
||||||
|
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//allocate output buf
|
//allocate output buf
|
||||||
if (pOutputData->pData == NULL) {
|
if (pOutputData->pData == NULL) {
|
||||||
int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
|
int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
|
||||||
pOutputData->pData = taosMemoryCalloc(outputLen, sizeof(char));
|
setVarTypeOutputBuf(pOutputData, outputLen, GET_PARAM_TYPE(pInput));
|
||||||
pOutputData->info.type = GET_PARAM_TYPE(pInput);
|
|
||||||
pOutputData->info.bytes = outputLen;
|
|
||||||
pOutputData->varmeta.length = outputLen;
|
|
||||||
pOutputData->varmeta.allocLen = outputLen;
|
|
||||||
}
|
}
|
||||||
output = pOutputData->pData;
|
output = pOutputData->pData;
|
||||||
|
|
||||||
|
@ -365,8 +376,10 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
||||||
for (int32_t i = 0; i < inputNum; ++i) {
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i]));
|
memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i]));
|
||||||
dataLen += varDataLen(input[i]);
|
dataLen += varDataLen(input[i]);
|
||||||
|
if (pInput[i].numOfRows != 1) {
|
||||||
input[i] += varDataTLen(input[i]);
|
input[i] += varDataTLen(input[i]);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
varDataSetLen(output, dataLen);
|
varDataSetLen(output, dataLen);
|
||||||
int32_t dataTLen = varDataTLen(output);
|
int32_t dataTLen = varDataTLen(output);
|
||||||
output += dataTLen;
|
output += dataTLen;
|
||||||
|
@ -392,7 +405,12 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
char *output = NULL;
|
char *output = NULL;
|
||||||
|
|
||||||
int32_t inputLen = 0;
|
int32_t inputLen = 0;
|
||||||
int32_t numOfRows = pInput[1].numOfRows;
|
int32_t numOfRows = 0;
|
||||||
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
|
if (pInput[i].numOfRows > numOfRows) {
|
||||||
|
numOfRows = pInput[i].numOfRows;
|
||||||
|
}
|
||||||
|
}
|
||||||
for (int32_t i = 0; i < inputNum; ++i) {
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
|
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
|
||||||
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) {
|
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) {
|
||||||
|
@ -401,7 +419,9 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
pInputData[i] = pInput[i].columnData;
|
pInputData[i] = pInput[i].columnData;
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
// calculate required separator space
|
// calculate required separator space
|
||||||
inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * (inputNum - 2);
|
inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2);
|
||||||
|
} else if (pInput[i].numOfRows == 1) {
|
||||||
|
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows;
|
||||||
} else {
|
} else {
|
||||||
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
|
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
|
||||||
}
|
}
|
||||||
|
@ -411,11 +431,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
//allocate output buf
|
//allocate output buf
|
||||||
if (pOutputData->pData == NULL) {
|
if (pOutputData->pData == NULL) {
|
||||||
int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
|
int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
|
||||||
pOutputData->pData = taosMemoryCalloc(outputLen, sizeof(char));
|
setVarTypeOutputBuf(pOutputData, outputLen, GET_PARAM_TYPE(&pInput[1]));
|
||||||
pOutputData->info.type = GET_PARAM_TYPE(pInput);
|
|
||||||
pOutputData->info.bytes = outputLen;
|
|
||||||
pOutputData->varmeta.length = outputLen;
|
|
||||||
pOutputData->varmeta.allocLen = outputLen;
|
|
||||||
}
|
}
|
||||||
output = pOutputData->pData;
|
output = pOutputData->pData;
|
||||||
|
|
||||||
|
@ -435,7 +451,9 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
|
|
||||||
memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i]));
|
memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i]));
|
||||||
dataLen += varDataLen(input[i]);
|
dataLen += varDataLen(input[i]);
|
||||||
|
if (pInput[i].numOfRows != 1) {
|
||||||
input[i] += varDataTLen(input[i]);
|
input[i] += varDataTLen(input[i]);
|
||||||
|
}
|
||||||
|
|
||||||
if (i < inputNum - 1) {
|
if (i < inputNum - 1) {
|
||||||
//insert the separator
|
//insert the separator
|
||||||
|
|
Loading…
Reference in New Issue