feature/qnode

This commit is contained in:
dapan1121 2022-02-25 19:15:23 +08:00
parent e9ff317d9b
commit 7129ba9cbd
4 changed files with 159 additions and 97 deletions

View File

@ -227,8 +227,10 @@ typedef struct SAggFunctionInfo {
} SAggFunctionInfo;
typedef struct SScalarParam {
void* data;
bool colData;
void *data;
SColumnInfoData *columnData;
char *bitmap;
bool dataInBlock;
int32_t num;
int32_t type;
int32_t bytes;

View File

@ -22,6 +22,7 @@ extern "C" {
#include "sclfunc.h"
typedef void (*_bufConverteFunc)(char *buf, SScalarParam* pOut, int32_t outType);
typedef void (*_bin_scalar_fn_t)(SScalarParam* pLeft, SScalarParam* pRight, void *output, int32_t order);
_bin_scalar_fn_t getBinScalarOperatorFn(int32_t binOperator);

View File

@ -87,6 +87,18 @@ _return:
SCL_RET(code);
}
bool sclIsNull(SScalarParam* param, int32_t idx) {
if (param->dataInBlock) {
return colDataIsNull(param->columnData, 0, idx, NULL);
}
return colDataIsNull_f(param->bitmap, idx);
}
void sclSetNull(SScalarParam* param, int32_t idx) {
colDataSetNull_f(param->bitmap, idx);
}
void sclFreeRes(SHashObj *res) {
SScalarParam *p = NULL;
@ -116,7 +128,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
param->num = 1;
param->type = valueNode->node.resType.type;
param->bytes = valueNode->node.resType.bytes;
param->colData = false;
param->dataInBlock = false;
break;
}
@ -130,7 +142,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
SCL_ERR_RET(scalarGenerateSetFromList(&param->data, node, nodeList->dataType.type));
param->num = 1;
param->type = SCL_DATA_TYPE_DUMMY_HASH;
param->colData = false;
param->dataInBlock = false;
break;
}
@ -147,13 +159,9 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
}
SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(ctx->pSrc->pDataBlock, ref->slotId);
if (IS_VAR_DATA_TYPE(columnData->info.type)) {
param->data = columnData;
param->colData = true;
} else {
param->data = columnData->pData;
param->colData = false;
}
param->data = NULL;
param->columnData = columnData;
param->dataInBlock = true;
param->num = ctx->pSrc->info.rows;
param->type = columnData->info.type;
@ -192,22 +200,26 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t sclParamMoveNext(SScalarParam *params, int32_t num) {
int32_t sclMoveParamListData(SScalarParam *params, int32_t listNum, int32_t idx) {
SScalarParam *param = NULL;
for (int32_t i = 0; i < num; ++i) {
for (int32_t i = 0; i < listNum; ++i) {
param = params + i;
if (1 == param->num) {
continue;
}
if (param->dataInBlock) {
param->data = colDataGet(param->columnData, idx);
} else if (idx) {
if (IS_VAR_DATA_TYPE(param->type)) {
param->data = (char *)(param->data) + varDataTLen(param->data);
} else {
param->data = (char *)(param->data) + tDataTypes[param->type].bytes;
}
}
}
return TSDB_CODE_SUCCESS;
}
@ -281,8 +293,7 @@ int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outpu
SScalarFuncExecFuncs ffpSet = {0};
int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
if (code) {
sclError(
"fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
SCL_ERR_RET(code);
}
@ -298,15 +309,14 @@ int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outpu
}
for (int32_t i = 0; i < rowNum; ++i) {
sclMoveParamListData(output, 1, i);
sclMoveParamListData(params, node->pParameterList->length, i);
code = (*ffpSet.process)(params, node->pParameterList->length, output);
if (code) {
sclError(
"scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
SCL_ERR_JRET(code);
}
sclParamMoveNext(output, 1);
sclParamMoveNext(params, node->pParameterList->length);
}
return TSDB_CODE_SUCCESS;
@ -354,6 +364,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
bool value = false;
for (int32_t i = 0; i < rowNum; ++i) {
sclMoveParamListData(output, 1, i);
sclMoveParamListData(params, node->pParameterList->length, i);
for (int32_t m = 0; m < node->pParameterList->length; ++m) {
GET_TYPED_DATA(value, bool, params[m].type, params[m].data);
@ -367,9 +380,6 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
}
*(bool *)output->data = value;
sclParamMoveNext(output, 1);
sclParamMoveNext(params, node->pParameterList->length);
}
output->data = data;

View File

@ -261,6 +261,60 @@ _getValueAddr_fn_t getVectorValueAddrFn(int32_t srcType) {
return p;
}
static FORCE_INLINE void convertToSigned(char *buf, SScalarParam* pOut, int32_t outType) {
int64_t value = strtoll(buf, NULL, 10);
SET_TYPED_DATA(pOut->data, outType, value);
}
static FORCE_INLINE void convertToUnsigned(char *buf, SScalarParam* pOut, int32_t outType) {
uint64_t value = strtoull(buf, NULL, 10);
SET_TYPED_DATA(pOut->data, outType, value);
}
static FORCE_INLINE void convertToFloat(char *buf, SScalarParam* pOut, int32_t outType) {
double value = strtod(tmp, NULL);
SET_TYPED_DATA(output, outType, value);
}
int32_t vectorConvertFromVarData(SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) {
int32_t bufSize = 0;
char *tmp = NULL;
_bufConverteFunc func = NULL;
if (IS_SIGNED_NUMERIC_TYPE(outType) || TSDB_DATA_TYPE_TIMESTAMP == outType) {
func = convertToSigned;
} else if (IS_UNSIGNED_NUMERIC_TYPE(outType)) {
func = convertToUnsigned;
} else if (IS_FLOAT_TYPE(outType)) {
func = convertToFloat;
} else {
sclError("unknown outType:%d", outType);
return TSDB_CODE_QRY_APP_ERROR;
}
for (int32_t i = 0; i < pIn->num; ++i) {
sclMoveParamListData(pIn, 1, i);
sclMoveParamListData(pOut, 1, i);
if (sclIsNull(pIn, i)) {
sclSetNull(pOut, i);
continue;
}
if (varDataLen(pIn->data) >= bufSize) {
bufSize = varDataLen(pIn->data) + 1;
tmp = realloc(tmp, bufSize);
}
memcpy(tmp, varDataVal(pIn->data), varDataLen(pIn->data));
tmp[varDataLen(pIn->data)] = 0;
(*func)(tmp, pOut, outType);
}
tfree(tmp);
}
int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) {
int16_t inType = pIn->type;
@ -278,52 +332,50 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) {
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
if (inType == TSDB_DATA_TYPE_BINARY) {
int32_t bufSize = varDataLen(input) + 1;
char *tmp = malloc(bufSize);
if (NULL == tmp) {
sclError("malloc %d failed", bufSize);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
int32_t bufSize = 0;
char *tmp = NULL;
for (int32_t i = 0; i < pIn->num; ++i) {
if (isNull(input, inType)) {
assignVal(output, getNullValue(outType), 0, outType);
} else {
if (varDataLen(input) >= bufSize) {
bufSize = varDataLen(input) + 1;
sclMoveParamListData(pIn, 1, i);
sclMoveParamListData(pOut, 1, i);
if (sclIsNull(pIn, i)) {
sclSetNull(pOut, i);
continue;
}
if (varDataLen(pIn->data) >= bufSize) {
bufSize = varDataLen(pIn->data) + 1;
tmp = realloc(tmp, bufSize);
}
memcpy(tmp, varDataVal(input), varDataLen(input));
tmp[varDataLen(input)] = 0;
memcpy(tmp, varDataVal(pIn->data), varDataLen(pIn->data));
tmp[varDataLen(pIn->data)] = 0;
int64_t value = strtoll(tmp, NULL, 10);
SET_TYPED_DATA(output, outType, value);
}
input += varDataLen(input) + VARSTR_HEADER_SIZE;
output += tDataTypes[outType].bytes;
SET_TYPED_DATA(pOut->data, outType, value);
}
tfree(tmp);
} else if (inType == TSDB_DATA_TYPE_NCHAR) {
int32_t bufSize = varDataLen(input) * TSDB_NCHAR_SIZE + 1;
char *tmp = calloc(1, bufSize);
if (NULL == tmp) {
sclError("calloc %d failed", bufSize);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
int32_t bufSize = 0;
char *tmp = NULL;
for (int32_t i = 0; i < pIn->num; ++i) {
if (isNull(input, inType)) {
assignVal(output, getNullValue(outType), 0, outType);
} else {
if (varDataLen(input)* TSDB_NCHAR_SIZE >= bufSize) {
bufSize = varDataLen(input) * TSDB_NCHAR_SIZE + 1;
sclMoveParamListData(pIn, 1, i);
sclMoveParamListData(pOut, 1, i);
if (sclIsNull(pIn, i)) {
sclSetNull(pOut, i);
continue;
}
if (varDataLen(pIn->data) * TSDB_NCHAR_SIZE >= bufSize) {
bufSize = varDataLen(pIn->data) * TSDB_NCHAR_SIZE + 1;
tmp = realloc(tmp, bufSize);
}
int len = taosUcs4ToMbs(varDataVal(input), varDataLen(input), tmp);
int len = taosUcs4ToMbs(varDataVal(pIn->data), varDataLen(pIn->data), tmp);
if (len < 0){
sclError("castConvert taosUcs4ToMbs error 1");
tfree(tmp);
@ -331,12 +383,9 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) {
}
tmp[len] = 0;
int64_t value = strtoll(tmp, NULL, 10);
SET_TYPED_DATA(output, outType, value);
}
input += varDataLen(input) + VARSTR_HEADER_SIZE;
output += tDataTypes[outType].bytes;
int64_t value = strtoll(tmp, NULL, 10);
SET_TYPED_DATA(pOut->data, outType, value);
}
tfree(tmp);
@ -628,8 +677,8 @@ void vectorAdd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _or
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
SScalarParam leftParam = {.type = TSDB_DATA_TYPE_DOUBLE, .num = pLeft->num};
SScalarParam rightParam = {.type = TSDB_DATA_TYPE_DOUBLE, .num = pRight->num};
SScalarParam leftParam = {.type = TSDB_DATA_TYPE_DOUBLE, .num = pLeft->num, .dataInBlock = false};
SScalarParam rightParam = {.type = TSDB_DATA_TYPE_DOUBLE, .num = pRight->num, .dataInBlock = false};
if (IS_VAR_DATA_TYPE(pLeft->type)) {
leftParam.data = calloc(leftParam.num, sizeof(double));
if (NULL == leftParam.data) {
@ -637,7 +686,7 @@ void vectorAdd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _or
return;
}
if (pLeft->colData) {
if (pLeft->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data;
pLeft->data = colInfo->pData;
}
@ -655,7 +704,7 @@ void vectorAdd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _or
return;
}
if (pRight->colData) {
if (pRight->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data;
pRight->data = colInfo->pData;
}
@ -719,7 +768,7 @@ void vectorSub(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _or
return;
}
if (pLeft->colData) {
if (pLeft->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data;
pLeft->data = colInfo->pData;
}
@ -737,7 +786,7 @@ void vectorSub(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _or
return;
}
if (pRight->colData) {
if (pRight->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data;
pRight->data = colInfo->pData;
}
@ -799,7 +848,7 @@ void vectorMultiply(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_
return;
}
if (pLeft->colData) {
if (pLeft->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data;
pLeft->data = colInfo->pData;
}
@ -817,7 +866,7 @@ void vectorMultiply(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_
return;
}
if (pRight->colData) {
if (pRight->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data;
pRight->data = colInfo->pData;
}
@ -881,7 +930,7 @@ void vectorDivide(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t
return;
}
if (pLeft->colData) {
if (pLeft->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data;
pLeft->data = colInfo->pData;
}
@ -899,7 +948,7 @@ void vectorDivide(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t
return;
}
if (pRight->colData) {
if (pRight->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data;
pRight->data = colInfo->pData;
}
@ -970,7 +1019,7 @@ void vectorRemainder(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32
return;
}
if (pLeft->colData) {
if (pLeft->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data;
pLeft->data = colInfo->pData;
}
@ -988,7 +1037,7 @@ void vectorRemainder(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32
return;
}
if (pRight->colData) {
if (pRight->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data;
pRight->data = colInfo->pData;
}
@ -1136,7 +1185,7 @@ void vectorBitAnd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t
return;
}
if (pLeft->colData) {
if (pLeft->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data;
pLeft->data = colInfo->pData;
}
@ -1154,7 +1203,7 @@ void vectorBitAnd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t
return;
}
if (pRight->colData) {
if (pRight->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data;
pRight->data = colInfo->pData;
}
@ -1218,7 +1267,7 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _
return;
}
if (pLeft->colData) {
if (pLeft->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data;
pLeft->data = colInfo->pData;
}
@ -1236,7 +1285,7 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _
return;
}
if (pRight->colData) {
if (pRight->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pRight->data;
pRight->data = colInfo->pData;
}
@ -1298,13 +1347,13 @@ void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, void *out, int
_getValueAddr_fn_t getVectorValueAddrFnLeft = NULL;
_getValueAddr_fn_t getVectorValueAddrFnRight = NULL;
if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->colData) {
if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->dataInBlock) {
getVectorValueAddrFnLeft = getVectorValueAddr_default;
} else {
getVectorValueAddrFnLeft = getVectorValueAddrFn(pLeft->type);
}
if (IS_VAR_DATA_TYPE(pRight->type) && !pRight->colData) {
if (IS_VAR_DATA_TYPE(pRight->type) && !pRight->dataInBlock) {
getVectorValueAddrFnRight = getVectorValueAddr_default;
} else {
getVectorValueAddrFnRight = getVectorValueAddrFn(pRight->type);
@ -1436,7 +1485,7 @@ void vectorIsNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t
bool *output=(bool *)out;
_getValueAddr_fn_t getVectorValueAddrFnLeft = NULL;
if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->colData) {
if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->dataInBlock) {
getVectorValueAddrFnLeft = getVectorValueAddr_default;
} else {
getVectorValueAddrFnLeft = getVectorValueAddrFn(pLeft->type);
@ -1462,7 +1511,7 @@ void vectorNotNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t
bool *output = (bool *)out;
_getValueAddr_fn_t getVectorValueAddrFnLeft = NULL;
if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->colData) {
if (IS_VAR_DATA_TYPE(pLeft->type) && !pLeft->dataInBlock) {
getVectorValueAddrFnLeft = getVectorValueAddr_default;
} else {
getVectorValueAddrFnLeft = getVectorValueAddrFn(pLeft->type);
@ -1483,7 +1532,7 @@ void vectorNotNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t
void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
SScalarParam output = {.data = out, .num = pLeft->num, .type = TSDB_DATA_TYPE_BOOL};
if (pLeft->colData) {
if (pLeft->dataInBlock) {
SColumnInfoData *colInfo = (SColumnInfoData *)pLeft->data;
pLeft->data = colInfo->pData;
}