Merge pull request #27793 from taosdata/fix/TD-31948

fix: block data shrink issue
This commit is contained in:
Pan Wei 2024-09-11 16:32:58 +08:00 committed by GitHub
commit 798718e903
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 116 additions and 11 deletions

View File

@ -233,6 +233,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
* @brief find how many rows already in order start from first row * @brief find how many rows already in order start from first row
*/ */
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo);
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk);
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload); int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);

View File

@ -74,26 +74,26 @@ int32_t tGetMachineId(char **result);
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
#define GRANTS_SCHEMA \ #define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \ static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "version", .bytes = 64 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "timeseries", .bytes = 43 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "dnodes", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "cpu_cores", .bytes = 13 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "cpu_cores", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
} }
#else #else
#define GRANTS_SCHEMA \ #define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \ static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "version", .bytes = 64 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "timeseries", .bytes = 43 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "dnodes", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "cpu_cores", .bytes = 13 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \ {.name = "cpu_cores", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
} }
#endif #endif
// #define GRANT_CFG_ADD // #define GRANT_CFG_ADD

View File

@ -731,6 +731,10 @@ int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t
} }
void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) { void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
if (numOfRows == 0) {
return;
}
if (numOfRows >= pBlock->info.rows) { if (numOfRows >= pBlock->info.rows) {
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
return; return;
@ -2936,6 +2940,8 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
// return length of encoded data, return -1 if failed // return length of encoded data, return -1 if failed
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
blockDataCheck(pBlock, false);
int32_t dataLen = 0; int32_t dataLen = 0;
// todo extract method // todo extract method
@ -3177,6 +3183,9 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
} }
*pEndPos = pStart; *pEndPos = pStart;
blockDataCheck(pBlock, false);
return code; return code;
} }
@ -3386,3 +3395,77 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
return nextRowIdx; return nextRowIdx;
} }
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
return;
if (NULL == pDataBlock || pDataBlock->info.rows == 0) {
return;
}
#define BLOCK_DATA_CHECK_TRESSA(o) ;
//#define BLOCK_DATA_CHECK_TRESSA(o) A S S E R T(o)
BLOCK_DATA_CHECK_TRESSA(pDataBlock->info.rows > 0);
if (!pDataBlock->info.dataLoad && !forceChk) {
return;
}
bool isVarType = false;
int32_t colLen = 0;
int32_t nextPos = 0;
int64_t checkRows = 0;
int64_t typeValue = 0;
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < colNum; ++i) {
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, i);
isVarType = IS_VAR_DATA_TYPE(pCol->info.type);
checkRows = pDataBlock->info.rows;
if (isVarType) {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset);
} else {
BLOCK_DATA_CHECK_TRESSA(pCol->nullbitmap);
}
nextPos = 0;
for (int64_t r = 0; r < checkRows; ++r) {
if (!colDataIsNull_s(pCol, r)) {
BLOCK_DATA_CHECK_TRESSA(pCol->pData);
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.length <= pCol->varmeta.allocLen);
if (isVarType) {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.allocLen > 0);
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] < pCol->varmeta.length);
if (pCol->reassigned) {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] >= 0);
} else if (0 == r) {
nextPos = pCol->varmeta.offset[r];
} else {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] == nextPos);
}
colLen = varDataTLen(pCol->pData + pCol->varmeta.offset[r]);
BLOCK_DATA_CHECK_TRESSA(colLen >= VARSTR_HEADER_SIZE);
BLOCK_DATA_CHECK_TRESSA(colLen <= pCol->info.bytes);
if (pCol->reassigned) {
BLOCK_DATA_CHECK_TRESSA((pCol->varmeta.offset[r] + colLen) <= pCol->varmeta.length);
} else {
nextPos += colLen;
BLOCK_DATA_CHECK_TRESSA(nextPos <= pCol->varmeta.length);
}
typeValue = *(char*)(pCol->pData + pCol->varmeta.offset[r] + colLen - 1);
} else {
GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r));
}
}
}
}
return;
}

View File

@ -528,6 +528,7 @@ static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock**
qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo)); qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes); code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
if (*ppRes && (code == 0)) { if (*ppRes && (code == 0)) {
blockDataCheck(*ppRes, false);
pPost->isStarted = true; pPost->isStarted = true;
pStbJoin->execInfo.postBlkNum++; pStbJoin->execInfo.postBlkNum++;
pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows; pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;

View File

@ -692,8 +692,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) { if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
pTaskInfo->paramSet = true; pTaskInfo->paramSet = true;
code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes); code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
blockDataCheck(pRes, false);
} else { } else {
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes); code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
blockDataCheck(pRes, false);
} }
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
@ -740,6 +742,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
} }
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes); code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
blockDataCheck(pRes, false);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }
@ -839,6 +842,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
} }
blockDataCheck(*pRes, false);
uint64_t el = (taosGetTimestampUs() - st); uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el; pTaskInfo->cost.elapsedTime += el;

View File

@ -617,6 +617,8 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
_err: _err:
blockDataCheck(pBlock, true);
colDataDestroy(p); colDataDestroy(p);
taosMemoryFree(p); taosMemoryFree(p);
return code; return code;

View File

@ -733,6 +733,8 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
} }
} }
blockDataCheck(pBlock, false);
*ppRes = pBlock; *ppRes = pBlock;
return code; return code;
} }

View File

@ -474,8 +474,11 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl
pCtx->midRemains = false; pCtx->midRemains = false;
} else { } else {
int32_t copyRows = pMore->info.capacity - pMore->info.rows; int32_t copyRows = pMore->info.capacity - pMore->info.rows;
if (copyRows > 0) {
MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows)); MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
blockDataShrinkNRows(pLess, copyRows); blockDataShrinkNRows(pLess, copyRows);
}
pCtx->midRemains = true; pCtx->midRemains = true;
} }
@ -1742,6 +1745,7 @@ int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBloc
if (pBlock && pBlock->info.rows > 0) { if (pBlock && pBlock->info.rows > 0) {
*pResBlock = pBlock; *pResBlock = pBlock;
} }
return code; return code;
} }

View File

@ -66,6 +66,7 @@ static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock);
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) { int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param; SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
return code; return code;
} }
@ -524,6 +525,7 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
if ((*pResBlock) != NULL) { if ((*pResBlock) != NULL) {
pOperator->resultInfo.totalRows += (*pResBlock)->info.rows; pOperator->resultInfo.totalRows += (*pResBlock)->info.rows;
blockDataCheck(*pResBlock, false);
} else { } else {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }

View File

@ -868,12 +868,14 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) { SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
SSDataBlock* p = NULL; SSDataBlock* p = NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p); int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
blockDataCheck(p, false);
return (code == 0)? p:NULL; return (code == 0)? p:NULL;
} }
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) { SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
SSDataBlock* p = NULL; SSDataBlock* p = NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p); int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
blockDataCheck(p, false);
return (code == 0)? p:NULL; return (code == 0)? p:NULL;
} }

View File

@ -328,7 +328,9 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) { int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param; SOperatorInfo* pOperator = (SOperatorInfo*)param;
return pOperator->fpSet.getNextFn(pOperator, ppBlock); int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
return code;
} }
// todo refactor: merged with fetch fp // todo refactor: merged with fetch fp
@ -611,6 +613,7 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (block != NULL) { if (block != NULL) {
blockDataCheck(block, false);
if (block->info.id.groupId == grpSortOpInfo->currGroupId) { if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
*ppBlock = block; *ppBlock = block;