fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-07-27 11:02:35 +08:00
parent e3415199f5
commit 8876b2602d
3 changed files with 16 additions and 6 deletions

View File

@ -1244,7 +1244,11 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera
}
*pResBlock = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]);
return code;
if (*pResBlock == NULL && terrno != 0) {
return terrno;
} else {
return code;
}
}
bool compareVal(const char* v, const SStateKeys* pKey) {

View File

@ -430,7 +430,11 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
SSDataBlock* doProjectOperation1(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
pOperator->pTaskInfo->code = doProjectOperation(pOperator, &pRes);
int32_t code = doProjectOperation(pOperator, &pRes);
if (code && pOperator->pTaskInfo->code == 0) {
pOperator->pTaskInfo->code = code;
}
return pRes;
}

View File

@ -1874,7 +1874,6 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
// build message and send to mnode to fetch the content of system tables.
int32_t code = TSDB_CODE_SUCCESS;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
char dbName[TSDB_DB_NAME_LEN] = {0};
@ -1883,7 +1882,7 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
if (isTaskKilled(pOperator->pTaskInfo)) {
setOperatorCompleted(pOperator);
(*ppRes) = NULL;
return code;
return pTaskInfo->code;
}
blockDataCleanup(pInfo->pRes);
@ -1926,10 +1925,10 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
continue;
}
(*ppRes) = pBlock;
return code;
return pTaskInfo->code;
} else {
(*ppRes) = NULL;
return code;
return pTaskInfo->code;
}
}
}
@ -1937,6 +1936,9 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doSysTableScanNext(pOperator, &pRes);
if (code) {
terrno = code;
}
return pRes;
}