From 23db83f5f4414ca319ebc24037bbc0dc67f572af Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 26 Mar 2022 22:13:14 +0800 Subject: [PATCH] [td-13039] support arithmetic query. --- include/common/tcommon.h | 1 + include/libs/function/function.h | 4 + source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 173 +++++------------- source/libs/scalar/src/scalar.c | 6 +- .../libs/scalar/test/scalar/scalarTests.cpp | 5 - 6 files changed, 58 insertions(+), 133 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 27a0962e15..ebea35e2a8 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -215,6 +215,7 @@ typedef struct SResSchame { // TODO move away to executor.h typedef struct SExprBasicInfo { + int32_t type; SResSchema resSchema; int16_t numOfParams; // argument value of each function SFunctParam* pParam; diff --git a/include/libs/function/function.h b/include/libs/function/function.h index f6dfc6f65c..e2057c6384 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -227,6 +227,10 @@ typedef struct tExprNode { // operator and is kept in the attribute of _node. struct tExprNode **pChild; } _function; + + struct { + struct SNode* pRootNode; + } _optrRoot; }; } tExprNode; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c582873315..6de9c5ebcd 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -466,7 +466,7 @@ typedef struct SOptrBasicInfo { int32_t* rowCellInfoOffset; // offset value for each row result cell info SqlFunctionCtx* pCtx; SSDataBlock* pRes; - int32_t capacity; + int32_t capacity; // TODO remove it } SOptrBasicInfo; //TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index cea9b3ce11..fd6e344ebc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1241,17 +1241,25 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction } } -static void projectApplyFunctions(SSDataBlock* pResult, SqlFunctionCtx *pCtx, int32_t numOfOutput) { +static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx *pCtx, int32_t numOfOutput) { for (int32_t k = 0; k < numOfOutput; ++k) { - if (pCtx[k].fpSet.init == NULL) { // it is a project query + if (pExpr[k].base.type == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows); - } else { // TODO: arithmetic and other process. + + pResult->info.rows = pCtx[0].input.numOfRows; + } else if (pExpr[k].base.type == QUERY_NODE_OPERATOR) { + SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); + taosArrayPush(pBlockList, &pSrcBlock); + + SScalarParam p = {.numOfRows = pSrcBlock->info.rows}; + p.columnData = taosArrayGet(pResult->pDataBlock, k); + scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &p); + pResult->info.rows = p.numOfRows; + } else { ASSERT(0); } } - - pResult->info.rows = pCtx[0].input.numOfRows; } void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, @@ -2008,107 +2016,6 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx *pCtx, int32_t numOfOutput) { return TSDB_CODE_SUCCESS; } -static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, - int32_t** rowCellInfoOffset) { - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - - SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)calloc(numOfOutput, sizeof(SqlFunctionCtx)); - if (pFuncCtx == NULL) { - return NULL; - } - - *rowCellInfoOffset = calloc(numOfOutput, sizeof(int32_t)); - if (*rowCellInfoOffset == 0) { - tfree(pFuncCtx); - return NULL; - } - - for (int32_t i = 0; i < numOfOutput; ++i) { - SExprBasicInfo *pFunct = &pExpr[i].base; - SqlFunctionCtx* pCtx = &pFuncCtx[i]; -#if 0 - SColIndex *pIndex = &pFunct->colInfo; - - if (TSDB_COL_REQ_NULL(pIndex->flag)) { - pCtx->requireNull = true; - pIndex->flag &= ~(TSDB_COL_NULL); - } else { - pCtx->requireNull = false; - } -#endif -// pCtx->inputBytes = pFunct->colBytes; -// pCtx->inputType = pFunct->colType; - - pCtx->ptsOutputBuf = NULL; - - pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; - pCtx->resDataInfo.type = pFunct->resSchema.type; - - pCtx->order = pQueryAttr->order.order; -// pCtx->functionId = pFunct->functionId; - pCtx->stableQuery = pQueryAttr->stableQuery; -// pCtx->resDataInfo.interBufSize = pFunct->interBytes; - pCtx->start.key = INT64_MIN; - pCtx->end.key = INT64_MIN; - - pCtx->numOfParams = pFunct->numOfParams; - for (int32_t j = 0; j < pCtx->numOfParams; ++j) { - int16_t type = pFunct->pParam[j].param.nType; - int16_t bytes = pFunct->pParam[j].param.nType; - - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { -// taosVariantCreateFromBinary(&pCtx->param[j], pFunct->param[j].pz, bytes, type); - } else { -// taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pFunct->param[j].i, bytes, type); - } - } - - // set the order information for top/bottom query - int32_t functionId = pCtx->functionId; - - if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { - int32_t f = getExprFunctionId(&pExpr[0]); - assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY); - - pCtx->param[2].i = pQueryAttr->order.order; - pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - pCtx->param[3].i = functionId; - pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; - - pCtx->param[1].i = pQueryAttr->order.col.colId; - } else if (functionId == FUNCTION_INTERP) { - pCtx->param[2].i = (int8_t)pQueryAttr->fillType; - if (pQueryAttr->fillVal != NULL) { - if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) { - pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; - } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value - if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { - taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType); - } - } - } - } else if (functionId == FUNCTION_TS_COMP) { - pCtx->param[0].i = pQueryAttr->vgId; //TODO this should be the parameter from client - pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT; - } else if (functionId == FUNCTION_TWA) { - pCtx->param[1].i = pQueryAttr->window.skey; - pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT; - pCtx->param[2].i = pQueryAttr->window.ekey; - pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - } else if (functionId == FUNCTION_ARITHM) { -// pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i); - } - } - -// for(int32_t i = 1; i < numOfOutput; ++i) { -// (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr[i - 1].base.interBytes); -// } - - setCtxTagColumnInfo(pFuncCtx, numOfOutput); - - return pFuncCtx; -} - static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) { SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)calloc(numOfOutput, sizeof(SqlFunctionCtx)); if (pFuncCtx == NULL) { @@ -2127,15 +2034,18 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num SExprBasicInfo *pFunct = &pExpr->base; SqlFunctionCtx* pCtx = &pFuncCtx[i]; - if (pExpr->pExpr->_function.pFunctNode != NULL) { + pCtx->functionId = -1; + if (pExpr->base.type == QUERY_NODE_FUNCTION) { SFuncExecEnv env = {0}; pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId; fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); pCtx->resDataInfo.interBufSize = env.calcMemSize; - } else { - pCtx->functionId = -1; + } else if (pExpr->base.type == QUERY_NODE_COLUMN) { + + } else if (pExpr->base.type == QUERY_NODE_OPERATOR) { + } pCtx->input.numOfInputCols = pFunct->numOfParams; @@ -6560,7 +6470,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) blockDataCleanup(pRes); if (pProjectInfo->existDataBlock) { // TODO refactor -// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; SSDataBlock* pBlock = pProjectInfo->existDataBlock; pProjectInfo->existDataBlock = NULL; *newgroup = true; @@ -6574,9 +6483,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); - projectApplyFunctions(pInfo->pRes, pInfo->pCtx, pOperator->numOfOutput); - - pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL); + projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput); if (pRes->info.rows >= pProjectInfo->binfo.capacity*0.8) { copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); @@ -6619,15 +6526,13 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); - updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows); - - projectApplyFunctions(pInfo->pRes, pInfo->pCtx, pOperator->numOfOutput); - if (pRes->info.rows >= pProjectInfo->threshold) { + projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput); + if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; } } - copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); +// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } @@ -7717,7 +7622,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); +// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); @@ -7742,7 +7647,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); pInfo->colIndex = -1; pInfo->reptScan = false; - pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); +// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); @@ -7807,7 +7712,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); +// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); @@ -7831,7 +7736,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); +// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); @@ -8439,16 +8344,18 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* // it is a project query, or group by column if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) { + pExp->base.type = QUERY_NODE_COLUMN; SColumnNode* pColNode = (SColumnNode*) pTargetNode->pExpr; SDataType* pType = &pColNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); - pCol->slotId = pColNode->slotId; + pCol->slotId = pColNode->slotId; // TODO refactor pCol->bytes = pType->bytes; pCol->type = pType->type; pCol->scale = pType->scale; pCol->precision = pType->precision; - } else { + } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) { + pExp->base.type = QUERY_NODE_FUNCTION; SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; SDataType* pType = &pFuncNode->node.resType; @@ -8462,7 +8369,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); for (int32_t j = 0; j < numOfParam; ++j) { SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); - SColumnNode* pcn = (SColumnNode*)p1; + SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor pCol->slotId = pcn->slotId; pCol->bytes = pcn->node.resType.bytes; @@ -8471,6 +8378,22 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pCol->precision = pcn->node.resType.precision; pCol->dataBlockId = pcn->dataBlockId; } + } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_OPERATOR) { + pExp->base.type = QUERY_NODE_OPERATOR; + SOperatorNode* pNode = (SOperatorNode*) pTargetNode->pExpr; + + SDataType* pType = &pNode->node.resType; + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName); + + pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr; + + pCol->slotId = pTargetNode->slotId; // TODO refactor + pCol->bytes = pType->bytes; + pCol->type = pType->type; + pCol->scale = pType->scale; + pCol->precision = pType->precision; + } else { + ASSERT(0); } } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 152d54be04..6c701b024e 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -711,8 +711,10 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } -// sclMoveParamListData(res, 1, 0); - *pDst = *res; + colDataAssign(pDst->columnData, res->columnData, res->numOfRows); + pDst->numOfRows = res->numOfRows; + +// *pDst = *res; taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); } diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index 6ece674846..9e691ba10d 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -332,7 +332,6 @@ TEST(constantTest, int_or_binary) { nodesDestroyNode(res); } - TEST(constantTest, int_greater_double) { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; scltMakeValueNode(&pLeft, TSDB_DATA_TYPE_INT, &scltLeftV); @@ -841,7 +840,6 @@ TEST(constantTest, int_add_int_is_true2) { nodesDestroyNode(res); } - TEST(constantTest, int_greater_int_is_true1) { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; int32_t leftv = 1, rightv = 1; @@ -903,8 +901,6 @@ TEST(constantTest, greater_and_lower) { nodesDestroyNode(res); } - - TEST(columnTest, smallint_value_add_int_column) { scltInitLogFile(); @@ -1260,7 +1256,6 @@ TEST(columnTest, binary_column_like_binary) { nodesDestroyNode(opNode); } - TEST(columnTest, binary_column_is_true) { SNode *pLeft = NULL, *opNode = NULL; char leftv[5][5]= {0};