From c118df0ec4425e603883e11e1b61ded6328db3c1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Aug 2024 17:04:44 +0800 Subject: [PATCH] fix(query): fix memory leak. --- source/libs/executor/inc/operator.h | 5 +- source/libs/executor/src/aggregateoperator.c | 7 +- source/libs/executor/src/cachescanoperator.c | 12 +- .../libs/executor/src/countwindowoperator.c | 11 +- .../libs/executor/src/dynqueryctrloperator.c | 25 ++-- .../libs/executor/src/eventwindowoperator.c | 14 +- source/libs/executor/src/exchangeoperator.c | 8 +- source/libs/executor/src/executor.c | 28 +++- source/libs/executor/src/executorInt.c | 8 +- source/libs/executor/src/filloperator.c | 8 +- source/libs/executor/src/groupcacheoperator.c | 29 ++-- source/libs/executor/src/groupoperator.c | 24 +-- source/libs/executor/src/hashjoinoperator.c | 45 +++--- source/libs/executor/src/mergejoinoperator.c | 9 +- source/libs/executor/src/mergeoperator.c | 54 +++---- source/libs/executor/src/operator.c | 7 +- source/libs/executor/src/projectoperator.c | 14 +- source/libs/executor/src/scanoperator.c | 140 ++++++++++-------- source/libs/executor/src/sortoperator.c | 48 +++--- .../executor/src/streamcountwindowoperator.c | 14 +- .../executor/src/streameventwindowoperator.c | 15 +- source/libs/executor/src/streamfilloperator.c | 8 +- .../executor/src/streamtimewindowoperator.c | 57 +++---- source/libs/executor/src/sysscanoperator.c | 13 +- source/libs/executor/src/timesliceoperator.c | 8 +- source/libs/executor/src/timewindowoperator.c | 40 +---- 26 files changed, 286 insertions(+), 365 deletions(-) diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index af8532b5b3..7d09be3300 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -27,11 +27,8 @@ typedef struct SOperatorCostInfo { struct SOperatorInfo; -//typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); -//typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result); - typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); -typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); +typedef int32_t (*__optr_fn_t)(struct SOperatorInfo* pOptr, SSDataBlock** pResBlock); typedef void (*__optr_close_fn_t)(void* param); typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index d7b60b2bcd..b5a3f2f484 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -56,9 +56,8 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock); -static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx); -static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator); - +static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx); +static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey); @@ -123,7 +122,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResult, NULL, destroyAggOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index a219a5b5f0..c26d193a06 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -50,7 +50,7 @@ typedef struct SCacheRowsScanInfo { SColumnInfo pkCol; } SCacheRowsScanInfo; -static SSDataBlock* doScanCache(SOperatorInfo* pOperator); +static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); static void destroyCacheScanOperator(void* param); static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds, int32_t** pDstSlotIds); @@ -235,7 +235,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doScanCacheNext, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); pOperator->cost.openCost = 0; @@ -259,7 +259,7 @@ _error: return code; } -int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { +static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (pOperator->status == OP_EXEC_DONE) { @@ -445,12 +445,6 @@ _end: return code; } -static SSDataBlock* doScanCache(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doScanCacheNext(pOperator, &pRes); - return pRes; -} - void destroyCacheScanOperator(void* param) { SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param; blockDataDestroy(pInfo->pRes); diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index ba07e666a0..1d72b0bb58 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -64,6 +64,8 @@ void destroyCountWindowOperatorInfo(void* param) { taosMemoryFreeClear(param); } +static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); + static void clearWinStateBuff(SCountWindowResult* pBuff) { pBuff->winRows = 0; } static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) { @@ -227,7 +229,6 @@ static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** SExprSupp* pExprSup = &pOperator->exprSupp; int32_t order = pInfo->binfo.inputTsOrder; SSDataBlock* pRes = pInfo->binfo.pRes; - SOperatorInfo* downstream = pOperator->pDownstream[0]; blockDataCleanup(pRes); @@ -292,12 +293,6 @@ _end: return code; } -static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = countWindowAggregateNext(pOperator, &pRes); - return pRes; -} - int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -374,7 +369,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy setOperatorInfo(pOperator, "CountWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, countWindowAggregate, NULL, destroyCountWindowOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, countWindowAggregateNext, NULL, destroyCountWindowOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 6268fa0268..f0efa76639 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -878,20 +878,18 @@ static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** return TSDB_CODE_SUCCESS; } -static FORCE_INLINE SSDataBlock* seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) { +static FORCE_INLINE void seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) { pBlock->info.id.blockId = pStbJoin->outputBlkId; - return pBlock; } - -SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { - int32_t code = TSDB_CODE_SUCCESS; +int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) { + int32_t code = TSDB_CODE_SUCCESS; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; - SSDataBlock* pRes = NULL; + QRY_OPTR_CHECK(pRes); if (pOperator->status == OP_EXEC_DONE) { - return pRes; + return code; } int64_t st = 0; @@ -907,25 +905,24 @@ SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { } } - QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, &pRes)); + QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes)); if (pRes) { goto _return; } - - QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, &pRes)); + + QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes)); _return: - if (pOperator->cost.openCost == 0) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } if (code) { pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } else { + seqStableJoinComposeRes(pStbJoin, *pRes); } - - return pRes ? seqStableJoinComposeRes(pStbJoin, pRes) : NULL; + return code; } int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) { diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 7218291f8c..0b5fd074b0 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -39,9 +39,9 @@ typedef struct SEventWindowOperatorInfo { SSDataBlock* pPreDataBlock; } SEventWindowOperatorInfo; -static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator); -static void destroyEWindowOperatorInfo(void* param); -static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); +static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** pRes); +static void destroyEWindowOperatorInfo(void* param); +static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); // todo : move to util static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, @@ -131,7 +131,7 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregateNext, NULL, destroyEWindowOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -250,12 +250,6 @@ _end: return code; } -static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = eventWindowAggregateNext(pOperator, &pRes); - return pRes; -} - static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult, SExprSupp* pExprSup, SAggSupporter* pAggSup) { if (*pResult == NULL) { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index f354c9a1cc..507fc5b154 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -305,12 +305,6 @@ _end: return code; } -static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = loadRemoteDataNext(pOperator, &pRes); - return pRes; -} - static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) { pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); if (pInfo->pSourceDataInfo == NULL) { @@ -447,7 +441,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); - pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, + pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index f908ef5984..3dfa4ec757 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -692,9 +692,11 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo pTaskInfo->paramSet = true; code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes); } else { - pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); + code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes); } + QUERY_CHECK_CODE(code, lino, _end); + if (pRes == NULL) { st = taosGetTimestampUs(); } @@ -718,6 +720,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo } else { void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + p = *(SSDataBlock**)tmp; code = copyDataBlock(p, pRes); QUERY_CHECK_CODE(code, lino, _end); @@ -735,8 +738,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo break; } - pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); + code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes); + QUERY_CHECK_CODE(code, lino, _end); } + if (pTaskInfo->pSubplan->dynamicRowThreshold) { pTaskInfo->pSubplan->rowsThreshold -= current; } @@ -750,7 +755,6 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo } _end: - (void)cleanUpUdfs(); uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; @@ -758,6 +762,11 @@ _end: GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0); atomic_store_64(&pTaskInfo->owner, 0); + if (code) { + pTaskInfo->code = code; + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return pTaskInfo->code; } @@ -778,9 +787,10 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) { int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); + int32_t lino = 0; + int64_t curOwner = 0; *pRes = NULL; - int64_t curOwner = 0; // todo extract method taosRLockLatch(&pTaskInfo->lock); @@ -822,7 +832,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { int64_t st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); + int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes); + if (code) { + pTaskInfo->code = code; + qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + } + uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; @@ -830,8 +845,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { *useconds = pTaskInfo->cost.elapsedTime; } - int32_t tmpRes = cleanUpUdfs(); - qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); + (void) cleanUpUdfs(); int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0; uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 966bda382b..42e7fbee1f 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1291,12 +1291,8 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera return code; } - *pResBlock = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]); - if (*pResBlock == NULL && terrno != 0) { - return terrno; - } else { - return code; - } + code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock); + return code; } bool compareVal(const char* v, const SStateKeys* pKey) { diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 63f7667890..dfd57ab45b 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -318,12 +318,6 @@ static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { return code; } -static SSDataBlock* doFill(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doFillNext(pOperator, &pRes); - return pRes; -} - void destroyFillOperatorInfo(void* param) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); @@ -513,7 +507,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi } setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFillNext, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index b899524988..9b213487ed 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -692,39 +692,48 @@ _return: return code; } -static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SSDataBlock** ppRes) { - int32_t code = TSDB_CODE_SUCCESS; - SOperatorParam* pDownstreamParam = NULL; - SSDataBlock* pBlock = NULL; +static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx, + SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + SOperatorParam* pDownstreamParam = NULL; + SSDataBlock* pBlock = NULL; SGroupCacheOperatorInfo* pGCache = pOperator->info; + code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam); if (code) { return code; } + SOperatorInfo* pDownstream = pOperator->pDownstream[downstreamIdx]; if (pDownstreamParam) { - code = pOperator->pDownstream[downstreamIdx]->fpSet.getNextExtFn(pOperator->pDownstream[downstreamIdx], pDownstreamParam, &pBlock); + code = pDownstream->fpSet.getNextExtFn(pDownstream, pDownstreamParam, &pBlock); } else { - pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextFn(pOperator->pDownstream[downstreamIdx]); + code = pDownstream->fpSet.getNextFn(pDownstream, &pBlock); + } + + if (code) { + qError("failed to get block from downstream, code:%s %s", tstrerror(code), GET_TASKID(pOperator->pTaskInfo)); + return code; } if (pBlock) { - qDebug("%s blk retrieved from group %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId); - + qDebug("%s res block retrieved from group %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId); + pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++; if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) { code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock); if (code) { return code; } - if (NULL == taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, &pGCache->pDownstreams[downstreamIdx].pBaseBlock)) { + + if (NULL == taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, + &pGCache->pDownstreams[downstreamIdx].pBaseBlock)) { QRY_ERR_RET(terrno); } } } *ppRes = pBlock; - return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3f203e7a95..f0a4914c24 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -531,12 +531,6 @@ _end: return code; } -static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = hashGroupbyAggregateNext(pOperator, &pRes); - return pRes; -} - int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -600,7 +594,7 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregateNext, NULL, destroyGroupOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); QUERY_CHECK_CODE(code, lino, _error); @@ -1103,12 +1097,6 @@ _end: return code; } -static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = hashPartitionNext(pOperator, &pRes); - return pRes; -} - static void destroyPartitionOperatorInfo(void* param) { SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); @@ -1232,7 +1220,7 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartitionNext, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -1583,12 +1571,6 @@ _end: return code; } -static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamHashPartitionNext(pOperator, &pRes); - return pRes; -} - static void destroyStreamPartitionOperatorInfo(void* param) { SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); @@ -1785,7 +1767,7 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, + createOperatorFpSet(optrDummyOpenFn, doStreamHashPartitionNext, NULL, destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 15819cd94a..55620defba 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -989,13 +989,14 @@ void hJoinSetDone(struct SOperatorInfo* pOperator) { qDebug("hash Join done"); } -static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { +static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SHJoinOperatorInfo* pJoin = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int32_t code = TSDB_CODE_SUCCESS; - SSDataBlock* pRes = pJoin->finBlk; - int64_t st = 0; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pRes = pJoin->finBlk; + int64_t st = 0; + QRY_OPTR_CHECK(pResBlock); if (pOperator->cost.openCost == 0) { st = taosGetTimestampUs(); } @@ -1004,7 +1005,7 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { pRes->info.rows = 0; goto _return; } - + if (!pJoin->keyHashBuilt) { pJoin->keyHashBuilt = true; @@ -1012,7 +1013,7 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { code = hJoinBuildHash(pOperator, &queryDone); if (code) { pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + return code; } if (queryDone) { @@ -1026,18 +1027,20 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { code = (*pJoin->joinFp)(pOperator); if (code) { pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + return pTaskInfo->code; } - + if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) { code = doFilter(pRes, pJoin->pFinFilter, NULL); if (code) { pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + return pTaskInfo->code; } } + if (pRes->info.rows > 0) { - return pRes; + *pResBlock = pRes; + return code; } } @@ -1050,39 +1053,41 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { pJoin->execInfo.probeBlkNum++; pJoin->execInfo.probeBlkRows += pBlock->info.rows; - + code = hJoinPrepareStart(pOperator, pBlock); if (code) { pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + return pTaskInfo->code; } if (!hJoinBlkReachThreshold(pJoin, pRes->info.rows)) { continue; } - + if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) { code = doFilter(pRes, pJoin->pFinFilter, NULL); if (code) { pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + return pTaskInfo->code; } } - + if (pRes->info.rows > 0) { break; } } _return: - if (pOperator->cost.openCost == 0) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - - return (pRes->info.rows > 0) ? pRes : NULL; -} + if (pRes->info.rows > 0) { + *pResBlock = pRes; + } + + return code; +} static void destroyHashJoinOperator(void* param) { SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index fced682312..808aac66c2 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1691,13 +1691,13 @@ void mJoinResetOperator(struct SOperatorInfo* pOperator) { pOperator->status = OP_OPENED; } -SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { +int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SMJoinOperatorInfo* pJoin = pOperator->info; int32_t code = TSDB_CODE_SUCCESS; if (pOperator->status == OP_EXEC_DONE) { if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) { qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo)); - return NULL; + return code; } else { mJoinResetOperator(pOperator); qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo)); @@ -1739,7 +1739,10 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { } pJoin->execInfo.resRows += pBlock ? pBlock->info.rows : 0; - return (pBlock && pBlock->info.rows > 0) ? pBlock : NULL; + if (pBlock && pBlock->info.rows > 0) { + *pResBlock = pBlock; + } + return code; } void destroyGrpArray(void* ppArray) { diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 3f85324f57..ae23b3c8d7 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -57,18 +57,16 @@ typedef struct SMultiwayMergeOperatorInfo { bool inputWithGroupId; } SMultiwayMergeOperatorInfo; -static SSDataBlock* doSortMerge1(SOperatorInfo* pOperator); -static int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock); -static SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator); +static int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock); +static int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock); static int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock); -static SSDataBlock* doNonSortMerge1(SOperatorInfo* pOperator); -static SSDataBlock* doColsMerge1(SOperatorInfo* pOperator); static int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock); +static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock); int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; - *ppBlock = pOperator->fpSet.getNextFn(pOperator); - return TSDB_CODE_SUCCESS; + int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); + return code; } int32_t openSortMergeOperator(SOperatorInfo* pOperator) { @@ -185,12 +183,6 @@ static int32_t doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHand return code; } -SSDataBlock* doSortMerge1(SOperatorInfo* pOperator) { - SSDataBlock* pBlock = NULL; - pOperator->pTaskInfo->code = doSortMerge(pOperator, &pBlock); - return pBlock; -} - int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SMultiwayMergeOperatorInfo* pInfo = pOperator->info; @@ -343,12 +335,6 @@ int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) { return 0; } -SSDataBlock* doNonSortMerge1(SOperatorInfo* pOperator) { - SSDataBlock* pBlock = NULL; - pOperator->pTaskInfo->code = doNonSortMerge(pOperator, &pBlock); - return pBlock; -} - int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_OPTR_CHECK(pResBlock); @@ -432,12 +418,6 @@ int32_t copyColumnsValue(SNodeList* pNodeList, uint64_t targetBlkId, SSDataBlock return code; } -SSDataBlock* doColsMerge1(SOperatorInfo* pOperator) { - SSDataBlock* pBlock = NULL; - pOperator->pTaskInfo->code = doColsMerge(pOperator, &pBlock); - return pBlock; -} - int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_OPTR_CHECK(pResBlock); @@ -486,9 +466,9 @@ int32_t getColsMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u SOperatorFpSet gMultiwayMergeFps[MERGE_TYPE_MAX_VALUE] = { {0}, - {._openFn = openSortMergeOperator, .getNextFn = doSortMerge1, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo}, - {._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge1, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo}, - {._openFn = openColsMergeOperator, .getNextFn = doColsMerge1, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo}, + {._openFn = openSortMergeOperator, .getNextFn = doSortMerge, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo}, + {._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo}, + {._openFn = openColsMergeOperator, .getNextFn = doColsMerge, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo}, }; @@ -518,9 +498,11 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { return code; } -SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { +int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { + QRY_OPTR_CHECK(pResBlock); + if (pOperator->status == OP_EXEC_DONE) { - return NULL; + return 0; } SSDataBlock* pBlock = NULL; @@ -529,19 +511,25 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); + pTaskInfo->code = code; + return code; } if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) { - pBlock = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator); + code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, &pBlock); + if (code) { + pTaskInfo->code = code; + return code; + } } + if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; } else { setOperatorCompleted(pOperator); } - return pBlock; + return code; } void destroyMultiwayMergeOperatorInfo(void* param) { diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 0ff6870405..fc52b97388 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -862,10 +862,13 @@ int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM); if (TSDB_CODE_SUCCESS != code) { pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } else { + code = pOperator->fpSet.getNextFn(pOperator, pRes); + if (code) { + pOperator->pTaskInfo->code = code; + } } - *pRes = pOperator->fpSet.getNextFn(pOperator); return code; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index bf523af918..7e06c083ed 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -165,7 +165,7 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation1, NULL, destroyProjectOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState); @@ -445,16 +445,6 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { return code; } -SSDataBlock* doProjectOperation1(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doProjectOperation(pOperator, &pRes); - if (code && pOperator->pTaskInfo->code == 0) { - pOperator->pTaskInfo->code = code; - } - - return pRes; -} - int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -526,7 +516,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction1, NULL, destroyIndefinitOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 310992efed..18ea98808f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -68,6 +68,7 @@ typedef struct STableCountScanOperatorInfo { } STableCountScanOperatorInfo; static bool processBlockWithProbability(const SSampleExecInfo* pInfo); +static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); bool processBlockWithProbability(const SSampleExecInfo* pInfo) { #if 0 @@ -1001,30 +1002,30 @@ _end: return code; } -static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doTableScanImplNext(pOperator, &pRes); - return pRes; -} - -static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { +static int32_t doGroupedTableScan(SOperatorInfo* pOperator, SSDataBlock** pBlock) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; STableScanInfo* pTableScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + QRY_OPTR_CHECK(pBlock); + // The read handle is not initialized yet, since no qualified tables exists if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) { - return NULL; + return code; } // do the ascending order traverse in the first place. while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { - SSDataBlock* p = doTableScanImpl(pOperator); + SSDataBlock* p = NULL; + code = doTableScanImplNext(pOperator, &p); + QUERY_CHECK_CODE(code, lino, _end); + if (p != NULL) { markGroupProcessed(pTableScanInfo, p->info.id.groupId); - return p; + *pBlock = p; + return code; } pTableScanInfo->scanTimes += 1; @@ -1052,10 +1053,14 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { } while (pTableScanInfo->scanTimes < total) { - SSDataBlock* p = doTableScanImpl(pOperator); + SSDataBlock* p = NULL; + code = doTableScanImplNext(pOperator, &p); + QUERY_CHECK_CODE(code, lino, _end); + if (p != NULL) { markGroupProcessed(pTableScanInfo, p->info.id.groupId); - return p; + *pBlock = p; + return code; } pTableScanInfo->scanTimes += 1; @@ -1079,8 +1084,13 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED; STableKeyInfo* pStart = (STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex); - if (NULL == pStart) return NULL; - return getBlockForEmptyTable(pOperator, pStart); + + if (NULL == pStart) { + return code; + } + + *pBlock = getBlockForEmptyTable(pOperator, pStart); + return code; } } else { // group by tag + no sort int32_t numOfTables = 0; @@ -1096,7 +1106,9 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { STableKeyInfo info = {.uid = *(uint64_t*)pIte, .groupId = *pGroupId}; taosHashCancelIterate(pTableListInfo->remainGroups, pIte); markGroupProcessed(pTableScanInfo, *pGroupId); - return getBlockForEmptyTable(pOperator, &info); + *pBlock = getBlockForEmptyTable(pOperator, &info); + + return code; } } } @@ -1107,9 +1119,9 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - return NULL; + + return code; } static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { @@ -1169,13 +1181,16 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { return code; } -static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { +static int32_t startNextGroupScan(SOperatorInfo* pOperator, SSDataBlock** pResult) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; int32_t numOfTables = 0; + + QRY_OPTR_CHECK(pResult); + code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables); QUERY_CHECK_CODE(code, lino, _end); @@ -1185,7 +1200,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { taosArrayClear(pInfo->base.pTableListInfo->pTableList); taosHashClear(pInfo->base.pTableListInfo->map); } - return NULL; + return code; } // reset value for the next group data output @@ -1204,21 +1219,22 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { QUERY_CHECK_CODE(code, lino, _end); pInfo->scanTimes = 0; - SSDataBlock* result = doGroupedTableScan(pOperator); - if (result != NULL) { + code = doGroupedTableScan(pOperator, pResult); + QUERY_CHECK_CODE(code, lino, _end); + + if (*pResult != NULL) { if (pOperator->dynamicTask) { - result->info.id.groupId = result->info.id.uid; + (*pResult)->info.id.groupId = (*pResult)->info.id.uid; } - return result; + return code; } _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - return NULL; + return code; } static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { @@ -1257,7 +1273,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { } } - result = doGroupedTableScan(pOperator); + result = NULL; + code = doGroupedTableScan(pOperator, &result); + QUERY_CHECK_CODE(code, lino, _end); + if (result != NULL) { if (pOperator->dynamicTask) { result->info.id.groupId = result->info.id.uid; @@ -1266,7 +1285,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { } while (true) { - result = startNextGroupScan(pOperator); + code = startNextGroupScan(pOperator, &result); + QUERY_CHECK_CODE(code, lino, _end); + if (result || pOperator->status == OP_EXEC_DONE) { return result; } @@ -1287,6 +1308,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + QRY_OPTR_CHECK(ppRes); if (pOperator->pOperatorGetParam) { pOperator->dynamicTask = true; @@ -1299,8 +1321,11 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pInfo->currentGroupId = -1; pOperator->status = OP_OPENED; SSDataBlock* result = NULL; + while (true) { - result = startNextGroupScan(pOperator); + code = startNextGroupScan(pOperator, &result); + QUERY_CHECK_CODE(code, lino, _end); + if (result || pOperator->status == OP_EXEC_DONE) { (*ppRes) = result; return code; @@ -1316,7 +1341,10 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pInfo->countState = TABLE_COUNT_STATE_END; while (1) { - SSDataBlock* result = doGroupedTableScan(pOperator); + SSDataBlock* result = NULL; + code = doGroupedTableScan(pOperator, &result); + QUERY_CHECK_CODE(code, lino, _end); + if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) { (*ppRes) = result; return code; @@ -1340,6 +1368,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { (*ppRes) = NULL; return code; } + STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); if (!tmp) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); @@ -1368,16 +1397,9 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - (*ppRes) = NULL; - return code; -} -static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doTableScanNext(pOperator, &pRes); - return pRes; + return code; } static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { @@ -1497,7 +1519,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa pInfo->filesetDelimited = pTableScanNode->filesetDelimited; taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanNext, NULL, destroyTableScanOperatorInfo, optrDefaultBufFn, getTableScannerExecInfo, optrDefaultGetNextExtFn, NULL); // for non-blocking operator, the open cost is always 0 @@ -1535,7 +1557,7 @@ int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskIn setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImplNext, NULL, NULL, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; @@ -1832,18 +1854,23 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t static int32_t doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex, SSDataBlock** ppRes) { qDebug("do stream range scan. windows index:%d", *pRowIndex); + int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; bool prepareRes = true; while (1) { SSDataBlock* pResult = NULL; - pResult = doTableScan(pInfo->pTableScanOp); + code = doTableScanNext(pInfo->pTableScanOp, &pResult); + QUERY_CHECK_CODE(code, lino, _end); + if (!pResult) { prepareRangeScan(pInfo, pSDB, pRowIndex, &prepareRes); // scan next window data - pResult = doTableScan(pInfo->pTableScanOp); + code = doTableScanNext(pInfo->pTableScanOp, &pResult); + QUERY_CHECK_CODE(code, lino, _end); } + if (!pResult) { if (prepareRes) { continue; @@ -2891,7 +2918,9 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { while (1) { - SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + SSDataBlock* pResult = NULL; + code = doTableScanNext(pInfo->pTableScanOp, &pResult); + QUERY_CHECK_CODE(code, lino, _end); if (pResult && pResult->info.rows > 0) { bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader); @@ -3231,7 +3260,9 @@ static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { break; } - pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp); + code = doTableScanNext(pInfo->pTableScanOp, &pInfo->pRecoverRes); + QUERY_CHECK_CODE(code, lino, _end); + if (pInfo->pRecoverRes != NULL) { code = calBlockTbName(pInfo, pInfo->pRecoverRes, 0); QUERY_CHECK_CODE(code, lino, _end); @@ -3784,12 +3815,6 @@ _end: return code; } -static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doRawScanNext(pOperator, &pRes); - return pRes; -} - static void destroyRawScanOperatorInfo(void* param) { SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param; pRawScan->pAPI->tsdReader.tsdReaderClose(pRawScan->dataReader); @@ -3828,7 +3853,7 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL, + pOperator->fpSet = createOperatorFpSet(NULL, doRawScanNext, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; @@ -4207,7 +4232,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan; + __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScanNext : doQueueScanNext; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamScanReleaseState, streamScanReloadState); @@ -4779,7 +4804,7 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p } } //TODO wjm check pInfo->filterCtx.code - __optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry; + __optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdxNext : doTagScanFromMetaEntryNext; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; @@ -6044,7 +6069,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR pOperator->exprSupp.numOfExprs = numOfCols; pOperator->fpSet = createOperatorFpSet( - optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTables : doTableMergeScan, NULL, + optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTablesNext : doTableMergeScanNext, NULL, destroyTableMergeScanOperatorInfo, optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL); pOperator->cost.openCost = 0; @@ -6070,7 +6095,6 @@ _error: // ==================================================================================================================== // TableCountScanOperator -static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator); static void destoryTableCountScanOperator(void* param); static int32_t buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI); @@ -6212,7 +6236,7 @@ int32_t createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountSca setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScanNext, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; @@ -6381,12 +6405,6 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe return code; } -static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doTableCountScanNext(pOperator, &pRes); - return pRes; -} - static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes) { int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index d9b1e40510..fb4b61c7a8 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -43,11 +43,9 @@ typedef struct SSortOperatorInfo { SSortOpGroupIdCalc* pGroupIdCalc; } SSortOperatorInfo; -static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock); -static SSDataBlock* doSort1(SOperatorInfo* pOperator); -static int32_t doOpenSortOperator(SOperatorInfo* pOperator); -static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); -static SSDataBlock* doGroupSort1(SOperatorInfo* pOperator); +static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock); +static int32_t doOpenSortOperator(SOperatorInfo* pOperator); +static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); static int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock); static void destroySortOperatorInfo(void* param); @@ -149,7 +147,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN // TODO dynamic set the available sort buffer pOperator->fpSet = - createOperatorFpSet(doOpenSortOperator, doSort1, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL); + createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -342,8 +340,7 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; - *ppBlock = pOperator->fpSet.getNextFn(pOperator); - return TSDB_CODE_SUCCESS; + return pOperator->fpSet.getNextFn(pOperator, ppBlock); } // todo refactor: merged with fetch fp @@ -404,12 +401,6 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { return code; } -SSDataBlock* doSort1(SOperatorInfo* pOperator) { - SSDataBlock* pBlock = NULL; - pOperator->pTaskInfo->code = doSort(pOperator, &pBlock); - return pBlock; -} - int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_OPTR_CHECK(pResBlock); if (pOperator->status == OP_EXEC_DONE) { @@ -613,17 +604,23 @@ typedef struct SGroupSortSourceParam { } SGroupSortSourceParam; int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) { - *ppBlock = NULL; - + int32_t code = 0; + int32_t lino = 0; SGroupSortSourceParam* source = param; SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo; + SSDataBlock* block = NULL; + + QRY_OPTR_CHECK(ppBlock); + if (grpSortOpInfo->prefetchedSortInput) { - SSDataBlock* block = grpSortOpInfo->prefetchedSortInput; + block = grpSortOpInfo->prefetchedSortInput; grpSortOpInfo->prefetchedSortInput = NULL; *ppBlock = block; } else { SOperatorInfo* childOp = source->childOpInfo; - SSDataBlock* block = childOp->fpSet.getNextFn(childOp); + code = childOp->fpSet.getNextFn(childOp, &block); + QUERY_CHECK_CODE(code, lino, _end); + if (block != NULL) { if (block->info.id.groupId == grpSortOpInfo->currGroupId) { grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; @@ -637,7 +634,12 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) { } } - return TSDB_CODE_SUCCESS; + return code; +_end: + if (code != 0) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t beginSortGroup(SOperatorInfo* pOperator) { @@ -695,12 +697,6 @@ int32_t finishSortGroup(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -SSDataBlock* doGroupSort1(SOperatorInfo* pOperator) { - SSDataBlock* pBlock = NULL; - pOperator->pTaskInfo->code = doGroupSort(pOperator, &pBlock); - return pBlock; -} - int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_OPTR_CHECK(pResBlock); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -836,7 +832,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort1, NULL, destroyGroupSortOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo, optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index b4f8d6837a..54ad12cff0 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -658,10 +658,14 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { break; } + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); setStreamOperatorState(&pInfo->basic, pBlock->info.type); @@ -745,12 +749,6 @@ _end: return code; } -static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamCountAggNext(pOperator, &pRes); - return pRes; -} - void streamCountReleaseState(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -908,7 +906,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); taosMemoryFree(buff); } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAgg, NULL, destroyStreamCountAggOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAggNext, NULL, destroyStreamCountAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 38a813bf33..a8e14bce68 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -629,10 +629,15 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { break; } + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); setStreamOperatorState(&pInfo->basic, pBlock->info.type); @@ -734,12 +739,6 @@ _end: return code; } -static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamEventAggNext(pOperator, &pRes); - return pRes; -} - void streamEventReleaseState(SOperatorInfo* pOperator) { SStreamEventAggOperatorInfo* pInfo = pOperator->info; int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); @@ -966,7 +965,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAggNext, NULL, destroyStreamEventOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState); code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index fac1cf48c7..1f808f17d9 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1164,12 +1164,6 @@ _end: return code; } -static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamFillNext(pOperator, &pRes); - return pRes; -} - static int32_t initResultBuf(SStreamFillSupporter* pFillSup) { pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols; for (int i = 0; i < pFillSup->numOfAllCols; i++) { @@ -1444,7 +1438,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi pInfo->srcRowIndex = -1; setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 08caf71eca..1996f4c34c 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -60,7 +60,7 @@ typedef struct SPullWindowInfo { STimeWindow calWin; } SPullWindowInfo; -static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator); +static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index); @@ -1603,7 +1603,10 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc return code; } - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType), @@ -1611,6 +1614,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc pInfo->numOfDatapack = 0; break; } + pInfo->numOfDatapack++; printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); setStreamOperatorState(&pInfo->basic, pBlock->info.type); @@ -1765,12 +1769,6 @@ _end: return code; } -static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamFinalIntervalAggNext(pOperator, &pRes); - return pRes; -} - int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) { if (pWinPhyNode->deleteMark <= 0) { return DEAULT_DELETE_MARK; @@ -1997,10 +1995,10 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pOperator->info = pInfo; if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) { - pOperator->fpSet = createOperatorFpSet(NULL, doStreamMidIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, + pOperator->fpSet = createOperatorFpSet(NULL, doStreamMidIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } else { - pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, + pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); @@ -3397,7 +3395,10 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { break; } @@ -3840,7 +3841,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode QUERY_CHECK_CODE(code, lino, _error); } } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAggNext, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); @@ -3966,7 +3967,10 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock* QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; break; @@ -4076,7 +4080,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo, + createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAggNext, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState); } @@ -4693,7 +4697,10 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { break; } @@ -5014,7 +5021,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAggNext, NULL, destroyStreamStateOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState); code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, @@ -5108,7 +5115,10 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType), pInfo->numOfDatapack); @@ -5222,12 +5232,6 @@ _end: return code; } -static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamIntervalAggNext(pOperator, &pRes); - return pRes; -} - int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -5338,7 +5342,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, + createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); @@ -5603,7 +5607,10 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* return code; } - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType), diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 9eb9d60226..1b5a2bb58c 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2053,15 +2053,6 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) } } -static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doSysTableScanNext(pOperator, &pRes); - if (code) { - terrno = code; - } - return pRes; -} - static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name, SSDataBlock* pBlock) { int32_t code = TSDB_CODE_SUCCESS; @@ -2252,7 +2243,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScanNext, NULL, destroySysScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; @@ -2868,7 +2859,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScanNext, NULL, destroyBlockDistScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 7f23255257..1b8e6709d1 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1085,12 +1085,6 @@ _finished: return code; } -static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doTimesliceNext(pOperator, &pRes); - return pRes; -} - static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn* pPkColumn) { SNode* pNode; FOREACH(pNode, pFuncs) { @@ -1189,7 +1183,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTimesliceNext, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c9bb1e69e0..024e0393f0 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1161,12 +1161,6 @@ _end: return code; } -static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStateWindowAggNext(pOperator, &pRes); - return pRes; -} - static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1211,12 +1205,6 @@ _end: return code; } -static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doBuildIntervalResultNext(pOperator, &pRes); - return pRes; -} - static void destroyStateWindowOperatorInfo(void* param) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); @@ -1431,7 +1419,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, + pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -1631,12 +1619,6 @@ _end: return code; } -static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doSessionWindowAggNext(pOperator, &pRes); - return pRes; -} - // todo make this as an non-blocking operator int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { @@ -1712,7 +1694,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, + pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -1816,7 +1798,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); @@ -2061,12 +2043,6 @@ static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock return code; } -static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = mergeAlignedIntervalAggNext(pOperator, &pRes); - return pRes; -} - int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -2139,7 +2115,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, false, OP_NOT_OPENED, miaInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -2411,12 +2387,6 @@ _end: return code; } -static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doMergeIntervalAggNext(pOperator, &pRes); - return pRes; -} - int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -2483,7 +2453,7 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo); setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1);