Merge pull request #28839 from taosdata/enh/TD-32179/operatorInputCheck
enh: operator input check
This commit is contained in:
commit
b1433deebe
|
@ -748,7 +748,7 @@ static int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx
|
||||||
}
|
}
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
|
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
|
||||||
SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode;
|
SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode;
|
||||||
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, (pAggNode->pGroupKeys ? "GroupAggragate" : "Aggragate"));
|
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, (pAggNode->pGroupKeys ? "GroupAggregate" : "Aggregate"));
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||||
if (pResNode->pExecInfo) {
|
if (pResNode->pExecInfo) {
|
||||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "executorInt.h"
|
||||||
typedef struct SDynQueryCtrlExecInfo {
|
typedef struct SDynQueryCtrlExecInfo {
|
||||||
int64_t prevBlkNum;
|
int64_t prevBlkNum;
|
||||||
int64_t prevBlkRows;
|
int64_t prevBlkRows;
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
#include "executorInt.h"
|
||||||
|
|
||||||
#define GROUP_CACHE_DEFAULT_MAX_FILE_SIZE 104857600
|
#define GROUP_CACHE_DEFAULT_MAX_FILE_SIZE 104857600
|
||||||
#define GROUP_CACHE_MAX_FILE_FDS 10
|
#define GROUP_CACHE_MAX_FILE_FDS 10
|
||||||
|
|
|
@ -19,6 +19,9 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "executorInt.h"
|
||||||
|
#include "operator.h"
|
||||||
|
|
||||||
#define HASH_JOIN_DEFAULT_PAGE_SIZE 10485760
|
#define HASH_JOIN_DEFAULT_PAGE_SIZE 10485760
|
||||||
#define HJOIN_DEFAULT_BLK_ROWS_NUM 4096
|
#define HJOIN_DEFAULT_BLK_ROWS_NUM 4096
|
||||||
#define HJOIN_BLK_SIZE_LIMIT 10485760
|
#define HJOIN_BLK_SIZE_LIMIT 10485760
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "executorInt.h"
|
||||||
typedef struct SOperatorCostInfo {
|
typedef struct SOperatorCostInfo {
|
||||||
double openCost;
|
double openCost;
|
||||||
double totalCost;
|
double totalCost;
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "executorInt.h"
|
||||||
|
|
||||||
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
|
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
|
|
@ -184,6 +184,10 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||||
|
|
||||||
|
if(!pAggInfo) {
|
||||||
|
qError("function:%s, pAggInfo is NULL", __func__);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (pOperator->blocking && pAggInfo->hasValidBlock) {
|
if (pOperator->blocking && pAggInfo->hasValidBlock) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -333,6 +337,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
|
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if (!pOperator || (pOperator->exprSupp.numOfExprs > 0 && pCtx == NULL)) {
|
||||||
|
qError("%s failed at line %d since pCtx is NULL.", __func__, __LINE__);
|
||||||
|
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
|
for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
|
||||||
if (functionNeedToExecute(&pCtx[k])) {
|
if (functionNeedToExecute(&pCtx[k])) {
|
||||||
// todo add a dummy function to avoid process check
|
// todo add a dummy function to avoid process check
|
||||||
|
|
|
@ -182,9 +182,17 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SFillOperatorInfo* pInfo = pOperator->info;
|
SFillOperatorInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
if (pInfo == NULL || pTaskInfo == NULL) {
|
||||||
|
qError("%s failed at line %d since pInfo or pTaskInfo is NULL.", __func__, __LINE__);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||||
|
if (pResBlock == NULL) {
|
||||||
|
qError("%s failed at line %d since pResBlock is NULL.", __func__, __LINE__);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
blockDataCleanup(pResBlock);
|
blockDataCleanup(pResBlock);
|
||||||
|
|
||||||
|
|
|
@ -904,9 +904,9 @@ static int32_t hJoinAddBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo*
|
||||||
|
|
||||||
static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator, bool* queryDone) {
|
static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator, bool* queryDone) {
|
||||||
SHJoinOperatorInfo* pJoin = pOperator->info;
|
SHJoinOperatorInfo* pJoin = pOperator->info;
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = NULL;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx);
|
pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx);
|
||||||
if (NULL == pBlock) {
|
if (NULL == pBlock) {
|
||||||
|
|
|
@ -564,7 +564,6 @@ SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
QRY_PARAM_CHECK(pResBlock);
|
QRY_PARAM_CHECK(pResBlock);
|
||||||
|
|
||||||
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
||||||
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
@ -1178,5 +1177,8 @@ _exit:
|
||||||
if(processByRowFunctionCtx) {
|
if(processByRowFunctionCtx) {
|
||||||
taosArrayDestroy(processByRowFunctionCtx);
|
taosArrayDestroy(processByRowFunctionCtx);
|
||||||
}
|
}
|
||||||
|
if(code) {
|
||||||
|
qError("project apply functions failed at: %s:%d", __func__, lino);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -412,6 +412,13 @@ int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNo
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void resetOutputChangedFunc(SFunctionNode *pFunc, const SFunctionNode* pSrcFunc) {
|
||||||
|
if (funcMgtBuiltins[pFunc->funcId].type == FUNCTION_TYPE_LAST_MERGE) {
|
||||||
|
pFunc->node.resType = pSrcFunc->node.resType;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList, SFunctionNode** ppFunc) {
|
int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList, SFunctionNode** ppFunc) {
|
||||||
int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)ppFunc);
|
int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)ppFunc);
|
||||||
if (NULL == *ppFunc) {
|
if (NULL == *ppFunc) {
|
||||||
|
@ -430,6 +437,7 @@ int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFu
|
||||||
*ppFunc = NULL;
|
*ppFunc = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
resetOutputChangedFunc(*ppFunc, pSrcFunc);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4259,7 +4259,7 @@ typedef struct SLastRowScanOptSetColDataTypeCxt {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
} SLastRowScanOptSetColDataTypeCxt;
|
} SLastRowScanOptSetColDataTypeCxt;
|
||||||
|
|
||||||
static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
|
static EDealRes lastRowScanOptGetColAndSetDataType(SNode* pNode, void* pContext, bool setType) {
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
SLastRowScanOptSetColDataTypeCxt* pCxt = pContext;
|
SLastRowScanOptSetColDataTypeCxt* pCxt = pContext;
|
||||||
if (pCxt->doAgg) {
|
if (pCxt->doAgg) {
|
||||||
|
@ -4267,12 +4267,12 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
|
||||||
if (TSDB_CODE_SUCCESS != pCxt->code) {
|
if (TSDB_CODE_SUCCESS != pCxt->code) {
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes);
|
if (setType) getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes);
|
||||||
} else {
|
} else {
|
||||||
SNode* pCol = NULL;
|
SNode* pCol = NULL;
|
||||||
FOREACH(pCol, pCxt->pLastCols) {
|
FOREACH(pCol, pCxt->pLastCols) {
|
||||||
if (nodesEqualNode(pCol, pNode)) {
|
if (nodesEqualNode(pCol, pNode)) {
|
||||||
getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes);
|
if (setType) getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4282,6 +4282,14 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static EDealRes lastRowScanOptGetLastCols(SNode* pNode, void* pContext) {
|
||||||
|
return lastRowScanOptGetColAndSetDataType(pNode, pContext, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
|
||||||
|
return lastRowScanOptGetColAndSetDataType(pNode, pContext, true);
|
||||||
|
}
|
||||||
|
|
||||||
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, SNodeList* pLastRowCols, bool erase, int32_t pkBytes) {
|
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, SNodeList* pLastRowCols, bool erase, int32_t pkBytes) {
|
||||||
SNode* pTarget = NULL;
|
SNode* pTarget = NULL;
|
||||||
WHERE_EACH(pTarget, pTargets) {
|
WHERE_EACH(pTarget, pTargets) {
|
||||||
|
@ -4394,7 +4402,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
||||||
SNode* pParamNode = NULL;
|
SNode* pParamNode = NULL;
|
||||||
if (FUNCTION_TYPE_LAST == funcType) {
|
if (FUNCTION_TYPE_LAST == funcType) {
|
||||||
(void)nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1));
|
(void)nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1));
|
||||||
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt);
|
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptGetLastCols, &cxt);
|
||||||
if (TSDB_CODE_SUCCESS != cxt.code) break;
|
if (TSDB_CODE_SUCCESS != cxt.code) break;
|
||||||
}
|
}
|
||||||
FOREACH(pParamNode, pFunc->pParameterList) {
|
FOREACH(pParamNode, pFunc->pParameterList) {
|
||||||
|
|
|
@ -1294,7 +1294,7 @@ void DestroyRegexCache(){
|
||||||
uInfo("[regex cache] destory regex cache");
|
uInfo("[regex cache] destory regex cache");
|
||||||
bool ret = taosTmrStopA(&sRegexCache.timer);
|
bool ret = taosTmrStopA(&sRegexCache.timer);
|
||||||
if (!ret) {
|
if (!ret) {
|
||||||
uError("failed to stop regex cache timer");
|
uInfo("stop regex cache timer may be failed");
|
||||||
}
|
}
|
||||||
taosWLockLatch(&sRegexCache.mutex);
|
taosWLockLatch(&sRegexCache.mutex);
|
||||||
sRegexCache.exit = true;
|
sRegexCache.exit = true;
|
||||||
|
|
|
@ -108,6 +108,8 @@ if $data30 != 12 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print =============== show vnodes on dnode 1
|
print =============== show vnodes on dnode 1
|
||||||
|
print =============== Wait for the synchronization status of vnode and Mnode, heartbeat for one second
|
||||||
|
sleep 1000
|
||||||
sql show vnodes on dnode 1
|
sql show vnodes on dnode 1
|
||||||
if $rows != 9 then
|
if $rows != 9 then
|
||||||
return -1
|
return -1
|
||||||
|
|
|
@ -76,11 +76,11 @@ if $data00 != @-> Data Exchange 2:1 (width=296)@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
sql explain select count(*), last_row(f1), min(f1),t1 from sta partition by t1;
|
sql explain select count(*), last_row(f1), min(f1),t1 from sta partition by t1;
|
||||||
if $data00 != @-> Aggragate (functions=4 width=28 input_order=desc )@ then
|
if $data00 != @-> Aggregate (functions=4 width=28 input_order=desc )@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
sql explain select count(*), last_row(f1), min(f1),t1 from sta group by t1;
|
sql explain select count(*), last_row(f1), min(f1),t1 from sta group by t1;
|
||||||
if $data00 != @-> Aggragate (functions=4 width=28 input_order=desc )@ then
|
if $data00 != @-> Aggregate (functions=4 width=28 input_order=desc )@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
sql explain select distinct count(*), last_row(f1), min(f1) from sta;
|
sql explain select distinct count(*), last_row(f1), min(f1) from sta;
|
||||||
|
|
Loading…
Reference in New Issue