[TD-14241]<feature>: concat/concat_ws support nchar column wtih constant
This commit is contained in:
parent
8bcdf8e200
commit
98cc544e5b
|
@ -530,10 +530,32 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
case FUNCTION_TYPE_CONCAT:
|
case FUNCTION_TYPE_CONCAT:
|
||||||
case FUNCTION_TYPE_CONCAT_WS: {
|
case FUNCTION_TYPE_CONCAT_WS: {
|
||||||
int32_t paraType, paraBytes = 0;
|
int32_t paraType, paraBytes = 0;
|
||||||
|
bool typeSet = false;
|
||||||
for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) {
|
for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) {
|
||||||
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i);
|
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i);
|
||||||
paraBytes += pParam->node.resType.bytes;
|
if (pParam->node.type == QUERY_NODE_COLUMN) {
|
||||||
paraType = pParam->node.resType.type;
|
if (typeSet == false) {
|
||||||
|
paraType = pParam->node.resType.type;
|
||||||
|
typeSet = true;
|
||||||
|
} else {
|
||||||
|
//columns have to be the same type
|
||||||
|
if (paraType != pParam->node.resType.type) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
paraBytes += pParam->node.resType.bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) {
|
||||||
|
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i);
|
||||||
|
if (pParam->node.type == QUERY_NODE_VALUE) {
|
||||||
|
if (paraType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
paraBytes += pParam->node.resType.bytes * TSDB_NCHAR_SIZE;
|
||||||
|
} else {
|
||||||
|
paraBytes += pParam->node.resType.bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pFunc->node.resType = (SDataType) { .bytes = paraBytes, .type = paraType };
|
pFunc->node.resType = (SDataType) { .bytes = paraBytes, .type = paraType };
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -12,7 +12,7 @@ typedef void (*_trim_fn)(char *, char*, int32_t, int32_t);
|
||||||
typedef int16_t (*_len_fn)(char *, int32_t);
|
typedef int16_t (*_len_fn)(char *, int32_t);
|
||||||
|
|
||||||
/** Math functions **/
|
/** Math functions **/
|
||||||
double tlog(double v, double base) {
|
static double tlog(double v, double base) {
|
||||||
return log(v) / log(base);
|
return log(v) / log(base);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,7 +113,7 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
|
static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
|
||||||
int32_t type = GET_PARAM_TYPE(pInput);
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
|
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -138,7 +138,7 @@ int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarPa
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) {
|
static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) {
|
||||||
if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) {
|
if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -167,7 +167,7 @@ int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarP
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) {
|
static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) {
|
||||||
int32_t type = GET_PARAM_TYPE(pInput);
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
|
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -215,11 +215,11 @@ int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* p
|
||||||
}
|
}
|
||||||
|
|
||||||
/** String functions **/
|
/** String functions **/
|
||||||
int16_t tlength(char *input, int32_t type) {
|
static int16_t tlength(char *input, int32_t type) {
|
||||||
return varDataLen(input);
|
return varDataLen(input);
|
||||||
}
|
}
|
||||||
|
|
||||||
int16_t tcharlength(char *input, int32_t type) {
|
static int16_t tcharlength(char *input, int32_t type) {
|
||||||
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
||||||
return varDataLen(input);
|
return varDataLen(input);
|
||||||
} else { //NCHAR
|
} else { //NCHAR
|
||||||
|
@ -227,7 +227,7 @@ int16_t tcharlength(char *input, int32_t type) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tltrim(char *input, char *output, int32_t type, int32_t charLen) {
|
static void tltrim(char *input, char *output, int32_t type, int32_t charLen) {
|
||||||
int32_t numOfSpaces = 0;
|
int32_t numOfSpaces = 0;
|
||||||
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
||||||
for (int32_t i = 0; i < charLen; ++i) {
|
for (int32_t i = 0; i < charLen; ++i) {
|
||||||
|
@ -257,7 +257,7 @@ void tltrim(char *input, char *output, int32_t type, int32_t charLen) {
|
||||||
varDataSetLen(output, resLen);
|
varDataSetLen(output, resLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
void trtrim(char *input, char *output, int32_t type, int32_t charLen) {
|
static void trtrim(char *input, char *output, int32_t type, int32_t charLen) {
|
||||||
int32_t numOfSpaces = 0;
|
int32_t numOfSpaces = 0;
|
||||||
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
||||||
for (int32_t i = charLen - 1; i >= 0; --i) {
|
for (int32_t i = charLen - 1; i >= 0; --i) {
|
||||||
|
@ -286,7 +286,7 @@ void trtrim(char *input, char *output, int32_t type, int32_t charLen) {
|
||||||
varDataSetLen(output, resLen);
|
varDataSetLen(output, resLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _len_fn lenFn) {
|
static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _len_fn lenFn) {
|
||||||
int32_t type = GET_PARAM_TYPE(pInput);
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
|
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -312,12 +312,22 @@ int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setVarTypeOutputBuf(SColumnInfoData *pOutputData, int32_t len, int32_t type) {
|
static int32_t concatCopyHelper(const char *input, char *output, bool hasNcharCol, int32_t type, int16_t *dataLen) {
|
||||||
pOutputData->pData = taosMemoryCalloc(len, sizeof(char));
|
if (hasNcharCol && type == TSDB_DATA_TYPE_VARCHAR) {
|
||||||
pOutputData->info.type = type;
|
TdUcs4 *newBuf = taosMemoryCalloc((varDataLen(input) + 1) * TSDB_NCHAR_SIZE, 1);
|
||||||
pOutputData->info.bytes = len;
|
bool ret = taosMbsToUcs4(varDataVal(input), varDataLen(input), newBuf, (varDataLen(input) + 1) * TSDB_NCHAR_SIZE, NULL);
|
||||||
pOutputData->varmeta.length = len;
|
if (!ret) {
|
||||||
pOutputData->varmeta.allocLen = len;
|
taosMemoryFree(newBuf);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
memcpy(varDataVal(output) + *dataLen, newBuf, varDataLen(input) * TSDB_NCHAR_SIZE);
|
||||||
|
*dataLen += varDataLen(input) * TSDB_NCHAR_SIZE;
|
||||||
|
taosMemoryFree(newBuf);
|
||||||
|
} else {
|
||||||
|
memcpy(varDataVal(output) + *dataLen, varDataVal(input), varDataLen(input));
|
||||||
|
*dataLen += varDataLen(input);
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
@ -332,11 +342,15 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
||||||
|
|
||||||
int32_t inputLen = 0;
|
int32_t inputLen = 0;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
bool hasNcharCol = false;
|
||||||
for (int32_t i = 0; i < inputNum; ++i) {
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
|
int32_t type = GET_PARAM_TYPE(&pInput[i]);
|
||||||
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) {
|
if (!IS_VAR_DATA_TYPE(type)) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
hasNcharCol = true;
|
||||||
|
}
|
||||||
if (pInput[i].numOfRows > numOfRows) {
|
if (pInput[i].numOfRows > numOfRows) {
|
||||||
numOfRows = pInput[i].numOfRows;
|
numOfRows = pInput[i].numOfRows;
|
||||||
}
|
}
|
||||||
|
@ -344,8 +358,12 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
||||||
for (int32_t i = 0; i < inputNum; ++i) {
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
pInputData[i] = pInput[i].columnData;
|
pInputData[i] = pInput[i].columnData;
|
||||||
input[i] = pInputData[i]->pData;
|
input[i] = pInputData[i]->pData;
|
||||||
|
int32_t factor = 1;
|
||||||
|
if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
|
||||||
|
factor = TSDB_NCHAR_SIZE;
|
||||||
|
}
|
||||||
if (pInput[i].numOfRows == 1) {
|
if (pInput[i].numOfRows == 1) {
|
||||||
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows;
|
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * factor * numOfRows;
|
||||||
} else {
|
} else {
|
||||||
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
|
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
|
||||||
}
|
}
|
||||||
|
@ -371,8 +389,10 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
||||||
|
|
||||||
int16_t dataLen = 0;
|
int16_t dataLen = 0;
|
||||||
for (int32_t i = 0; i < inputNum; ++i) {
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i]));
|
int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen);
|
||||||
dataLen += varDataLen(input[i]);
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
if (pInput[i].numOfRows != 1) {
|
if (pInput[i].numOfRows != 1) {
|
||||||
input[i] += varDataTLen(input[i]);
|
input[i] += varDataTLen(input[i]);
|
||||||
}
|
}
|
||||||
|
@ -390,6 +410,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator
|
if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -402,27 +423,34 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
|
|
||||||
int32_t inputLen = 0;
|
int32_t inputLen = 0;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
bool hasNcharCol = false;
|
||||||
for (int32_t i = 1; i < inputNum; ++i) {
|
for (int32_t i = 1; i < inputNum; ++i) {
|
||||||
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
|
int32_t type = GET_PARAM_TYPE(&pInput[i]);
|
||||||
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[1])) {
|
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i]))) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
hasNcharCol = true;
|
||||||
|
}
|
||||||
if (pInput[i].numOfRows > numOfRows) {
|
if (pInput[i].numOfRows > numOfRows) {
|
||||||
numOfRows = pInput[i].numOfRows;
|
numOfRows = pInput[i].numOfRows;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < inputNum; ++i) {
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
pInputData[i] = pInput[i].columnData;
|
pInputData[i] = pInput[i].columnData;
|
||||||
|
input[i] = pInputData[i]->pData;
|
||||||
|
int32_t factor = 1;
|
||||||
|
if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
|
||||||
|
factor = TSDB_NCHAR_SIZE;
|
||||||
|
}
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
// calculate required separator space
|
// calculate required separator space
|
||||||
int32_t factor = (GET_PARAM_TYPE(&pInput[1]) == TSDB_DATA_TYPE_NCHAR) ? TSDB_NCHAR_SIZE : 1;
|
|
||||||
inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2) * factor;
|
inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2) * factor;
|
||||||
} else if (pInput[i].numOfRows == 1) {
|
} else if (pInput[i].numOfRows == 1) {
|
||||||
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows;
|
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * factor;
|
||||||
} else {
|
} else {
|
||||||
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
|
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
|
||||||
}
|
}
|
||||||
input[i] = pInputData[i]->pData;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
|
int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
|
||||||
|
@ -441,8 +469,11 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i]));
|
int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen);
|
||||||
dataLen += varDataLen(input[i]);
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
if (pInput[i].numOfRows != 1) {
|
if (pInput[i].numOfRows != 1) {
|
||||||
input[i] += varDataTLen(input[i]);
|
input[i] += varDataTLen(input[i]);
|
||||||
}
|
}
|
||||||
|
@ -450,8 +481,10 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
if (i < inputNum - 1) {
|
if (i < inputNum - 1) {
|
||||||
//insert the separator
|
//insert the separator
|
||||||
char *sep = pInputData[0]->pData;
|
char *sep = pInputData[0]->pData;
|
||||||
memcpy(varDataVal(output) + dataLen, varDataVal(sep), varDataLen(sep));
|
int32_t ret = concatCopyHelper(sep, output, hasNcharCol, GET_PARAM_TYPE(&pInput[0]), &dataLen);
|
||||||
dataLen += varDataLen(sep);
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
varDataSetLen(output, dataLen);
|
varDataSetLen(output, dataLen);
|
||||||
|
@ -467,7 +500,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) {
|
static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) {
|
||||||
int32_t type = GET_PARAM_TYPE(pInput);
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
|
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -512,7 +545,7 @@ int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) {
|
static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) {
|
||||||
int32_t type = GET_PARAM_TYPE(pInput);
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
|
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
|
Loading…
Reference in New Issue