math functions working with shell
This commit is contained in:
parent
a0ba1eb1f2
commit
0c243e658e
|
@ -197,6 +197,11 @@ typedef struct SGroupbyExpr {
|
|||
bool groupbyTag; // group by tag or column
|
||||
} SGroupbyExpr;
|
||||
|
||||
enum {
|
||||
FUNC_PARAM_TYPE_VALUE = 0,
|
||||
FUNC_PARAM_TYPE_COLUMN,
|
||||
};
|
||||
|
||||
typedef struct SFunctParam {
|
||||
int32_t type;
|
||||
SColumn* pCol;
|
||||
|
|
|
@ -234,7 +234,6 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
|
|||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
}
|
||||
}
|
||||
|
||||
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
|
||||
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
|
||||
|
||||
|
@ -1186,8 +1185,19 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
|
|||
pCtx[i].size = pBlock->info.rows;
|
||||
pCtx[i].currentStage = MAIN_SCAN;
|
||||
|
||||
SExprInfo expr = pOperator->pExpr[i];
|
||||
for (int32_t j = 0; j < expr.base.numOfParams; ++j) {
|
||||
SFunctParam *pFuncParam = &expr.base.pParam[j];
|
||||
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
||||
int32_t slotId = pFuncParam->pCol->slotId;
|
||||
pCtx[i].input.pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
pCtx[i].input.totalRows = pBlock->info.rows;
|
||||
pCtx[i].input.numOfRows = pBlock->info.rows;
|
||||
pCtx[i].input.startRowIndex = 0;
|
||||
ASSERT(pCtx[i].input.pData[j] != NULL);
|
||||
}
|
||||
}
|
||||
// setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns);
|
||||
int32_t slotId = pOperator->pExpr[i].base.pParam[0].pCol->slotId;
|
||||
|
||||
// uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag;
|
||||
// if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) ||
|
||||
|
@ -1205,12 +1215,11 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
|
|||
// }
|
||||
|
||||
// in case of the block distribution query, the inputBytes is not a constant value.
|
||||
pCtx[i].input.pData[0] = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
pCtx[i].input.totalRows = pBlock->info.rows;
|
||||
pCtx[i].input.numOfRows = pBlock->info.rows;
|
||||
pCtx[i].input.startRowIndex = 0;
|
||||
//pCtx[i].input.pData[0] = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
//pCtx[i].input.totalRows = pBlock->info.rows;
|
||||
//pCtx[i].input.numOfRows = pBlock->info.rows;
|
||||
//pCtx[i].input.startRowIndex = 0;
|
||||
|
||||
ASSERT(pCtx[i].input.pData[0] != NULL);
|
||||
|
||||
// uint32_t status = aAggs[pCtx[i].functionId].status;
|
||||
// if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) {
|
||||
|
@ -1267,15 +1276,17 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
|
|||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
||||
ASSERT(!fmIsAggFunc(pCtx->functionId));
|
||||
|
||||
SScalarParam p = {.numOfRows = pSrcBlock->info.rows};
|
||||
int32_t slotId = pExpr[k].base.pParam[0].pCol->slotId;
|
||||
p.columnData = taosArrayGet(pSrcBlock->pDataBlock, slotId);
|
||||
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
||||
taosArrayPush(pBlockList, &pSrcBlock);
|
||||
|
||||
SScalarParam dest = {0};
|
||||
dest.columnData = taosArrayGet(pResult->pDataBlock, k);
|
||||
pCtx[k].sfp.process(&p, 1, &dest);
|
||||
|
||||
scalarCalculate((SNode *)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
|
||||
pResult->info.rows = dest.numOfRows;
|
||||
|
||||
taosArrayDestroy(pBlockList);
|
||||
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -8451,19 +8462,20 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
pExp->pExpr->_function.num = 1;
|
||||
pExp->pExpr->_function.functionId = -1;
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = 1;
|
||||
|
||||
pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
SColumn* pCol = pExp->base.pParam[0].pCol;
|
||||
|
||||
// it is a project query, or group by column
|
||||
if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) {
|
||||
pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
|
||||
SColumnNode* pColNode = (SColumnNode*) pTargetNode->pExpr;
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = 1;
|
||||
pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
|
||||
SDataType* pType = &pColNode->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
|
||||
|
||||
SColumn* pCol = pExp->base.pParam[0].pCol;
|
||||
pCol->slotId = pColNode->slotId; // TODO refactor
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
|
@ -8482,26 +8494,46 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
|
||||
// TODO: value parameter needs to be handled
|
||||
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = numOfParam;
|
||||
|
||||
for (int32_t j = 0; j < numOfParam; ++j) {
|
||||
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
|
||||
SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor
|
||||
if (p1->type == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor
|
||||
|
||||
pCol->slotId = pcn->slotId;
|
||||
pCol->bytes = pcn->node.resType.bytes;
|
||||
pCol->type = pcn->node.resType.type;
|
||||
pCol->scale = pcn->node.resType.scale;
|
||||
pCol->precision = pcn->node.resType.precision;
|
||||
pCol->dataBlockId = pcn->dataBlockId;
|
||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
pExp->base.pParam[j].pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
SColumn* pCol = pExp->base.pParam[j].pCol;
|
||||
|
||||
pCol->slotId = pcn->slotId;
|
||||
pCol->bytes = pcn->node.resType.bytes;
|
||||
pCol->type = pcn->node.resType.type;
|
||||
pCol->scale = pcn->node.resType.scale;
|
||||
pCol->precision = pcn->node.resType.precision;
|
||||
pCol->dataBlockId = pcn->dataBlockId;
|
||||
} else if (p1->type == QUERY_NODE_VALUE) {
|
||||
SValueNode* pvn = (SValueNode*)p1;
|
||||
|
||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
||||
}
|
||||
}
|
||||
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_OPERATOR) {
|
||||
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
|
||||
SOperatorNode* pNode = (SOperatorNode*) pTargetNode->pExpr;
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = 1;
|
||||
pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
|
||||
SDataType* pType = &pNode->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName);
|
||||
|
||||
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
|
||||
|
||||
SColumn* pCol = pExp->base.pParam[0].pCol;
|
||||
pCol->slotId = pTargetNode->slotId; // TODO refactor
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
|
|
|
@ -107,96 +107,14 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
if (inputNum != 2 || !IS_NUMERIC_TYPE(pInput[0].type) || !IS_NUMERIC_TYPE(pInput[1].type)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
char **input = NULL, *output = NULL;
|
||||
bool hasNullInput = false;
|
||||
input = taosMemoryCalloc(inputNum, sizeof(char *));
|
||||
for (int32_t i = 0; i < pOutput->num; ++i) {
|
||||
for (int32_t j = 0; j < inputNum; ++j) {
|
||||
if (pInput[j].num == 1) {
|
||||
input[j] = pInput[j].data;
|
||||
} else {
|
||||
input[j] = pInput[j].data + i * pInput[j].bytes;
|
||||
}
|
||||
if (isNull(input[j], pInput[j].type)) {
|
||||
hasNullInput = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
output = pOutput->data + i * pOutput->bytes;
|
||||
|
||||
if (hasNullInput) {
|
||||
setNull(output, pOutput->type, pOutput->bytes);
|
||||
continue;
|
||||
}
|
||||
|
||||
double base;
|
||||
GET_TYPED_DATA(base, double, pInput[1].type, input[1]);
|
||||
double v;
|
||||
GET_TYPED_DATA(v, double, pInput[0].type, input[0]);
|
||||
double result = log(v) / log(base);
|
||||
SET_TYPED_DATA(output, pOutput->type, result);
|
||||
}
|
||||
|
||||
taosMemoryFree(input);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
if (inputNum != 2 || !IS_NUMERIC_TYPE(pInput[0].type) || !IS_NUMERIC_TYPE(pInput[1].type)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
pOutput->type = TSDB_DATA_TYPE_DOUBLE;
|
||||
pOutput->bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
|
||||
|
||||
char **input = NULL, *output = NULL;
|
||||
bool hasNullInput = false;
|
||||
input = taosMemoryCalloc(inputNum, sizeof(char *));
|
||||
for (int32_t i = 0; i < pOutput->num; ++i) {
|
||||
for (int32_t j = 0; j < inputNum; ++j) {
|
||||
if (pInput[j].num == 1) {
|
||||
input[j] = pInput[j].data;
|
||||
} else {
|
||||
input[j] = pInput[j].data + i * pInput[j].bytes;
|
||||
}
|
||||
if (isNull(input[j], pInput[j].type)) {
|
||||
hasNullInput = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
output = pOutput->data + i * pOutput->bytes;
|
||||
|
||||
if (hasNullInput) {
|
||||
setNull(output, pOutput->type, pOutput->bytes);
|
||||
continue;
|
||||
}
|
||||
|
||||
double base;
|
||||
GET_TYPED_DATA(base, double, pInput[1].type, input[1]);
|
||||
double v;
|
||||
GET_TYPED_DATA(v, double, pInput[0].type, input[0]);
|
||||
double result = pow(v, base);
|
||||
SET_TYPED_DATA(output, pOutput->type, result);
|
||||
}
|
||||
|
||||
taosMemoryFree(input);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
typedef float (*_float_fn)(float);
|
||||
typedef double (*_double_fn)(double);
|
||||
typedef double (*_double_fn_2)(double, double);
|
||||
|
||||
double tlog(double v, double base) {
|
||||
return log(v) / log(base);
|
||||
}
|
||||
|
||||
int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
|
||||
int32_t type = GET_PARAM_TYPE(pInput);
|
||||
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
|
||||
|
@ -222,10 +140,6 @@ int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarPa
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
double tlog(double v, double base) {
|
||||
return log(v) / log(base);
|
||||
}
|
||||
|
||||
int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) {
|
||||
if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) {
|
||||
return TSDB_CODE_FAILED;
|
||||
|
|
Loading…
Reference in New Issue