fix(query):check return code

This commit is contained in:
Haojun Liao 2024-08-06 17:32:20 +08:00
parent 59d190508d
commit e0fc36ce97
8 changed files with 45 additions and 19 deletions

View File

@ -24,10 +24,10 @@
#include "tpagedbuf.h"
#include "tsimplehash.h"
#define T_LONG_JMP(_obj, _c) \
do { \
ASSERT((_obj) != 0); \
longjmp((_obj), (_c)); \
#define T_LONG_JMP(_obj, _c) \
do { \
ASSERT((_obj)->__jmpbuf[0] != 0); \
longjmp((_obj), (_c)); \
} while (0)
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \

View File

@ -180,11 +180,11 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;
if (pOperator->blocking && pAggInfo->hasValidBlock) return false;
SExprSupp* pSup = &pOperator->exprSupp;
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (pOperator->blocking && pAggInfo->hasValidBlock) {
return false;
}
SExprSupp* pSup = &pOperator->exprSupp;
int64_t st = taosGetTimestampUs();
int32_t order = pAggInfo->binfo.inputTsOrder;
SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
@ -458,7 +458,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow->pageId == -1) {
if (pResultRow == NULL || pResultRow->pageId == -1) {
int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);

View File

@ -838,6 +838,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
atomic_store_64(&pTaskInfo->owner, 0);
memset(pTaskInfo->env->__jmpbuf, 0, tListLen(pTaskInfo->env->__jmpbuf));
return pTaskInfo->code;
}

View File

@ -158,8 +158,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (isIntervalQuery) {
if (p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
pResult = getResultRowByPos(pResultBuf, p1, true);
if (NULL == pResult) {
T_LONG_JMP(pTaskInfo->env, terrno);
if (pResult == NULL) {
pTaskInfo->code = terrno;
return NULL;
}
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
@ -171,7 +172,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// todo
pResult = getResultRowByPos(pResultBuf, p1, true);
if (NULL == pResult) {
T_LONG_JMP(pTaskInfo->env, terrno);
pTaskInfo->code = terrno;
return NULL;
}
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
@ -184,7 +186,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
if (pPage == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
pTaskInfo->code = terrno;
return NULL;
}
releaseBufPage(pResultBuf, pPage);
}
@ -193,7 +196,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (pResult == NULL) {
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
pTaskInfo->code = terrno;
return NULL;
}
// add a new result set for a new group
@ -202,7 +206,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
sizeof(SResultRowPosition));
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
pTaskInfo->code = code;
return NULL;
}
}
@ -212,7 +217,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// too many time window in query
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
pTaskInfo->code = TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW;
return NULL;
}
return pResult;

View File

@ -1258,6 +1258,9 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo,
false, pAggSup, false);
if (pResultRow == NULL || pTaskInfo->code != 0) {
return pTaskInfo->code;
}
return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
}

View File

@ -703,6 +703,9 @@ int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
int64_t groupId = 0;
SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
pTaskInfo, false, pSup, true);
if (pRow == NULL || pTaskInfo->code != 0) {
return pTaskInfo->code;
}
for (int32_t i = 0; i < numOfExprs; ++i) {
struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);

View File

@ -130,7 +130,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
pGroupIdCalc->lastKeysLen = 0;
pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
if (!pGroupIdCalc->keyBuf) {
code = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
}
}
}
@ -370,8 +370,13 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
if (ps == NULL) {
return terrno;
}
ps->param = pOperator->pDownstream[0];
ps->onlyRef = true;
code = tsortAddSource(pInfo->pSortHandle, ps);
if (code) {
taosMemoryFree(ps);
@ -464,6 +469,9 @@ void destroySortOperatorInfo(void* param) {
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
if (pInfo == NULL) {
return terrno;
}
SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
@ -638,6 +646,10 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
if (ps == NULL || param == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
param->childOpInfo = pOperator->pDownstream[0];
param->grpSortOpInfo = pInfo;
ps->param = param;

View File

@ -75,9 +75,9 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
if (pResultRow == NULL) {
if (pResultRow == NULL || pTaskInfo->code != 0) {
*pResult = NULL;
return TSDB_CODE_SUCCESS;
return pTaskInfo->code;
}
// set time window for current result