diff --git a/include/libs/function/function.h b/include/libs/function/function.h index e970a0d693..2b03fbd933 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -227,11 +227,13 @@ typedef struct SAggFunctionInfo { } SAggFunctionInfo; typedef struct SScalarParam { - void* data; - bool colData; - int32_t num; - int32_t type; - int32_t bytes; + void *data; + SColumnInfoData *columnData; + char *bitmap; + bool dataInBlock; + int32_t num; + int32_t type; + int32_t bytes; } SScalarParam; typedef struct SScalarFunctionInfo { diff --git a/source/libs/scalar/inc/sclvector.h b/source/libs/scalar/inc/sclvector.h index 55c4828745..fd0f8b896f 100644 --- a/source/libs/scalar/inc/sclvector.h +++ b/source/libs/scalar/inc/sclvector.h @@ -22,6 +22,7 @@ extern "C" { #include "sclfunc.h" +typedef void (*_bufConverteFunc)(char *buf, SScalarParam* pOut, int32_t outType); typedef void (*_bin_scalar_fn_t)(SScalarParam* pLeft, SScalarParam* pRight, void *output, int32_t order); _bin_scalar_fn_t getBinScalarOperatorFn(int32_t binOperator); diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 742c7fd706..05af5f63b4 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -87,6 +87,18 @@ _return: SCL_RET(code); } +bool sclIsNull(SScalarParam* param, int32_t idx) { + if (param->dataInBlock) { + return colDataIsNull(param->columnData, 0, idx, NULL); + } + + return colDataIsNull_f(param->bitmap, idx); +} + +void sclSetNull(SScalarParam* param, int32_t idx) { + colDataSetNull_f(param->bitmap, idx); +} + void sclFreeRes(SHashObj *res) { SScalarParam *p = NULL; @@ -116,7 +128,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t param->num = 1; param->type = valueNode->node.resType.type; param->bytes = valueNode->node.resType.bytes; - param->colData = false; + param->dataInBlock = false; break; } @@ -130,7 +142,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t SCL_ERR_RET(scalarGenerateSetFromList(¶m->data, node, nodeList->dataType.type)); param->num = 1; param->type = SCL_DATA_TYPE_DUMMY_HASH; - param->colData = false; + param->dataInBlock = false; break; } @@ -147,13 +159,9 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t } SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(ctx->pSrc->pDataBlock, ref->slotId); - if (IS_VAR_DATA_TYPE(columnData->info.type)) { - param->data = columnData; - param->colData = true; - } else { - param->data = columnData->pData; - param->colData = false; - } + param->data = NULL; + param->columnData = columnData; + param->dataInBlock = true; param->num = ctx->pSrc->info.rows; param->type = columnData->info.type; @@ -192,20 +200,24 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t return TSDB_CODE_SUCCESS; } -int32_t sclParamMoveNext(SScalarParam *params, int32_t num) { +int32_t sclMoveParamListData(SScalarParam *params, int32_t listNum, int32_t idx) { SScalarParam *param = NULL; - for (int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < listNum; ++i) { param = params + i; if (1 == param->num) { continue; } - if (IS_VAR_DATA_TYPE(param->type)) { - param->data = (char *)(param->data) + varDataTLen(param->data); - } else { - param->data = (char *)(param->data) + tDataTypes[param->type].bytes; + if (param->dataInBlock) { + param->data = colDataGet(param->columnData, idx); + } else if (idx) { + if (IS_VAR_DATA_TYPE(param->type)) { + param->data = (char *)(param->data) + varDataTLen(param->data); + } else { + param->data = (char *)(param->data) + tDataTypes[param->type].bytes; + } } } @@ -281,8 +293,7 @@ int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outpu SScalarFuncExecFuncs ffpSet = {0}; int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); if (code) { - sclError( -"fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); SCL_ERR_RET(code); } @@ -298,15 +309,14 @@ int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outpu } for (int32_t i = 0; i < rowNum; ++i) { + sclMoveParamListData(output, 1, i); + sclMoveParamListData(params, node->pParameterList->length, i); + code = (*ffpSet.process)(params, node->pParameterList->length, output); if (code) { - sclError( -"scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); SCL_ERR_JRET(code); } - - sclParamMoveNext(output, 1); - sclParamMoveNext(params, node->pParameterList->length); } return TSDB_CODE_SUCCESS; @@ -354,6 +364,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o bool value = false; for (int32_t i = 0; i < rowNum; ++i) { + sclMoveParamListData(output, 1, i); + sclMoveParamListData(params, node->pParameterList->length, i); + for (int32_t m = 0; m < node->pParameterList->length; ++m) { GET_TYPED_DATA(value, bool, params[m].type, params[m].data); @@ -367,9 +380,6 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o } *(bool *)output->data = value; - - sclParamMoveNext(output, 1); - sclParamMoveNext(params, node->pParameterList->length); } output->data = data; diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 85af663313..57899f0e82 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -261,6 +261,60 @@ _getValueAddr_fn_t getVectorValueAddrFn(int32_t srcType) { return p; } +static FORCE_INLINE void convertToSigned(char *buf, SScalarParam* pOut, int32_t outType) { + int64_t value = strtoll(buf, NULL, 10); + SET_TYPED_DATA(pOut->data, outType, value); +} + +static FORCE_INLINE void convertToUnsigned(char *buf, SScalarParam* pOut, int32_t outType) { + uint64_t value = strtoull(buf, NULL, 10); + SET_TYPED_DATA(pOut->data, outType, value); +} + +static FORCE_INLINE void convertToFloat(char *buf, SScalarParam* pOut, int32_t outType) { + double value = strtod(tmp, NULL); + SET_TYPED_DATA(output, outType, value); +} + + +int32_t vectorConvertFromVarData(SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) { + int32_t bufSize = 0; + char *tmp = NULL; + _bufConverteFunc func = NULL; + + if (IS_SIGNED_NUMERIC_TYPE(outType) || TSDB_DATA_TYPE_TIMESTAMP == outType) { + func = convertToSigned; + } else if (IS_UNSIGNED_NUMERIC_TYPE(outType)) { + func = convertToUnsigned; + } else if (IS_FLOAT_TYPE(outType)) { + func = convertToFloat; + } else { + sclError("unknown outType:%d", outType); + return TSDB_CODE_QRY_APP_ERROR; + } + + for (int32_t i = 0; i < pIn->num; ++i) { + sclMoveParamListData(pIn, 1, i); + sclMoveParamListData(pOut, 1, i); + + if (sclIsNull(pIn, i)) { + sclSetNull(pOut, i); + continue; + } + + if (varDataLen(pIn->data) >= bufSize) { + bufSize = varDataLen(pIn->data) + 1; + tmp = realloc(tmp, bufSize); + } + + memcpy(tmp, varDataVal(pIn->data), varDataLen(pIn->data)); + tmp[varDataLen(pIn->data)] = 0; + + (*func)(tmp, pOut, outType); + } + + tfree(tmp); +} int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) { int16_t inType = pIn->type; @@ -278,65 +332,60 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) { case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_TIMESTAMP: if (inType == TSDB_DATA_TYPE_BINARY) { - int32_t bufSize = varDataLen(input) + 1; - char *tmp = malloc(bufSize); - if (NULL == tmp) { - sclError("malloc %d failed", bufSize); - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } + int32_t bufSize = 0; + char *tmp = NULL; for (int32_t i = 0; i < pIn->num; ++i) { - if (isNull(input, inType)) { - assignVal(output, getNullValue(outType), 0, outType); - } else { - if (varDataLen(input) >= bufSize) { - bufSize = varDataLen(input) + 1; - tmp = realloc(tmp, bufSize); - } - - memcpy(tmp, varDataVal(input), varDataLen(input)); - tmp[varDataLen(input)] = 0; - - int64_t value = strtoll(tmp, NULL, 10); - SET_TYPED_DATA(output, outType, value); + sclMoveParamListData(pIn, 1, i); + sclMoveParamListData(pOut, 1, i); + + if (sclIsNull(pIn, i)) { + sclSetNull(pOut, i); + continue; + } + + if (varDataLen(pIn->data) >= bufSize) { + bufSize = varDataLen(pIn->data) + 1; + tmp = realloc(tmp, bufSize); } - input += varDataLen(input) + VARSTR_HEADER_SIZE; - output += tDataTypes[outType].bytes; + memcpy(tmp, varDataVal(pIn->data), varDataLen(pIn->data)); + tmp[varDataLen(pIn->data)] = 0; + + int64_t value = strtoll(tmp, NULL, 10); + SET_TYPED_DATA(pOut->data, outType, value); } tfree(tmp); } else if (inType == TSDB_DATA_TYPE_NCHAR) { - int32_t bufSize = varDataLen(input) * TSDB_NCHAR_SIZE + 1; - char *tmp = calloc(1, bufSize); - if (NULL == tmp) { - sclError("calloc %d failed", bufSize); - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } + int32_t bufSize = 0; + char *tmp = NULL; for (int32_t i = 0; i < pIn->num; ++i) { - if (isNull(input, inType)) { - assignVal(output, getNullValue(outType), 0, outType); - } else { - if (varDataLen(input)* TSDB_NCHAR_SIZE >= bufSize) { - bufSize = varDataLen(input) * TSDB_NCHAR_SIZE + 1; - tmp = realloc(tmp, bufSize); - } + sclMoveParamListData(pIn, 1, i); + sclMoveParamListData(pOut, 1, i); - int len = taosUcs4ToMbs(varDataVal(input), varDataLen(input), tmp); - if (len < 0){ - sclError("castConvert taosUcs4ToMbs error 1"); - tfree(tmp); - return TSDB_CODE_QRY_APP_ERROR; - } - - tmp[len] = 0; - int64_t value = strtoll(tmp, NULL, 10); - SET_TYPED_DATA(output, outType, value); + if (sclIsNull(pIn, i)) { + sclSetNull(pOut, i); + continue; } - input += varDataLen(input) + VARSTR_HEADER_SIZE; - output += tDataTypes[outType].bytes; + if (varDataLen(pIn->data) * TSDB_NCHAR_SIZE >= bufSize) { + bufSize = varDataLen(pIn->data) * TSDB_NCHAR_SIZE + 1; + tmp = realloc(tmp, bufSize); + } + + int len = taosUcs4ToMbs(varDataVal(pIn->data), varDataLen(pIn->data), tmp); + if (len < 0){ + sclError("castConvert taosUcs4ToMbs error 1"); + tfree(tmp); + return TSDB_CODE_QRY_APP_ERROR; + } + + tmp[len] = 0; + + int64_t value = strtoll(tmp, NULL, 10); + SET_TYPED_DATA(pOut->data, outType, value); } tfree(tmp); @@ -628,8 +677,8 @@ void vectorAdd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _or int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; - SScalarParam leftParam = {.type = TSDB_DATA_TYPE_DOUBLE, .num = pLeft->num}; - SScalarParam rightParam = {.type = TSDB_DATA_TYPE_DOUBLE, .num = pRight->num}; + SScalarParam leftParam = {.type = TSDB_DATA_TYPE_DOUBLE, .num = pLeft->num, .dataInBlock = false}; + SScalarParam rightParam = {.type = TSDB_DATA_TYPE_DOUBLE, .num = pRight->num, .dataInBlock = false}; if (IS_VAR_DATA_TYPE(pLeft->type)) { leftParam.data = calloc(leftParam.num, sizeof(double)); if (NULL == leftParam.data) { @@ -637,7 +686,7 @@ void vectorAdd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _or return; } - if (pLeft->colData) { + if (pLeft->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data; pLeft->data = colInfo->pData; } @@ -655,7 +704,7 @@ void vectorAdd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _or return; } - if (pRight->colData) { + if (pRight->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data; pRight->data = colInfo->pData; } @@ -719,7 +768,7 @@ void vectorSub(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _or return; } - if (pLeft->colData) { + if (pLeft->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data; pLeft->data = colInfo->pData; } @@ -737,7 +786,7 @@ void vectorSub(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _or return; } - if (pRight->colData) { + if (pRight->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data; pRight->data = colInfo->pData; } @@ -799,7 +848,7 @@ void vectorMultiply(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_ return; } - if (pLeft->colData) { + if (pLeft->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data; pLeft->data = colInfo->pData; } @@ -817,7 +866,7 @@ void vectorMultiply(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_ return; } - if (pRight->colData) { + if (pRight->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data; pRight->data = colInfo->pData; } @@ -881,7 +930,7 @@ void vectorDivide(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t return; } - if (pLeft->colData) { + if (pLeft->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data; pLeft->data = colInfo->pData; } @@ -899,7 +948,7 @@ void vectorDivide(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t return; } - if (pRight->colData) { + if (pRight->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data; pRight->data = colInfo->pData; } @@ -970,7 +1019,7 @@ void vectorRemainder(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32 return; } - if (pLeft->colData) { + if (pLeft->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data; pLeft->data = colInfo->pData; } @@ -988,7 +1037,7 @@ void vectorRemainder(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32 return; } - if (pRight->colData) { + if (pRight->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data; pRight->data = colInfo->pData; } @@ -1136,7 +1185,7 @@ void vectorBitAnd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t return; } - if (pLeft->colData) { + if (pLeft->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data; pLeft->data = colInfo->pData; } @@ -1154,7 +1203,7 @@ void vectorBitAnd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t return; } - if (pRight->colData) { + if (pRight->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data; pRight->data = colInfo->pData; } @@ -1218,7 +1267,7 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ return; } - if (pLeft->colData) { + if (pLeft->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data; pLeft->data = colInfo->pData; } @@ -1236,7 +1285,7 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ return; } - if (pRight->colData) { + if (pRight->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data; pRight->data = colInfo->pData; } @@ -1298,13 +1347,13 @@ void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, void *out, int _getValueAddr_fn_t getVectorValueAddrFnLeft = NULL; _getValueAddr_fn_t getVectorValueAddrFnRight = NULL; - if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->colData) { + if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->dataInBlock) { getVectorValueAddrFnLeft = getVectorValueAddr_default; } else { getVectorValueAddrFnLeft = getVectorValueAddrFn(pLeft->type); } - if (IS_VAR_DATA_TYPE(pRight->type) && !pRight->colData) { + if (IS_VAR_DATA_TYPE(pRight->type) && !pRight->dataInBlock) { getVectorValueAddrFnRight = getVectorValueAddr_default; } else { getVectorValueAddrFnRight = getVectorValueAddrFn(pRight->type); @@ -1436,7 +1485,7 @@ void vectorIsNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t bool *output=(bool *)out; _getValueAddr_fn_t getVectorValueAddrFnLeft = NULL; - if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->colData) { + if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->dataInBlock) { getVectorValueAddrFnLeft = getVectorValueAddr_default; } else { getVectorValueAddrFnLeft = getVectorValueAddrFn(pLeft->type); @@ -1462,7 +1511,7 @@ void vectorNotNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t bool *output = (bool *)out; _getValueAddr_fn_t getVectorValueAddrFnLeft = NULL; - if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->colData) { + if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->dataInBlock) { getVectorValueAddrFnLeft = getVectorValueAddr_default; } else { getVectorValueAddrFnLeft = getVectorValueAddrFn(pLeft->type); @@ -1483,7 +1532,7 @@ void vectorNotNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { SScalarParam output = {.data = out, .num = pLeft->num, .type = TSDB_DATA_TYPE_BOOL}; - if (pLeft->colData) { + if (pLeft->dataInBlock) { SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data; pLeft->data = colInfo->pData; }