diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index b4dd6d61e4..22a43012c5 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -41,7 +41,7 @@ typedef struct SBlockOrderInfo { #define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT]) #define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_)))) -#define QRY_OPTR_CHECK(_o) \ +#define QRY_PARAM_CHECK(_o) \ do { \ if ((_o) == NULL) { \ return TSDB_CODE_INVALID_PARA; \ diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0066d12129..ea3e88919b 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -849,7 +849,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock) { int32_t code = 0; - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) { return TSDB_CODE_INVALID_PARA; @@ -1263,18 +1263,19 @@ static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBloc } static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) { - *ppCols = NULL; - int32_t code = 0; int32_t rows = pDataBlock->info.capacity; size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + int32_t i = 0; + + *ppCols = NULL; SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData)); if (pCols == NULL) { return terrno; } - for (int32_t i = 0; i < numOfCols; ++i) { + for (i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i); if (pColInfoData == NULL) { continue; @@ -1309,6 +1310,10 @@ static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoD return code; _error: + for(int32_t j = 0; j < i; ++j) { + colDataDestroy(&pCols[j]); + } + taosMemoryFree(pCols); return code; } @@ -1753,7 +1758,7 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { } int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) { - QRY_OPTR_CHECK(pBlock); + QRY_PARAM_CHECK(pBlock); int32_t code = 0; SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock)); @@ -1846,7 +1851,7 @@ _err: } int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); if (pDataBlock == NULL) { return TSDB_CODE_INVALID_PARA; @@ -1946,7 +1951,7 @@ _end: } int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); if (pDataBlock == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -2029,7 +2034,7 @@ int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataB } int32_t createDataBlock(SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pBlock == NULL) { return terrno; @@ -2080,7 +2085,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData) { int32_t code = 0; - QRY_OPTR_CHECK(pColInfoData); + QRY_PARAM_CHECK(pColInfoData); if (index >= taosArrayGetSize(pBlock->pDataBlock)) { return TSDB_CODE_INVALID_PARA; @@ -2871,7 +2876,7 @@ bool alreadyAddGroupId(char* ctbName, int64_t groupId) { } int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** pName) { - QRY_OPTR_CHECK(pName); + QRY_PARAM_CHECK(pName); char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); if (!pBuf) { diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index fc2b873054..ed67bfca3e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -408,9 +408,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl for (int32_t k = startIndex; k < endIndex; ++k) { code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block); - if (code) { - return code; - } + QUERY_CHECK_CODE(code, lino, _end); int32_t i = 0; int32_t rows = block.numOfRecords; @@ -536,12 +534,15 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl } _end: - (void)tStatisBlockDestroy(&block); + (void) tStatisBlockDestroy(&block); + if (code != 0) { + tsdbError("%s error happens at:%s line number: %d, code:%s", id, __func__, lino, tstrerror(code)); + } else { + double el = (taosGetTimestampUs() - st) / 1000.0; + pBlockLoadInfo->cost.statisElapsedTime += el; - double el = (taosGetTimestampUs() - st) / 1000.0; - pBlockLoadInfo->cost.statisElapsedTime += el; - - tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el); + tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el); + } return code; } @@ -953,6 +954,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF pMTree->pIter = NULL; pMTree->backward = pConf->backward; pMTree->idStr = pConf->idstr; + int32_t lino = 0; if (!pMTree->backward) { // asc tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); @@ -1027,9 +1029,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF // let's record the time window for current table of uid in the stt files if (pSttDataInfo != NULL && numOfRows > 0) { void *px = taosArrayPush(pSttDataInfo->pKeyRangeList, &range); - if (px == NULL) { - return terrno; - } + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + pSttDataInfo->numOfRows += numOfRows; } } else { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 0c61366513..cc4c3d318a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -403,7 +403,7 @@ static void initReaderStatus(SReaderStatus* pStatus) { } static int32_t createResBlock(SQueryTableDataCond* pCond, int32_t capacity, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); SSDataBlock* pBlock = NULL; int32_t code = createDataBlock(&pBlock); diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 138782495f..80766855cf 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -78,7 +78,7 @@ static int32_t getSchemaBytes(const SSchema* pSchema) { } static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) { - QRY_OPTR_CHECK(pOutput); + QRY_PARAM_CHECK(pOutput); SSDataBlock* pBlock = NULL; int32_t code = createDataBlock(&pBlock); @@ -236,7 +236,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** static int32_t execResetQueryCache() { return catalogClearCache(); } static int32_t buildCreateDBResultDataBlock(SSDataBlock** pOutput) { - QRY_OPTR_CHECK(pOutput); + QRY_PARAM_CHECK(pOutput); SSDataBlock* pBlock = NULL; int32_t code = createDataBlock(&pBlock); @@ -475,7 +475,7 @@ static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveT } static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) { - QRY_OPTR_CHECK(pOutput); + QRY_PARAM_CHECK(pOutput); SSDataBlock* pBlock = NULL; int32_t code = createDataBlock(&pBlock); @@ -499,7 +499,7 @@ static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) { } static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) { - QRY_OPTR_CHECK(pOutput); + QRY_PARAM_CHECK(pOutput); SSDataBlock* pBlock = NULL; int32_t code = createDataBlock(&pBlock); @@ -929,7 +929,7 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) { } static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) { - QRY_OPTR_CHECK(pOutput); + QRY_PARAM_CHECK(pOutput); SSDataBlock* pBlock = NULL; int32_t code = createDataBlock(&pBlock); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index d9af279813..4605d19464 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -70,7 +70,7 @@ static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t lino = 0; int32_t code = 0; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index fdf20b4613..b575613efe 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -97,7 +97,7 @@ _end: int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 713116d605..2233d58ef8 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -295,7 +295,7 @@ _end: int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 8058fa9afe..44e8a3cb8a 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -889,7 +889,7 @@ int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; - QRY_OPTR_CHECK(pRes); + QRY_PARAM_CHECK(pRes); if (pOperator->status == OP_EXEC_DONE) { return code; } @@ -958,7 +958,7 @@ int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) { int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; __optr_fn_t nextFp = NULL; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index a4be2ddf7d..0f3a08c14b 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -61,7 +61,7 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index d43f37ca9e..457aa8ff2b 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -401,7 +401,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = 0; int32_t lino = 0; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index db3017468d..05ed5a9d1e 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1924,7 +1924,7 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) { } int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) { - QRY_OPTR_CHECK(pExprInfo); + QRY_PARAM_CHECK(pExprInfo); int32_t code = 0; int32_t numOfFuncs = LIST_LENGTH(pNodeList); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index a1bf64a10d..00138d6871 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1280,7 +1280,7 @@ void freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); int32_t code = 0; if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) { diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 21b67c8ffb..76b41ca99a 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -423,7 +423,7 @@ static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiN int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = 0; int32_t lino = 0; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 6a9482e31c..1796aa7b64 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -1444,7 +1444,7 @@ static int32_t groupCacheTableCacheEnd(SOperatorInfo* pOperator, SOperatorParam* int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo)); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 47b35fe9f3..e4ace6b83a 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -353,38 +353,6 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } } -static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; - SGroupbyOperatorInfo* pInfo = pOperator->info; - SSDataBlock* pRes = pInfo->binfo.pRes; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - while (1) { - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); - QUERY_CHECK_CODE(code, lino, _end); - - if (!hasRemainResults(&pInfo->groupResInfo)) { - setOperatorCompleted(pOperator); - break; - } - - if (pRes->info.rows > 0) { - break; - } - } - - pOperator->resultInfo.totalRows += pRes->info.rows; - -_end: - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); - } - return (pRes->info.rows == 0) ? NULL : pRes; -} - bool hasRemainResultByHash(SOperatorInfo* pOperator) { SGroupbyOperatorInfo* pInfo = pOperator->info; SSHashObj* pHashmap = pInfo->aggSup.pResultRowHashTable; @@ -463,25 +431,23 @@ _end: } static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SGroupbyOperatorInfo* pInfo = pOperator->info; + SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo; + int32_t order = pInfo->binfo.inputTsOrder; + int64_t st = taosGetTimestampUs(); + + QRY_PARAM_CHECK(ppRes); if (pOperator->status == OP_EXEC_DONE) { - (*ppRes) = NULL; return TSDB_CODE_SUCCESS; } - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - SGroupbyOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { (*ppRes) = buildGroupResultDataBlockByHash(pOperator); return code; } - SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo; - - int32_t order = pInfo->binfo.inputTsOrder; - int64_t st = taosGetTimestampUs(); - SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); @@ -511,10 +477,12 @@ static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** if (pGroupResInfo->pRows != NULL) { taosArrayDestroy(pGroupResInfo->pRows); } + if (pGroupResInfo->pBuf) { taosMemoryFree(pGroupResInfo->pBuf); pGroupResInfo->pBuf = NULL; } + pGroupResInfo->index = 0; pGroupResInfo->iter = 0; pGroupResInfo->dataPos = NULL; @@ -525,15 +493,16 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + } else { + (*ppRes) = buildGroupResultDataBlockByHash(pOperator); } - (*ppRes) = buildGroupResultDataBlockByHash(pOperator); + return code; } int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1127,7 +1096,7 @@ static void destroyPartitionOperatorInfo(void* param) { int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1668,7 +1637,7 @@ void freePartItem(void* ptr) { int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 4e630dbd18..7f13608653 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -996,7 +996,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p SSDataBlock* pRes = pJoin->finBlk; int64_t st = 0; - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); if (pOperator->cost.openCost == 0) { st = taosGetTimestampUs(); } @@ -1182,7 +1182,7 @@ int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinN int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo)); diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 30cc596a44..90c2248c12 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1867,7 +1867,7 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) { int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t oldNum = numOfDownstream; bool newDownstreams = false; diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 93bddb0787..7357329ca6 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -336,7 +336,7 @@ int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) { } int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SMultiwayMergeOperatorInfo* pInfo = pOperator->info; @@ -419,7 +419,7 @@ int32_t copyColumnsValue(SNodeList* pNodeList, uint64_t targetBlkId, SSDataBlock } int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SMultiwayMergeOperatorInfo* pInfo = pOperator->info; @@ -499,7 +499,7 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { } int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); if (pOperator->status == OP_EXEC_DONE) { return 0; @@ -556,7 +556,7 @@ int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplai int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode; int32_t lino = 0; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 983b70c575..6fe270161f 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -180,7 +180,7 @@ ERetType extractOperatorInfo(SOperatorInfo* pOperator, STraverParam* pParam, con // QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); if (pOperator == NULL) { qError("invalid operator, failed to find tableScanOperator %s", id); @@ -282,7 +282,7 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SSto int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = 0; int32_t type = nodeType(pPhyNode); @@ -878,7 +878,7 @@ SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, i } int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) { - QRY_OPTR_CHECK(pRes); + QRY_PARAM_CHECK(pRes); int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 633eda6bf9..790e97b27c 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -93,7 +93,7 @@ void streamOperatorReloadState(SOperatorInfo* pOperator) { int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); @@ -262,7 +262,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS } int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; @@ -441,7 +441,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = 0; int32_t lino = 0; int32_t numOfRows = 4096; @@ -572,7 +572,7 @@ SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) { } int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); SIndefOperatorInfo* pIndefInfo = pOperator->info; SOptrBasicInfo* pInfo = &pIndefInfo->binfo; @@ -717,7 +717,7 @@ int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, } int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) { - QRY_OPTR_CHECK(pResList); + QRY_PARAM_CHECK(pResList); SArray* pList = taosArrayInit(4, sizeof(int32_t)); if (pList == NULL) { return terrno; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index beb3da84b2..42e7e4ac3b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -67,7 +67,7 @@ typedef struct STableCountScanOperatorInfo { SArray* stbUidList; // when group by db_name and/or stable_name } STableCountScanOperatorInfo; -static bool processBlockWithProbability(const SSampleExecInfo* pInfo); +static bool processBlockWithProbability(const SSampleExecInfo* pInfo); static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); bool processBlockWithProbability(const SSampleExecInfo* pInfo) { @@ -937,13 +937,13 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes STableScanInfo* pTableScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SSDataBlock* pBlock = pTableScanInfo->pResBlock; + bool hasNext = false; + int64_t st = taosGetTimestampUs(); - SSDataBlock* pBlock = pTableScanInfo->pResBlock; - bool hasNext = false; + QRY_PARAM_CHECK(ppRes); pBlock->info.dataLoad = false; - int64_t st = taosGetTimestampUs(); - while (true) { code = pAPI->tsdReader.tsdNextDataBlock(pTableScanInfo->base.dataReader, &hasNext); if (code != TSDB_CODE_SUCCESS) { @@ -957,7 +957,7 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes if (isTaskKilled(pTaskInfo)) { pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader); - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + return pTaskInfo->code; } if (pOperator->status == OP_EXEC_DONE) { @@ -993,6 +993,7 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime; pBlock->info.scanFlag = pTableScanInfo->base.scanFlag; + (*ppRes) = pBlock; return code; } @@ -1001,9 +1002,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - (*ppRes) = NULL; return code; } @@ -1014,7 +1013,7 @@ static int32_t doGroupedTableScan(SOperatorInfo* pOperator, SSDataBlock** pBlock SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - QRY_OPTR_CHECK(pBlock); + QRY_PARAM_CHECK(pBlock); // The read handle is not initialized yet, since no qualified tables exists if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) { @@ -1194,7 +1193,7 @@ static int32_t startNextGroupScan(SOperatorInfo* pOperator, SSDataBlock** pResul SStorageAPI* pAPI = &pTaskInfo->storageAPI; int32_t numOfTables = 0; - QRY_OPTR_CHECK(pResult); + QRY_PARAM_CHECK(pResult); code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables); QUERY_CHECK_CODE(code, lino, _end); @@ -1242,7 +1241,7 @@ _end: return code; } -static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { +static int32_t groupSeqTableScan(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; STableScanInfo* pInfo = pOperator->info; @@ -1250,12 +1249,14 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { SStorageAPI* pAPI = &pTaskInfo->storageAPI; int32_t num = 0; STableKeyInfo* pList = NULL; - SSDataBlock* result = NULL; + SSDataBlock* pResult = NULL; + + QRY_PARAM_CHECK(pResBlock); if (pInfo->currentGroupId == -1) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); - return NULL; + return code; } taosRLockLatch(&pTaskInfo->lock); @@ -1273,28 +1274,32 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { if (pInfo->filesetDelimited) { pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader); } + if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) { pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity; } } - result = NULL; - code = doGroupedTableScan(pOperator, &result); + pResult = NULL; + code = doGroupedTableScan(pOperator, &pResult); QUERY_CHECK_CODE(code, lino, _end); - if (result != NULL) { + if (pResult != NULL) { if (pOperator->dynamicTask) { - result->info.id.groupId = result->info.id.uid; + pResult->info.id.groupId = pResult->info.id.uid; } - return result; + + *pResBlock = pResult; + return code; } while (true) { - code = startNextGroupScan(pOperator, &result); + code = startNextGroupScan(pOperator, &pResult); QUERY_CHECK_CODE(code, lino, _end); - if (result || pOperator->status == OP_EXEC_DONE) { - return result; + if (pResult || pOperator->status == OP_EXEC_DONE) { + *pResBlock = pResult; + return code; } } @@ -1302,9 +1307,9 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - return result; + + return code; } static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { @@ -1313,7 +1318,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - QRY_OPTR_CHECK(ppRes); + QRY_PARAM_CHECK(ppRes); if (pOperator->pOperatorGetParam) { pOperator->dynamicTask = true; @@ -1394,8 +1399,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pInfo->scanTimes = 0; } } else { // scan table group by group sequentially - (*ppRes) = groupSeqTableScan(pOperator); - return code; + code = groupSeqTableScan(pOperator, ppRes); } _end: @@ -1447,7 +1451,7 @@ static void destroyTableScanOperatorInfo(void* param) { int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1547,7 +1551,7 @@ _error: } int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = 0; STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); @@ -3704,7 +3708,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { if (pInfo->dataReader && hasNext) { if (isTaskKilled(pTaskInfo)) { pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader); - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + return code; } SSDataBlock* pBlock = NULL; @@ -3806,6 +3810,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } } + (*ppRes) = NULL; return code; } @@ -3814,8 +3819,8 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } + (*ppRes) = NULL; return code; } @@ -3837,7 +3842,7 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo // create meta reader // create tq reader - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -4029,7 +4034,7 @@ _end: int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -4262,7 +4267,7 @@ _error: return code; } -static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr, +static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr, SStorageAPI* pAPI) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -4276,7 +4281,7 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, GET_TASKID(pTaskInfo)); tDecoderClear(&(*mr).coder); pAPI->metaReaderFn.clearReader(mr); - T_LONG_JMP(pTaskInfo->env, terrno); + goto _end; } code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid); @@ -4285,7 +4290,7 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), GET_TASKID(pTaskInfo)); pAPI->metaReaderFn.clearReader(mr); - T_LONG_JMP(pTaskInfo->env, terrno); + goto _end; } char str[512]; @@ -4314,12 +4319,13 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, } else { data = (char*)p; } + code = colDataSetVal(pDst, (count), data, (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); QUERY_CHECK_CODE(code, lino, _end); - if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && - data != NULL) { + if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && + (data != NULL)) { taosMemoryFree(data); } } @@ -4329,8 +4335,9 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } + + return code; } static void tagScanFreeUidTag(void* p) { @@ -4558,10 +4565,6 @@ _end: } static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { - if (pOperator->status == OP_EXEC_DONE) { - (*ppRes) = NULL; - return TSDB_CODE_SUCCESS; - } int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4569,6 +4572,12 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p STagScanInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->pRes; + + QRY_PARAM_CHECK(ppRes); + + if (pOperator->status == OP_EXEC_DONE) { + return TSDB_CODE_SUCCESS; + } blockDataCleanup(pRes); if (pInfo->pCtbCursor == NULL) { @@ -4635,28 +4644,23 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p pInfo->pCtbCursor = NULL; setOperatorCompleted(pOperator); } - pRes->info.rows = count; + pRes->info.rows = count; bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo); if (bLimitReached) { setOperatorCompleted(pOperator); } + pOperator->resultInfo.totalRows += pRes->info.rows; + (*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes; + _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - pOperator->resultInfo.totalRows += pRes->info.rows; - (*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes; - return code; -} -static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doTagScanFromCtbIdxNext(pOperator, &pRes); - return pRes; + return code; } static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { @@ -4687,18 +4691,18 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock* return code; } - char str[512] = {0}; int32_t count = 0; SMetaReader mr = {0}; pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { - doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI); + code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI); ++count; if (++pInfo->curPos >= size) { setOperatorCompleted(pOperator); } } + pRes->info.rows = count; pAPI->metaReaderFn.clearReader(&mr); @@ -4706,6 +4710,7 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock* if (bLimitReached) { setOperatorCompleted(pOperator); } + // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count); if (pOperator->status == OP_EXEC_DONE) { setTaskStatus(pTaskInfo, TASK_COMPLETED); @@ -4745,7 +4750,7 @@ static void destroyTagScanOperatorInfo(void* param) { int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -4913,7 +4918,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, pInput->pKeyInfo, 1, pInput->pReaderBlock, (void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL); if (code != 0) { - T_LONG_JMP(pTaskInfo->env, code); + return code; } } @@ -4921,18 +4926,20 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu while (true) { bool hasNext = false; - int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext); + code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext); if (code != 0) { pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader); pInfo->base.dataReader = NULL; - T_LONG_JMP(pTaskInfo->env, code); + return code; } + if (!hasNext || isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) { pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader); pInfo->base.dataReader = NULL; - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + return code; } + *pSubTableHasBlock = false; break; } @@ -4947,8 +4954,9 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu code = loadDataBlock(pOperator, &pInfo->base, pInput->pReaderBlock, &status); if (code != 0) { pInfo->base.dataReader = NULL; - T_LONG_JMP(pTaskInfo->env, code); + return code; } + if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { *pSubTableHasBlock = false; break; @@ -5215,49 +5223,56 @@ _end: return code; } -static SSDataBlock* getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pResBlock, int32_t capacity) { +static int32_t getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t capacity, SSDataBlock** pResBlock) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; + bool finished = false; + + QRY_PARAM_CHECK(pResBlock); + + blockDataCleanup(pBlock); - blockDataCleanup(pResBlock); - bool finished = false; while (true) { - while (1) { + while (true) { if (pSubTblsInfo->numSubTablesCompleted >= pSubTblsInfo->numSubTables) { finished = true; break; } - code = appendChosenRowToDataBlock(pSubTblsInfo, pResBlock); + code = appendChosenRowToDataBlock(pSubTblsInfo, pBlock); QUERY_CHECK_CODE(code, lino, _end); + code = adjustSubTableForNextRow(pOperator, pSubTblsInfo); QUERY_CHECK_CODE(code, lino, _end); - if (pResBlock->info.rows >= capacity) { + if (pBlock->info.rows >= capacity) { break; } } if (isTaskKilled(pTaskInfo)) { - T_LONG_JMP(pOperator->pTaskInfo->env, pTaskInfo->code); + return pTaskInfo->code; } - bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); - if (finished || limitReached || pResBlock->info.rows > 0) { + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo); + if (finished || limitReached || pBlock->info.rows > 0) { break; } } + if (pBlock->info.rows > 0) { + *pResBlock = pBlock; + } + _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - return (pResBlock->info.rows > 0) ? pResBlock : NULL; + return code; } static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) { @@ -5307,23 +5322,22 @@ static void stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) { } int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { - if (pOperator->status == OP_EXEC_DONE) { - (*ppRes) = NULL; - return TSDB_CODE_SUCCESS; - } + QRY_PARAM_CHECK(ppRes); + + int32_t lino = 0; + int32_t tableListSize = 0; + int64_t st = taosGetTimestampUs(); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableMergeScanInfo* pInfo = pOperator->info; - int32_t lino = 0; - int32_t code = pOperator->fpSet._openFn(pOperator); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); + if (pOperator->status == OP_EXEC_DONE) { + return TSDB_CODE_SUCCESS; } - int64_t st = taosGetTimestampUs(); + int32_t code = pOperator->fpSet._openFn(pOperator); + QUERY_CHECK_CODE(code, lino, _end); - int32_t tableListSize = 0; code = tableListGetSize(pInfo->base.pTableListInfo, &tableListSize); QUERY_CHECK_CODE(code, lino, _end); @@ -5335,6 +5349,7 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock* (*ppRes) = NULL; return code; } + pInfo->tableStartIndex = 0; STableKeyInfo* pTmpGpId = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno); @@ -5346,15 +5361,19 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock* SSDataBlock* pBlock = NULL; while (pInfo->tableStartIndex < tableListSize) { if (isTaskKilled(pTaskInfo)) { - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + break; } - pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity); + code = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) { STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno); + pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo); } + if (pBlock != NULL) { pBlock->info.id.groupId = pInfo->groupId; pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -5385,16 +5404,11 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + } else { + (*ppRes) = pBlock; } - (*ppRes) = pBlock; - return code; -} -static SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doTableMergeScanParaSubTablesNext(pOperator, &pRes); - return pRes; + return code; } static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeOpInfo) { @@ -5426,7 +5440,7 @@ _end: } } -static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { +static int32_t doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; @@ -5439,7 +5453,8 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe if (code != 0) { pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, code); + pTaskInfo->code = code; + return code; } if (!hasNext || isTaskKilled(pTaskInfo)) { @@ -5448,7 +5463,7 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); } *pFinished = true; - return; + return code; } uint32_t status = 0; @@ -5456,21 +5471,22 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe if (code != TSDB_CODE_SUCCESS) { qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, code); + pTaskInfo->code = code; + return code; } if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { *pFinished = true; - return; + return code; } // current block is filter out according to filter condition, continue load the next block if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { *pSkipped = true; - return; + return code; } - return; + return code; } static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) { @@ -5481,7 +5497,7 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSDataBlock* pBlock = NULL; int64_t st = taosGetTimestampUs(); - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; while (true) { if (pInfo->rtnNextDurationBlocks) { @@ -5505,7 +5521,12 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) { } else { bool bFinished = false; bool bSkipped = false; - doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); + + code = doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); + if (code != 0) { + return code; + } + pBlock = pInfo->pReaderBlock; qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); @@ -5518,7 +5539,6 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) { if (!bSkipped) { code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]); if (code) { - terrno = code; *ppBlock = NULL; return code; } @@ -5657,7 +5677,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { param->pOperator = pOperator; SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - QUERY_CHECK_NULL(ps, code, lino, _end, terrno); + if (ps == NULL) { + taosMemoryFree(param); + QUERY_CHECK_NULL(ps, code, lino, _end, terrno); + } + ps->param = param; ps->onlyRef = false; code = tsortAddSource(pInfo->pSortHandle, ps); @@ -5852,7 +5876,7 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SSDataBlock* pBlock = NULL; while (pInfo->tableStartIndex < tableListSize) { if (isTaskKilled(pTaskInfo)) { - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + goto _end; } pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, @@ -5896,9 +5920,10 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + } else { + (*ppRes) = pBlock; } - (*ppRes) = pBlock; + return code; } @@ -5971,7 +5996,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -6106,11 +6131,11 @@ static int32_t buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI); static int32_t buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI); -static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, +static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName); -static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, +static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName); -static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, +static int32_t buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes); static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum, @@ -6213,7 +6238,7 @@ int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* int32_t createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -6396,23 +6421,29 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe STableCountScanOperatorInfo* pInfo = pOperator->info; STableCountScanSupp* pSupp = &pInfo->supp; SSDataBlock* pRes = pInfo->pRes; + blockDataCleanup(pRes); + QRY_PARAM_CHECK(ppRes); if (pOperator->status == OP_EXEC_DONE) { - (*ppRes) = NULL; return code; } + if (pInfo->readHandle.mnd != NULL) { (*ppRes) = buildSysDbTableCount(pOperator, pInfo); return code; } - (*ppRes) = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes); + code = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes); + if ((pRes->info.rows > 0) && (code == 0)) { + *ppRes = pRes; + } + return code; } -static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, - STableCountScanSupp* pSupp, SSDataBlock* pRes) { +static int32_t buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, + STableCountScanSupp* pSupp, SSDataBlock* pRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; const char* db = NULL; @@ -6424,27 +6455,29 @@ static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCount // get dbname pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL); SName sn = {0}; + code = tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB); QUERY_CHECK_CODE(code, lino, _end); + code = tNameGetDbName(&sn, dbName); QUERY_CHECK_CODE(code, lino, _end); if (pSupp->groupByDbName || pSupp->groupByStbName) { - buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName); + code = buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName); } else { - buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName); + code = buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName); } _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - return pRes->info.rows > 0 ? pRes : NULL; + + return code; } -static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, +static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -6458,6 +6491,7 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca code = pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, TSDB_SUPER_TABLE, pInfo->stbUidList); QUERY_CHECK_CODE(code, lino, _end); } + if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) { tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx); code = buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid, pAPI); @@ -6487,11 +6521,11 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } + return code; } -static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, +static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -6527,9 +6561,10 @@ _end: if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); } + setOperatorCompleted(pOperator); + return code; } static int32_t buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index bb494cacfa..9b659ac761 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -55,7 +55,7 @@ static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc); // todo add limit/offset impl int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = 0; int32_t lino = 0; @@ -252,7 +252,7 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); blockDataCleanup(pDataBlock); int32_t lino = 0; int32_t code = 0; @@ -355,25 +355,26 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { pInfo->startTs = taosGetTimestampUs(); // pInfo->binfo.pRes is not equalled to the input datablock. pInfo->pSortHandle = NULL; - int32_t code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, - pInfo->maxRows, pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle); + int32_t code = + tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows, + pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle); if (code) { return code; } tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - if (ps == NULL) { + SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); + if (pSource == NULL) { return terrno; } - ps->param = pOperator->pDownstream[0]; - ps->onlyRef = true; + pSource->param = pOperator->pDownstream[0]; + pSource->onlyRef = true; - code = tsortAddSource(pInfo->pSortHandle, ps); + code = tsortAddSource(pInfo->pSortHandle, pSource); if (code) { - taosMemoryFree(ps); + taosMemoryFree(pSource); return code; } @@ -390,7 +391,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { } int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); if (pOperator->status == OP_EXEC_DONE) { return 0; } @@ -400,7 +401,7 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); + return code; } // multi-group case not handle here @@ -408,7 +409,7 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { while (1) { if (tsortIsClosed(pInfo->pSortHandle)) { code = TSDB_CODE_TSC_QUERY_CANCELLED; - T_LONG_JMP(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, @@ -516,7 +517,7 @@ typedef struct SGroupSortOperatorInfo { int32_t getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SGroupSortOperatorInfo* pInfo, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); blockDataCleanup(pDataBlock); int32_t code = blockDataEnsureCapacity(pDataBlock, capacity); @@ -598,7 +599,7 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) { SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo; SSDataBlock* block = NULL; - QRY_OPTR_CHECK(ppBlock); + QRY_PARAM_CHECK(ppBlock); if (grpSortOpInfo->prefetchedSortInput) { block = grpSortOpInfo->prefetchedSortInput; @@ -648,23 +649,22 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam)); if (ps == NULL || param == NULL) { - T_LONG_JMP(pTaskInfo->env, terrno); + taosMemoryFree(ps); + taosMemoryFree(param); + return terrno; } param->childOpInfo = pOperator->pDownstream[0]; param->grpSortOpInfo = pInfo; + ps->param = param; ps->onlyRef = false; code = tsortAddSource(pInfo->pCurrSortHandle, ps); - if (code) { - T_LONG_JMP(pTaskInfo->env, code); + if (code != 0) { + return code; } code = tsortOpen(pInfo->pCurrSortHandle); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - return code; } @@ -686,7 +686,7 @@ int32_t finishSortGroup(SOperatorInfo* pOperator) { } int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { - QRY_OPTR_CHECK(pResBlock); + QRY_PARAM_CHECK(pResBlock); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SGroupSortOperatorInfo* pInfo = pOperator->info; @@ -696,7 +696,7 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); + return code; } if (!pInfo->hasGroupId) { @@ -720,15 +720,14 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { while (pInfo->pCurrSortHandle != NULL) { if (tsortIsClosed(pInfo->pCurrSortHandle)) { code = TSDB_CODE_TSC_QUERY_CANCELLED; - T_LONG_JMP(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } // beginSortGroup would fetch all child blocks of pInfo->currGroupId; if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) { - code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; - pOperator->pTaskInfo->code = code; + pTaskInfo->code = code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - T_LONG_JMP(pOperator->pTaskInfo->env, code); + return code; } code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, @@ -777,7 +776,7 @@ void destroyGroupSortOperatorInfo(void* param) { int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = 0; int32_t lino = 0; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index b086b41611..524ef6ee2f 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -807,7 +807,7 @@ _end: int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); SCountWinodwPhysiNode* pCountNode = (SCountWinodwPhysiNode*)pPhyNode; int32_t numOfCols = 0; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 70f88fc37f..735b470325 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -858,7 +858,7 @@ _end: int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); SStreamEventWinodwPhysiNode* pEventNode = (SStreamEventWinodwPhysiNode*)pPhyNode; int32_t tsSlotId = ((SColumnNode*)pEventNode->window.pTspk)->slotId; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index d87e78ede3..58e57eb88b 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1349,7 +1349,7 @@ _end: int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index acadf32937..6196647eff 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1886,7 +1886,7 @@ _end: int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -3754,7 +3754,7 @@ _end: int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; int32_t numOfCols = 0; @@ -4081,7 +4081,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -4931,7 +4931,7 @@ _end: int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = 0; int32_t lino = 0; @@ -5248,7 +5248,7 @@ _end: int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 42fc5b7f7d..bf1153f412 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2233,7 +2233,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -2864,7 +2864,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = 0; int32_t lino = 0; diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 32a769ade3..f9211e5730 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1104,7 +1104,7 @@ static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn } int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = 0; int32_t lino = 0; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 1999694057..59883d2b80 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1320,7 +1320,7 @@ _end: int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1616,7 +1616,7 @@ _end: // todo make this as an non-blocking operator int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1727,7 +1727,7 @@ void destroySWindowOperatorInfo(void* param) { int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -2027,7 +2027,7 @@ static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -2366,7 +2366,7 @@ _end: int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { - QRY_OPTR_CHECK(pOptrInfo); + QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 2792e6a197..796ebbeb84 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -79,9 +79,8 @@ struct SSortHandle { bool forceUsePQSort; BoundedQueue* pBoundedQueue; uint32_t tmpRowIdx; - - int64_t mergeLimit; - int64_t currMergeLimitTs; + int64_t mergeLimit; + int64_t currMergeLimitTs; int32_t sourceId; SSDataBlock* pDataBlock; @@ -288,7 +287,7 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t code = 0; int32_t lino = 0; - QRY_OPTR_CHECK(pHandle); + QRY_PARAM_CHECK(pHandle); SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle)); QUERY_CHECK_NULL(pSortHandle, code, lino, _err, terrno); @@ -367,7 +366,7 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { return TSDB_CODE_SUCCESS; } -void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) { +void tsortClearOrderedSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) { for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) { SSortSource** pSource = taosArrayGet(pOrderedSource, i); if (NULL == *pSource) { @@ -413,10 +412,11 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { destroyDiskbasedBuf(pSortHandle->pBuf); taosMemoryFreeClear(pSortHandle->idStr); blockDataDestroy(pSortHandle->pDataBlock); + if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue); int64_t fetchUs = 0, fetchNum = 0; - tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum); + tsortClearOrderedSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum); qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr); taosArrayDestroy(pSortHandle->pOrderedSource); @@ -465,7 +465,13 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } - return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows); + int32_t code = blockDataEnsureCapacity(pSource->src.pBlock, numOfRows); + if (code != 0) { + qError("sort failed at: %s:%d", __func__, __LINE__); + taosArrayDestroy(pPageIdList); + } + + return code; } static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { @@ -935,9 +941,99 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { } } } + return 0; } +static int32_t doSortForEachGroup(SSortHandle* pHandle, int32_t sortTimes, int32_t numOfSorted, + int32_t numOfInputSources, SArray* pResList, int32_t sortGroup, int32_t numOfRows) { + int32_t code = 0; + int32_t lino = 0; + SArray* pPageIdList = NULL; + + for (int32_t i = 0; i < sortGroup; ++i) { + qDebug("internal merge sort pass %d group %d. num input sources %d ", sortTimes, i, numOfInputSources); + pHandle->sourceId += 1; + + int32_t end = (i + 1) * numOfInputSources - 1; + if (end > numOfSorted - 1) { + end = numOfSorted - 1; + } + + pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1; + + code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle); + QUERY_CHECK_CODE(code, lino, _err); + + code = + tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); + QUERY_CHECK_CODE(code, lino, _err); + + int32_t nMergedRows = 0; + pPageIdList = taosArrayInit(4, sizeof(int32_t)); + QUERY_CHECK_NULL(pPageIdList, code, lino, _err, terrno); + + while (1) { + if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) { + code = TSDB_CODE_TSC_QUERY_CANCELLED; + goto _err; + } + + SSDataBlock* pDataBlock = NULL; + code = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows, &pDataBlock); + if (pDataBlock == NULL || code != 0) { + break; + } + + int32_t pageId = -1; + void* pPage = getNewBufPage(pHandle->pBuf, &pageId); + QUERY_CHECK_NULL(pPage, code, lino, _err, terrno); + + void* px = taosArrayPush(pPageIdList, &pageId); + QUERY_CHECK_NULL(px, code, lino, _err, terrno); + + int32_t size = + blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t); + if (size > getBufPageSize(pHandle->pBuf)) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + goto _err; + } + + code = blockDataToBuf(pPage, pDataBlock); + QUERY_CHECK_CODE(code, lino, _err); + + setBufPageDirty(pPage, true); + releaseBufPage(pHandle->pBuf, pPage); + nMergedRows += pDataBlock->info.rows; + + blockDataCleanup(pDataBlock); + if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { + break; + } + } + + code = sortComparCleanup(&pHandle->cmpParam); + QUERY_CHECK_CODE(code, lino, _err); + + tMergeTreeDestroy(&pHandle->pMergeTree); + pHandle->numOfCompletedSources = 0; + + SSDataBlock* pBlock = NULL; + code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock); + QUERY_CHECK_CODE(code, lino, _err); + + code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList); + QUERY_CHECK_CODE(code, lino, _err); + } + + return code; + +_err: + taosArrayDestroy(pPageIdList); + qError("%s error happens:%s line:%d, code:%s", pHandle->idStr, __func__, lino, tstrerror(code)); + return code; +} + static int32_t doInternalMergeSort(SSortHandle* pHandle) { size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource); if (numOfSources == 0) { @@ -959,8 +1055,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { pHandle->numOfPages); } - int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, - blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock))); + int32_t size = (int32_t) blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock)); + int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, size); if (numOfRows < 0) { return terrno; } @@ -985,117 +1081,22 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources; // Only *numOfInputSources* can be loaded into buffer to perform the external sort. - for (int32_t i = 0; i < sortGroup; ++i) { - qDebug("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources); - pHandle->sourceId += 1; - - int32_t end = (i + 1) * numOfInputSources - 1; - if (end > numOfSorted - 1) { - end = numOfSorted - 1; - } - - pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1; - - code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pResList); - return code; - } - - code = - tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pResList); - return code; - } - - int32_t nMergedRows = 0; - SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); - if (pPageIdList == NULL) { - taosArrayDestroy(pResList); - return terrno; - } - - while (1) { - if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) { - code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED; - return code; - } - - SSDataBlock* pDataBlock = NULL; - code = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows, &pDataBlock); - if (pDataBlock == NULL || code != 0) { - break; - } - - int32_t pageId = -1; - void* pPage = getNewBufPage(pHandle->pBuf, &pageId); - if (pPage == NULL) { - taosArrayDestroy(pResList); - taosArrayDestroy(pPageIdList); - return terrno; - } - - void* px = taosArrayPush(pPageIdList, &pageId); - if (px == NULL) { - taosArrayDestroy(pResList); - taosArrayDestroy(pPageIdList); - return terrno; - } - - int32_t size = - blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t); - if (size > getBufPageSize(pHandle->pBuf)) { - qError("sort failed at: %s:%d", __func__, __LINE__); - return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; - } - - code= blockDataToBuf(pPage, pDataBlock); - if (code) { - return code; - } - - setBufPageDirty(pPage, true); - releaseBufPage(pHandle->pBuf, pPage); - nMergedRows += pDataBlock->info.rows; - - blockDataCleanup(pDataBlock); - if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { - break; - } - } - - code = sortComparCleanup(&pHandle->cmpParam); - if (code) { - return code; - } - - tMergeTreeDestroy(&pHandle->pMergeTree); - pHandle->numOfCompletedSources = 0; - - SSDataBlock* pBlock = NULL; - - code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock); - if (code) { - taosArrayDestroy(pResList); - return code; - } - - code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pResList); - return code; - } + code = doSortForEachGroup(pHandle, t, numOfSorted, numOfInputSources, pResList, sortGroup, numOfRows); + if (code != 0) { + tsortClearOrderedSource(pResList, NULL, NULL); + taosArrayDestroy(pResList); + return code; } - tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL); void* px = taosArrayAddAll(pHandle->pOrderedSource, pResList); if (px == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + tsortClearOrderedSource(pResList, NULL, NULL); + taosArrayDestroy(pResList); + return terrno; } taosArrayDestroy(pResList); - numOfSorted = taosArrayGetSize(pHandle->pOrderedSource); int64_t el = taosGetTimestampUs() - st; @@ -2346,7 +2347,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } } - tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL); if (!tsortIsClosed(pHandle)) { void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); QUERY_CHECK_NULL(px, code, lino, _err, terrno); @@ -2378,37 +2379,44 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { return code; } -static void freeSSortSource(SSortSource* source) { - if (NULL == source) { +static void freeSortSource(SSortSource* pSource) { + if (NULL == pSource) { return; } - if (source->param && !source->onlyRef) { - taosMemoryFree(source->param); + if (!pSource->onlyRef && pSource->param) { + taosMemoryFree(pSource->param); } - if (!source->onlyRef && source->src.pBlock) { - blockDataDestroy(source->src.pBlock); - source->src.pBlock = NULL; + + if (!pSource->onlyRef && pSource->src.pBlock) { + blockDataDestroy(pSource->src.pBlock); + pSource->src.pBlock = NULL; } - taosMemoryFree(source); + + taosMemoryFree(pSource); } static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { int32_t code = 0; size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; - SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); - if (pSource == NULL) { + SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0); + if (p == NULL) { return terrno; } - SSortSource* source = *pSource; - *pSource = NULL; + SSortSource* pSource = *p; - tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + taosArrayRemove(pHandle->pOrderedSource, 0); + tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL); while (1) { SSDataBlock* pBlock = NULL; - TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock)); + code = pHandle->fetchfp(pSource->param, &pBlock); + if (code != 0) { + freeSortSource(pSource); + return code; + } + if (pBlock == NULL) { break; } @@ -2422,7 +2430,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { sortBufSize = pHandle->numOfPages * pHandle->pageSize; code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock); if (code) { - freeSSortSource(source); + freeSortSource(pSource); return code; } } @@ -2433,47 +2441,45 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { code = blockDataMerge(pHandle->pDataBlock, pBlock); if (code != TSDB_CODE_SUCCESS) { - freeSSortSource(source); + freeSortSource(pSource); return code; } size_t size = blockDataGetSize(pHandle->pDataBlock); if (size > sortBufSize) { // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); + int64_t st = taosGetTimestampUs(); code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); if (code != 0) { - freeSSortSource(source); + freeSortSource(pSource); return code; } - int64_t el = taosGetTimestampUs() - p; - pHandle->sortElapsed += el; + pHandle->sortElapsed += (taosGetTimestampUs() - st); + if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows); code = doAddToBuf(pHandle->pDataBlock, pHandle); if (code != TSDB_CODE_SUCCESS) { - freeSSortSource(source); + freeSortSource(pSource); return code; } } } - freeSSortSource(source); + freeSortSource(pSource); if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) { size_t size = blockDataGetSize(pHandle->pDataBlock); // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); - + int64_t st = taosGetTimestampUs(); code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); if (code != 0) { return code; } if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows); - int64_t el = taosGetTimestampUs() - p; - pHandle->sortElapsed += el; + pHandle->sortElapsed += (taosGetTimestampUs() - st); // All sorted data can fit in memory, external memory sort is not needed. Return to directly if (size <= sortBufSize && pHandle->pBuf == NULL) { @@ -2488,6 +2494,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { code = doAddToBuf(pHandle->pDataBlock, pHandle); } } + return code; } @@ -2500,7 +2507,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { code = createBlocksMergeSortInitialSources(pHandle); } - qDebug("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); + qDebug("%s %zu sources created", pHandle->idStr, taosArrayGetSize(pHandle->pOrderedSource)); return code; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 81061cd06d..36d0086ac6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1224,7 +1224,7 @@ void streamMetaWUnLock(SStreamMeta* pMeta) { } int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { - QRY_OPTR_CHECK(pList); + QRY_PARAM_CHECK(pList); int32_t code = 0; SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);