Merge pull request #11221 from taosdata/feature/3.0_glzhao
[TD-14241]: add string functions
This commit is contained in:
commit
f74771cd3f
|
@ -72,10 +72,15 @@ typedef enum EFunctionType {
|
||||||
FUNCTION_TYPE_ATAN,
|
FUNCTION_TYPE_ATAN,
|
||||||
|
|
||||||
// string function
|
// string function
|
||||||
FUNCTION_TYPE_CHAR_LENGTH = 1500,
|
FUNCTION_TYPE_LENGTH = 1500,
|
||||||
|
FUNCTION_TYPE_CHAR_LENGTH,
|
||||||
FUNCTION_TYPE_CONCAT,
|
FUNCTION_TYPE_CONCAT,
|
||||||
FUNCTION_TYPE_CONCAT_WS,
|
FUNCTION_TYPE_CONCAT_WS,
|
||||||
FUNCTION_TYPE_LENGTH,
|
FUNCTION_TYPE_LOWER,
|
||||||
|
FUNCTION_TYPE_UPPER,
|
||||||
|
FUNCTION_TYPE_LTRIM,
|
||||||
|
FUNCTION_TYPE_RTRIM,
|
||||||
|
FUNCTION_TYPE_SUBSTR,
|
||||||
|
|
||||||
// conversion function
|
// conversion function
|
||||||
FUNCTION_TYPE_CAST = 2000,
|
FUNCTION_TYPE_CAST = 2000,
|
||||||
|
|
|
@ -42,6 +42,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type);
|
||||||
int32_t vectorGetConvertType(int32_t type1, int32_t type2);
|
int32_t vectorGetConvertType(int32_t type1, int32_t type2);
|
||||||
int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut);
|
int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut);
|
||||||
|
|
||||||
|
/* Math functions */
|
||||||
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
@ -58,6 +59,17 @@ int32_t ceilFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
||||||
int32_t floorFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t floorFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
||||||
|
/* String functions */
|
||||||
|
int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t lowerFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
||||||
bool getTimePseudoFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getTimePseudoFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
|
||||||
int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
|
@ -282,6 +282,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.sprocessFunc = atanFunction,
|
.sprocessFunc = atanFunction,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "length",
|
||||||
|
.type = FUNCTION_TYPE_LENGTH,
|
||||||
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = NULL,
|
||||||
|
.initFunc = NULL,
|
||||||
|
.sprocessFunc = lengthFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "char_length",
|
||||||
|
.type = FUNCTION_TYPE_CHAR_LENGTH,
|
||||||
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = NULL,
|
||||||
|
.initFunc = NULL,
|
||||||
|
.sprocessFunc = charLengthFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "concat",
|
.name = "concat",
|
||||||
.type = FUNCTION_TYPE_CONCAT,
|
.type = FUNCTION_TYPE_CONCAT,
|
||||||
|
@ -289,7 +309,67 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.checkFunc = stubCheckAndGetResultType,
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
.getEnvFunc = NULL,
|
.getEnvFunc = NULL,
|
||||||
.initFunc = NULL,
|
.initFunc = NULL,
|
||||||
.sprocessFunc = NULL,
|
.sprocessFunc = concatFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "concat_ws",
|
||||||
|
.type = FUNCTION_TYPE_CONCAT_WS,
|
||||||
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = NULL,
|
||||||
|
.initFunc = NULL,
|
||||||
|
.sprocessFunc = concatWsFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "lower",
|
||||||
|
.type = FUNCTION_TYPE_LOWER,
|
||||||
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = NULL,
|
||||||
|
.initFunc = NULL,
|
||||||
|
.sprocessFunc = lowerFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "upper",
|
||||||
|
.type = FUNCTION_TYPE_UPPER,
|
||||||
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = NULL,
|
||||||
|
.initFunc = NULL,
|
||||||
|
.sprocessFunc = upperFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "ltrim",
|
||||||
|
.type = FUNCTION_TYPE_LTRIM,
|
||||||
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = NULL,
|
||||||
|
.initFunc = NULL,
|
||||||
|
.sprocessFunc = ltrimFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "rtrim",
|
||||||
|
.type = FUNCTION_TYPE_RTRIM,
|
||||||
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = NULL,
|
||||||
|
.initFunc = NULL,
|
||||||
|
.sprocessFunc = rtrimFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "substr",
|
||||||
|
.type = FUNCTION_TYPE_SUBSTR,
|
||||||
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = NULL,
|
||||||
|
.initFunc = NULL,
|
||||||
|
.sprocessFunc = substrFunction,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -409,12 +489,6 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType };
|
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType };
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case FUNCTION_TYPE_CONCAT:
|
|
||||||
case FUNCTION_TYPE_ROWTS:
|
|
||||||
case FUNCTION_TYPE_TBNAME: {
|
|
||||||
// todo
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case FUNCTION_TYPE_QENDTS:
|
case FUNCTION_TYPE_QENDTS:
|
||||||
case FUNCTION_TYPE_QSTARTTS:
|
case FUNCTION_TYPE_QSTARTTS:
|
||||||
|
@ -447,6 +521,65 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE };
|
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE };
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case FUNCTION_TYPE_LENGTH:
|
||||||
|
case FUNCTION_TYPE_CHAR_LENGTH: {
|
||||||
|
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_SMALLINT].bytes, .type = TSDB_DATA_TYPE_SMALLINT };
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case FUNCTION_TYPE_CONCAT:
|
||||||
|
case FUNCTION_TYPE_CONCAT_WS: {
|
||||||
|
int32_t paraType, paraBytes = 0;
|
||||||
|
for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) {
|
||||||
|
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i);
|
||||||
|
paraBytes += pParam->node.resType.bytes;
|
||||||
|
paraType = pParam->node.resType.type;
|
||||||
|
}
|
||||||
|
pFunc->node.resType = (SDataType) { .bytes = paraBytes, .type = paraType };
|
||||||
|
break;
|
||||||
|
//int32_t paraTypeFirst, totalBytes = 0, sepBytes = 0;
|
||||||
|
//int32_t firstParamIndex = 0;
|
||||||
|
//if (pFunc->funcType == FUNCTION_TYPE_CONCAT_WS) {
|
||||||
|
// firstParamIndex = 1;
|
||||||
|
// SColumnNode* pSep = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
// sepBytes = pSep->node.resType.type;
|
||||||
|
//}
|
||||||
|
//for (int32_t i = firstParamIndex; i < pFunc->pParameterList->length; ++i) {
|
||||||
|
// SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i);
|
||||||
|
// int32_t paraType = pParam->node.resType.type;
|
||||||
|
// if (i == firstParamIndex) {
|
||||||
|
// paraTypeFirst = paraType;
|
||||||
|
// }
|
||||||
|
// if (paraType != paraTypeFirst) {
|
||||||
|
// return TSDB_CODE_FAILED;
|
||||||
|
// }
|
||||||
|
// //TODO: for constants also needs numOfRows
|
||||||
|
// totalBytes += pParam->node.resType.bytes;
|
||||||
|
//}
|
||||||
|
////TODO: need to get numOfRows to decide how much space separator needed. Currently set to 100.
|
||||||
|
//totalBytes += sepBytes * (pFunc->pParameterList->length - 2) * 100;
|
||||||
|
//pFunc->node.resType = (SDataType) { .bytes = totalBytes, .type = paraTypeFirst };
|
||||||
|
//break;
|
||||||
|
}
|
||||||
|
case FUNCTION_TYPE_LOWER:
|
||||||
|
case FUNCTION_TYPE_UPPER:
|
||||||
|
case FUNCTION_TYPE_LTRIM:
|
||||||
|
case FUNCTION_TYPE_RTRIM:
|
||||||
|
case FUNCTION_TYPE_SUBSTR: {
|
||||||
|
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
int32_t paraType = pParam->node.resType.type;
|
||||||
|
int32_t paraBytes = pParam->node.resType.bytes;
|
||||||
|
pFunc->node.resType = (SDataType) { .bytes = paraBytes, .type = paraType };
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case FUNCTION_TYPE_ROWTS:
|
||||||
|
case FUNCTION_TYPE_TBNAME: {
|
||||||
|
// todo
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case FUNCTION_TYPE_NOW:
|
case FUNCTION_TYPE_NOW:
|
||||||
// todo
|
// todo
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -47,7 +47,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out);
|
||||||
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows);
|
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows);
|
||||||
|
|
||||||
#define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type)
|
#define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type)
|
||||||
#define GET_PARAM_BYTES(_c) ((_c)->pColumnInfoData->info.bytes)
|
#define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes)
|
||||||
|
|
||||||
void sclFreeParam(SScalarParam *param);
|
void sclFreeParam(SScalarParam *param);
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,18 @@
|
||||||
#include "sclInt.h"
|
#include "sclInt.h"
|
||||||
#include "sclvector.h"
|
#include "sclvector.h"
|
||||||
|
|
||||||
|
typedef float (*_float_fn)(float);
|
||||||
|
typedef double (*_double_fn)(double);
|
||||||
|
typedef double (*_double_fn_2)(double, double);
|
||||||
|
typedef int (*_conv_fn)(int);
|
||||||
|
typedef void (*_trim_fn)(char *, char*, int32_t, int32_t);
|
||||||
|
typedef int16_t (*_len_fn)(char *, int32_t);
|
||||||
|
|
||||||
/** Math functions **/
|
/** Math functions **/
|
||||||
|
double tlog(double v, double base) {
|
||||||
|
return log(v) / log(base);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
SColumnInfoData *pInputData = pInput->columnData;
|
SColumnInfoData *pInputData = pInput->columnData;
|
||||||
SColumnInfoData *pOutputData = pOutput->columnData;
|
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||||
|
@ -102,14 +113,6 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef float (*_float_fn)(float);
|
|
||||||
typedef double (*_double_fn)(double);
|
|
||||||
typedef double (*_double_fn_2)(double, double);
|
|
||||||
|
|
||||||
double tlog(double v, double base) {
|
|
||||||
return log(v) / log(base);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
|
int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
|
||||||
int32_t type = GET_PARAM_TYPE(pInput);
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
|
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
|
||||||
|
@ -211,6 +214,407 @@ int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* p
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** String functions **/
|
||||||
|
int16_t tlength(char *input, int32_t type) {
|
||||||
|
return varDataLen(input);
|
||||||
|
}
|
||||||
|
|
||||||
|
int16_t tcharlength(char *input, int32_t type) {
|
||||||
|
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
||||||
|
return varDataLen(input);
|
||||||
|
} else { //NCHAR
|
||||||
|
return varDataLen(input) / TSDB_NCHAR_SIZE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void tltrim(char *input, char *output, int32_t type, int32_t charLen) {
|
||||||
|
int32_t numOfSpaces = 0;
|
||||||
|
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
||||||
|
for (int32_t i = 0; i < charLen; ++i) {
|
||||||
|
if (!isspace(*(varDataVal(input) + i))) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
numOfSpaces++;
|
||||||
|
}
|
||||||
|
} else { //NCHAR
|
||||||
|
for (int32_t i = 0; i < charLen; ++i) {
|
||||||
|
if (!iswspace(*((uint32_t *)varDataVal(input) + i))) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
numOfSpaces++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t resLen;
|
||||||
|
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
||||||
|
resLen = charLen - numOfSpaces;
|
||||||
|
memcpy(varDataVal(output), varDataVal(input) + numOfSpaces, resLen);
|
||||||
|
} else {
|
||||||
|
resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE;
|
||||||
|
memcpy(varDataVal(output), varDataVal(input) + numOfSpaces * TSDB_NCHAR_SIZE, resLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
varDataSetLen(output, resLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
void trtrim(char *input, char *output, int32_t type, int32_t charLen) {
|
||||||
|
int32_t numOfSpaces = 0;
|
||||||
|
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
||||||
|
for (int32_t i = charLen - 1; i >= 0; --i) {
|
||||||
|
if (!isspace(*(varDataVal(input) + i))) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
numOfSpaces++;
|
||||||
|
}
|
||||||
|
} else { //NCHAR
|
||||||
|
for (int32_t i = charLen - 1; i < charLen; ++i) {
|
||||||
|
if (!iswspace(*((uint32_t *)varDataVal(input) + i))) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
numOfSpaces++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t resLen;
|
||||||
|
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
||||||
|
resLen = charLen - numOfSpaces;
|
||||||
|
} else {
|
||||||
|
resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE;
|
||||||
|
}
|
||||||
|
memcpy(varDataVal(output), varDataVal(input), resLen);
|
||||||
|
|
||||||
|
varDataSetLen(output, resLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _len_fn lenFn) {
|
||||||
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
|
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData *pInputData = pInput->columnData;
|
||||||
|
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||||
|
|
||||||
|
char *in = pInputData->pData;
|
||||||
|
int16_t *out = (int16_t *)pOutputData->pData;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
|
if (colDataIsNull_f(pInputData->nullbitmap, i)) {
|
||||||
|
colDataSetNull_f(pOutputData->nullbitmap, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
out[i] = lenFn(in, type);
|
||||||
|
in += varDataTLen(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->numOfRows = pInput->numOfRows;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setVarTypeOutputBuf(SColumnInfoData *pOutputData, int32_t len, int32_t type) {
|
||||||
|
pOutputData->pData = taosMemoryCalloc(len, sizeof(char));
|
||||||
|
pOutputData->info.type = type;
|
||||||
|
pOutputData->info.bytes = len;
|
||||||
|
pOutputData->varmeta.length = len;
|
||||||
|
pOutputData->varmeta.allocLen = len;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
if (inputNum < 2 || inputNum > 8) { // concat accpet 2-8 input strings
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
|
||||||
|
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||||
|
char **input = taosMemoryCalloc(inputNum, POINTER_BYTES);
|
||||||
|
char *outputBuf = NULL;
|
||||||
|
|
||||||
|
int32_t inputLen = 0;
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
|
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
|
||||||
|
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
if (pInput[i].numOfRows > numOfRows) {
|
||||||
|
numOfRows = pInput[i].numOfRows;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
|
pInputData[i] = pInput[i].columnData;
|
||||||
|
input[i] = pInputData[i]->pData;
|
||||||
|
if (pInput[i].numOfRows == 1) {
|
||||||
|
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows;
|
||||||
|
} else {
|
||||||
|
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
|
||||||
|
outputBuf = taosMemoryCalloc(outputLen, 1);
|
||||||
|
char *output = outputBuf;
|
||||||
|
|
||||||
|
bool hasNull = false;
|
||||||
|
for (int32_t k = 0; k < numOfRows; ++k) {
|
||||||
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
|
if (colDataIsNull_s(pInputData[i], k)) {
|
||||||
|
colDataAppendNULL(pOutputData, k);
|
||||||
|
hasNull = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasNull) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int16_t dataLen = 0;
|
||||||
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
|
memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i]));
|
||||||
|
dataLen += varDataLen(input[i]);
|
||||||
|
if (pInput[i].numOfRows != 1) {
|
||||||
|
input[i] += varDataTLen(input[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
varDataSetLen(output, dataLen);
|
||||||
|
colDataAppend(pOutputData, k, output, false);
|
||||||
|
output += varDataTLen(output);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->numOfRows = numOfRows;
|
||||||
|
taosMemoryFree(input);
|
||||||
|
taosMemoryFree(outputBuf);
|
||||||
|
taosMemoryFree(pInputData);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
|
||||||
|
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||||
|
char **input = taosMemoryCalloc(inputNum, POINTER_BYTES);
|
||||||
|
char *outputBuf = NULL;
|
||||||
|
|
||||||
|
int32_t inputLen = 0;
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
for (int32_t i = 1; i < inputNum; ++i) {
|
||||||
|
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
|
||||||
|
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[1])) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
if (pInput[i].numOfRows > numOfRows) {
|
||||||
|
numOfRows = pInput[i].numOfRows;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
|
pInputData[i] = pInput[i].columnData;
|
||||||
|
if (i == 0) {
|
||||||
|
// calculate required separator space
|
||||||
|
int32_t factor = (GET_PARAM_TYPE(&pInput[1]) == TSDB_DATA_TYPE_NCHAR) ? TSDB_NCHAR_SIZE : 1;
|
||||||
|
inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2) * factor;
|
||||||
|
} else if (pInput[i].numOfRows == 1) {
|
||||||
|
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows;
|
||||||
|
} else {
|
||||||
|
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
|
||||||
|
}
|
||||||
|
input[i] = pInputData[i]->pData;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
|
||||||
|
outputBuf = taosMemoryCalloc(outputLen, 1);
|
||||||
|
char *output = outputBuf;
|
||||||
|
|
||||||
|
for (int32_t k = 0; k < numOfRows; ++k) {
|
||||||
|
if (colDataIsNull_s(pInputData[0], k)) {
|
||||||
|
colDataAppendNULL(pOutputData, k);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int16_t dataLen = 0;
|
||||||
|
for (int32_t i = 1; i < inputNum; ++i) {
|
||||||
|
if (colDataIsNull_s(pInputData[i], k)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i]));
|
||||||
|
dataLen += varDataLen(input[i]);
|
||||||
|
if (pInput[i].numOfRows != 1) {
|
||||||
|
input[i] += varDataTLen(input[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i < inputNum - 1) {
|
||||||
|
//insert the separator
|
||||||
|
char *sep = pInputData[0]->pData;
|
||||||
|
memcpy(varDataVal(output) + dataLen, varDataVal(sep), varDataLen(sep));
|
||||||
|
dataLen += varDataLen(sep);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
varDataSetLen(output, dataLen);
|
||||||
|
colDataAppend(pOutputData, k, output, false);
|
||||||
|
output += varDataTLen(output);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->numOfRows = numOfRows;
|
||||||
|
taosMemoryFree(input);
|
||||||
|
taosMemoryFree(outputBuf);
|
||||||
|
taosMemoryFree(pInputData);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) {
|
||||||
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
|
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData *pInputData = pInput->columnData;
|
||||||
|
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||||
|
|
||||||
|
char *input = pInputData->pData;
|
||||||
|
char *output = NULL;
|
||||||
|
|
||||||
|
int32_t outputLen = pInputData->varmeta.length;
|
||||||
|
char *outputBuf = taosMemoryCalloc(outputLen, 1);
|
||||||
|
output = outputBuf;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
|
if (colDataIsNull_s(pInputData, i)) {
|
||||||
|
colDataAppendNULL(pOutputData, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = varDataLen(input);
|
||||||
|
if (type == TSDB_DATA_TYPE_VARCHAR) {
|
||||||
|
for (int32_t j = 0; j < len; ++j) {
|
||||||
|
*(varDataVal(output) + j) = convFn(*(varDataVal(input) + j));
|
||||||
|
}
|
||||||
|
} else { //NCHAR
|
||||||
|
for (int32_t j = 0; j < len / TSDB_NCHAR_SIZE; ++j) {
|
||||||
|
*((uint32_t *)varDataVal(output) + j) = convFn(*((uint32_t *)varDataVal(input) + j));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
varDataSetLen(output, len);
|
||||||
|
colDataAppend(pOutputData, i, output, false);
|
||||||
|
input += varDataTLen(input);
|
||||||
|
output += varDataTLen(output);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->numOfRows = pInput->numOfRows;
|
||||||
|
taosMemoryFree(outputBuf);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) {
|
||||||
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
|
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData *pInputData = pInput->columnData;
|
||||||
|
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||||
|
|
||||||
|
char *input = pInputData->pData;
|
||||||
|
char *output = NULL;
|
||||||
|
|
||||||
|
int32_t outputLen = pInputData->varmeta.length;
|
||||||
|
char *outputBuf = taosMemoryCalloc(outputLen, 1);
|
||||||
|
output = outputBuf;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
|
if (colDataIsNull_s(pInputData, i)) {
|
||||||
|
colDataAppendNULL(pOutputData, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = varDataLen(input);
|
||||||
|
int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE;
|
||||||
|
trimFn(input, output, type, charLen);
|
||||||
|
|
||||||
|
varDataSetLen(output, len);
|
||||||
|
colDataAppend(pOutputData, i, output, false);
|
||||||
|
input += varDataTLen(input);
|
||||||
|
output += varDataTLen(output);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->numOfRows = pInput->numOfRows;
|
||||||
|
taosMemoryFree(outputBuf);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
if (inputNum != 2 && inputNum!= 3) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t subPos = 0;
|
||||||
|
GET_TYPED_DATA(subPos, int32_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData);
|
||||||
|
if (subPos == 0) { //subPos needs to be positive or negative values;
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t subLen = INT16_MAX;
|
||||||
|
if (inputNum == 3) {
|
||||||
|
GET_TYPED_DATA(subLen, int32_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData);
|
||||||
|
if (subLen < 0) { //subLen cannot be negative
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
subLen = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subLen : subLen * TSDB_NCHAR_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData *pInputData = pInput->columnData;
|
||||||
|
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||||
|
|
||||||
|
char *input = pInputData->pData;
|
||||||
|
char *output = NULL;
|
||||||
|
|
||||||
|
int32_t outputLen = pInputData->varmeta.length;
|
||||||
|
char *outputBuf = taosMemoryCalloc(outputLen, 1);
|
||||||
|
output = outputBuf;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
|
if (colDataIsNull_s(pInputData, i)) {
|
||||||
|
colDataAppendNULL(pOutputData, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = varDataLen(input);
|
||||||
|
int32_t startPosBytes;
|
||||||
|
|
||||||
|
if (subPos > 0) {
|
||||||
|
startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subPos - 1 : (subPos - 1) * TSDB_NCHAR_SIZE;
|
||||||
|
startPosBytes = MIN(startPosBytes, len);
|
||||||
|
} else {
|
||||||
|
startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? len + subPos : len + subPos * TSDB_NCHAR_SIZE;
|
||||||
|
startPosBytes = MAX(startPosBytes, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
subLen = MIN(subLen, len - startPosBytes);
|
||||||
|
if (subLen > 0) {
|
||||||
|
memcpy(varDataVal(output), varDataVal(input) + startPosBytes, subLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
varDataSetLen(output, subLen);
|
||||||
|
colDataAppend(pOutputData, i , output, false);
|
||||||
|
input += varDataTLen(input);
|
||||||
|
output += varDataTLen(output);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->numOfRows = pInput->numOfRows;
|
||||||
|
taosMemoryFree(outputBuf);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t atanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t atanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
return doScalarFunctionUnique(pInput, inputNum, pOutput, atan);
|
return doScalarFunctionUnique(pInput, inputNum, pOutput, atan);
|
||||||
}
|
}
|
||||||
|
@ -259,57 +663,28 @@ int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut
|
||||||
return doScalarFunction(pInput, inputNum, pOutput, roundf, round);
|
return doScalarFunction(pInput, inputNum, pOutput, roundf, round);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tlength(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
|
int32_t lowerFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
assert(numOfInput == 1);
|
return doCaseConvFunction(pInput, inputNum, pOutput, tolower);
|
||||||
#if 0
|
|
||||||
int64_t* out = (int64_t*) pOutput->data;
|
|
||||||
char* s = pLeft->data;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < pLeft->num; ++i) {
|
|
||||||
out[i] = varDataLen(POINTER_SHIFT(s, i * pLeft->bytes));
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tconcat(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
|
int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
assert(numOfInput > 0);
|
return doCaseConvFunction(pInput, inputNum, pOutput, toupper);
|
||||||
#if 0
|
|
||||||
int32_t rowLen = 0;
|
|
||||||
int32_t num = 1;
|
|
||||||
for(int32_t i = 0; i < numOfInput; ++i) {
|
|
||||||
rowLen += pLeft[i].bytes;
|
|
||||||
|
|
||||||
if (pLeft[i].num > 1) {
|
|
||||||
num = pLeft[i].num;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pOutput->data = taosMemoryRealloc(pOutput->data, rowLen * num);
|
|
||||||
assert(pOutput->data);
|
|
||||||
|
|
||||||
char* rstart = pOutput->data;
|
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
|
||||||
|
|
||||||
char* s = rstart;
|
|
||||||
varDataSetLen(s, 0);
|
|
||||||
for (int32_t j = 0; j < numOfInput; ++j) {
|
|
||||||
char* p1 = POINTER_SHIFT(pLeft[j].data, i * pLeft[j].bytes);
|
|
||||||
|
|
||||||
memcpy(varDataVal(s) + varDataLen(s), varDataVal(p1), varDataLen(p1));
|
|
||||||
varDataLen(s) += varDataLen(p1);
|
|
||||||
}
|
|
||||||
|
|
||||||
rstart += rowLen;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tltrim(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
|
int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
return doTrimFunction(pInput, inputNum, pOutput, tltrim);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void trtrim(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
|
int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
return doTrimFunction(pInput, inputNum, pOutput, trtrim);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
return doLengthFunction(pInput, inputNum, pOutput, tlength);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
return doLengthFunction(pInput, inputNum, pOutput, tcharlength);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
|
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
|
||||||
|
|
Loading…
Reference in New Issue