Merge pull request #13472 from taosdata/feature/3.0_bug16060
feat: add physical plannode of indefinite rows func
This commit is contained in:
commit
6de91932ea
|
@ -195,6 +195,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_LOGIC_PLAN_FILL,
|
||||
QUERY_NODE_LOGIC_PLAN_SORT,
|
||||
QUERY_NODE_LOGIC_PLAN_PARTITION,
|
||||
QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC,
|
||||
QUERY_NODE_LOGIC_SUBPLAN,
|
||||
QUERY_NODE_LOGIC_PLAN,
|
||||
|
||||
|
@ -222,6 +223,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW,
|
||||
QUERY_NODE_PHYSICAL_PLAN_PARTITION,
|
||||
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC,
|
||||
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
|
||||
QUERY_NODE_PHYSICAL_PLAN_INSERT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_DELETE,
|
||||
|
|
|
@ -87,6 +87,11 @@ typedef struct SProjectLogicNode {
|
|||
int64_t soffset;
|
||||
} SProjectLogicNode;
|
||||
|
||||
typedef struct SIndefRowsFuncLogicNode {
|
||||
SLogicNode node;
|
||||
SNodeList* pVectorFuncs;
|
||||
} SIndefRowsFuncLogicNode;
|
||||
|
||||
typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType;
|
||||
|
||||
typedef struct SVnodeModifyLogicNode {
|
||||
|
@ -267,6 +272,12 @@ typedef struct SProjectPhysiNode {
|
|||
int64_t soffset;
|
||||
} SProjectPhysiNode;
|
||||
|
||||
typedef struct SIndefRowsFuncPhysiNode {
|
||||
SPhysiNode node;
|
||||
SNodeList* pExprs;
|
||||
SNodeList* pVectorFuncs;
|
||||
} SIndefRowsFuncPhysiNode;
|
||||
|
||||
typedef struct SJoinPhysiNode {
|
||||
SPhysiNode node;
|
||||
EJoinType joinType;
|
||||
|
|
|
@ -524,6 +524,17 @@ typedef struct SProjectOperatorInfo {
|
|||
int64_t curOutput;
|
||||
} SProjectOperatorInfo;
|
||||
|
||||
typedef struct SIndefOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SAggSupporter aggSup;
|
||||
SArray* pPseudoColInfo;
|
||||
|
||||
SExprInfo* pScalarExpr;
|
||||
int32_t numOfScalarExpr;
|
||||
SqlFunctionCtx* pScalarCtx;
|
||||
int32_t* rowCellInfoOffset;
|
||||
} SIndefOperatorInfo;
|
||||
|
||||
typedef struct SFillOperatorInfo {
|
||||
struct SFillInfo* pFillInfo;
|
||||
SSDataBlock* pRes;
|
||||
|
@ -770,6 +781,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
|||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
|
||||
|
|
|
@ -108,8 +108,6 @@ static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutpu
|
|||
|
||||
static void releaseQueryBuf(size_t numOfTables);
|
||||
|
||||
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr);
|
||||
|
||||
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
|
||||
|
@ -3636,18 +3634,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
|
||||
#if 0
|
||||
// Return result of the previous group in the firstly.
|
||||
if (false) {
|
||||
if (pRes->info.rows > 0) {
|
||||
pProjectInfo->existDataBlock = pBlock;
|
||||
break;
|
||||
} else { // init output buffer for a new group data
|
||||
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfExprs);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -3793,6 +3779,17 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
|
||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||
SExprInfo* pExprInfo = &pExpr[i];
|
||||
if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) {
|
||||
taosMemoryFree(pExprInfo->base.pParam[0].pCol);
|
||||
}
|
||||
taosMemoryFree(pExprInfo->base.pParam);
|
||||
taosMemoryFree(pExprInfo->pExpr);
|
||||
}
|
||||
}
|
||||
|
||||
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
||||
if (pOperator == NULL) {
|
||||
return;
|
||||
|
@ -3812,14 +3809,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
if (pOperator->pExpr != NULL) {
|
||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
||||
SExprInfo* pExprInfo = &pOperator->pExpr[i];
|
||||
if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) {
|
||||
taosMemoryFree(pExprInfo->base.pParam[0].pCol);
|
||||
}
|
||||
taosMemoryFree(pExprInfo->base.pParam);
|
||||
taosMemoryFree(pExprInfo->pExpr);
|
||||
}
|
||||
destroyExprInfo(pOperator->pExpr, pOperator->numOfExprs);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pOperator->pExpr);
|
||||
|
@ -4008,6 +3998,19 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosArrayDestroy(pInfo->pPseudoColInfo);
|
||||
}
|
||||
|
||||
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
|
||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
|
||||
taosArrayDestroy(pInfo->pPseudoColInfo);
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
|
||||
destroySqlFunctionCtx(pInfo->pScalarCtx, numOfOutput);
|
||||
destroyExprInfo(pInfo->pScalarExpr, pInfo->numOfScalarExpr);
|
||||
|
||||
taosMemoryFree(pInfo->rowCellInfoOffset);
|
||||
}
|
||||
|
||||
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
|
||||
taosArrayDestroy(pExInfo->pSources);
|
||||
|
@ -4085,6 +4088,136 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
||||
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
||||
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
||||
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
blockDataCleanup(pRes);
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int64_t st = 0;
|
||||
int32_t order = 0;
|
||||
int32_t scanFlag = 0;
|
||||
|
||||
if (pOperator->cost.openCost == 0) {
|
||||
st = taosGetTimestampUs();
|
||||
}
|
||||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
while (1) {
|
||||
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
||||
if (pIndefInfo->pScalarExpr != NULL) {
|
||||
code = projectApplyFunctions(pIndefInfo->pScalarExpr, pBlock, pBlock, pIndefInfo->pScalarCtx,
|
||||
pIndefInfo->numOfScalarExpr, pIndefInfo->pPseudoColInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
|
||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||
|
||||
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
|
||||
pIndefInfo->pPseudoColInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
|
||||
size_t rows = pInfo->pRes->info.rows;
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
|
||||
if (pOperator->cost.openCost == 0) {
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
}
|
||||
|
||||
return (rows > 0) ? pInfo->pRes : NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
|
||||
|
||||
int32_t numOfExpr = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pVectorFuncs, NULL, &numOfExpr);
|
||||
|
||||
int32_t numOfScalarExpr = 0;
|
||||
if (pPhyNode->pExprs != NULL) {
|
||||
pInfo->pScalarExpr = createExprInfo(pPhyNode->pExprs, NULL, &numOfScalarExpr);
|
||||
pInfo->pScalarCtx = createSqlFunctionCtx(pInfo->pScalarExpr, numOfScalarExpr, &pInfo->rowCellInfoOffset);
|
||||
}
|
||||
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
|
||||
;
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
||||
// Make sure the size of SSDataBlock will never exceed the size of 2MB.
|
||||
int32_t TWOMB = 2 * 1024 * 1024;
|
||||
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
|
||||
numOfRows = TWOMB / pResBlock->info.rowSize;
|
||||
}
|
||||
initResultSizeInfo(pOperator, numOfRows);
|
||||
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfExpr, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr, pTaskInfo);
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->numOfScalarExpr = numOfScalarExpr;
|
||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfExpr);
|
||||
|
||||
pOperator->name = "IndefinitOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfExprs = numOfExpr;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
||||
destroyIndefinitOperatorInfo, NULL, NULL, NULL);
|
||||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
|
||||
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
|
||||
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
|
||||
|
@ -4589,6 +4722,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SInterval* pInterval = &((SIntervalAggOperatorInfo*)ops[0]->info)->interval;
|
||||
pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode,
|
||||
(SNodeListNode*)pFillNode->pValues, false, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
|
||||
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
|
|
@ -177,13 +177,6 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param0
|
||||
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The first parameter of PERCENTILE function can only be column");
|
||||
}
|
||||
|
||||
// param1
|
||||
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
|
||||
|
||||
|
@ -218,13 +211,6 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param0
|
||||
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The first parameter of APERCENTILE function can only be column");
|
||||
}
|
||||
|
||||
// param1
|
||||
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
|
||||
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
|
||||
|
@ -284,13 +270,6 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param0
|
||||
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The first parameter of TOP/BOTTOM function can only be column");
|
||||
}
|
||||
|
||||
// param1
|
||||
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
|
||||
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
|
||||
|
@ -338,13 +317,6 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param0
|
||||
SNode* pParaNode0 = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pParaNode0)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The first parameter of ELAPSED function can only be column");
|
||||
}
|
||||
|
||||
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (TSDB_DATA_TYPE_TIMESTAMP != paraType) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -410,13 +382,6 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param0
|
||||
SNode* pParaNode0 = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pParaNode0)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The first parameter of HISTOGRAM function can only be column");
|
||||
}
|
||||
|
||||
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (!IS_NUMERIC_TYPE(colType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -449,12 +414,6 @@ static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The input parameter of HYPERLOGLOG function can only be column");
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -474,12 +433,6 @@ static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param0
|
||||
SNode* pParaNode0 = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pParaNode0)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The input parameter of STATECOUNT function can only be column");
|
||||
}
|
||||
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (!IS_NUMERIC_TYPE(colType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -520,12 +473,6 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param0
|
||||
SNode* pParaNode0 = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pParaNode0)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The input parameter of STATEDURATION function can only be column");
|
||||
}
|
||||
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (!IS_NUMERIC_TYPE(colType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -573,12 +520,6 @@ static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The input parameter of CSUM function can only be column");
|
||||
}
|
||||
|
||||
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
uint8_t resType;
|
||||
if (!IS_NUMERIC_TYPE(colType)) {
|
||||
|
@ -604,13 +545,6 @@ static int32_t translateMavg(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param0
|
||||
SNode* pParaNode0 = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pParaNode0)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The first parameter of MAVG function can only be column");
|
||||
}
|
||||
|
||||
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
|
||||
// param1
|
||||
|
@ -640,13 +574,6 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param0
|
||||
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pParamNode0)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The first parameter of SAMPLE function can only be column");
|
||||
}
|
||||
|
||||
SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||
uint8_t colType = pCol->resType.type;
|
||||
|
||||
|
@ -684,12 +611,6 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param0
|
||||
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The first parameter of TAIL function can only be column");
|
||||
}
|
||||
SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||
uint8_t colType = pCol->resType.type;
|
||||
|
||||
|
@ -766,13 +687,6 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param0
|
||||
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The first parameter of DIFF function can only be column");
|
||||
}
|
||||
|
||||
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) && TSDB_DATA_TYPE_BOOL != colType) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
|
|
@ -434,6 +434,12 @@ static SNode* logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLogi
|
|||
return (SNode*)pDst;
|
||||
}
|
||||
|
||||
static SNode* logicIndefRowsFuncCopy(const SIndefRowsFuncLogicNode* pSrc, SIndefRowsFuncLogicNode* pDst) {
|
||||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||
CLONE_NODE_LIST_FIELD(pVectorFuncs);
|
||||
return (SNode*)pDst;
|
||||
}
|
||||
|
||||
static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) {
|
||||
COPY_OBJECT_FIELD(id, sizeof(SSubplanId));
|
||||
CLONE_NODE_FIELD(pNode);
|
||||
|
@ -565,6 +571,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
|
|||
return logicSortCopy((const SSortLogicNode*)pNode, (SSortLogicNode*)pDst);
|
||||
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||
return logicPartitionCopy((const SPartitionLogicNode*)pNode, (SPartitionLogicNode*)pDst);
|
||||
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
|
||||
return logicIndefRowsFuncCopy((const SIndefRowsFuncLogicNode*)pNode, (SIndefRowsFuncLogicNode*)pDst);
|
||||
case QUERY_NODE_LOGIC_SUBPLAN:
|
||||
return logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst);
|
||||
default:
|
||||
|
|
|
@ -202,6 +202,8 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "LogicSort";
|
||||
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||
return "LogicPartition";
|
||||
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
|
||||
return "LogicIndefRowsFunc";
|
||||
case QUERY_NODE_LOGIC_SUBPLAN:
|
||||
return "LogicSubplan";
|
||||
case QUERY_NODE_LOGIC_PLAN:
|
||||
|
@ -250,6 +252,8 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "PhysiStreamStateWindow";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
|
||||
return "PhysiPartition";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
|
||||
return "PhysiIndefRowsFunc";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
|
||||
return "PhysiDispatch";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
|
||||
|
@ -913,6 +917,30 @@ static int32_t jsonToLogicPartitionNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkIndefRowsFuncLogicPlanVectorFuncs = "VectorFuncs";
|
||||
|
||||
static int32_t logicIndefRowsFuncNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SIndefRowsFuncLogicNode* pNode = (const SIndefRowsFuncLogicNode*)pObj;
|
||||
|
||||
int32_t code = logicPlanNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkIndefRowsFuncLogicPlanVectorFuncs, pNode->pVectorFuncs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToLogicIndefRowsFuncNode(const SJson* pJson, void* pObj) {
|
||||
SIndefRowsFuncLogicNode* pNode = (SIndefRowsFuncLogicNode*)pObj;
|
||||
|
||||
int32_t code = jsonToLogicPlanNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkIndefRowsFuncLogicPlanVectorFuncs, &pNode->pVectorFuncs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkSubplanIdQueryId = "QueryId";
|
||||
static const char* jkSubplanIdGroupId = "GroupId";
|
||||
static const char* jkSubplanIdSubplanId = "SubplanId";
|
||||
|
@ -1995,6 +2023,37 @@ static int32_t jsonToPhysiPartitionNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkIndefRowsFuncPhysiPlanExprs = "Exprs";
|
||||
static const char* jkIndefRowsFuncPhysiPlanVectorFuncs = "VectorFuncs";
|
||||
|
||||
static int32_t physiIndefRowsFuncNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SIndefRowsFuncPhysiNode* pNode = (const SIndefRowsFuncPhysiNode*)pObj;
|
||||
|
||||
int32_t code = physicPlanNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkIndefRowsFuncPhysiPlanExprs, pNode->pExprs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkIndefRowsFuncPhysiPlanVectorFuncs, pNode->pVectorFuncs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToPhysiIndefRowsFuncNode(const SJson* pJson, void* pObj) {
|
||||
SIndefRowsFuncPhysiNode* pNode = (SIndefRowsFuncPhysiNode*)pObj;
|
||||
|
||||
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkIndefRowsFuncPhysiPlanExprs, &pNode->pExprs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkIndefRowsFuncPhysiPlanVectorFuncs, &pNode->pVectorFuncs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc";
|
||||
|
||||
static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) {
|
||||
|
@ -3800,6 +3859,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return logicSortNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||
return logicPartitionNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
|
||||
return logicIndefRowsFuncNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_LOGIC_SUBPLAN:
|
||||
return logicSubplanToJson(pObj, pJson);
|
||||
case QUERY_NODE_LOGIC_PLAN:
|
||||
|
@ -3840,6 +3901,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return physiStateWindowNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
|
||||
return physiPartitionNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
|
||||
return physiIndefRowsFuncNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
|
||||
return physiDispatchNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
|
||||
|
@ -3929,6 +3992,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToLogicSortNode(pJson, pObj);
|
||||
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||
return jsonToLogicPartitionNode(pJson, pObj);
|
||||
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
|
||||
return jsonToLogicIndefRowsFuncNode(pJson, pObj);
|
||||
case QUERY_NODE_LOGIC_SUBPLAN:
|
||||
return jsonToLogicSubplan(pJson, pObj);
|
||||
case QUERY_NODE_LOGIC_PLAN:
|
||||
|
@ -3969,6 +4034,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToPhysiStateWindowNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
|
||||
return jsonToPhysiPartitionNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
|
||||
return jsonToPhysiIndefRowsFuncNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
|
||||
return jsonToPhysiDispatchNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
|
||||
|
|
|
@ -350,6 +350,7 @@ void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker wa
|
|||
case SQL_CLAUSE_GROUP_BY:
|
||||
nodesWalkExpr(pSelect->pHaving, walker, pContext);
|
||||
case SQL_CLAUSE_HAVING:
|
||||
case SQL_CLAUSE_SELECT:
|
||||
case SQL_CLAUSE_DISTINCT:
|
||||
nodesWalkExprs(pSelect->pOrderByList, walker, pContext);
|
||||
case SQL_CLAUSE_ORDER_BY:
|
||||
|
@ -382,6 +383,7 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit
|
|||
case SQL_CLAUSE_GROUP_BY:
|
||||
nodesRewriteExpr(&(pSelect->pHaving), rewriter, pContext);
|
||||
case SQL_CLAUSE_HAVING:
|
||||
case SQL_CLAUSE_SELECT:
|
||||
case SQL_CLAUSE_DISTINCT:
|
||||
nodesRewriteExprs(pSelect->pOrderByList, rewriter, pContext);
|
||||
case SQL_CLAUSE_ORDER_BY:
|
||||
|
|
|
@ -232,6 +232,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SSortLogicNode));
|
||||
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||
return makeNode(type, sizeof(SPartitionLogicNode));
|
||||
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
|
||||
return makeNode(type, sizeof(SIndefRowsFuncLogicNode));
|
||||
case QUERY_NODE_LOGIC_SUBPLAN:
|
||||
return makeNode(type, sizeof(SLogicSubplan));
|
||||
case QUERY_NODE_LOGIC_PLAN:
|
||||
|
@ -280,6 +282,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SStreamStateWinodwPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
|
||||
return makeNode(type, sizeof(SPartitionPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
|
||||
return makeNode(type, sizeof(SIndefRowsFuncPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
|
||||
return makeNode(type, sizeof(SDataDispatcherNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
|
||||
|
|
|
@ -456,6 +456,37 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
||||
// top/bottom are both an aggregate function and a indefinite rows function
|
||||
if (!pSelect->hasIndefiniteRowsFunc || pSelect->hasAggFuncs || NULL != pSelect->pWindow) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SIndefRowsFuncLogicNode* pIdfRowsFunc =
|
||||
(SIndefRowsFuncLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC);
|
||||
if (NULL == pIdfRowsFunc) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pVectorFuncs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprForSelect(pIdfRowsFunc->pVectorFuncs, pSelect, SQL_CLAUSE_SELECT);
|
||||
}
|
||||
|
||||
// set the output
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByRewriteExps(pIdfRowsFunc->pVectorFuncs, &pIdfRowsFunc->node.pTargets);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pIdfRowsFunc;
|
||||
} else {
|
||||
nodesDestroyNode(pIdfRowsFunc);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow,
|
||||
SLogicNode** pLogicNode) {
|
||||
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs);
|
||||
|
@ -772,6 +803,9 @@ static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createSelectRootLogicNode(pCxt, pSelect, createAggLogicNode, &pRoot);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createSelectRootLogicNode(pCxt, pSelect, createIndefRowsFuncLogicNode, &pRoot);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createSelectRootLogicNode(pCxt, pSelect, createDistinctLogicNode, &pRoot);
|
||||
}
|
||||
|
|
|
@ -791,6 +791,43 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
|
||||
SIndefRowsFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
|
||||
SIndefRowsFuncPhysiNode* pIdfRowsFunc = (SIndefRowsFuncPhysiNode*)makePhysiNode(
|
||||
pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC);
|
||||
if (NULL == pIdfRowsFunc) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SNodeList* pPrecalcExprs = NULL;
|
||||
SNodeList* pVectorFuncs = NULL;
|
||||
int32_t code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pVectorFuncs, &pPrecalcExprs, &pVectorFuncs);
|
||||
|
||||
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
|
||||
// push down expression to pOutputDataBlockDesc of child node
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
|
||||
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pIdfRowsFunc->pExprs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = pushdownDataBlockSlots(pCxt, pIdfRowsFunc->pExprs, pChildTupe);
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pVectorFuncs) {
|
||||
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pVectorFuncs, &pIdfRowsFunc->pVectorFuncs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addDataBlockSlots(pCxt, pIdfRowsFunc->pVectorFuncs, pIdfRowsFunc->node.pOutputDataBlockDesc);
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pPhyNode = (SPhysiNode*)pIdfRowsFunc;
|
||||
} else {
|
||||
nodesDestroyNode(pIdfRowsFunc);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
|
||||
SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
|
||||
SProjectPhysiNode* pProject =
|
||||
|
@ -1225,6 +1262,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
|
|||
return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_FILL:
|
||||
return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
|
||||
return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_MERGE:
|
||||
return createMergePhysiNode(pCxt, (SMergeLogicNode*)pLogicNode, pPhyNode);
|
||||
default:
|
||||
|
|
|
@ -50,6 +50,8 @@ TEST_F(PlanIntervalTest, selectFunc) {
|
|||
run("SELECT MAX(c1), MIN(c1) FROM t1 INTERVAL(10s)");
|
||||
// select function along with the columns of select row, and with INTERVAL clause
|
||||
run("SELECT MAX(c1), c2 FROM t1 INTERVAL(10s)");
|
||||
|
||||
run("SELECT TOP(c1, 1) FROM t1 INTERVAL(10s) ORDER BY c1");
|
||||
}
|
||||
|
||||
TEST_F(PlanIntervalTest, stable) {
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "planTestUtil.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
class PlanProjectTest : public PlannerTestBase {};
|
||||
|
||||
TEST_F(PlanProjectTest, basic) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT CEIL(c1) FROM t1");
|
||||
}
|
||||
|
||||
TEST_F(PlanProjectTest, indefiniteRowsFunc) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT MAVG(c1, 10) FROM t1");
|
||||
|
||||
run("SELECT MAVG(CEIL(c1), 20) + 2 FROM t1");
|
||||
}
|
|
@ -81,6 +81,8 @@ int32_t getLogLevel() { return g_logLevel; }
|
|||
|
||||
class PlannerTestBaseImpl {
|
||||
public:
|
||||
PlannerTestBaseImpl() : sqlNo_(0) {}
|
||||
|
||||
void useDb(const string& acctId, const string& db) {
|
||||
caseEnv_.acctId_ = acctId;
|
||||
caseEnv_.db_ = db;
|
||||
|
@ -88,6 +90,7 @@ class PlannerTestBaseImpl {
|
|||
}
|
||||
|
||||
void run(const string& sql) {
|
||||
++sqlNo_;
|
||||
if (caseEnv_.nsql_ > 0) {
|
||||
--(caseEnv_.nsql_);
|
||||
return;
|
||||
|
@ -187,6 +190,8 @@ class PlannerTestBaseImpl {
|
|||
string acctId_;
|
||||
string db_;
|
||||
int32_t nsql_;
|
||||
|
||||
caseEnv() : nsql_(0) {}
|
||||
};
|
||||
|
||||
struct stmtEnv {
|
||||
|
@ -194,6 +199,7 @@ class PlannerTestBaseImpl {
|
|||
array<char, 1024> msgBuf_;
|
||||
SQuery* pQuery_;
|
||||
|
||||
stmtEnv() : pQuery_(nullptr) {}
|
||||
~stmtEnv() { qDestroyQuery(pQuery_); }
|
||||
};
|
||||
|
||||
|
@ -229,7 +235,7 @@ class PlannerTestBaseImpl {
|
|||
return;
|
||||
}
|
||||
|
||||
cout << "==========================================sql : [" << stmtEnv_.sql_ << "]" << endl;
|
||||
cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl;
|
||||
|
||||
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
|
||||
if (res_.prepareAst_.empty()) {
|
||||
|
@ -382,6 +388,7 @@ class PlannerTestBaseImpl {
|
|||
caseEnv caseEnv_;
|
||||
stmtEnv stmtEnv_;
|
||||
stmtRes res_;
|
||||
int32_t sqlNo_;
|
||||
};
|
||||
|
||||
PlannerTestBase::PlannerTestBase() : impl_(new PlannerTestBaseImpl()) {}
|
||||
|
|
|
@ -240,7 +240,7 @@ class TDTestCase:
|
|||
tdSql.error("select csum(c1) t1") # no from
|
||||
tdSql.error("select csum( c1 ) from ") # no table_expr
|
||||
# tdSql.error(self.csum_query_form(col="st1")) # tag col
|
||||
tdSql.error(self.csum_query_form(col=1)) # col is a value
|
||||
# tdSql.error(self.csum_query_form(col=1)) # col is a value
|
||||
tdSql.error(self.csum_query_form(col="'c1'")) # col is a string
|
||||
tdSql.error(self.csum_query_form(col=None)) # col is NULL 1
|
||||
tdSql.error(self.csum_query_form(col="NULL")) # col is NULL 2
|
||||
|
@ -407,6 +407,14 @@ class TDTestCase:
|
|||
tdDnodes.start(index)
|
||||
self.csum_current_query()
|
||||
self.csum_error_query()
|
||||
tdSql.query("select csum(1) from t1 ")
|
||||
tdSql.checkRows(7)
|
||||
tdSql.checkData(0,0,1)
|
||||
tdSql.checkData(1,0,2)
|
||||
tdSql.checkData(2,0,3)
|
||||
tdSql.checkData(3,0,4)
|
||||
tdSql.query("select csum(abs(c1))+2 from t1 ")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
def run(self):
|
||||
import traceback
|
||||
|
|
|
@ -141,7 +141,7 @@ class TDTestCase:
|
|||
tablenames = ["sub_table1_1","sub_table1_2","sub_table1_3","sub_table2_1","sub_table2_2","sub_table2_3","regular_table_1","regular_table_2","regular_table_3"]
|
||||
|
||||
abnormal_list = ["()","(NULL)","(*)","(abc)","( , )","(NULL,*)","( ,NULL)","(%)","(+)","(*,)","(*, /)","(ts,*)" "(ts,tbname*10)","(ts,tagname)",
|
||||
"(ts,2d+3m-2s,NULL)","(ts+1d,10s)","(ts+10d,NULL)" ,"(ts,now -1m%1d)","(ts+10d)","(ts+10d,_c0)","(ts+10d,)","(ts,%)","(ts, , m)","(ts,abc)","(ts,/)","(ts,*)","(ts,1s,100)",
|
||||
"(ts,2d+3m-2s,NULL)","(ts+10d,NULL)" ,"(ts,now -1m%1d)","(ts+10d,_c0)","(ts+10d,)","(ts,%)","(ts, , m)","(ts,abc)","(ts,/)","(ts,*)","(ts,1s,100)",
|
||||
"(ts,1s,abc)","(ts,1s,_c0)","(ts,1s,*)","(ts,1s,NULL)","(ts,,_c0)","(ts,tbname,ts)","(ts,0,tbname)","('2021-11-18 00:00:10')","('2021-11-18 00:00:10', 1s)",
|
||||
"('2021-11-18T00:00:10+0800', '1s')","('2021-11-18T00:00:10Z', '1s')","('2021-11-18T00:00:10+0800', 10000000d,)","('ts', ,2021-11-18T00:00:10+0800, )"]
|
||||
|
||||
|
|
|
@ -85,8 +85,8 @@ class TDTestCase:
|
|||
"select stateduration(c1 ,'GT','*',1s) from t1",
|
||||
"select stateduration(c1 ,'GT',ts,1s) from t1",
|
||||
"select stateduration(c1 ,'GT',max(c1),1s) from t1",
|
||||
"select stateduration(abs(c1) ,'GT',1,1s) from t1",
|
||||
"select stateduration(c1+2 ,'GT',1,1s) from t1",
|
||||
# "select stateduration(abs(c1) ,'GT',1,1s) from t1",
|
||||
# "select stateduration(c1+2 ,'GT',1,1s) from t1",
|
||||
"select stateduration(c1 ,'GT',1,1u) from t1",
|
||||
"select stateduration(c1 ,'GT',1,now) from t1",
|
||||
"select stateduration(c1 ,'GT','1',1s) from t1",
|
||||
|
@ -323,6 +323,11 @@ class TDTestCase:
|
|||
tdSql.checkData(0, 0, None)
|
||||
tdSql.checkData(1, 0, 0.000000000)
|
||||
tdSql.checkData(3, 0, -86404.000000000)
|
||||
|
||||
tdSql.query("select stateduration(abs(c1) ,'GT',1,1s) from t1")
|
||||
tdSql.checkRows(12)
|
||||
tdSql.query("select stateduration(c1+2 ,'GT',1,1s) from t1")
|
||||
tdSql.checkRows(12)
|
||||
|
||||
|
||||
# bug for stable
|
||||
|
|
|
@ -223,7 +223,7 @@ class TDTestCase:
|
|||
tdLog.printNoPrefix("===step 0: err case, must return err")
|
||||
tdSql.error( "select hyperloglog() from ct1" )
|
||||
tdSql.error( "select hyperloglog(c1, c2) from ct2" )
|
||||
tdSql.error( "select hyperloglog(1) from ct2" )
|
||||
# tdSql.error( "select hyperloglog(1) from ct2" )
|
||||
tdSql.error( f"select hyperloglog({NUM_COL[0]}, {NUM_COL[1]}) from ct4" )
|
||||
tdSql.error( ''' select hyperloglog(['c1 + c1', 'c1 + c2', 'c1 + c3', 'c1 + c4', 'c1 + c5', 'c1 + c6', 'c1 + c7', 'c1 + c8', 'c1 + c9', 'c1 + c10'])
|
||||
from ct1
|
||||
|
|
|
@ -417,8 +417,8 @@ class TDTestCase:
|
|||
|
||||
# err9 = {"col": "st1"}
|
||||
# self.checkmavg(**err9) # col: tag
|
||||
err10 = {"col": 1}
|
||||
self.checkmavg(**err10) # col: value
|
||||
# err10 = {"col": 1}
|
||||
# self.checkmavg(**err10) # col: value
|
||||
err11 = {"col": "NULL"}
|
||||
self.checkmavg(**err11) # col: NULL
|
||||
err12 = {"col": "%_"}
|
||||
|
@ -660,6 +660,14 @@ class TDTestCase:
|
|||
tdDnodes.start(index)
|
||||
self.mavg_current_query()
|
||||
self.mavg_error_query()
|
||||
tdSql.query("select mavg(1,1) from t1")
|
||||
tdSql.checkRows(7)
|
||||
tdSql.checkData(0,0,1.000000000)
|
||||
tdSql.checkData(1,0,1.000000000)
|
||||
tdSql.checkData(5,0,1.000000000)
|
||||
|
||||
tdSql.query("select mavg(abs(c1),1) from t1")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
def run(self):
|
||||
import traceback
|
||||
|
|
|
@ -427,10 +427,10 @@ class TDTestCase:
|
|||
# err9 = {"col": "st1"}
|
||||
# self.checksample(**err9) # col: tag
|
||||
tdSql.query(" select sample(st1 ,1) from t1 ")
|
||||
err10 = {"col": 1}
|
||||
self.checksample(**err10) # col: value
|
||||
err11 = {"col": "NULL"}
|
||||
self.checksample(**err11) # col: NULL
|
||||
# err10 = {"col": 1}
|
||||
# self.checksample(**err10) # col: value
|
||||
# err11 = {"col": "NULL"}
|
||||
# self.checksample(**err11) # col: NULL
|
||||
err12 = {"col": "%_"}
|
||||
self.checksample(**err12) # col: %_
|
||||
err13 = {"col": "c3"}
|
||||
|
@ -445,12 +445,12 @@ class TDTestCase:
|
|||
self.checksample(**err17) # nchar col
|
||||
err18 = {"col": "c6"}
|
||||
self.checksample(**err18) # bool col
|
||||
err19 = {"col": "'c1'"}
|
||||
self.checksample(**err19) # col: string
|
||||
# err19 = {"col": "'c1'"}
|
||||
# self.checksample(**err19) # col: string
|
||||
err20 = {"col": None}
|
||||
self.checksample(**err20) # col: None
|
||||
err21 = {"col": "''"}
|
||||
self.checksample(**err21) # col: ''
|
||||
# err21 = {"col": "''"}
|
||||
# self.checksample(**err21) # col: ''
|
||||
err22 = {"col": "tt1.c1"}
|
||||
self.checksample(**err22) # not table_expr col
|
||||
err23 = {"col": "t1"}
|
||||
|
@ -459,10 +459,10 @@ class TDTestCase:
|
|||
self.checksample(**err24) # stbname
|
||||
err25 = {"col": "db"}
|
||||
self.checksample(**err25) # datbasename
|
||||
err26 = {"col": "True"}
|
||||
self.checksample(**err26) # col: BOOL 1
|
||||
err27 = {"col": True}
|
||||
self.checksample(**err27) # col: BOOL 2
|
||||
# err26 = {"col": "True"}
|
||||
# self.checksample(**err26) # col: BOOL 1
|
||||
# err27 = {"col": True}
|
||||
# self.checksample(**err27) # col: BOOL 2
|
||||
err28 = {"col": "*"}
|
||||
self.checksample(**err28) # col: all col
|
||||
err29 = {"func": "sample[", "r_comm": "]"}
|
||||
|
@ -678,7 +678,7 @@ class TDTestCase:
|
|||
tdSql.error(" select sample(c1,tbname) from t1 ")
|
||||
tdSql.error(" select sample(c1,ts) from t1 ")
|
||||
tdSql.error(" select sample(c1,false) from t1 ")
|
||||
tdSql.error(" select sample(123,1) from t1 ")
|
||||
tdSql.query(" select sample(123,1) from t1 ")
|
||||
|
||||
tdSql.query(" select sample(c1,2) from t1 ")
|
||||
tdSql.checkRows(2)
|
||||
|
|
|
@ -85,8 +85,8 @@ class TDTestCase:
|
|||
"select statecount(c1 ,'GT','*') from t1",
|
||||
"select statecount(c1 ,'GT',ts) from t1",
|
||||
"select statecount(c1 ,'GT',max(c1)) from t1",
|
||||
"select statecount(abs(c1) ,'GT',1) from t1",
|
||||
"select statecount(c1+2 ,'GT',1) from t1",
|
||||
# "select statecount(abs(c1) ,'GT',1) from t1",
|
||||
# "select statecount(c1+2 ,'GT',1) from t1",
|
||||
"select statecount(c1 ,'GT',1,1u) from t1",
|
||||
"select statecount(c1 ,'GT',1,now) from t1",
|
||||
"select statecount(c1 ,'GT','1') from t1",
|
||||
|
|
Loading…
Reference in New Issue