blockDataCheck

This commit is contained in:
xsren 2024-10-23 15:37:49 +08:00
parent ea6c90e023
commit e8835c2caa
13 changed files with 93 additions and 49 deletions

View File

@ -233,7 +233,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
* @brief find how many rows already in order start from first row
*/
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo);
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk);
int32_t blockDataCheck(const SSDataBlock* pDataBlock);
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);

View File

@ -153,7 +153,12 @@ extern bool tsEnableCrashReport;
extern char *tsTelemUri;
extern char *tsClientCrashReportUri;
extern char *tsSvrCrashReportUri;
extern bool tsEnableSafetyCheck;
extern int8_t tsSafetyCheckLevel;
enum {
TSDB_SAFETY_CHECK_LEVELL_NEVER = 0,
TSDB_SAFETY_CHECK_LEVELL_NORMAL = 1,
TSDB_SAFETY_CHECK_LEVELL_BYROW = 2,
};
// query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing

View File

@ -18,6 +18,7 @@
#include "tcompare.h"
#include "tlog.h"
#include "tname.h"
#include "tglobal.h"
#define MALLOC_ALIGN_BYTES 32
@ -3042,7 +3043,11 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
// return length of encoded data, return -1 if failed
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
blockDataCheck(pBlock, false);
int32_t code = blockDataCheck(pBlock);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return -1;
}
int32_t dataLen = 0;
@ -3297,9 +3302,13 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
*pEndPos = pStart;
blockDataCheck(pBlock, false);
code = blockDataCheck(pBlock);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}
return TSDB_CODE_SUCCESS;
}
int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
@ -3509,20 +3518,19 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
return nextRowIdx;
}
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
return;
if (NULL == pDataBlock || pDataBlock->info.rows == 0) {
return;
#define BLOCK_DATA_CHECK_TRESSA(o) \
if (!(o)) { \
uError("blockDataCheck failed! line:%d", __LINE__); \
return TSDB_CODE_INTERNAL_ERROR; \
}
int32_t blockDataCheck(const SSDataBlock* pDataBlock) {
if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER || NULL == pDataBlock || pDataBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
#define BLOCK_DATA_CHECK_TRESSA(o) ;
//#define BLOCK_DATA_CHECK_TRESSA(o) A S S E R T(o)
BLOCK_DATA_CHECK_TRESSA(pDataBlock->info.rows > 0);
if (!pDataBlock->info.dataLoad && !forceChk) {
return;
if (!pDataBlock->info.dataLoad) {
return TSDB_CODE_SUCCESS;
}
bool isVarType = false;
@ -3544,6 +3552,7 @@ void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
nextPos = 0;
for (int64_t r = 0; r < checkRows; ++r) {
if (tsSafetyCheckLevel <= TSDB_SAFETY_CHECK_LEVELL_NORMAL) break;
if (!colDataIsNull_s(pCol, r)) {
BLOCK_DATA_CHECK_TRESSA(pCol->pData);
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.length <= pCol->varmeta.allocLen);
@ -3578,7 +3587,7 @@ void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
}
}
return;
return TSDB_CODE_SUCCESS;
}

View File

@ -139,7 +139,7 @@ bool tsEnableCrashReport = true;
#endif
char *tsClientCrashReportUri = "/ccrashreport";
char *tsSvrCrashReportUri = "/dcrashreport";
bool tsEnableSafetyCheck = false;
int8_t tsSafetyCheckLevel = TSDB_SAFETY_CHECK_LEVELL_NEVER;
// schemaless
bool tsSmlDot2Underline = true;
@ -608,7 +608,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(
cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableSafetyCheck", tsEnableSafetyCheck, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "safetyCheckLevel", tsSafetyCheckLevel, 0, 5, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS);
@ -1302,8 +1302,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "tsmaDataDeleteMark");
tsmaDataDeleteMark = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "enableSafetyCheck");
tsEnableSafetyCheck = pItem->bval;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "safetyCheckLevel");
tsSafetyCheckLevel = pItem->i32;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -2045,7 +2045,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental},
{"maxTsmaNum", &tsMaxTsmaNum},
{"enableSafetyCheck", &tsEnableSafetyCheck}};
{"safetyCheckLevel", &tsSafetyCheckLevel}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);
@ -2302,7 +2302,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
{"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags},
{"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay},
{"tsmaDataDeleteMark", &tsmaDataDeleteMark},
{"enableSafetyCheck", &tsEnableSafetyCheck}};
{"safetyCheckLevel", &tsSafetyCheckLevel}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -55,7 +55,7 @@ typedef struct SDataDispatchHandle {
} SDataDispatchHandle;
static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData* pInput) {
if(!tsEnableSafetyCheck) {
if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
return TSDB_CODE_SUCCESS;
}
if (pInput == NULL || pInput->pData == NULL || pInput->pData->info.rows <= 0) {

View File

@ -528,7 +528,12 @@ static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock**
qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
if (*ppRes && (code == 0)) {
blockDataCheck(*ppRes, false);
code = blockDataCheck(*ppRes);
if (code) {
qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
pPost->isStarted = true;
pStbJoin->execInfo.postBlkNum++;
pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;

View File

@ -699,12 +699,12 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
pTaskInfo->paramSet = true;
code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
blockDataCheck(pRes, false);
} else {
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
blockDataCheck(pRes, false);
}
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataCheck(pRes);
QUERY_CHECK_CODE(code, lino, _end);
if (pRes == NULL) {
@ -749,7 +749,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
}
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
blockDataCheck(pRes, false);
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataCheck(pRes);
QUERY_CHECK_CODE(code, lino, _end);
}
@ -848,7 +849,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
}
blockDataCheck(*pRes, false);
code = blockDataCheck(*pRes);
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
}
uint64_t el = (taosGetTimestampUs() - st);

View File

@ -616,11 +616,12 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
}
}
}
code = TSDB_CODE_SUCCESS;
code = blockDataCheck(pBlock);
QUERY_CHECK_CODE(code, lino, _err);
_err:
blockDataCheck(pBlock, true);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
colDataDestroy(p);
taosMemoryFree(p);
return code;

View File

@ -765,7 +765,7 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
}
}
blockDataCheck(pBlock, false);
code = blockDataCheck(pBlock);
*ppRes = pBlock;
return code;

View File

@ -66,10 +66,13 @@ static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock);
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
}
blockDataCheck(*ppBlock);
if (code) {
qError("failed to check data block got from upstream, %s code:%s", __func__, tstrerror(code));
}
return code;
}
@ -526,7 +529,8 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
if ((*pResBlock) != NULL) {
pOperator->resultInfo.totalRows += (*pResBlock)->info.rows;
blockDataCheck(*pResBlock, false);
code = blockDataCheck(*pResBlock);
QUERY_CHECK_CODE(code, lino, _end);
} else {
setOperatorCompleted(pOperator);
}

View File

@ -871,14 +871,24 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
SSDataBlock* p = NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
blockDataCheck(p, false);
return (code == 0)? p:NULL;
if (code == TSDB_CODE_SUCCESS) {
code = blockDataCheck(p);
if (code != TSDB_CODE_SUCCESS) {
qError("blockDataCheck failed, code:%s", tstrerror(code));
}
}
return (code == 0) ? p : NULL;
}
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
SSDataBlock* p = NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
blockDataCheck(p, false);
if (code == TSDB_CODE_SUCCESS) {
code = blockDataCheck(p);
if (code != TSDB_CODE_SUCCESS) {
qError("blockDataCheck failed, code:%s", tstrerror(code));
}
}
return (code == 0)? p:NULL;
}

View File

@ -335,9 +335,13 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
} else {
code = blockDataCheck(*ppBlock);
if (code) {
qError("failed to check block data, %s code:%s", __func__, tstrerror(code));
}
}
return code;
}
@ -630,7 +634,8 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
QUERY_CHECK_CODE(code, lino, _end);
if (block != NULL) {
blockDataCheck(block, false);
code = blockDataCheck(block);
QUERY_CHECK_CODE(code, lino, _end);
if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
*ppBlock = block;

View File

@ -48,7 +48,7 @@ class TDSimClient:
"telemetryReporting": "0",
"tqDebugflag": "135",
"stDebugflag":"135",
"enableSafetyCheck":"1"
"safetyCheckLevel":"2"
}
def getLogDir(self):
@ -151,7 +151,7 @@ class TDDnode:
"enableQueryHb": "1",
"supportVnodes": "1024",
"telemetryReporting": "0",
"enableSafetyCheck":"1"
"safetyCheckLevel":"2"
}
def init(self, path, remoteIP = ""):