Merge pull request #18464 from taosdata/fix/liao_cov
fix(query): check the error code, if the downstream is an exchange operator.
This commit is contained in:
commit
7eb5ac340b
|
@ -82,12 +82,17 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock*
|
||||||
|
|
||||||
static void releaseQueryBuf(size_t numOfTables);
|
static void releaseQueryBuf(size_t numOfTables);
|
||||||
|
|
||||||
static void destroyFillOperatorInfo(void* param);
|
static void destroyFillOperatorInfo(void* param);
|
||||||
static void destroyProjectOperatorInfo(void* param);
|
static void destroyAggOperatorInfo(void* param);
|
||||||
static void destroySortOperatorInfo(void* param);
|
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||||
static void destroyAggOperatorInfo(void* param);
|
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||||
|
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
||||||
static void destroyIntervalOperatorInfo(void* param);
|
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
||||||
|
const char* pKey);
|
||||||
|
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
|
||||||
|
int32_t status);
|
||||||
|
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
||||||
|
bool createDummyCol);
|
||||||
|
|
||||||
void setOperatorCompleted(SOperatorInfo* pOperator) {
|
void setOperatorCompleted(SOperatorInfo* pOperator) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
@ -129,9 +134,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
|
||||||
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||||
SGroupResInfo* pGroupResInfo);
|
SGroupResInfo* pGroupResInfo);
|
||||||
|
|
||||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
|
||||||
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
|
||||||
|
|
||||||
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
|
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
|
||||||
SFilePage* pData = NULL;
|
SFilePage* pData = NULL;
|
||||||
|
|
||||||
|
@ -362,9 +364,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
|
||||||
bool createDummyCol);
|
|
||||||
|
|
||||||
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
|
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
|
||||||
SqlFunctionCtx* pCtx = pExprSup->pCtx;
|
SqlFunctionCtx* pCtx = pExprSup->pCtx;
|
||||||
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
|
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
|
||||||
|
@ -996,9 +995,6 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
|
|
||||||
int32_t status);
|
|
||||||
|
|
||||||
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
|
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
|
||||||
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
|
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -1564,9 +1560,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
|
||||||
const char* pKey);
|
|
||||||
|
|
||||||
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
|
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
|
||||||
// todo add more information about exchange operation
|
// todo add more information about exchange operation
|
||||||
int32_t type = pOperator->operatorType;
|
int32_t type = pOperator->operatorType;
|
||||||
|
@ -1641,11 +1634,16 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the downstream operator may return with error code, so let's check the code before generating results.
|
||||||
|
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
|
||||||
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
|
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
|
||||||
OPTR_SET_OPENED(pOperator);
|
OPTR_SET_OPENED(pOperator);
|
||||||
|
|
||||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
return TSDB_CODE_SUCCESS;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
||||||
|
@ -1684,7 +1682,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
||||||
return (rows == 0) ? NULL : pInfo->pRes;
|
return (rows == 0) ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
|
||||||
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
||||||
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
|
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||||
|
|
Loading…
Reference in New Issue