fix(query)[TD-32258]. Fix error handling in operator interface functions

- Contain errors within individual operators, preventing error
  propagation to upper-level operators
- Use longjmp to jump directly to the outermost error handler
  for unresolvable issues, avoiding unnecessary error code
  returns through multiple layers
- Simplify error-handling logic for better maintainability
This commit is contained in:
Jinqing Kuang 2024-09-23 19:54:33 +08:00
parent 5639fd0baf
commit 83f1a590b0
16 changed files with 201 additions and 165 deletions

View File

@ -347,11 +347,7 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SExprSupp* pSup = &pInfo->pseudoExprSup;
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows,
pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
(*ppRes) = NULL;
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
pRes->info.id.groupId = tableListGetTableGroupId(pTableList, pRes->info.id.uid);
pInfo->indexOfBufferedRes += 1;
@ -414,11 +410,7 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid;
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
pInfo->pRes->info.rows, pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
(*ppRes) = NULL;
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
}
}

View File

@ -938,7 +938,9 @@ _return:
}
if (code) {
qError("%s failed since %s", __func__, tstrerror(code));
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
} else {
seqStableJoinComposeRes(pStbJoin, *pRes);
}

View File

@ -1094,6 +1094,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
(pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
return TSDB_CODE_SUCCESS;
@ -1101,23 +1102,26 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
if (pExchangeInfo->dynamicOp) {
code = addDynamicExchangeSource(pOperator);
if (code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
}
int64_t st = taosGetTimestampUs();
if (!pExchangeInfo->seqLoadData) {
int32_t code = prepareConcurrentlyLoad(pOperator);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = prepareConcurrentlyLoad(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
pExchangeInfo->openedTs = taosGetTimestampUs();
}
OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -441,7 +441,7 @@ static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
QRY_PARAM_CHECK(ppRes);
if (pOperator->status == OP_EXEC_DONE) {
return TSDB_CODE_SUCCESS;
return code;
}
if (pOperator->status == OP_RES_TO_RETURN) {
@ -493,6 +493,7 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
} else {
(*ppRes) = buildGroupResultDataBlockByHash(pOperator);
}
@ -1522,8 +1523,9 @@ static int32_t doStreamHashPartitionNext(SOperatorInfo* pOperator, SSDataBlock**
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = NULL;
return code;

View File

@ -993,6 +993,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
SHJoinOperatorInfo* pJoin = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSDataBlock* pRes = pJoin->finBlk;
int64_t st = 0;
@ -1003,7 +1004,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
if (pOperator->status == OP_EXEC_DONE) {
pRes->info.rows = 0;
goto _return;
goto _end;
}
if (!pJoin->keyHashBuilt) {
@ -1011,13 +1012,10 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
bool queryDone = false;
code = hJoinBuildHash(pOperator, &queryDone);
if (code) {
pTaskInfo->code = code;
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
if (queryDone) {
goto _return;
goto _end;
}
}
@ -1025,17 +1023,11 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
if (pJoin->ctx.rowRemains) {
code = (*pJoin->joinFp)(pOperator);
if (code) {
pTaskInfo->code = code;
return pTaskInfo->code;
}
QUERY_CHECK_CODE(code, lino, _end);
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
code = doFilter(pRes, pJoin->pFinFilter, NULL);
if (code) {
pTaskInfo->code = code;
return pTaskInfo->code;
}
QUERY_CHECK_CODE(code, lino, _end);
}
if (pRes->info.rows > 0) {
@ -1055,10 +1047,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
pJoin->execInfo.probeBlkRows += pBlock->info.rows;
code = hJoinPrepareStart(pOperator, pBlock);
if (code) {
pTaskInfo->code = code;
return pTaskInfo->code;
}
QUERY_CHECK_CODE(code, lino, _end);
if (!hJoinBlkReachThreshold(pJoin, pRes->info.rows)) {
continue;
@ -1066,10 +1055,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
code = doFilter(pRes, pJoin->pFinFilter, NULL);
if (code) {
pTaskInfo->code = code;
return pTaskInfo->code;
}
QUERY_CHECK_CODE(code, lino, _end);
}
if (pRes->info.rows > 0) {
@ -1077,11 +1063,15 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
}
}
_return:
_end:
if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (pRes->info.rows > 0) {
*pResBlock = pRes;
}

View File

@ -1731,6 +1731,7 @@ int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBloc
if (pJoin->pFinFilter != NULL) {
code = doFilter(pBlock, pJoin->pFinFilter, NULL);
if (code) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pJoin->errCode = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
}

View File

@ -492,6 +492,8 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, terrno);
}
@ -501,6 +503,8 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_PARAM_CHECK(pResBlock);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pOperator->status == OP_EXEC_DONE) {
return 0;
@ -509,18 +513,12 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return code;
}
code = pOperator->fpSet._openFn(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) {
code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, pResBlock);
if (code) {
pTaskInfo->code = code;
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
}
if ((*pResBlock) != NULL) {
@ -530,6 +528,12 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
setOperatorCompleted(pOperator);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}

View File

@ -883,14 +883,17 @@ SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, i
int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
QRY_PARAM_CHECK(pRes);
int32_t lino = 0;
int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
if (TSDB_CODE_SUCCESS != code) {
QUERY_CHECK_CODE(code, lino, _end);
code = pOperator->fpSet.getNextFn(pOperator, pRes);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pOperator->pTaskInfo->code = code;
} else {
code = pOperator->fpSet.getNextFn(pOperator, pRes);
if (code) {
pOperator->pTaskInfo->code = code;
}
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
return code;

View File

@ -270,6 +270,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
SSDataBlock* pRes = pInfo->pRes;
SSDataBlock* pFinalRes = pProjectInfo->pFinalRes;
int32_t code = 0;
int32_t lino = 0;
int64_t st = 0;
int32_t order = pInfo->inputTsOrder;
int32_t scanFlag = 0;
@ -290,9 +291,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
if (downstream == NULL) {
code = doGenerateSourceData(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
QUERY_CHECK_CODE(code, lino, _end);
if (pProjectInfo->outputIgnoreGroup) {
pRes->info.id.groupId = 0;
@ -348,20 +347,14 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
}
code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
if (code) {
T_LONG_JMP(pTaskInfo->env, code);
}
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
QUERY_CHECK_CODE(code, lino, _end);
code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
pProjectInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
QUERY_CHECK_CODE(code, lino, _end);
status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator);
if (status == PROJECT_RETRIEVE_CONTINUE) {
@ -377,11 +370,8 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
pFinalRes->info.version = pRes->info.version;
// continue merge data, ignore the group id
int32_t ret = blockDataMerge(pFinalRes, pRes);
if (ret < 0) {
pTaskInfo->code = code;
return code;
}
code = blockDataMerge(pFinalRes, pRes);
QUERY_CHECK_CODE(code, lino, _end);
if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) {
continue;
@ -390,10 +380,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
// do apply filter
code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL);
if (code) {
pTaskInfo->code = code;
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
@ -404,10 +391,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
// do apply filter
if (pRes->info.rows > 0) {
code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (code) {
pTaskInfo->code = code;
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
if (pRes->info.rows == 0) {
continue;
@ -436,6 +420,13 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
}
*pResBlock = (p->info.rows > 0)? p:NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
@ -578,14 +569,15 @@ int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlo
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
SExprSupp* pSup = &pOperator->exprSupp;
int64_t st = 0;
int32_t code = 0;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
return 0;
return code;
}
if (pOperator->cost.openCost == 0) {
@ -637,10 +629,7 @@ int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlo
}
code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (code) {
pTaskInfo->code = code;
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
size_t rows = pInfo->pRes->info.rows;
if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
@ -658,6 +647,13 @@ int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlo
}
*pResBlock = (rows > 0) ? pInfo->pRes : NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}

View File

@ -951,7 +951,8 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
if (isTaskKilled(pTaskInfo)) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
return pTaskInfo->code;
code = pTaskInfo->code;
goto _end;
}
if (pOperator->status == OP_EXEC_DONE) {
@ -996,6 +997,7 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
@ -1416,6 +1418,7 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
@ -2944,8 +2947,9 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pResult && pResult->info.rows > 0) {
bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader);
code = processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
QUERY_CHECK_CODE(code, lino, _end);
qDebug("tmqsnap doQueueScan get data utid:%" PRId64 "", pResult->info.id.uid);
if (pResult->info.rows > 0 || code != TSDB_CODE_SUCCESS) {
if (pResult->info.rows > 0) {
(*ppRes) = pResult;
return code;
}
@ -3009,8 +3013,9 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = NULL;
return code;
@ -3340,9 +3345,7 @@ FETCH_NEXT_BLOCK:
if (pBlock->info.parTbName[0]) {
code =
pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
QUERY_CHECK_CODE(code, lino, _end);
}
// TODO move into scan
@ -3658,8 +3661,9 @@ FETCH_NEXT_BLOCK:
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = NULL;
return code;
@ -3730,6 +3734,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pBlock && pBlock->info.rows > 0) {
bool hasPrimaryKey = pAPI->snapshotFn.taosXGetTablePrimaryKey(pInfo->sContext);
code = processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
QUERY_CHECK_CODE(code, lino, _end);
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
(*ppRes) = pBlock;
return code;
@ -3741,7 +3746,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
QUERY_CHECK_CODE(code, lino, _end);
if (code != 0) {
tDeleteSchemaWrapper(mtInfo.schema);
goto _end;
QUERY_CHECK_CODE(code, lino, _end);
}
STqOffsetVal offset = {0};
if (mtInfo.uid == 0 || pInfo->sContext->withMeta == ONLY_META) { // read snapshot done, change to get data from wal
@ -3831,6 +3836,7 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = NULL;
@ -4677,6 +4683,7 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
@ -4684,6 +4691,7 @@ _end:
static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return code;
@ -4699,10 +4707,7 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
int32_t size = 0;
code = tableListGetSize(pInfo->pTableListInfo, &size);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
@ -4716,11 +4721,11 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
if (code == TSDB_CODE_OUT_OF_MEMORY) {
break;
} else {
if (code != TSDB_CODE_OUT_OF_MEMORY) {
// ignore other error
code = TSDB_CODE_SUCCESS;
}
QUERY_CHECK_CODE(code, lino, _end);
++count;
if (++pInfo->curPos >= size) {
@ -4744,6 +4749,13 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
pOperator->resultInfo.totalRows += pRes->info.rows;
(*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
@ -5429,6 +5441,7 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
} else {
(*ppRes) = pBlock;
}
@ -5945,6 +5958,7 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
} else {
(*ppRes) = pBlock;
}
@ -6460,7 +6474,12 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
}
code = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
if ((pRes->info.rows > 0) && (code == 0)) {
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed since %s", __func__, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (pRes->info.rows > 0) {
*ppRes = pRes;
}

View File

@ -349,82 +349,84 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
SSortOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSortSource* pSource =NULL;
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
return code;
}
pInfo->startTs = taosGetTimestampUs();
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pSortHandle = NULL;
int32_t code =
code =
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows,
pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
if (code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
if (pSource == NULL) {
return terrno;
}
pSource = taosMemoryCalloc(1, sizeof(SSortSource));
QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
pSource->param = pOperator->pDownstream[0];
pSource->onlyRef = true;
code = tsortAddSource(pInfo->pSortHandle, pSource);
if (code) {
taosMemoryFree(pSource);
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
pSource = NULL;
code = tsortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
} else {
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
pOperator->status = OP_RES_TO_RETURN;
OPTR_SET_OPENED(pOperator);
}
QUERY_CHECK_CODE(code, lino, _end);
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
pOperator->status = OP_RES_TO_RETURN;
OPTR_SET_OPENED(pOperator);
_end:
if (pSource) {
taosMemoryFree(pSource);
}
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_PARAM_CHECK(pResBlock);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pOperator->status == OP_EXEC_DONE) {
return 0;
return code;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortOperatorInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = pOperator->fpSet._openFn(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
// multi-group case not handle here
SSDataBlock* pBlock = NULL;
while (1) {
if (tsortIsClosed(pInfo->pSortHandle)) {
code = TSDB_CODE_TSC_QUERY_CANCELLED;
T_LONG_JMP(pTaskInfo->env, code);
QUERY_CHECK_CODE(code, lino, _end);
}
code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->matchInfo.pList, pInfo, &pBlock);
if (pBlock == NULL || code != 0) {
QUERY_CHECK_CODE(code, lino, _end);
if (pBlock == NULL) {
setOperatorCompleted(pOperator);
return code;
}
code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
if (code) {
break;
}
QUERY_CHECK_CODE(code, lino, _end);
if (blockDataGetNumOfRows(pBlock) == 0) {
continue;
@ -443,6 +445,12 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
}
*pResBlock = blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
@ -692,16 +700,16 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_PARAM_CHECK(pResBlock);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupSortOperatorInfo* pInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pOperator->status == OP_EXEC_DONE) {
return 0;
}
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = pOperator->fpSet._openFn(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
@ -714,30 +722,25 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
code = beginSortGroup(pOperator);
if (code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
}
SSDataBlock* pBlock = NULL;
while (pInfo->pCurrSortHandle != NULL) {
if (tsortIsClosed(pInfo->pCurrSortHandle)) {
code = TSDB_CODE_TSC_QUERY_CANCELLED;
T_LONG_JMP(pTaskInfo->env, code);
QUERY_CHECK_CODE(code, lino, _end);
}
// beginSortGroup would fetch all child blocks of pInfo->currGroupId;
if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
pTaskInfo->code = code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->matchInfo.pList, pInfo, &pBlock);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
if (pBlock != NULL) {
pBlock->info.id.groupId = pInfo->currGroupId;
pOperator->resultInfo.totalRows += pBlock->info.rows;
@ -748,9 +751,7 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
(void) finishSortGroup(pOperator);
pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
code = beginSortGroup(pOperator);
if (code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
} else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
(void) finishSortGroup(pOperator);
setOperatorCompleted(pOperator);
@ -759,6 +760,12 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}

View File

@ -741,8 +741,9 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;

View File

@ -731,8 +731,9 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;

View File

@ -1155,8 +1155,9 @@ static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
setOperatorCompleted(pOperator);
resetStreamFillInfo(pInfo);

View File

@ -1798,8 +1798,9 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;
@ -3563,8 +3564,9 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;
@ -4067,8 +4069,9 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock*
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
clearFunctionContext(&pOperator->exprSupp);
@ -4803,8 +4806,9 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;
@ -5202,6 +5206,7 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
code = TSDB_CODE_SUCCESS;
break;
}
QUERY_CHECK_CODE(code, lino, _end);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
}
@ -5244,8 +5249,9 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;
@ -5779,8 +5785,9 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock*
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = NULL;
return code;

View File

@ -2049,7 +2049,7 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
if (isTaskKilled(pOperator->pTaskInfo)) {
setOperatorCompleted(pOperator);
(*ppRes) = NULL;
return pTaskInfo->code;
break;
}
blockDataCleanup(pInfo->pRes);
@ -2092,12 +2092,18 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
continue;
}
(*ppRes) = pBlock;
return pTaskInfo->code;
} else {
(*ppRes) = NULL;
return pTaskInfo->code;
}
break;
}
_end:
if (pTaskInfo->code) {
qError("%s failed since %s", __func__, tstrerror(pTaskInfo->code));
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
return pTaskInfo->code;
}
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,