fix: add block data check
This commit is contained in:
parent
4bc8db22fa
commit
dc983b9a39
|
@ -233,7 +233,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
|||
* @brief find how many rows already in order start from first row
|
||||
*/
|
||||
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||
void blockDataCheck(const SSDataBlock* pDataBlock);
|
||||
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk);
|
||||
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
|
||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||
|
|
|
@ -731,6 +731,10 @@ int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t
|
|||
}
|
||||
|
||||
void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
|
||||
if (numOfRows == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (numOfRows >= pBlock->info.rows) {
|
||||
blockDataCleanup(pBlock);
|
||||
return;
|
||||
|
@ -2936,7 +2940,7 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
|
|||
|
||||
// return length of encoded data, return -1 if failed
|
||||
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
||||
blockDataCheck(pBlock);
|
||||
blockDataCheck(pBlock, false);
|
||||
|
||||
int32_t dataLen = 0;
|
||||
|
||||
|
@ -3180,7 +3184,7 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
|
|||
|
||||
*pEndPos = pStart;
|
||||
|
||||
blockDataCheck(pBlock);
|
||||
blockDataCheck(pBlock, false);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -3392,14 +3396,14 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
|||
return nextRowIdx;
|
||||
}
|
||||
|
||||
void blockDataCheck(const SSDataBlock* pDataBlock) {
|
||||
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
|
||||
if (NULL == pDataBlock || pDataBlock->info.rows == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(pDataBlock->info.rows > 0);
|
||||
|
||||
if (!pDataBlock->info.dataLoad) {
|
||||
if (!pDataBlock->info.dataLoad && !forceChk) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -3457,3 +3461,4 @@ void blockDataCheck(const SSDataBlock* pDataBlock) {
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -528,6 +528,7 @@ static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock**
|
|||
qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
|
||||
code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
|
||||
if (*ppRes && (code == 0)) {
|
||||
blockDataCheck(*ppRes, false);
|
||||
pPost->isStarted = true;
|
||||
pStbJoin->execInfo.postBlkNum++;
|
||||
pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
|
||||
|
|
|
@ -692,8 +692,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
|||
if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
|
||||
pTaskInfo->paramSet = true;
|
||||
code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
|
||||
blockDataCheck(pRes, false);
|
||||
} else {
|
||||
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
|
||||
blockDataCheck(pRes, false);
|
||||
}
|
||||
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -740,6 +742,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
|||
}
|
||||
|
||||
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
|
||||
blockDataCheck(pRes, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
|
@ -839,6 +842,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
|||
qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
|
||||
blockDataCheck(*pRes, false);
|
||||
|
||||
uint64_t el = (taosGetTimestampUs() - st);
|
||||
|
||||
pTaskInfo->cost.elapsedTime += el;
|
||||
|
|
|
@ -617,6 +617,8 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
|
|||
code = TSDB_CODE_SUCCESS;
|
||||
|
||||
_err:
|
||||
blockDataCheck(pBlock, true);
|
||||
|
||||
colDataDestroy(p);
|
||||
taosMemoryFree(p);
|
||||
return code;
|
||||
|
|
|
@ -733,6 +733,8 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
|
|||
}
|
||||
}
|
||||
|
||||
blockDataCheck(pBlock, false);
|
||||
|
||||
*ppRes = pBlock;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -474,8 +474,11 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl
|
|||
pCtx->midRemains = false;
|
||||
} else {
|
||||
int32_t copyRows = pMore->info.capacity - pMore->info.rows;
|
||||
if (copyRows > 0) {
|
||||
MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
|
||||
blockDataShrinkNRows(pLess, copyRows);
|
||||
}
|
||||
|
||||
pCtx->midRemains = true;
|
||||
}
|
||||
|
||||
|
@ -1742,6 +1745,7 @@ int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBloc
|
|||
if (pBlock && pBlock->info.rows > 0) {
|
||||
*pResBlock = pBlock;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -66,6 +66,7 @@ static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock);
|
|||
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
||||
blockDataCheck(*ppBlock, false);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -524,6 +525,7 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
|
||||
if ((*pResBlock) != NULL) {
|
||||
pOperator->resultInfo.totalRows += (*pResBlock)->info.rows;
|
||||
blockDataCheck(*pResBlock, false);
|
||||
} else {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
|
|
@ -868,14 +868,14 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
|
|||
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
|
||||
SSDataBlock* p = NULL;
|
||||
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
|
||||
blockDataCheck(p);
|
||||
blockDataCheck(p, false);
|
||||
return (code == 0)? p:NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
|
||||
SSDataBlock* p = NULL;
|
||||
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
|
||||
blockDataCheck(p);
|
||||
blockDataCheck(p, false);
|
||||
return (code == 0)? p:NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -328,7 +328,9 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
|
|||
|
||||
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||
return pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
||||
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
||||
blockDataCheck(*ppBlock, false);
|
||||
return code;
|
||||
}
|
||||
|
||||
// todo refactor: merged with fetch fp
|
||||
|
@ -611,6 +613,7 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (block != NULL) {
|
||||
blockDataCheck(block, false);
|
||||
if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
|
||||
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
|
||||
*ppBlock = block;
|
||||
|
|
Loading…
Reference in New Issue