enh: add block check

This commit is contained in:
dapan1121 2024-09-09 18:20:00 +08:00
parent 8308a42c0c
commit b57cd27827
3 changed files with 73 additions and 0 deletions

View File

@ -233,6 +233,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
* @brief find how many rows already in order start from first row
*/
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo);
void blockDataCheck(const SSDataBlock* pDataBlock);
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);

View File

@ -2936,6 +2936,8 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
// return length of encoded data, return -1 if failed
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
blockDataCheck(pBlock);
int32_t dataLen = 0;
// todo extract method
@ -3177,6 +3179,9 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
}
*pEndPos = pStart;
blockDataCheck(pBlock);
return code;
}
@ -3386,3 +3391,68 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
return nextRowIdx;
}
void blockDataCheck(const SSDataBlock* pDataBlock) {
if (NULL == pDataBlock || pDataBlock->info.rows == 0) {
return;
}
ASSERT(pDataBlock->info.rows > 0);
if (!pDataBlock->info.dataLoad) {
return;
}
bool isVarType = false;
int32_t colLen = 0;
int32_t nextPos = 0;
int64_t checkRows = 0;
int64_t typeValue = 0;
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < colNum; ++i) {
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, i);
isVarType = IS_VAR_DATA_TYPE(pCol->info.type);
checkRows = pDataBlock->info.rows;
if (isVarType) {
ASSERT(pCol->varmeta.length);
} else {
ASSERT(pCol->nullbitmap);
}
for (int64_t r = 0; r < checkRows; ++r) {
if (!colDataIsNull_s(pCol, r)) {
ASSERT(pCol->pData);
ASSERT(pCol->varmeta.length <= pCol->varmeta.allocLen);
if (isVarType) {
ASSERT(pCol->varmeta.allocLen > 0);
ASSERT(pCol->varmeta.offset[r] < pCol->varmeta.length);
if (pCol->reassigned) {
ASSERT(pCol->varmeta.offset[r] >= 0);
} else {
ASSERT(pCol->varmeta.offset[r] == nextPos);
}
colLen = varDataTLen(pCol->pData + pCol->varmeta.offset[r]);
ASSERT(colLen >= VARSTR_HEADER_SIZE);
ASSERT(colLen <= pCol->info.bytes);
if (pCol->reassigned) {
ASSERT((pCol->varmeta.offset[r] + colLen) <= pCol->varmeta.length);
} else {
nextPos += colLen;
ASSERT(nextPos <= pCol->varmeta.length);
}
typeValue = *(char*)(pCol->pData + pCol->varmeta.offset[r] + colLen - 1);
} else {
GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r));
}
}
}
}
return;
}

View File

@ -868,12 +868,14 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
SSDataBlock* p = NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
blockDataCheck(p);
return (code == 0)? p:NULL;
}
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
SSDataBlock* p = NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
blockDataCheck(p);
return (code == 0)? p:NULL;
}