From 7137bf9d3c1987d2f094b103692d495a46b1c0e4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 21 Mar 2022 18:31:31 +0800 Subject: [PATCH] [td-13039] fix bug. --- source/libs/executor/inc/executorimpl.h | 9 +++--- source/libs/executor/src/executorimpl.c | 42 ++++++++++++------------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index dc3313e23e..e995365b69 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -307,11 +307,10 @@ typedef struct STaskRuntimeEnv { } STaskRuntimeEnv; enum { - OP_IN_EXECUTING = 1, - OP_RES_TO_RETURN = 2, - OP_EXEC_DONE = 3, - OP_OPENED = 4, - OP_NOT_OPENED = 5 + OP_NOT_OPENED = 0x0, + OP_OPENED = 0x1, + OP_RES_TO_RETURN = 0x5, + OP_EXEC_DONE = 0x9, }; typedef struct SOperatorInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index aa6681bed5..cb673bda7f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5429,10 +5429,10 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->getNextFn = doTableScan; + pOperator->getNextFn = doTableScan; pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -5453,7 +5453,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->pRuntimeEnv = pRuntimeEnv; @@ -5478,7 +5478,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt pOperator->name = "TableBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; // pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->getNextFn = doBlockInfoScan; @@ -5511,7 +5511,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; pOperator->getNextFn = doStreamBlockScan; @@ -5758,10 +5758,10 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB pOperator->name = "SysTableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->getNextFn = doSysTableScan; + pOperator->getNextFn = doSysTableScan; pOperator->closeFn = destroySysTableScannerOperatorInfo; pOperator->pTaskInfo = pTaskInfo; @@ -6242,7 +6242,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->name = "SortedMerge"; // pOperator->operatorType = OP_SortedMerge; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = num; pOperator->pExpr = pExprInfo; @@ -6337,7 +6337,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->name = "Order"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -7334,7 +7334,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; @@ -7476,7 +7476,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI pOperator->name = "MultiTableAggregate"; // pOperator->operatorType = OP_MultiTableAggregate; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; @@ -7514,7 +7514,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = num; @@ -7661,7 +7661,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S pOperator->name = "AllTimeIntervalAggOperator"; // pOperator->operatorType = OP_AllTimeWindow; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; @@ -7685,7 +7685,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pOperator->name = "StateWindowOperator"; // pOperator->operatorType = OP_StateWindow; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; @@ -7750,7 +7750,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim pOperator->name = "MultiTableTimeIntervalOperator"; // pOperator->operatorType = OP_MultiTableTimeInterval; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; @@ -7774,7 +7774,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun pOperator->name = "AllMultiTableTimeIntervalOperator"; // pOperator->operatorType = OP_AllMultiTableTimeInterval; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; @@ -7806,7 +7806,7 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -7845,7 +7845,7 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf pOperator->name = "FillOperator"; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Fill; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -7894,7 +7894,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->name = "SLimitOperator"; // pOperator->operatorType = OP_SLimit; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->exec = doSLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; @@ -8050,7 +8050,7 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo pOperator->name = "SeqTableTagScan"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->getNextFn = doTagScan; pOperator->pExpr = pExpr; @@ -8187,7 +8187,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "DistinctOperator"; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Distinct; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput;