From f3acf49e229c8015b8be50363a276f4adbe4f3ca Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 19 Nov 2024 16:23:46 +0800 Subject: [PATCH 1/7] enh: include --- source/libs/command/src/explain.c | 2 +- source/libs/executor/inc/dynqueryctrl.h | 1 + source/libs/executor/inc/groupcache.h | 1 + source/libs/executor/inc/hashjoin.h | 3 +++ source/libs/executor/inc/operator.h | 1 + source/libs/executor/inc/querytask.h | 2 ++ source/util/src/tcompare.c | 2 +- 7 files changed, 10 insertions(+), 2 deletions(-) diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 24b43ac95b..13b75f8233 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -748,7 +748,7 @@ static int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, (pAggNode->pGroupKeys ? "GroupAggragate" : "Aggragate")); + EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, (pAggNode->pGroupKeys ? "GroupAggregate" : "Aggregate")); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); diff --git a/source/libs/executor/inc/dynqueryctrl.h b/source/libs/executor/inc/dynqueryctrl.h index 793fbc0e61..3df0f6644c 100755 --- a/source/libs/executor/inc/dynqueryctrl.h +++ b/source/libs/executor/inc/dynqueryctrl.h @@ -19,6 +19,7 @@ extern "C" { #endif +#include "executorInt.h" typedef struct SDynQueryCtrlExecInfo { int64_t prevBlkNum; int64_t prevBlkRows; diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 0244823e29..f50947ead7 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -18,6 +18,7 @@ #ifdef __cplusplus extern "C" { #endif +#include "executorInt.h" #define GROUP_CACHE_DEFAULT_MAX_FILE_SIZE 104857600 #define GROUP_CACHE_MAX_FILE_FDS 10 diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index 1085f2236c..542763ffd3 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -19,6 +19,9 @@ extern "C" { #endif +#include "executorInt.h" +#include "operator.h" + #define HASH_JOIN_DEFAULT_PAGE_SIZE 10485760 #define HJOIN_DEFAULT_BLK_ROWS_NUM 4096 #define HJOIN_BLK_SIZE_LIMIT 10485760 diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 91aef93452..f2e542e7cd 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -20,6 +20,7 @@ extern "C" { #endif +#include "executorInt.h" typedef struct SOperatorCostInfo { double openCost; double totalCost; diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index e3bb9a1361..f726e4300f 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include "executorInt.h" + #define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) enum { diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index b1f4ed0ed3..ebc379897f 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1294,7 +1294,7 @@ void DestroyRegexCache(){ uInfo("[regex cache] destory regex cache"); bool ret = taosTmrStopA(&sRegexCache.timer); if (!ret) { - uError("failed to stop regex cache timer"); + uInfo("stop regex cache timer may be failed"); } taosWLockLatch(&sRegexCache.mutex); sRegexCache.exit = true; From 04af5f4b944e9580b0bdfa73fdd7800d90682859 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 19 Nov 2024 19:38:40 +0800 Subject: [PATCH 2/7] enh: check param --- source/libs/executor/inc/operator.h | 8 ++++++++ source/libs/executor/src/aggregateoperator.c | 8 ++++++++ source/libs/executor/src/anomalywindowoperator.c | 5 +++++ source/libs/executor/src/executorInt.c | 1 + 4 files changed, 22 insertions(+) diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index f2e542e7cd..5ceedbe542 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -202,6 +202,14 @@ void * getOperatorParam(int32_t opType, SOperatorParam* param, int32_t i void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId); void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId); +#define CHECK_CONDITION_FAILED(c) \ + do { \ + if (!(c)) { \ + qError("function:%s condition failed, Line:%d", __FUNCTION__, __LINE__); \ + return TSDB_CODE_APP_ERROR; \ + } \ + } while (0) + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 829ca6da50..b71ed5ee26 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -184,6 +184,10 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; + if(!pAggInfo) { + qError("function:%s, pAggInfo is NULL", __func__); + return false; + } if (pOperator->blocking && pAggInfo->hasValidBlock) { return false; } @@ -333,6 +337,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { int32_t code = TSDB_CODE_SUCCESS; + if (pOperator || (pOperator->exprSupp.numOfExprs > 0 && pCtx == NULL)) { + qError("%s failed at line %d since pCtx is NULL.", __func__, __LINE__); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { // todo add a dummy function to avoid process check diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 94cc5d9129..b678030a1c 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -171,6 +171,10 @@ _error: } static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + CHECK_CONDITION_FAILED(pOperator != NULL); + CHECK_CONDITION_FAILED(ppRes != NULL); + CHECK_CONDITION_FAILED(pOperator->info != NULL); + CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SAnomalyWindowOperatorInfo* pInfo = pOperator->info; @@ -181,6 +185,7 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe int64_t st = taosGetTimestampUs(); int32_t numOfBlocks = taosArrayGetSize(pSupp->blocks); + CHECK_CONDITION_FAILED(pRes != NULL); blockDataCleanup(pRes); while (1) { diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 1b823bf69d..4a1d26d875 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -255,6 +255,7 @@ static int32_t doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SqlFunctionCtx* pCtx = pExprSup->pCtx; + CHECK_CONDITION_FAILED(pExprSup->numOfExprs <= 0 || pCtx != NULL); for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { pCtx[i].order = order; pCtx[i].input.numOfRows = pBlock->info.rows; From 22f3c2097751722555013bed84f64cc21396e3b6 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Mon, 25 Nov 2024 11:33:02 +0800 Subject: [PATCH 3/7] enh: check param --- source/libs/executor/src/aggregateoperator.c | 2 +- source/libs/executor/src/anomalywindowoperator.c | 2 -- source/libs/executor/src/countwindowoperator.c | 3 +++ source/libs/executor/src/eventwindowoperator.c | 3 +++ source/libs/executor/src/filloperator.c | 8 ++++++++ source/libs/executor/src/groupoperator.c | 7 +++++++ source/libs/executor/src/hashjoinoperator.c | 10 +++++++--- source/libs/executor/src/projectoperator.c | 5 ++++- 8 files changed, 33 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index b71ed5ee26..94bf791ef8 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -337,7 +337,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { int32_t code = TSDB_CODE_SUCCESS; - if (pOperator || (pOperator->exprSupp.numOfExprs > 0 && pCtx == NULL)) { + if (!pOperator || (pOperator->exprSupp.numOfExprs > 0 && pCtx == NULL)) { qError("%s failed at line %d since pCtx is NULL.", __func__, __LINE__); return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index b678030a1c..71a38c739d 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -171,8 +171,6 @@ _error: } static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { - CHECK_CONDITION_FAILED(pOperator != NULL); - CHECK_CONDITION_FAILED(ppRes != NULL); CHECK_CONDITION_FAILED(pOperator->info != NULL); CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 542a7c89a9..cb7459744f 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -225,6 +225,8 @@ _end: } static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + CHECK_CONDITION_FAILED(pOperator->info != NULL); + CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SCountWindowOperatorInfo* pInfo = pOperator->info; @@ -232,6 +234,7 @@ static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** SExprSupp* pExprSup = &pOperator->exprSupp; int32_t order = pInfo->binfo.inputTsOrder; SSDataBlock* pRes = pInfo->binfo.pRes; + CHECK_CONDITION_FAILED(pRes != NULL); blockDataCleanup(pRes); diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index e68a91d97d..83b202fed6 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -182,6 +182,8 @@ void destroyEWindowOperatorInfo(void* param) { } static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + CHECK_CONDITION_FAILED(pOperator->info != NULL); + CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SEventWindowOperatorInfo* pInfo = pOperator->info; @@ -191,6 +193,7 @@ static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** int32_t order = pInfo->binfo.inputTsOrder; SSDataBlock* pRes = pInfo->binfo.pRes; + CHECK_CONDITION_FAILED(pRes != NULL); blockDataCleanup(pRes); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 1595c90419..d6a518ccc4 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -182,9 +182,17 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { int32_t lino = 0; SFillOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + if (pInfo == NULL || pTaskInfo == NULL) { + qError("%s failed at line %d since pInfo or pTaskInfo is NULL.", __func__, __LINE__); + return NULL; + } SResultInfo* pResultInfo = &pOperator->resultInfo; SSDataBlock* pResBlock = pInfo->pFinalRes; + if (pResBlock == NULL) { + qError("%s failed at line %d since pResBlock is NULL.", __func__, __LINE__); + return NULL; + } blockDataCleanup(pResBlock); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index fec35c3371..c832cfbb4e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -444,6 +444,8 @@ _end: } static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + CHECK_CONDITION_FAILED(pOperator->info != NULL); + CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -1003,11 +1005,14 @@ static int32_t hashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) return TSDB_CODE_SUCCESS; } + CHECK_CONDITION_FAILED(pOperator->info != NULL); + CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SPartitionOperatorInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->binfo.pRes; + CHECK_CONDITION_FAILED(pRes != NULL); if (pOperator->status == OP_RES_TO_RETURN) { (*ppRes) = buildPartitionResult(pOperator); @@ -1459,6 +1464,8 @@ static int32_t doStreamHashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamPartitionOperatorInfo* pInfo = pOperator->info; + CHECK_CONDITION_FAILED(pInfo != NULL); + CHECK_CONDITION_FAILED(pTaskInfo != NULL); if (pOperator->status == OP_EXEC_DONE) { (*ppRes) = NULL; diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 1f43a429b3..12f90097c5 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -904,9 +904,10 @@ static int32_t hJoinAddBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator, bool* queryDone) { SHJoinOperatorInfo* pJoin = pOperator->info; - SSDataBlock* pBlock = NULL; - int32_t code = TSDB_CODE_SUCCESS; - + SSDataBlock* pBlock = NULL; + int32_t code = TSDB_CODE_SUCCESS; + CHECK_CONDITION_FAILED(pJoin != NULL); + while (true) { pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx); if (NULL == pBlock) { @@ -990,12 +991,15 @@ void hJoinSetDone(struct SOperatorInfo* pOperator) { } static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) { + CHECK_CONDITION_FAILED(pOperator->info != NULL); + CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); SHJoinOperatorInfo* pJoin = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SSDataBlock* pRes = pJoin->finBlk; int64_t st = 0; + CHECK_CONDITION_FAILED(pRes != NULL); QRY_PARAM_CHECK(pResBlock); if (pOperator->cost.openCost == 0) { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 5b9e531679..5030b8a148 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -564,7 +564,7 @@ SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) { int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_PARAM_CHECK(pResBlock); - + CHECK_CONDITION_FAILED(pOperator->info != NULL); SIndefOperatorInfo* pIndefInfo = pOperator->info; SOptrBasicInfo* pInfo = &pIndefInfo->binfo; SExprSupp* pSup = &pOperator->exprSupp; @@ -1178,5 +1178,8 @@ _exit: if(processByRowFunctionCtx) { taosArrayDestroy(processByRowFunctionCtx); } + if(code) { + qError("project apply functions failed at: %s:%d", __func__, lino); + } return code; } From c46278a863d1045bad5f222c21bf2a64e179b886 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Mon, 25 Nov 2024 13:56:14 +0800 Subject: [PATCH 4/7] fix: col type --- source/libs/function/src/functionMgt.c | 8 ++++++++ source/libs/planner/src/planOptimizer.c | 16 ++++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index a406b23c59..5dfb94ba6e 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -412,6 +412,13 @@ int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNo return code; } +static void resetOutputChangedFunc(SFunctionNode *pFunc, const SFunctionNode* pSrcFunc) { + if (funcMgtBuiltins[pFunc->funcId].type == FUNCTION_TYPE_LAST_MERGE) { + pFunc->node.resType = pSrcFunc->node.resType; + return; + } +} + int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList, SFunctionNode** ppFunc) { int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)ppFunc); if (NULL == *ppFunc) { @@ -430,6 +437,7 @@ int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFu *ppFunc = NULL; return code; } + resetOutputChangedFunc(*ppFunc, pSrcFunc); return code; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 39024731ed..0cc26dfce9 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -4258,7 +4258,7 @@ typedef struct SLastRowScanOptSetColDataTypeCxt { int32_t code; } SLastRowScanOptSetColDataTypeCxt; -static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { +static EDealRes lastRowScanOptGetColAndSetDataType(SNode* pNode, void* pContext, bool setType) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { SLastRowScanOptSetColDataTypeCxt* pCxt = pContext; if (pCxt->doAgg) { @@ -4266,12 +4266,12 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { if (TSDB_CODE_SUCCESS != pCxt->code) { return DEAL_RES_ERROR; } - getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes); + if (setType) getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes); } else { SNode* pCol = NULL; FOREACH(pCol, pCxt->pLastCols) { if (nodesEqualNode(pCol, pNode)) { - getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes); + if (setType) getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes); break; } } @@ -4281,6 +4281,14 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } +static EDealRes lastRowScanOptGetLastCols(SNode* pNode, void* pContext) { + return lastRowScanOptGetColAndSetDataType(pNode, pContext, false); +} + +static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { + return lastRowScanOptGetColAndSetDataType(pNode, pContext, true); +} + static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, SNodeList* pLastRowCols, bool erase, int32_t pkBytes) { SNode* pTarget = NULL; WHERE_EACH(pTarget, pTargets) { @@ -4393,7 +4401,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic SNode* pParamNode = NULL; if (FUNCTION_TYPE_LAST == funcType) { (void)nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1)); - nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt); + nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptGetLastCols, &cxt); if (TSDB_CODE_SUCCESS != cxt.code) break; } FOREACH(pParamNode, pFunc->pParameterList) { From 9656ec4c80483cd6d0277fe79834933118dee24d Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Mon, 25 Nov 2024 19:53:57 +0800 Subject: [PATCH 5/7] fix: misspelling --- tests/script/tsim/query/cache_last.sim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/script/tsim/query/cache_last.sim b/tests/script/tsim/query/cache_last.sim index 0a30bbd325..64abdaef77 100644 --- a/tests/script/tsim/query/cache_last.sim +++ b/tests/script/tsim/query/cache_last.sim @@ -76,11 +76,11 @@ if $data00 != @-> Data Exchange 2:1 (width=296)@ then return -1 endi sql explain select count(*), last_row(f1), min(f1),t1 from sta partition by t1; -if $data00 != @-> Aggragate (functions=4 width=28 input_order=desc )@ then +if $data00 != @-> Aggregate (functions=4 width=28 input_order=desc )@ then return -1 endi sql explain select count(*), last_row(f1), min(f1),t1 from sta group by t1; -if $data00 != @-> Aggragate (functions=4 width=28 input_order=desc )@ then +if $data00 != @-> Aggregate (functions=4 width=28 input_order=desc )@ then return -1 endi sql explain select distinct count(*), last_row(f1), min(f1) from sta; From d50ee39daba968991d3d1b17cfa7c85e281b426a Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 26 Nov 2024 10:01:09 +0800 Subject: [PATCH 6/7] fix: basic1.sim case --- tests/script/tsim/db/basic1.sim | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/script/tsim/db/basic1.sim b/tests/script/tsim/db/basic1.sim index 8eb6dce759..9a6d3001ef 100644 --- a/tests/script/tsim/db/basic1.sim +++ b/tests/script/tsim/db/basic1.sim @@ -108,6 +108,8 @@ if $data30 != 12 then endi print =============== show vnodes on dnode 1 +print =============== Wait for the synchronization status of vnode and Mnode, heartbeat for one second +sleep 1000 sql show vnodes on dnode 1 if $rows != 9 then return -1 From a2f8a822dabd49052654ecfb925685543faef928 Mon Sep 17 00:00:00 2001 From: facetosea <25808407@qq.com> Date: Wed, 27 Nov 2024 19:51:27 +0800 Subject: [PATCH 7/7] remove some check --- source/libs/executor/inc/operator.h | 8 -------- source/libs/executor/src/anomalywindowoperator.c | 3 --- source/libs/executor/src/countwindowoperator.c | 3 --- source/libs/executor/src/eventwindowoperator.c | 3 --- source/libs/executor/src/executorInt.c | 1 - source/libs/executor/src/groupoperator.c | 7 ------- source/libs/executor/src/hashjoinoperator.c | 4 ---- source/libs/executor/src/projectoperator.c | 1 - 8 files changed, 30 deletions(-) diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 5ceedbe542..f2e542e7cd 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -202,14 +202,6 @@ void * getOperatorParam(int32_t opType, SOperatorParam* param, int32_t i void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId); void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId); -#define CHECK_CONDITION_FAILED(c) \ - do { \ - if (!(c)) { \ - qError("function:%s condition failed, Line:%d", __FUNCTION__, __LINE__); \ - return TSDB_CODE_APP_ERROR; \ - } \ - } while (0) - #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 71a38c739d..94cc5d9129 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -171,8 +171,6 @@ _error: } static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { - CHECK_CONDITION_FAILED(pOperator->info != NULL); - CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SAnomalyWindowOperatorInfo* pInfo = pOperator->info; @@ -183,7 +181,6 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe int64_t st = taosGetTimestampUs(); int32_t numOfBlocks = taosArrayGetSize(pSupp->blocks); - CHECK_CONDITION_FAILED(pRes != NULL); blockDataCleanup(pRes); while (1) { diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index cb7459744f..542a7c89a9 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -225,8 +225,6 @@ _end: } static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { - CHECK_CONDITION_FAILED(pOperator->info != NULL); - CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SCountWindowOperatorInfo* pInfo = pOperator->info; @@ -234,7 +232,6 @@ static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** SExprSupp* pExprSup = &pOperator->exprSupp; int32_t order = pInfo->binfo.inputTsOrder; SSDataBlock* pRes = pInfo->binfo.pRes; - CHECK_CONDITION_FAILED(pRes != NULL); blockDataCleanup(pRes); diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 83b202fed6..e68a91d97d 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -182,8 +182,6 @@ void destroyEWindowOperatorInfo(void* param) { } static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { - CHECK_CONDITION_FAILED(pOperator->info != NULL); - CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SEventWindowOperatorInfo* pInfo = pOperator->info; @@ -193,7 +191,6 @@ static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** int32_t order = pInfo->binfo.inputTsOrder; SSDataBlock* pRes = pInfo->binfo.pRes; - CHECK_CONDITION_FAILED(pRes != NULL); blockDataCleanup(pRes); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 4a1d26d875..1b823bf69d 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -255,7 +255,6 @@ static int32_t doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SqlFunctionCtx* pCtx = pExprSup->pCtx; - CHECK_CONDITION_FAILED(pExprSup->numOfExprs <= 0 || pCtx != NULL); for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { pCtx[i].order = order; pCtx[i].input.numOfRows = pBlock->info.rows; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c832cfbb4e..fec35c3371 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -444,8 +444,6 @@ _end: } static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { - CHECK_CONDITION_FAILED(pOperator->info != NULL); - CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -1005,14 +1003,11 @@ static int32_t hashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) return TSDB_CODE_SUCCESS; } - CHECK_CONDITION_FAILED(pOperator->info != NULL); - CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SPartitionOperatorInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->binfo.pRes; - CHECK_CONDITION_FAILED(pRes != NULL); if (pOperator->status == OP_RES_TO_RETURN) { (*ppRes) = buildPartitionResult(pOperator); @@ -1464,8 +1459,6 @@ static int32_t doStreamHashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamPartitionOperatorInfo* pInfo = pOperator->info; - CHECK_CONDITION_FAILED(pInfo != NULL); - CHECK_CONDITION_FAILED(pTaskInfo != NULL); if (pOperator->status == OP_EXEC_DONE) { (*ppRes) = NULL; diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 12f90097c5..06498d73e7 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -906,7 +906,6 @@ static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator, bool* queryDone) SHJoinOperatorInfo* pJoin = pOperator->info; SSDataBlock* pBlock = NULL; int32_t code = TSDB_CODE_SUCCESS; - CHECK_CONDITION_FAILED(pJoin != NULL); while (true) { pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx); @@ -991,15 +990,12 @@ void hJoinSetDone(struct SOperatorInfo* pOperator) { } static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) { - CHECK_CONDITION_FAILED(pOperator->info != NULL); - CHECK_CONDITION_FAILED(pOperator->pTaskInfo != NULL); SHJoinOperatorInfo* pJoin = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SSDataBlock* pRes = pJoin->finBlk; int64_t st = 0; - CHECK_CONDITION_FAILED(pRes != NULL); QRY_PARAM_CHECK(pResBlock); if (pOperator->cost.openCost == 0) { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 5030b8a148..226cde059b 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -564,7 +564,6 @@ SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) { int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_PARAM_CHECK(pResBlock); - CHECK_CONDITION_FAILED(pOperator->info != NULL); SIndefOperatorInfo* pIndefInfo = pOperator->info; SOptrBasicInfo* pInfo = &pIndefInfo->binfo; SExprSupp* pSup = &pOperator->exprSupp;