Merge pull request #20272 from taosdata/fix/TD-22927
fix:incorrectly judged that the stream was killed
This commit is contained in:
commit
aeb46416bb
|
@ -881,6 +881,7 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
|
|||
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
|
||||
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
|
||||
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
|
||||
void doClearBufferedBlocks(SStreamScanInfo* pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -93,6 +93,17 @@ static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void clearStreamBlock(SOperatorInfo* pOperator) {
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if (pOperator->numOfDownstream == 1) {
|
||||
return clearStreamBlock(pOperator->pDownstream[0]);
|
||||
}
|
||||
} else {
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
doClearBufferedBlocks(pInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if (pOperator->numOfDownstream == 0) {
|
||||
|
@ -607,7 +618,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
|||
pTaskInfo->cost.start = taosGetTimestampUs();
|
||||
}
|
||||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
if (isTaskKilled(pTaskInfo) && pTaskInfo->code != TSDB_CODE_QRY_IN_EXEC) {
|
||||
clearStreamBlock(pTaskInfo->pRoot);
|
||||
atomic_store_64(&pTaskInfo->owner, 0);
|
||||
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -945,7 +945,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
|
||||
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
|
||||
taosArrayClear(pInfo->pBlockLists);
|
||||
pInfo->validBlockIndex = 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue