diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 1d280b5d6b..cbfd1efb84 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -233,7 +233,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); * @brief find how many rows already in order start from first row */ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo); -void blockDataCheck(const SSDataBlock* pDataBlock); +void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk); int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 840434a351..6e4162268d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -731,6 +731,10 @@ int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t } void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) { + if (numOfRows == 0) { + return; + } + if (numOfRows >= pBlock->info.rows) { blockDataCleanup(pBlock); return; @@ -2936,7 +2940,7 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha // return length of encoded data, return -1 if failed int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { - blockDataCheck(pBlock); + blockDataCheck(pBlock, false); int32_t dataLen = 0; @@ -3180,7 +3184,7 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos *pEndPos = pStart; - blockDataCheck(pBlock); + blockDataCheck(pBlock, false); return code; } @@ -3392,14 +3396,14 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) { return nextRowIdx; } -void blockDataCheck(const SSDataBlock* pDataBlock) { +void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) { if (NULL == pDataBlock || pDataBlock->info.rows == 0) { return; } ASSERT(pDataBlock->info.rows > 0); - if (!pDataBlock->info.dataLoad) { + if (!pDataBlock->info.dataLoad && !forceChk) { return; } @@ -3457,3 +3461,4 @@ void blockDataCheck(const SSDataBlock* pDataBlock) { return; } + diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 44e8a3cb8a..aa0cc215df 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -528,6 +528,7 @@ static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo)); code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes); if (*ppRes && (code == 0)) { + blockDataCheck(*ppRes, false); pPost->isStarted = true; pStbJoin->execInfo.postBlkNum++; pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9e33a3d890..cca1895a85 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -692,8 +692,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) { pTaskInfo->paramSet = true; code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes); + blockDataCheck(pRes, false); } else { code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes); + blockDataCheck(pRes, false); } QUERY_CHECK_CODE(code, lino, _end); @@ -740,6 +742,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo } code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes); + blockDataCheck(pRes, false); QUERY_CHECK_CODE(code, lino, _end); } @@ -839,6 +842,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } + blockDataCheck(*pRes, false); + uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 00138d6871..c8ec1e8fcc 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -617,6 +617,8 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p code = TSDB_CODE_SUCCESS; _err: + blockDataCheck(pBlock, true); + colDataDestroy(p); taosMemoryFree(p); return code; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 1796aa7b64..990c9322b7 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -733,6 +733,8 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p } } + blockDataCheck(pBlock, false); + *ppRes = pBlock; return code; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 90c2248c12..c29c49e280 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -474,8 +474,11 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl pCtx->midRemains = false; } else { int32_t copyRows = pMore->info.capacity - pMore->info.rows; - MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows)); - blockDataShrinkNRows(pLess, copyRows); + if (copyRows > 0) { + MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows)); + blockDataShrinkNRows(pLess, copyRows); + } + pCtx->midRemains = true; } @@ -1742,6 +1745,7 @@ int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBloc if (pBlock && pBlock->info.rows > 0) { *pResBlock = pBlock; } + return code; } diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 7357329ca6..c94a330dbc 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -66,6 +66,7 @@ static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock); int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); + blockDataCheck(*ppBlock, false); return code; } @@ -524,6 +525,7 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { if ((*pResBlock) != NULL) { pOperator->resultInfo.totalRows += (*pResBlock)->info.rows; + blockDataCheck(*pResBlock, false); } else { setOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index de478a146f..cb1149d8a3 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -868,14 +868,14 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) { SSDataBlock* p = NULL; int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p); - blockDataCheck(p); + blockDataCheck(p, false); return (code == 0)? p:NULL; } SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) { SSDataBlock* p = NULL; int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p); - blockDataCheck(p); + blockDataCheck(p, false); return (code == 0)? p:NULL; } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 9b659ac761..27ae5e7281 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -328,7 +328,9 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; - return pOperator->fpSet.getNextFn(pOperator, ppBlock); + int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); + blockDataCheck(*ppBlock, false); + return code; } // todo refactor: merged with fetch fp @@ -611,6 +613,7 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) { QUERY_CHECK_CODE(code, lino, _end); if (block != NULL) { + blockDataCheck(block, false); if (block->info.id.groupId == grpSortOpInfo->currGroupId) { grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; *ppBlock = block;