From 8876b2602d0a24b2bfe3ca12fe4ca9b4d291c7bd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 27 Jul 2024 11:02:35 +0800 Subject: [PATCH] fix(stream): check return value. --- source/libs/executor/src/executorInt.c | 6 +++++- source/libs/executor/src/projectoperator.c | 6 +++++- source/libs/executor/src/sysscanoperator.c | 10 ++++++---- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index db0132c12a..d56b288129 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1244,7 +1244,11 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera } *pResBlock = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]); - return code; + if (*pResBlock == NULL && terrno != 0) { + return terrno; + } else { + return code; + } } bool compareVal(const char* v, const SStateKeys* pKey) { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index e2dd842973..cbc3d77faf 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -430,7 +430,11 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SSDataBlock* doProjectOperation1(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - pOperator->pTaskInfo->code = doProjectOperation(pOperator, &pRes); + int32_t code = doProjectOperation(pOperator, &pRes); + if (code && pOperator->pTaskInfo->code == 0) { + pOperator->pTaskInfo->code = code; + } + return pRes; } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index c6bd381769..dc3c8f8070 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1874,7 +1874,6 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) { static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { // build message and send to mnode to fetch the content of system tables. - int32_t code = TSDB_CODE_SUCCESS; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSysTableScanInfo* pInfo = pOperator->info; char dbName[TSDB_DB_NAME_LEN] = {0}; @@ -1883,7 +1882,7 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) if (isTaskKilled(pOperator->pTaskInfo)) { setOperatorCompleted(pOperator); (*ppRes) = NULL; - return code; + return pTaskInfo->code; } blockDataCleanup(pInfo->pRes); @@ -1926,10 +1925,10 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) continue; } (*ppRes) = pBlock; - return code; + return pTaskInfo->code; } else { (*ppRes) = NULL; - return code; + return pTaskInfo->code; } } } @@ -1937,6 +1936,9 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; int32_t code = doSysTableScanNext(pOperator, &pRes); + if (code) { + terrno = code; + } return pRes; }