diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index e1a62340f2..7e79a618a8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -1025,6 +1025,7 @@ int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fse if (code) { TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear); + taosMemoryFree(fsetArr[0]); fsetArr[0] = NULL; } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.c b/source/dnode/vnode/src/tsdb/tsdbFSet2.c index 05ce1b23f5..de62ce63a0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.c @@ -576,12 +576,17 @@ int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs SSttLvl *lvl; code = tsdbSttLvlInitRef(pTsdb, lvl1, &lvl); if (code) { + taosMemoryFree(lvl); tsdbTFileSetClear(fset); return code; } code = TARRAY2_APPEND(fset[0]->lvlArr, lvl); - if (code) return code; + if (code) { + taosMemoryFree(lvl); + tsdbTFileSetClear(fset); + return code; + } } return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index f89e35dbb1..fc2b873054 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -86,14 +86,19 @@ void destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { pInfo->sttBlockIndex = -1; pInfo->pin = false; - if (pLoadInfo->info.pCount != NULL) { - taosArrayDestroy(pLoadInfo->info.pUid); - taosArrayDestroyEx(pLoadInfo->info.pFirstKey, freeItem); - taosArrayDestroyEx(pLoadInfo->info.pLastKey, freeItem); - taosArrayDestroy(pLoadInfo->info.pCount); - taosArrayDestroy(pLoadInfo->info.pFirstTs); - taosArrayDestroy(pLoadInfo->info.pLastTs); - } + taosArrayDestroy(pLoadInfo->info.pUid); + taosArrayDestroyEx(pLoadInfo->info.pFirstKey, freeItem); + taosArrayDestroyEx(pLoadInfo->info.pLastKey, freeItem); + taosArrayDestroy(pLoadInfo->info.pCount); + taosArrayDestroy(pLoadInfo->info.pFirstTs); + taosArrayDestroy(pLoadInfo->info.pLastTs); + + pLoadInfo->info.pUid = NULL; + pLoadInfo->info.pFirstKey = NULL; + pLoadInfo->info.pLastKey = NULL; + pLoadInfo->info.pCount = NULL; + pLoadInfo->info.pFirstTs = NULL; + pLoadInfo->info.pLastTs = NULL; taosArrayDestroy(pLoadInfo->aSttBlk); taosMemoryFree(pLoadInfo); @@ -834,7 +839,6 @@ int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool *hasNext) { int32_t lino = 0; *hasNext = false; - terrno = 0; // no qualified last file block in current file, no need to fetch row if (pIter->pSttBlk == NULL) { @@ -843,6 +847,7 @@ int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool *hasNext) { code = loadLastBlock(pIter, idStr, &pBlockData); if (pBlockData == NULL || code != TSDB_CODE_SUCCESS) { + lino = __LINE__; goto _exit; } @@ -888,6 +893,7 @@ int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool *hasNext) { if (iBlockL != pIter->iSttBlk) { code = loadLastBlock(pIter, idStr, &pBlockData); if ((pBlockData == NULL) || (code != 0)) { + lino = __LINE__; goto _exit; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 2250431841..0e7607425d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -45,7 +45,7 @@ typedef struct { bool moreThanCapcity; } SDataBlockToLoadInfo; -static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo); +static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo, const char* idStr); static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); static void getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes); @@ -920,7 +920,7 @@ static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int return code; } -static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo) { +static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo, const char* id) { *pInfo = NULL; if (pBlockIter->blockList == NULL) { @@ -929,8 +929,10 @@ static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInf size_t num = TARRAY_SIZE(pBlockIter->blockList); if (num == 0) { - tsdbError("tsdb read failed at: %s:%d", __func__, __LINE__); - return TSDB_CODE_INTERNAL_ERROR; + if (num != pBlockIter->numOfBlocks) { + tsdbError("tsdb read failed at: %s:%d %s", __func__, __LINE__, id); + } + return TSDB_CODE_FAILED; } *pInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index); @@ -1206,7 +1208,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro bool asc = ASCENDING_TRAVERSE(pReader->info.order); int32_t step = asc ? 1 : -1; - code = getCurrentBlockInfo(pBlockIter, &pBlockInfo); + code = getCurrentBlockInfo(pBlockIter, &pBlockInfo, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1434,7 +1436,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI } } - code = getCurrentBlockInfo(pBlockIter, &pBlockInfo); + code = getCurrentBlockInfo(pBlockIter, &pBlockInfo, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2786,7 +2788,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo); + code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { return 0; } @@ -2835,7 +2837,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { pDumpInfo->rowIndex += step; if (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0) { - code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo); // NOTE: get the new block info + // NOTE: get the new block info + code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { goto _end; } @@ -3309,7 +3312,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { bool asc = ASCENDING_TRAVERSE(pReader->info.order); int32_t code = TSDB_CODE_SUCCESS; - code = getCurrentBlockInfo(pBlockIter, &pBlockInfo); + code = getCurrentBlockInfo(pBlockIter, &pBlockInfo, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3529,7 +3532,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) SReaderStatus* pStatus = &pReader->status; SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; - int32_t code = getCurrentBlockInfo(pBlockIter, &pBlockInfo); + int32_t code = getCurrentBlockInfo(pBlockIter, &pBlockInfo, pReader->idStr); if (code == TSDB_CODE_SUCCESS) { pDumpInfo->totalRows = pBlockInfo->numRow; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1; @@ -4113,7 +4116,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc CHECK_FILEBLOCK_STATE st; SFileDataBlockInfo* pFileBlockInfo = NULL; - code = getCurrentBlockInfo(&pReader->status.blockIter, &pFileBlockInfo); + code = getCurrentBlockInfo(&pReader->status.blockIter, &pFileBlockInfo, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5377,7 +5380,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, return TSDB_CODE_SUCCESS; } - code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo); + code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5467,7 +5470,7 @@ static int32_t doRetrieveDataBlock(STsdbReader* pReader, SSDataBlock** pBlock) { SFileDataBlockInfo* pBlockInfo = NULL; *pBlock = NULL; - code = getCurrentBlockInfo(&pStatus->blockIter, &pBlockInfo); + code = getCurrentBlockInfo(&pStatus->blockIter, &pBlockInfo, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5683,7 +5686,7 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT while (true) { if (hasNext) { SFileDataBlockInfo* pBlockInfo = NULL; - code = getCurrentBlockInfo(pBlockIter, &pBlockInfo); + code = getCurrentBlockInfo(pBlockIter, &pBlockInfo, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { break; } diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index af8532b5b3..7d09be3300 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -27,11 +27,8 @@ typedef struct SOperatorCostInfo { struct SOperatorInfo; -//typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); -//typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result); - typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); -typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); +typedef int32_t (*__optr_fn_t)(struct SOperatorInfo* pOptr, SSDataBlock** pResBlock); typedef void (*__optr_close_fn_t)(void* param); typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index d7b60b2bcd..b5a3f2f484 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -56,9 +56,8 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock); -static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx); -static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator); - +static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx); +static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey); @@ -123,7 +122,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResult, NULL, destroyAggOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index a219a5b5f0..c26d193a06 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -50,7 +50,7 @@ typedef struct SCacheRowsScanInfo { SColumnInfo pkCol; } SCacheRowsScanInfo; -static SSDataBlock* doScanCache(SOperatorInfo* pOperator); +static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); static void destroyCacheScanOperator(void* param); static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds, int32_t** pDstSlotIds); @@ -235,7 +235,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doScanCacheNext, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); pOperator->cost.openCost = 0; @@ -259,7 +259,7 @@ _error: return code; } -int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { +static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (pOperator->status == OP_EXEC_DONE) { @@ -445,12 +445,6 @@ _end: return code; } -static SSDataBlock* doScanCache(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doScanCacheNext(pOperator, &pRes); - return pRes; -} - void destroyCacheScanOperator(void* param) { SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param; blockDataDestroy(pInfo->pRes); diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index ba07e666a0..1d72b0bb58 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -64,6 +64,8 @@ void destroyCountWindowOperatorInfo(void* param) { taosMemoryFreeClear(param); } +static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); + static void clearWinStateBuff(SCountWindowResult* pBuff) { pBuff->winRows = 0; } static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) { @@ -227,7 +229,6 @@ static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** SExprSupp* pExprSup = &pOperator->exprSupp; int32_t order = pInfo->binfo.inputTsOrder; SSDataBlock* pRes = pInfo->binfo.pRes; - SOperatorInfo* downstream = pOperator->pDownstream[0]; blockDataCleanup(pRes); @@ -292,12 +293,6 @@ _end: return code; } -static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = countWindowAggregateNext(pOperator, &pRes); - return pRes; -} - int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -374,7 +369,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy setOperatorInfo(pOperator, "CountWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, countWindowAggregate, NULL, destroyCountWindowOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, countWindowAggregateNext, NULL, destroyCountWindowOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 6268fa0268..a75bfb8f4b 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -878,20 +878,20 @@ static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** return TSDB_CODE_SUCCESS; } -static FORCE_INLINE SSDataBlock* seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) { - pBlock->info.id.blockId = pStbJoin->outputBlkId; - return pBlock; +static FORCE_INLINE void seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) { + if (pBlock != NULL) { + pBlock->info.id.blockId = pStbJoin->outputBlkId; + } } - -SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { - int32_t code = TSDB_CODE_SUCCESS; +int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) { + int32_t code = TSDB_CODE_SUCCESS; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; - SSDataBlock* pRes = NULL; + QRY_OPTR_CHECK(pRes); if (pOperator->status == OP_EXEC_DONE) { - return pRes; + return code; } int64_t st = 0; @@ -907,25 +907,24 @@ SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { } } - QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, &pRes)); - if (pRes) { + QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes)); + if (*pRes) { goto _return; } - - QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, &pRes)); + + QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes)); _return: - if (pOperator->cost.openCost == 0) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } if (code) { pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } else { + seqStableJoinComposeRes(pStbJoin, *pRes); } - - return pRes ? seqStableJoinComposeRes(pStbJoin, pRes) : NULL; + return code; } int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) { diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 7218291f8c..0b5fd074b0 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -39,9 +39,9 @@ typedef struct SEventWindowOperatorInfo { SSDataBlock* pPreDataBlock; } SEventWindowOperatorInfo; -static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator); -static void destroyEWindowOperatorInfo(void* param); -static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); +static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** pRes); +static void destroyEWindowOperatorInfo(void* param); +static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); // todo : move to util static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, @@ -131,7 +131,7 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregateNext, NULL, destroyEWindowOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -250,12 +250,6 @@ _end: return code; } -static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = eventWindowAggregateNext(pOperator, &pRes); - return pRes; -} - static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult, SExprSupp* pExprSup, SAggSupporter* pAggSup) { if (*pResult == NULL) { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index f354c9a1cc..4315624d97 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -305,12 +305,6 @@ _end: return code; } -static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = loadRemoteDataNext(pOperator, &pRes); - return pRes; -} - static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) { pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); if (pInfo->pSourceDataInfo == NULL) { @@ -447,7 +441,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); - pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, + pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; @@ -465,7 +459,6 @@ _error: pOperator->info = NULL; destroyOperator(pOperator); } - pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 3ecaf61193..e641e151b7 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -693,9 +693,11 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo pTaskInfo->paramSet = true; code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes); } else { - pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); + code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes); } + QUERY_CHECK_CODE(code, lino, _end); + if (pRes == NULL) { st = taosGetTimestampUs(); } @@ -719,6 +721,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo } else { void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + p = *(SSDataBlock**)tmp; code = copyDataBlock(p, pRes); QUERY_CHECK_CODE(code, lino, _end); @@ -736,8 +739,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo break; } - pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); + code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes); + QUERY_CHECK_CODE(code, lino, _end); } + if (pTaskInfo->pSubplan->dynamicRowThreshold) { pTaskInfo->pSubplan->rowsThreshold -= current; } @@ -751,7 +756,6 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo } _end: - (void)cleanUpUdfs(); uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; @@ -759,6 +763,11 @@ _end: GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0); atomic_store_64(&pTaskInfo->owner, 0); + if (code) { + pTaskInfo->code = code; + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return pTaskInfo->code; } @@ -779,9 +788,10 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) { int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); + int32_t lino = 0; + int64_t curOwner = 0; *pRes = NULL; - int64_t curOwner = 0; // todo extract method taosRLockLatch(&pTaskInfo->lock); @@ -823,7 +833,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { int64_t st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); + int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes); + if (code) { + pTaskInfo->code = code; + qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + } + uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; @@ -831,8 +846,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { *useconds = pTaskInfo->cost.elapsedTime; } - int32_t tmpRes = cleanUpUdfs(); - qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); + (void) cleanUpUdfs(); int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0; uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 966bda382b..42e7fbee1f 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1291,12 +1291,8 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera return code; } - *pResBlock = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]); - if (*pResBlock == NULL && terrno != 0) { - return terrno; - } else { - return code; - } + code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock); + return code; } bool compareVal(const char* v, const SStateKeys* pKey) { diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 63f7667890..dfd57ab45b 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -318,12 +318,6 @@ static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { return code; } -static SSDataBlock* doFill(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doFillNext(pOperator, &pRes); - return pRes; -} - void destroyFillOperatorInfo(void* param) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); @@ -513,7 +507,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi } setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFillNext, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index b899524988..9b213487ed 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -692,39 +692,48 @@ _return: return code; } -static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SSDataBlock** ppRes) { - int32_t code = TSDB_CODE_SUCCESS; - SOperatorParam* pDownstreamParam = NULL; - SSDataBlock* pBlock = NULL; +static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx, + SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + SOperatorParam* pDownstreamParam = NULL; + SSDataBlock* pBlock = NULL; SGroupCacheOperatorInfo* pGCache = pOperator->info; + code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam); if (code) { return code; } + SOperatorInfo* pDownstream = pOperator->pDownstream[downstreamIdx]; if (pDownstreamParam) { - code = pOperator->pDownstream[downstreamIdx]->fpSet.getNextExtFn(pOperator->pDownstream[downstreamIdx], pDownstreamParam, &pBlock); + code = pDownstream->fpSet.getNextExtFn(pDownstream, pDownstreamParam, &pBlock); } else { - pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextFn(pOperator->pDownstream[downstreamIdx]); + code = pDownstream->fpSet.getNextFn(pDownstream, &pBlock); + } + + if (code) { + qError("failed to get block from downstream, code:%s %s", tstrerror(code), GET_TASKID(pOperator->pTaskInfo)); + return code; } if (pBlock) { - qDebug("%s blk retrieved from group %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId); - + qDebug("%s res block retrieved from group %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId); + pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++; if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) { code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock); if (code) { return code; } - if (NULL == taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, &pGCache->pDownstreams[downstreamIdx].pBaseBlock)) { + + if (NULL == taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, + &pGCache->pDownstreams[downstreamIdx].pBaseBlock)) { QRY_ERR_RET(terrno); } } } *ppRes = pBlock; - return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3f203e7a95..f0a4914c24 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -531,12 +531,6 @@ _end: return code; } -static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = hashGroupbyAggregateNext(pOperator, &pRes); - return pRes; -} - int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -600,7 +594,7 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregateNext, NULL, destroyGroupOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); QUERY_CHECK_CODE(code, lino, _error); @@ -1103,12 +1097,6 @@ _end: return code; } -static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = hashPartitionNext(pOperator, &pRes); - return pRes; -} - static void destroyPartitionOperatorInfo(void* param) { SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); @@ -1232,7 +1220,7 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartitionNext, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -1583,12 +1571,6 @@ _end: return code; } -static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamHashPartitionNext(pOperator, &pRes); - return pRes; -} - static void destroyStreamPartitionOperatorInfo(void* param) { SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); @@ -1785,7 +1767,7 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, + createOperatorFpSet(optrDummyOpenFn, doStreamHashPartitionNext, NULL, destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 15819cd94a..55620defba 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -989,13 +989,14 @@ void hJoinSetDone(struct SOperatorInfo* pOperator) { qDebug("hash Join done"); } -static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { +static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SHJoinOperatorInfo* pJoin = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int32_t code = TSDB_CODE_SUCCESS; - SSDataBlock* pRes = pJoin->finBlk; - int64_t st = 0; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pRes = pJoin->finBlk; + int64_t st = 0; + QRY_OPTR_CHECK(pResBlock); if (pOperator->cost.openCost == 0) { st = taosGetTimestampUs(); } @@ -1004,7 +1005,7 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { pRes->info.rows = 0; goto _return; } - + if (!pJoin->keyHashBuilt) { pJoin->keyHashBuilt = true; @@ -1012,7 +1013,7 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { code = hJoinBuildHash(pOperator, &queryDone); if (code) { pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + return code; } if (queryDone) { @@ -1026,18 +1027,20 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { code = (*pJoin->joinFp)(pOperator); if (code) { pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + return pTaskInfo->code; } - + if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) { code = doFilter(pRes, pJoin->pFinFilter, NULL); if (code) { pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + return pTaskInfo->code; } } + if (pRes->info.rows > 0) { - return pRes; + *pResBlock = pRes; + return code; } } @@ -1050,39 +1053,41 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { pJoin->execInfo.probeBlkNum++; pJoin->execInfo.probeBlkRows += pBlock->info.rows; - + code = hJoinPrepareStart(pOperator, pBlock); if (code) { pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + return pTaskInfo->code; } if (!hJoinBlkReachThreshold(pJoin, pRes->info.rows)) { continue; } - + if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) { code = doFilter(pRes, pJoin->pFinFilter, NULL); if (code) { pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + return pTaskInfo->code; } } - + if (pRes->info.rows > 0) { break; } } _return: - if (pOperator->cost.openCost == 0) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - - return (pRes->info.rows > 0) ? pRes : NULL; -} + if (pRes->info.rows > 0) { + *pResBlock = pRes; + } + + return code; +} static void destroyHashJoinOperator(void* param) { SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index fced682312..808aac66c2 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1691,13 +1691,13 @@ void mJoinResetOperator(struct SOperatorInfo* pOperator) { pOperator->status = OP_OPENED; } -SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { +int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SMJoinOperatorInfo* pJoin = pOperator->info; int32_t code = TSDB_CODE_SUCCESS; if (pOperator->status == OP_EXEC_DONE) { if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) { qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo)); - return NULL; + return code; } else { mJoinResetOperator(pOperator); qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo)); @@ -1739,7 +1739,10 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { } pJoin->execInfo.resRows += pBlock ? pBlock->info.rows : 0; - return (pBlock && pBlock->info.rows > 0) ? pBlock : NULL; + if (pBlock && pBlock->info.rows > 0) { + *pResBlock = pBlock; + } + return code; } void destroyGrpArray(void* ppArray) { diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 3f85324f57..9e0ad5f497 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -57,18 +57,16 @@ typedef struct SMultiwayMergeOperatorInfo { bool inputWithGroupId; } SMultiwayMergeOperatorInfo; -static SSDataBlock* doSortMerge1(SOperatorInfo* pOperator); -static int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock); -static SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator); +static int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock); +static int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock); static int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock); -static SSDataBlock* doNonSortMerge1(SOperatorInfo* pOperator); -static SSDataBlock* doColsMerge1(SOperatorInfo* pOperator); static int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock); +static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock); int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; - *ppBlock = pOperator->fpSet.getNextFn(pOperator); - return TSDB_CODE_SUCCESS; + int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); + return code; } int32_t openSortMergeOperator(SOperatorInfo* pOperator) { @@ -185,12 +183,6 @@ static int32_t doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHand return code; } -SSDataBlock* doSortMerge1(SOperatorInfo* pOperator) { - SSDataBlock* pBlock = NULL; - pOperator->pTaskInfo->code = doSortMerge(pOperator, &pBlock); - return pBlock; -} - int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SMultiwayMergeOperatorInfo* pInfo = pOperator->info; @@ -343,12 +335,6 @@ int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) { return 0; } -SSDataBlock* doNonSortMerge1(SOperatorInfo* pOperator) { - SSDataBlock* pBlock = NULL; - pOperator->pTaskInfo->code = doNonSortMerge(pOperator, &pBlock); - return pBlock; -} - int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_OPTR_CHECK(pResBlock); @@ -432,12 +418,6 @@ int32_t copyColumnsValue(SNodeList* pNodeList, uint64_t targetBlkId, SSDataBlock return code; } -SSDataBlock* doColsMerge1(SOperatorInfo* pOperator) { - SSDataBlock* pBlock = NULL; - pOperator->pTaskInfo->code = doColsMerge(pOperator, &pBlock); - return pBlock; -} - int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_OPTR_CHECK(pResBlock); @@ -486,9 +466,9 @@ int32_t getColsMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u SOperatorFpSet gMultiwayMergeFps[MERGE_TYPE_MAX_VALUE] = { {0}, - {._openFn = openSortMergeOperator, .getNextFn = doSortMerge1, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo}, - {._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge1, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo}, - {._openFn = openColsMergeOperator, .getNextFn = doColsMerge1, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo}, + {._openFn = openSortMergeOperator, .getNextFn = doSortMerge, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo}, + {._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo}, + {._openFn = openColsMergeOperator, .getNextFn = doColsMerge, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo}, }; @@ -518,30 +498,37 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { return code; } -SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { +int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { + QRY_OPTR_CHECK(pResBlock); + if (pOperator->status == OP_EXEC_DONE) { - return NULL; + return 0; } - SSDataBlock* pBlock = NULL; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SMultiwayMergeOperatorInfo* pInfo = pOperator->info; int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); + pTaskInfo->code = code; + return code; } if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) { - pBlock = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator); + code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, pResBlock); + if (code) { + pTaskInfo->code = code; + return code; + } } - if (pBlock != NULL) { - pOperator->resultInfo.totalRows += pBlock->info.rows; + + if ((*pResBlock) != NULL) { + pOperator->resultInfo.totalRows += (*pResBlock)->info.rows; } else { setOperatorCompleted(pOperator); } - return pBlock; + return code; } void destroyMultiwayMergeOperatorInfo(void* param) { @@ -599,6 +586,7 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); TSDB_CHECK_NULL(pInputBlock, code, lino, _error, terrno); + pSortMergeInfo->pInputBlock = pInputBlock; initResultSizeInfo(&pOperator->resultInfo, 1024); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -611,7 +599,6 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS pSortMergeInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols); pSortMergeInfo->sortBufSize = pSortMergeInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. - pSortMergeInfo->pInputBlock = pInputBlock; code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pSortMergeInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 0ff6870405..fc52b97388 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -862,10 +862,13 @@ int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM); if (TSDB_CODE_SUCCESS != code) { pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } else { + code = pOperator->fpSet.getNextFn(pOperator, pRes); + if (code) { + pOperator->pTaskInfo->code = code; + } } - *pRes = pOperator->fpSet.getNextFn(pOperator); return code; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index bf523af918..7e06c083ed 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -165,7 +165,7 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation1, NULL, destroyProjectOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState); @@ -445,16 +445,6 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { return code; } -SSDataBlock* doProjectOperation1(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doProjectOperation(pOperator, &pRes); - if (code && pOperator->pTaskInfo->code == 0) { - pOperator->pTaskInfo->code = code; - } - - return pRes; -} - int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -526,7 +516,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction1, NULL, destroyIndefinitOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ce1a614613..557794a062 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -68,6 +68,7 @@ typedef struct STableCountScanOperatorInfo { } STableCountScanOperatorInfo; static bool processBlockWithProbability(const SSampleExecInfo* pInfo); +static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); bool processBlockWithProbability(const SSampleExecInfo* pInfo) { #if 0 @@ -1004,30 +1005,30 @@ _end: return code; } -static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doTableScanImplNext(pOperator, &pRes); - return pRes; -} - -static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { +static int32_t doGroupedTableScan(SOperatorInfo* pOperator, SSDataBlock** pBlock) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; STableScanInfo* pTableScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + QRY_OPTR_CHECK(pBlock); + // The read handle is not initialized yet, since no qualified tables exists if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) { - return NULL; + return code; } // do the ascending order traverse in the first place. while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { - SSDataBlock* p = doTableScanImpl(pOperator); + SSDataBlock* p = NULL; + code = doTableScanImplNext(pOperator, &p); + QUERY_CHECK_CODE(code, lino, _end); + if (p != NULL) { markGroupProcessed(pTableScanInfo, p->info.id.groupId); - return p; + *pBlock = p; + return code; } pTableScanInfo->scanTimes += 1; @@ -1055,10 +1056,14 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { } while (pTableScanInfo->scanTimes < total) { - SSDataBlock* p = doTableScanImpl(pOperator); + SSDataBlock* p = NULL; + code = doTableScanImplNext(pOperator, &p); + QUERY_CHECK_CODE(code, lino, _end); + if (p != NULL) { markGroupProcessed(pTableScanInfo, p->info.id.groupId); - return p; + *pBlock = p; + return code; } pTableScanInfo->scanTimes += 1; @@ -1082,8 +1087,13 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED; STableKeyInfo* pStart = (STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex); - if (NULL == pStart) return NULL; - return getBlockForEmptyTable(pOperator, pStart); + + if (NULL == pStart) { + return code; + } + + *pBlock = getBlockForEmptyTable(pOperator, pStart); + return code; } } else { // group by tag + no sort int32_t numOfTables = 0; @@ -1099,7 +1109,9 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { STableKeyInfo info = {.uid = *(uint64_t*)pIte, .groupId = *pGroupId}; taosHashCancelIterate(pTableListInfo->remainGroups, pIte); markGroupProcessed(pTableScanInfo, *pGroupId); - return getBlockForEmptyTable(pOperator, &info); + *pBlock = getBlockForEmptyTable(pOperator, &info); + + return code; } } } @@ -1110,9 +1122,9 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - return NULL; + + return code; } static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { @@ -1172,13 +1184,16 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { return code; } -static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { +static int32_t startNextGroupScan(SOperatorInfo* pOperator, SSDataBlock** pResult) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; int32_t numOfTables = 0; + + QRY_OPTR_CHECK(pResult); + code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables); QUERY_CHECK_CODE(code, lino, _end); @@ -1188,7 +1203,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { taosArrayClear(pInfo->base.pTableListInfo->pTableList); taosHashClear(pInfo->base.pTableListInfo->map); } - return NULL; + return code; } // reset value for the next group data output @@ -1207,21 +1222,22 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { QUERY_CHECK_CODE(code, lino, _end); pInfo->scanTimes = 0; - SSDataBlock* result = doGroupedTableScan(pOperator); - if (result != NULL) { + code = doGroupedTableScan(pOperator, pResult); + QUERY_CHECK_CODE(code, lino, _end); + + if (*pResult != NULL) { if (pOperator->dynamicTask) { - result->info.id.groupId = result->info.id.uid; + (*pResult)->info.id.groupId = (*pResult)->info.id.uid; } - return result; + return code; } _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - return NULL; + return code; } static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { @@ -1260,7 +1276,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { } } - result = doGroupedTableScan(pOperator); + result = NULL; + code = doGroupedTableScan(pOperator, &result); + QUERY_CHECK_CODE(code, lino, _end); + if (result != NULL) { if (pOperator->dynamicTask) { result->info.id.groupId = result->info.id.uid; @@ -1269,7 +1288,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { } while (true) { - result = startNextGroupScan(pOperator); + code = startNextGroupScan(pOperator, &result); + QUERY_CHECK_CODE(code, lino, _end); + if (result || pOperator->status == OP_EXEC_DONE) { return result; } @@ -1290,6 +1311,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + QRY_OPTR_CHECK(ppRes); if (pOperator->pOperatorGetParam) { pOperator->dynamicTask = true; @@ -1302,8 +1324,11 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pInfo->currentGroupId = -1; pOperator->status = OP_OPENED; SSDataBlock* result = NULL; + while (true) { - result = startNextGroupScan(pOperator); + code = startNextGroupScan(pOperator, &result); + QUERY_CHECK_CODE(code, lino, _end); + if (result || pOperator->status == OP_EXEC_DONE) { (*ppRes) = result; return code; @@ -1319,7 +1344,10 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pInfo->countState = TABLE_COUNT_STATE_END; while (1) { - SSDataBlock* result = doGroupedTableScan(pOperator); + SSDataBlock* result = NULL; + code = doGroupedTableScan(pOperator, &result); + QUERY_CHECK_CODE(code, lino, _end); + if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) { (*ppRes) = result; return code; @@ -1343,6 +1371,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { (*ppRes) = NULL; return code; } + STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); if (!tmp) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); @@ -1371,16 +1400,9 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - (*ppRes) = NULL; - return code; -} -static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doTableScanNext(pOperator, &pRes); - return pRes; + return code; } static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { @@ -1500,7 +1522,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa pInfo->filesetDelimited = pTableScanNode->filesetDelimited; taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanNext, NULL, destroyTableScanOperatorInfo, optrDefaultBufFn, getTableScannerExecInfo, optrDefaultGetNextExtFn, NULL); // for non-blocking operator, the open cost is always 0 @@ -1538,7 +1560,7 @@ int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskIn setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImplNext, NULL, NULL, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; @@ -1835,18 +1857,23 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t static int32_t doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex, SSDataBlock** ppRes) { qDebug("do stream range scan. windows index:%d", *pRowIndex); + int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; bool prepareRes = true; while (1) { SSDataBlock* pResult = NULL; - pResult = doTableScan(pInfo->pTableScanOp); + code = doTableScanNext(pInfo->pTableScanOp, &pResult); + QUERY_CHECK_CODE(code, lino, _end); + if (!pResult) { prepareRangeScan(pInfo, pSDB, pRowIndex, &prepareRes); // scan next window data - pResult = doTableScan(pInfo->pTableScanOp); + code = doTableScanNext(pInfo->pTableScanOp, &pResult); + QUERY_CHECK_CODE(code, lino, _end); } + if (!pResult) { if (prepareRes) { continue; @@ -2894,7 +2921,9 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { while (1) { - SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + SSDataBlock* pResult = NULL; + code = doTableScanNext(pInfo->pTableScanOp, &pResult); + QUERY_CHECK_CODE(code, lino, _end); if (pResult && pResult->info.rows > 0) { bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader); @@ -3234,7 +3263,9 @@ static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { break; } - pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp); + code = doTableScanNext(pInfo->pTableScanOp, &pInfo->pRecoverRes); + QUERY_CHECK_CODE(code, lino, _end); + if (pInfo->pRecoverRes != NULL) { code = calBlockTbName(pInfo, pInfo->pRecoverRes, 0); QUERY_CHECK_CODE(code, lino, _end); @@ -3787,12 +3818,6 @@ _end: return code; } -static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doRawScanNext(pOperator, &pRes); - return pRes; -} - static void destroyRawScanOperatorInfo(void* param) { SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param; pRawScan->pAPI->tsdReader.tsdReaderClose(pRawScan->dataReader); @@ -3831,7 +3856,7 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL, + pOperator->fpSet = createOperatorFpSet(NULL, doRawScanNext, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; @@ -4210,7 +4235,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan; + __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScanNext : doQueueScanNext; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamScanReleaseState, streamScanReloadState); @@ -4782,7 +4807,7 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p } } //TODO wjm check pInfo->filterCtx.code - __optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry; + __optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdxNext : doTagScanFromMetaEntryNext; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; @@ -6047,7 +6072,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR pOperator->exprSupp.numOfExprs = numOfCols; pOperator->fpSet = createOperatorFpSet( - optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTables : doTableMergeScan, NULL, + optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTablesNext : doTableMergeScanNext, NULL, destroyTableMergeScanOperatorInfo, optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL); pOperator->cost.openCost = 0; @@ -6073,7 +6098,6 @@ _error: // ==================================================================================================================== // TableCountScanOperator -static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator); static void destoryTableCountScanOperator(void* param); static int32_t buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI); @@ -6215,7 +6239,7 @@ int32_t createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountSca setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScanNext, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; @@ -6384,12 +6408,6 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe return code; } -static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doTableCountScanNext(pOperator, &pRes); - return pRes; -} - static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes) { int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index d9b1e40510..fb4b61c7a8 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -43,11 +43,9 @@ typedef struct SSortOperatorInfo { SSortOpGroupIdCalc* pGroupIdCalc; } SSortOperatorInfo; -static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock); -static SSDataBlock* doSort1(SOperatorInfo* pOperator); -static int32_t doOpenSortOperator(SOperatorInfo* pOperator); -static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); -static SSDataBlock* doGroupSort1(SOperatorInfo* pOperator); +static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock); +static int32_t doOpenSortOperator(SOperatorInfo* pOperator); +static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); static int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock); static void destroySortOperatorInfo(void* param); @@ -149,7 +147,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN // TODO dynamic set the available sort buffer pOperator->fpSet = - createOperatorFpSet(doOpenSortOperator, doSort1, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL); + createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -342,8 +340,7 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; - *ppBlock = pOperator->fpSet.getNextFn(pOperator); - return TSDB_CODE_SUCCESS; + return pOperator->fpSet.getNextFn(pOperator, ppBlock); } // todo refactor: merged with fetch fp @@ -404,12 +401,6 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { return code; } -SSDataBlock* doSort1(SOperatorInfo* pOperator) { - SSDataBlock* pBlock = NULL; - pOperator->pTaskInfo->code = doSort(pOperator, &pBlock); - return pBlock; -} - int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_OPTR_CHECK(pResBlock); if (pOperator->status == OP_EXEC_DONE) { @@ -613,17 +604,23 @@ typedef struct SGroupSortSourceParam { } SGroupSortSourceParam; int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) { - *ppBlock = NULL; - + int32_t code = 0; + int32_t lino = 0; SGroupSortSourceParam* source = param; SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo; + SSDataBlock* block = NULL; + + QRY_OPTR_CHECK(ppBlock); + if (grpSortOpInfo->prefetchedSortInput) { - SSDataBlock* block = grpSortOpInfo->prefetchedSortInput; + block = grpSortOpInfo->prefetchedSortInput; grpSortOpInfo->prefetchedSortInput = NULL; *ppBlock = block; } else { SOperatorInfo* childOp = source->childOpInfo; - SSDataBlock* block = childOp->fpSet.getNextFn(childOp); + code = childOp->fpSet.getNextFn(childOp, &block); + QUERY_CHECK_CODE(code, lino, _end); + if (block != NULL) { if (block->info.id.groupId == grpSortOpInfo->currGroupId) { grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; @@ -637,7 +634,12 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) { } } - return TSDB_CODE_SUCCESS; + return code; +_end: + if (code != 0) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t beginSortGroup(SOperatorInfo* pOperator) { @@ -695,12 +697,6 @@ int32_t finishSortGroup(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -SSDataBlock* doGroupSort1(SOperatorInfo* pOperator) { - SSDataBlock* pBlock = NULL; - pOperator->pTaskInfo->code = doGroupSort(pOperator, &pBlock); - return pBlock; -} - int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_OPTR_CHECK(pResBlock); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -836,7 +832,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort1, NULL, destroyGroupSortOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo, optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index b4f8d6837a..54ad12cff0 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -658,10 +658,14 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { break; } + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); setStreamOperatorState(&pInfo->basic, pBlock->info.type); @@ -745,12 +749,6 @@ _end: return code; } -static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamCountAggNext(pOperator, &pRes); - return pRes; -} - void streamCountReleaseState(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -908,7 +906,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); taosMemoryFree(buff); } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAgg, NULL, destroyStreamCountAggOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAggNext, NULL, destroyStreamCountAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 38a813bf33..a8e14bce68 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -629,10 +629,15 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { break; } + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); setStreamOperatorState(&pInfo->basic, pBlock->info.type); @@ -734,12 +739,6 @@ _end: return code; } -static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamEventAggNext(pOperator, &pRes); - return pRes; -} - void streamEventReleaseState(SOperatorInfo* pOperator) { SStreamEventAggOperatorInfo* pInfo = pOperator->info; int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); @@ -966,7 +965,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAggNext, NULL, destroyStreamEventOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState); code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 453e64a151..3a6d0c709c 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1164,12 +1164,6 @@ _end: return code; } -static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamFillNext(pOperator, &pRes); - return pRes; -} - static int32_t initResultBuf(SStreamFillSupporter* pFillSup) { pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols; for (int i = 0; i < pFillSup->numOfAllCols; i++) { @@ -1449,7 +1443,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi pInfo->srcRowIndex = -1; setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 2d29bcd069..0651e2dbf6 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -60,7 +60,7 @@ typedef struct SPullWindowInfo { STimeWindow calWin; } SPullWindowInfo; -static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator); +static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index); @@ -1606,7 +1606,10 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc return code; } - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType), @@ -1614,6 +1617,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc pInfo->numOfDatapack = 0; break; } + pInfo->numOfDatapack++; printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); setStreamOperatorState(&pInfo->basic, pBlock->info.type); @@ -1768,12 +1772,6 @@ _end: return code; } -static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamFinalIntervalAggNext(pOperator, &pRes); - return pRes; -} - int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) { if (pWinPhyNode->deleteMark <= 0) { return DEAULT_DELETE_MARK; @@ -2000,10 +1998,10 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pOperator->info = pInfo; if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) { - pOperator->fpSet = createOperatorFpSet(NULL, doStreamMidIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, + pOperator->fpSet = createOperatorFpSet(NULL, doStreamMidIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } else { - pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, + pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); @@ -3400,7 +3398,10 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { break; } @@ -3843,7 +3844,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode QUERY_CHECK_CODE(code, lino, _error); } } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAggNext, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); @@ -3969,7 +3970,10 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock* QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; break; @@ -4079,7 +4083,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo, + createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAggNext, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState); } @@ -4696,7 +4700,10 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { break; } @@ -5017,7 +5024,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAggNext, NULL, destroyStreamStateOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState); code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, @@ -5111,7 +5118,10 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType), pInfo->numOfDatapack); @@ -5225,12 +5235,6 @@ _end: return code; } -static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStreamIntervalAggNext(pOperator, &pRes); - return pRes; -} - int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -5341,7 +5345,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, + createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); @@ -5606,7 +5610,10 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* return code; } - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType), diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 016ec7cdb3..5838e833f1 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2096,15 +2096,6 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) } } -static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doSysTableScanNext(pOperator, &pRes); - if (code) { - terrno = code; - } - return pRes; -} - static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name, SSDataBlock* pBlock) { int32_t code = TSDB_CODE_SUCCESS; @@ -2295,7 +2286,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScanNext, NULL, destroySysScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; @@ -2911,7 +2902,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScanNext, NULL, destroyBlockDistScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; return code; diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 7f23255257..1b8e6709d1 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1085,12 +1085,6 @@ _finished: return code; } -static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doTimesliceNext(pOperator, &pRes); - return pRes; -} - static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn* pPkColumn) { SNode* pNode; FOREACH(pNode, pFuncs) { @@ -1189,7 +1183,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTimesliceNext, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c9bb1e69e0..024e0393f0 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1161,12 +1161,6 @@ _end: return code; } -static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doStateWindowAggNext(pOperator, &pRes); - return pRes; -} - static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1211,12 +1205,6 @@ _end: return code; } -static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doBuildIntervalResultNext(pOperator, &pRes); - return pRes; -} - static void destroyStateWindowOperatorInfo(void* param) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); @@ -1431,7 +1419,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, + pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -1631,12 +1619,6 @@ _end: return code; } -static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doSessionWindowAggNext(pOperator, &pRes); - return pRes; -} - // todo make this as an non-blocking operator int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { @@ -1712,7 +1694,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, + pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -1816,7 +1798,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); @@ -2061,12 +2043,6 @@ static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock return code; } -static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = mergeAlignedIntervalAggNext(pOperator, &pRes); - return pRes; -} - int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -2139,7 +2115,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, false, OP_NOT_OPENED, miaInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -2411,12 +2387,6 @@ _end: return code; } -static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doMergeIntervalAggNext(pOperator, &pRes); - return pRes; -} - int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); @@ -2483,7 +2453,7 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo); setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 7d3cf0580c..6dd42c3a93 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -2967,7 +2967,8 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { jtCtx.startTsUs = taosGetTimestampUs(); while (true) { - SSDataBlock* pBlock = jtCtx.pJoinOp->fpSet.getNextFn(jtCtx.pJoinOp); + SSDataBlock* pBlock = NULL; + int32_t code = jtCtx.pJoinOp->fpSet.getNextFn(jtCtx.pJoinOp, &pBlock); if (NULL == pBlock) { checkJoinDone(caseName); break;