From d8fb5914353dd007f7a9c85443e4b3af9f1a1422 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 6 Mar 2023 09:46:12 +0800 Subject: [PATCH] fix:incorrectly judged that the stream was killed --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executor.c | 14 +++++++++++++- source/libs/executor/src/scanoperator.c | 2 +- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index be79054c1b..0b55eb4a45 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -881,6 +881,7 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs); void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo); void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset); +void doClearBufferedBlocks(SStreamScanInfo* pInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c2bf001c86..d8e7c25bbb 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -93,6 +93,17 @@ static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) { return 0; } +static void clearStreamBlock(SOperatorInfo* pOperator) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + if (pOperator->numOfDownstream == 1) { + return clearStreamBlock(pOperator->pDownstream[0]); + } + } else { + SStreamScanInfo* pInfo = pOperator->info; + doClearBufferedBlocks(pInfo); + } +} + static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -607,7 +618,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { pTaskInfo->cost.start = taosGetTimestampUs(); } - if (isTaskKilled(pTaskInfo)) { + if (isTaskKilled(pTaskInfo) && pTaskInfo->code != TSDB_CODE_QRY_IN_EXEC) { + clearStreamBlock(pTaskInfo->pRoot); atomic_store_64(&pTaskInfo->owner, 0); qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7961d5518a..38822a4565 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -945,7 +945,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* return pOperator; } -static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) { +FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) { taosArrayClear(pInfo->pBlockLists); pInfo->validBlockIndex = 0; }