fix(query): fix memory leak and do some internal refactor.

This commit is contained in:
Haojun Liao 2024-09-06 15:44:25 +08:00
parent 8a1d527da9
commit 0236db51f6
31 changed files with 243 additions and 238 deletions

View File

@ -41,7 +41,7 @@ typedef struct SBlockOrderInfo {
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
#define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_))))
#define QRY_OPTR_CHECK(_o) \
#define QRY_PARAM_CHECK(_o) \
do { \
if ((_o) == NULL) { \
return TSDB_CODE_INVALID_PARA; \

View File

@ -849,7 +849,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock) {
int32_t code = 0;
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
return TSDB_CODE_INVALID_PARA;
@ -1753,7 +1753,7 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
}
int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) {
QRY_OPTR_CHECK(pBlock);
QRY_PARAM_CHECK(pBlock);
int32_t code = 0;
SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock));
@ -1846,7 +1846,7 @@ _err:
}
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pDataBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
@ -1946,7 +1946,7 @@ _end:
}
int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pDataBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
@ -2029,7 +2029,7 @@ int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataB
}
int32_t createDataBlock(SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
return terrno;
@ -2080,7 +2080,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId)
int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData) {
int32_t code = 0;
QRY_OPTR_CHECK(pColInfoData);
QRY_PARAM_CHECK(pColInfoData);
if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
return TSDB_CODE_INVALID_PARA;
@ -2854,7 +2854,7 @@ bool alreadyAddGroupId(char* ctbName, int64_t groupId) {
}
int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** pName) {
QRY_OPTR_CHECK(pName);
QRY_PARAM_CHECK(pName);
char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
if (!pBuf) {

View File

@ -403,7 +403,7 @@ static void initReaderStatus(SReaderStatus* pStatus) {
}
static int32_t createResBlock(SQueryTableDataCond* pCond, int32_t capacity, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);

View File

@ -78,7 +78,7 @@ static int32_t getSchemaBytes(const SSchema* pSchema) {
}
static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) {
QRY_OPTR_CHECK(pOutput);
QRY_PARAM_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
@ -236,7 +236,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp**
static int32_t execResetQueryCache() { return catalogClearCache(); }
static int32_t buildCreateDBResultDataBlock(SSDataBlock** pOutput) {
QRY_OPTR_CHECK(pOutput);
QRY_PARAM_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
@ -475,7 +475,7 @@ static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveT
}
static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) {
QRY_OPTR_CHECK(pOutput);
QRY_PARAM_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
@ -499,7 +499,7 @@ static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) {
}
static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) {
QRY_OPTR_CHECK(pOutput);
QRY_PARAM_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
@ -929,7 +929,7 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
}
static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) {
QRY_OPTR_CHECK(pOutput);
QRY_PARAM_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);

View File

@ -70,7 +70,7 @@ static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t lino = 0;
int32_t code = 0;

View File

@ -97,7 +97,7 @@ _end:
int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -295,7 +295,7 @@ _end:
int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -889,7 +889,7 @@ int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
QRY_OPTR_CHECK(pRes);
QRY_PARAM_CHECK(pRes);
if (pOperator->status == OP_EXEC_DONE) {
return code;
}
@ -958,7 +958,7 @@ int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
__optr_fn_t nextFp = NULL;

View File

@ -61,7 +61,7 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -401,7 +401,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;

View File

@ -1924,7 +1924,7 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
}
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
QRY_OPTR_CHECK(pExprInfo);
QRY_PARAM_CHECK(pExprInfo);
int32_t code = 0;
int32_t numOfFuncs = LIST_LENGTH(pNodeList);

View File

@ -1280,7 +1280,7 @@ void freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType
FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam,
SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
int32_t code = 0;
if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) {

View File

@ -423,7 +423,7 @@ static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiN
int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;

View File

@ -1444,7 +1444,7 @@ static int32_t groupCacheTableCacheEnd(SOperatorInfo* pOperator, SOperatorParam*
int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo));

View File

@ -353,38 +353,6 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
}
static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SGroupbyOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
if (!hasRemainResults(&pInfo->groupResInfo)) {
setOperatorCompleted(pOperator);
break;
}
if (pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pRes->info.rows;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
return (pRes->info.rows == 0) ? NULL : pRes;
}
bool hasRemainResultByHash(SOperatorInfo* pOperator) {
SGroupbyOperatorInfo* pInfo = pOperator->info;
SSHashObj* pHashmap = pInfo->aggSup.pResultRowHashTable;
@ -463,25 +431,23 @@ _end:
}
static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupbyOperatorInfo* pInfo = pOperator->info;
SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo;
int32_t order = pInfo->binfo.inputTsOrder;
int64_t st = taosGetTimestampUs();
QRY_PARAM_CHECK(ppRes);
if (pOperator->status == OP_EXEC_DONE) {
return TSDB_CODE_SUCCESS;
}
if (pOperator->status == OP_RES_TO_RETURN) {
(*ppRes) = buildGroupResultDataBlockByHash(pOperator);
return code;
}
SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo;
int32_t order = pInfo->binfo.inputTsOrder;
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
@ -511,10 +477,12 @@ static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows);
}
if (pGroupResInfo->pBuf) {
taosMemoryFree(pGroupResInfo->pBuf);
pGroupResInfo->pBuf = NULL;
}
pGroupResInfo->index = 0;
pGroupResInfo->iter = 0;
pGroupResInfo->dataPos = NULL;
@ -525,15 +493,16 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
} else {
(*ppRes) = buildGroupResultDataBlockByHash(pOperator);
}
return code;
}
int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1127,7 +1096,7 @@ static void destroyPartitionOperatorInfo(void* param) {
int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1668,7 +1637,7 @@ void freePartItem(void* ptr) {
int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -996,7 +996,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
SSDataBlock* pRes = pJoin->finBlk;
int64_t st = 0;
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pOperator->cost.openCost == 0) {
st = taosGetTimestampUs();
}
@ -1182,7 +1182,7 @@ int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinN
int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo));

View File

@ -1867,7 +1867,7 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t oldNum = numOfDownstream;
bool newDownstreams = false;

View File

@ -336,7 +336,7 @@ int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) {
}
int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
@ -419,7 +419,7 @@ int32_t copyColumnsValue(SNodeList* pNodeList, uint64_t targetBlkId, SSDataBlock
}
int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
@ -499,7 +499,7 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
}
int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pOperator->status == OP_EXEC_DONE) {
return 0;
@ -556,7 +556,7 @@ int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplai
int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, SMergePhysiNode* pMergePhyNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode;
int32_t lino = 0;

View File

@ -180,7 +180,7 @@ ERetType extractOperatorInfo(SOperatorInfo* pOperator, STraverParam* pParam, con
// QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
if (pOperator == NULL) {
qError("invalid operator, failed to find tableScanOperator %s", id);
@ -282,7 +282,7 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SSto
int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t type = nodeType(pPhyNode);
@ -878,7 +878,7 @@ SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, i
}
int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
QRY_OPTR_CHECK(pRes);
QRY_PARAM_CHECK(pRes);
int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
if (TSDB_CODE_SUCCESS != code) {

View File

@ -93,7 +93,7 @@ void streamOperatorReloadState(SOperatorInfo* pOperator) {
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
@ -262,7 +262,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
}
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
@ -441,7 +441,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;
int32_t numOfRows = 4096;
@ -572,7 +572,7 @@ SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) {
}
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SIndefOperatorInfo* pIndefInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
@ -717,7 +717,7 @@ int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
}
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
QRY_OPTR_CHECK(pResList);
QRY_PARAM_CHECK(pResList);
SArray* pList = taosArrayInit(4, sizeof(int32_t));
if (pList == NULL) {
return terrno;

View File

@ -937,13 +937,13 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
bool hasNext = false;
pBlock->info.dataLoad = false;
int64_t st = taosGetTimestampUs();
QRY_PARAM_CHECK(ppRes);
pBlock->info.dataLoad = false;
while (true) {
code = pAPI->tsdReader.tsdNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
if (code != TSDB_CODE_SUCCESS) {
@ -957,7 +957,7 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
if (isTaskKilled(pTaskInfo)) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
return pTaskInfo->code;
}
if (pOperator->status == OP_EXEC_DONE) {
@ -993,6 +993,7 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
pBlock->info.scanFlag = pTableScanInfo->base.scanFlag;
(*ppRes) = pBlock;
return code;
}
@ -1001,9 +1002,7 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = NULL;
return code;
}
@ -1014,7 +1013,7 @@ static int32_t doGroupedTableScan(SOperatorInfo* pOperator, SSDataBlock** pBlock
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
QRY_OPTR_CHECK(pBlock);
QRY_PARAM_CHECK(pBlock);
// The read handle is not initialized yet, since no qualified tables exists
if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
@ -1194,7 +1193,7 @@ static int32_t startNextGroupScan(SOperatorInfo* pOperator, SSDataBlock** pResul
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t numOfTables = 0;
QRY_OPTR_CHECK(pResult);
QRY_PARAM_CHECK(pResult);
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
QUERY_CHECK_CODE(code, lino, _end);
@ -1242,7 +1241,7 @@ _end:
return code;
}
static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
static int32_t groupSeqTableScan(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
STableScanInfo* pInfo = pOperator->info;
@ -1250,12 +1249,14 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t num = 0;
STableKeyInfo* pList = NULL;
SSDataBlock* result = NULL;
SSDataBlock* pResult = NULL;
QRY_PARAM_CHECK(pResBlock);
if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
return NULL;
return code;
}
taosRLockLatch(&pTaskInfo->lock);
@ -1268,33 +1269,39 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->pIgnoreTables);
QUERY_CHECK_CODE(code, lino, _end);
// QUERY_CHECK_CODE(code, lino, _end);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
if (pInfo->filesetDelimited) {
pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader);
}
if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
}
}
result = NULL;
code = doGroupedTableScan(pOperator, &result);
pResult = NULL;
code = doGroupedTableScan(pOperator, &pResult);
QUERY_CHECK_CODE(code, lino, _end);
if (result != NULL) {
if (pResult != NULL) {
if (pOperator->dynamicTask) {
result->info.id.groupId = result->info.id.uid;
pResult->info.id.groupId = pResult->info.id.uid;
}
return result;
*pResBlock = pResult;
return code;
}
while (true) {
code = startNextGroupScan(pOperator, &result);
code = startNextGroupScan(pOperator, &pResult);
QUERY_CHECK_CODE(code, lino, _end);
if (result || pOperator->status == OP_EXEC_DONE) {
return result;
if (pResult || pOperator->status == OP_EXEC_DONE) {
*pResBlock = pResult;
return code;
}
}
@ -1302,9 +1309,9 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return result;
return code;
}
static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
@ -1313,7 +1320,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
QRY_OPTR_CHECK(ppRes);
QRY_PARAM_CHECK(ppRes);
if (pOperator->pOperatorGetParam) {
pOperator->dynamicTask = true;
@ -1394,8 +1401,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
pInfo->scanTimes = 0;
}
} else { // scan table group by group sequentially
(*ppRes) = groupSeqTableScan(pOperator);
return code;
code = groupSeqTableScan(pOperator, ppRes);
}
_end:
@ -1447,7 +1453,7 @@ static void destroyTableScanOperatorInfo(void* param) {
int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1547,7 +1553,7 @@ _error:
}
int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
@ -3704,7 +3710,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pInfo->dataReader && hasNext) {
if (isTaskKilled(pTaskInfo)) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
return code;
}
SSDataBlock* pBlock = NULL;
@ -3806,6 +3812,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
}
(*ppRes) = NULL;
return code;
}
@ -3814,8 +3821,8 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = NULL;
return code;
}
@ -3837,7 +3844,7 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo
// create meta reader
// create tq reader
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -4029,7 +4036,7 @@ _end:
int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -4262,7 +4269,7 @@ _error:
return code;
}
static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
SStorageAPI* pAPI) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -4276,7 +4283,7 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
GET_TASKID(pTaskInfo));
tDecoderClear(&(*mr).coder);
pAPI->metaReaderFn.clearReader(mr);
T_LONG_JMP(pTaskInfo->env, terrno);
goto _end;
}
code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid);
@ -4285,7 +4292,7 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
GET_TASKID(pTaskInfo));
pAPI->metaReaderFn.clearReader(mr);
T_LONG_JMP(pTaskInfo->env, terrno);
goto _end;
}
char str[512];
@ -4314,12 +4321,13 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
} else {
data = (char*)p;
}
code = colDataSetVal(pDst, (count), data,
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
QUERY_CHECK_CODE(code, lino, _end);
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
data != NULL) {
if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
(data != NULL)) {
taosMemoryFree(data);
}
}
@ -4329,8 +4337,9 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
static void tagScanFreeUidTag(void* p) {
@ -4558,10 +4567,6 @@ _end:
}
static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -4569,6 +4574,12 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p
STagScanInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
QRY_PARAM_CHECK(ppRes);
if (pOperator->status == OP_EXEC_DONE) {
return TSDB_CODE_SUCCESS;
}
blockDataCleanup(pRes);
if (pInfo->pCtbCursor == NULL) {
@ -4635,28 +4646,23 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p
pInfo->pCtbCursor = NULL;
setOperatorCompleted(pOperator);
}
pRes->info.rows = count;
pRes->info.rows = count;
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
if (bLimitReached) {
setOperatorCompleted(pOperator);
}
pOperator->resultInfo.totalRows += pRes->info.rows;
(*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
pOperator->resultInfo.totalRows += pRes->info.rows;
(*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes;
return code;
}
static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTagScanFromCtbIdxNext(pOperator, &pRes);
return pRes;
return code;
}
static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
@ -4687,18 +4693,18 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
return code;
}
char str[512] = {0};
int32_t count = 0;
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
++count;
if (++pInfo->curPos >= size) {
setOperatorCompleted(pOperator);
}
}
pRes->info.rows = count;
pAPI->metaReaderFn.clearReader(&mr);
@ -4706,6 +4712,7 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
if (bLimitReached) {
setOperatorCompleted(pOperator);
}
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
if (pOperator->status == OP_EXEC_DONE) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
@ -4745,7 +4752,7 @@ static void destroyTagScanOperatorInfo(void* param) {
int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -4913,7 +4920,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, pInput->pKeyInfo, 1, pInput->pReaderBlock,
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
return code;
}
}
@ -4921,18 +4928,20 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
while (true) {
bool hasNext = false;
int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, code);
return code;
}
if (!hasNext || isTaskKilled(pTaskInfo)) {
if (isTaskKilled(pTaskInfo)) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
return code;
}
*pSubTableHasBlock = false;
break;
}
@ -4947,8 +4956,9 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
code = loadDataBlock(pOperator, &pInfo->base, pInput->pReaderBlock, &status);
if (code != 0) {
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, code);
return code;
}
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
*pSubTableHasBlock = false;
break;
@ -5215,49 +5225,56 @@ _end:
return code;
}
static SSDataBlock* getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pResBlock, int32_t capacity) {
static int32_t getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t capacity, SSDataBlock** pResBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
blockDataCleanup(pResBlock);
bool finished = false;
QRY_PARAM_CHECK(pResBlock);
blockDataCleanup(pBlock);
while (true) {
while (true) {
while (1) {
if (pSubTblsInfo->numSubTablesCompleted >= pSubTblsInfo->numSubTables) {
finished = true;
break;
}
code = appendChosenRowToDataBlock(pSubTblsInfo, pResBlock);
code = appendChosenRowToDataBlock(pSubTblsInfo, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
code = adjustSubTableForNextRow(pOperator, pSubTblsInfo);
QUERY_CHECK_CODE(code, lino, _end);
if (pResBlock->info.rows >= capacity) {
if (pBlock->info.rows >= capacity) {
break;
}
}
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pOperator->pTaskInfo->env, pTaskInfo->code);
return pTaskInfo->code;
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
if (finished || limitReached || pResBlock->info.rows > 0) {
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
if (finished || limitReached || pBlock->info.rows > 0) {
break;
}
}
if (pBlock->info.rows > 0) {
*pResBlock = pBlock;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
return code;
}
static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) {
@ -5307,23 +5324,22 @@ static void stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
}
int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
QRY_PARAM_CHECK(ppRes);
int32_t lino = 0;
int32_t tableListSize = 0;
int64_t st = taosGetTimestampUs();
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableMergeScanInfo* pInfo = pOperator->info;
int32_t lino = 0;
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
if (pOperator->status == OP_EXEC_DONE) {
return TSDB_CODE_SUCCESS;
}
int64_t st = taosGetTimestampUs();
int32_t code = pOperator->fpSet._openFn(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
int32_t tableListSize = 0;
code = tableListGetSize(pInfo->base.pTableListInfo, &tableListSize);
QUERY_CHECK_CODE(code, lino, _end);
@ -5335,6 +5351,7 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
(*ppRes) = NULL;
return code;
}
pInfo->tableStartIndex = 0;
STableKeyInfo* pTmpGpId = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno);
@ -5346,15 +5363,19 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
break;
}
pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity);
code = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity, &pBlock);
QUERY_CHECK_CODE(code, lino, _end);
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno);
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
}
if (pBlock != NULL) {
pBlock->info.id.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows;
@ -5385,16 +5406,11 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
} else {
(*ppRes) = pBlock;
return code;
}
static SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTableMergeScanParaSubTablesNext(pOperator, &pRes);
return pRes;
return code;
}
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeOpInfo) {
@ -5426,7 +5442,7 @@ _end:
}
}
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
static int32_t doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
@ -5439,7 +5455,8 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
pTaskInfo->code = code;
return code;
}
if (!hasNext || isTaskKilled(pTaskInfo)) {
@ -5448,7 +5465,7 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
}
*pFinished = true;
return;
return code;
}
uint32_t status = 0;
@ -5456,21 +5473,22 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
if (code != TSDB_CODE_SUCCESS) {
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
pTaskInfo->code = code;
return code;
}
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
*pFinished = true;
return;
return code;
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
*pSkipped = true;
return;
return code;
}
return;
return code;
}
static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
@ -5505,7 +5523,12 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
} else {
bool bFinished = false;
bool bSkipped = false;
doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
code = doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
if (code != 0) {
return code;
}
pBlock = pInfo->pReaderBlock;
qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d",
GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent);
@ -5518,7 +5541,6 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
if (!bSkipped) {
code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]);
if (code) {
terrno = code;
*ppBlock = NULL;
return code;
}
@ -5852,7 +5874,7 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
goto _end;
}
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
@ -5896,9 +5918,10 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
} else {
(*ppRes) = pBlock;
}
return code;
}
@ -5971,7 +5994,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -6106,11 +6129,11 @@ static int32_t buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo*
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
static int32_t buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI);
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
static int32_t buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
@ -6213,7 +6236,7 @@ int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList*
int32_t createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -6396,22 +6419,28 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
STableCountScanOperatorInfo* pInfo = pOperator->info;
STableCountScanSupp* pSupp = &pInfo->supp;
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
QRY_PARAM_CHECK(ppRes);
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return code;
}
if (pInfo->readHandle.mnd != NULL) {
(*ppRes) = buildSysDbTableCount(pOperator, pInfo);
return code;
}
(*ppRes) = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
code = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
if ((pRes->info.rows > 0) && (code == 0)) {
*ppRes = pRes;
}
return code;
}
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
static int32_t buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -6424,27 +6453,29 @@ static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCount
// get dbname
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
SName sn = {0};
code = tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
QUERY_CHECK_CODE(code, lino, _end);
code = tNameGetDbName(&sn, dbName);
QUERY_CHECK_CODE(code, lino, _end);
if (pSupp->groupByDbName || pSupp->groupByStbName) {
buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
code = buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
} else {
buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
code = buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return pRes->info.rows > 0 ? pRes : NULL;
}
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
return code;
}
static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -6458,6 +6489,7 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca
code = pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, TSDB_SUPER_TABLE, pInfo->stbUidList);
QUERY_CHECK_CODE(code, lino, _end);
}
if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
code = buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid, pAPI);
@ -6487,11 +6519,11 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -6527,9 +6559,10 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
setOperatorCompleted(pOperator);
return code;
}
static int32_t buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,

View File

@ -55,7 +55,7 @@ static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc);
// todo add limit/offset impl
int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;
@ -252,7 +252,7 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
blockDataCleanup(pDataBlock);
int32_t lino = 0;
int32_t code = 0;
@ -392,7 +392,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
}
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pOperator->status == OP_EXEC_DONE) {
return 0;
}
@ -518,7 +518,7 @@ typedef struct SGroupSortOperatorInfo {
int32_t getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
SGroupSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
blockDataCleanup(pDataBlock);
int32_t code = blockDataEnsureCapacity(pDataBlock, capacity);
@ -600,7 +600,7 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
SSDataBlock* block = NULL;
QRY_OPTR_CHECK(ppBlock);
QRY_PARAM_CHECK(ppBlock);
if (grpSortOpInfo->prefetchedSortInput) {
block = grpSortOpInfo->prefetchedSortInput;
@ -687,7 +687,7 @@ int32_t finishSortGroup(SOperatorInfo* pOperator) {
}
int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupSortOperatorInfo* pInfo = pOperator->info;
@ -777,7 +777,7 @@ void destroyGroupSortOperatorInfo(void* param) {
int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;

View File

@ -807,7 +807,7 @@ _end:
int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
SCountWinodwPhysiNode* pCountNode = (SCountWinodwPhysiNode*)pPhyNode;
int32_t numOfCols = 0;

View File

@ -858,7 +858,7 @@ _end:
int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
SStreamEventWinodwPhysiNode* pEventNode = (SStreamEventWinodwPhysiNode*)pPhyNode;
int32_t tsSlotId = ((SColumnNode*)pEventNode->window.pTspk)->slotId;

View File

@ -1349,7 +1349,7 @@ _end:
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -1886,7 +1886,7 @@ _end:
int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -3754,7 +3754,7 @@ _end:
int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
int32_t numOfCols = 0;
@ -4081,7 +4081,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -4931,7 +4931,7 @@ _end:
int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;
@ -5248,7 +5248,7 @@ _end:
int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -2233,7 +2233,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -2864,7 +2864,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;

View File

@ -1104,7 +1104,7 @@ static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn
}
int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;

View File

@ -1320,7 +1320,7 @@ _end:
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1616,7 +1616,7 @@ _end:
// todo make this as an non-blocking operator
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1727,7 +1727,7 @@ void destroySWindowOperatorInfo(void* param) {
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -2027,7 +2027,7 @@ static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -2366,7 +2366,7 @@ _end:
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -287,7 +287,7 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize,
int32_t code = 0;
int32_t lino = 0;
QRY_OPTR_CHECK(pHandle);
QRY_PARAM_CHECK(pHandle);
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
QUERY_CHECK_NULL(pSortHandle, code, lino, _err, terrno);
@ -393,7 +393,6 @@ void tsortClearOrderedSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *
(*pSource)->src.pBlock = NULL;
}
qInfo("---free:%p", *pSource);
taosMemoryFreeClear(*pSource);
}
@ -2393,7 +2392,6 @@ static void freeSortSource(SSortSource* pSource) {
pSource->src.pBlock = NULL;
}
qInfo("---free-single:%p", pSource);
taosMemoryFree(pSource);
}
@ -2412,7 +2410,12 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
while (1) {
SSDataBlock* pBlock = NULL;
TAOS_CHECK_RETURN(pHandle->fetchfp(pSource->param, &pBlock));
code = pHandle->fetchfp(pSource->param, &pBlock);
if (code != 0) {
freeSortSource(pSource);
return code;
}
if (pBlock == NULL) {
break;
}

View File

@ -1224,7 +1224,7 @@ void streamMetaWUnLock(SStreamMeta* pMeta) {
}
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
QRY_OPTR_CHECK(pList);
QRY_PARAM_CHECK(pList);
int32_t code = 0;
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);