diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index c6f0b4d517..3b24ef9490 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -233,7 +233,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); * @brief find how many rows already in order start from first row */ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo); -void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk); +int32_t blockDataCheck(const SSDataBlock* pDataBlock); int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); diff --git a/include/common/tglobal.h b/include/common/tglobal.h index a13693d38c..5a52abe9ee 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -153,7 +153,12 @@ extern bool tsEnableCrashReport; extern char *tsTelemUri; extern char *tsClientCrashReportUri; extern char *tsSvrCrashReportUri; -extern bool tsEnableSafetyCheck; +extern int8_t tsSafetyCheckLevel; +enum { + TSDB_SAFETY_CHECK_LEVELL_NEVER = 0, + TSDB_SAFETY_CHECK_LEVELL_NORMAL = 1, + TSDB_SAFETY_CHECK_LEVELL_BYROW = 2, +}; // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 23efe48209..57df4a6006 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -18,6 +18,7 @@ #include "tcompare.h" #include "tlog.h" #include "tname.h" +#include "tglobal.h" #define MALLOC_ALIGN_BYTES 32 @@ -3042,7 +3043,11 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha // return length of encoded data, return -1 if failed int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) { - blockDataCheck(pBlock, false); + int32_t code = blockDataCheck(pBlock); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } int32_t dataLen = 0; @@ -3297,9 +3302,13 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos *pEndPos = pStart; - blockDataCheck(pBlock, false); + code = blockDataCheck(pBlock); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return code; + } - return code; + return TSDB_CODE_SUCCESS; } int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) { @@ -3509,20 +3518,19 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) { return nextRowIdx; } -void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) { - return; - - if (NULL == pDataBlock || pDataBlock->info.rows == 0) { - return; +#define BLOCK_DATA_CHECK_TRESSA(o) \ + if (!(o)) { \ + uError("blockDataCheck failed! line:%d", __LINE__); \ + return TSDB_CODE_INTERNAL_ERROR; \ + } +int32_t blockDataCheck(const SSDataBlock* pDataBlock) { + if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER || NULL == pDataBlock || pDataBlock->info.rows == 0) { + return TSDB_CODE_SUCCESS; } -#define BLOCK_DATA_CHECK_TRESSA(o) ; -//#define BLOCK_DATA_CHECK_TRESSA(o) A S S E R T(o) - BLOCK_DATA_CHECK_TRESSA(pDataBlock->info.rows > 0); - - if (!pDataBlock->info.dataLoad && !forceChk) { - return; + if (!pDataBlock->info.dataLoad) { + return TSDB_CODE_SUCCESS; } bool isVarType = false; @@ -3544,6 +3552,7 @@ void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) { nextPos = 0; for (int64_t r = 0; r < checkRows; ++r) { + if (tsSafetyCheckLevel <= TSDB_SAFETY_CHECK_LEVELL_NORMAL) break; if (!colDataIsNull_s(pCol, r)) { BLOCK_DATA_CHECK_TRESSA(pCol->pData); BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.length <= pCol->varmeta.allocLen); @@ -3578,7 +3587,7 @@ void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) { } } - return; + return TSDB_CODE_SUCCESS; } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 87cf1d7587..6b222b15ab 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -137,9 +137,9 @@ bool tsEnableCrashReport = false; #else bool tsEnableCrashReport = true; #endif -char *tsClientCrashReportUri = "/ccrashreport"; -char *tsSvrCrashReportUri = "/dcrashreport"; -bool tsEnableSafetyCheck = false; +char *tsClientCrashReportUri = "/ccrashreport"; +char *tsSvrCrashReportUri = "/dcrashreport"; +int8_t tsSafetyCheckLevel = TSDB_SAFETY_CHECK_LEVELL_NEVER; // schemaless bool tsSmlDot2Underline = true; @@ -608,7 +608,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { TAOS_CHECK_RETURN( cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); - TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableSafetyCheck", tsEnableSafetyCheck, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "safetyCheckLevel", tsSafetyCheckLevel, 0, 5, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS); @@ -1302,8 +1302,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "tsmaDataDeleteMark"); tsmaDataDeleteMark = pItem->i32; - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "enableSafetyCheck"); - tsEnableSafetyCheck = pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "safetyCheckLevel"); + tsSafetyCheckLevel = pItem->i32; TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -2045,7 +2045,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"supportVnodes", &tsNumOfSupportVnodes}, {"experimental", &tsExperimental}, {"maxTsmaNum", &tsMaxTsmaNum}, - {"enableSafetyCheck", &tsEnableSafetyCheck}}; + {"safetyCheckLevel", &tsSafetyCheckLevel}}; if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) { code = taosCfgSetOption(options, tListLen(options), pItem, false); @@ -2302,7 +2302,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { {"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags}, {"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay}, {"tsmaDataDeleteMark", &tsmaDataDeleteMark}, - {"enableSafetyCheck", &tsEnableSafetyCheck}}; + {"safetyCheckLevel", &tsSafetyCheckLevel}}; if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) { code = taosCfgSetOption(options, tListLen(options), pItem, false); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 732b2566a9..236d6a4d3e 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -55,7 +55,7 @@ typedef struct SDataDispatchHandle { } SDataDispatchHandle; static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData* pInput) { - if(!tsEnableSafetyCheck) { + if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) { return TSDB_CODE_SUCCESS; } if (pInput == NULL || pInput->pData == NULL || pInput->pData->info.rows <= 0) { diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index eb49057d89..62f199387e 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -528,7 +528,12 @@ static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo)); code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes); if (*ppRes && (code == 0)) { - blockDataCheck(*ppRes, false); + code = blockDataCheck(*ppRes); + if (code) { + qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code)); + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } pPost->isStarted = true; pStbJoin->execInfo.postBlkNum++; pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 27dd687f40..632daf0aa0 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -699,12 +699,12 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) { pTaskInfo->paramSet = true; code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes); - blockDataCheck(pRes, false); } else { code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes); - blockDataCheck(pRes, false); } + QUERY_CHECK_CODE(code, lino, _end); + code = blockDataCheck(pRes); QUERY_CHECK_CODE(code, lino, _end); if (pRes == NULL) { @@ -749,7 +749,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo } code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes); - blockDataCheck(pRes, false); + QUERY_CHECK_CODE(code, lino, _end); + code = blockDataCheck(pRes); QUERY_CHECK_CODE(code, lino, _end); } @@ -848,7 +849,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo)); } - blockDataCheck(*pRes, false); + code = blockDataCheck(*pRes); + if (code) { + pTaskInfo->code = code; + qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo)); + } uint64_t el = (taosGetTimestampUs() - st); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 64a07c4653..b39cf4014d 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -616,11 +616,12 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p } } } - code = TSDB_CODE_SUCCESS; - + code = blockDataCheck(pBlock); + QUERY_CHECK_CODE(code, lino, _err); _err: - blockDataCheck(pBlock, true); - + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } colDataDestroy(p); taosMemoryFree(p); return code; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 10b372319b..d47ab366b6 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -765,7 +765,7 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p } } - blockDataCheck(pBlock, false); + code = blockDataCheck(pBlock); *ppRes = pBlock; return code; diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 7fd6b91e52..4a288a277b 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -65,11 +65,14 @@ static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock); int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; - int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); - blockDataCheck(*ppBlock, false); + int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); if (code) { qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code)); } + blockDataCheck(*ppBlock); + if (code) { + qError("failed to check data block got from upstream, %s code:%s", __func__, tstrerror(code)); + } return code; } @@ -526,7 +529,8 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { if ((*pResBlock) != NULL) { pOperator->resultInfo.totalRows += (*pResBlock)->info.rows; - blockDataCheck(*pResBlock, false); + code = blockDataCheck(*pResBlock); + QUERY_CHECK_CODE(code, lino, _end); } else { setOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 7914f9f320..f88b544b0f 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -870,15 +870,25 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) { SSDataBlock* p = NULL; - int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p); - blockDataCheck(p, false); - return (code == 0)? p:NULL; + int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p); + if (code == TSDB_CODE_SUCCESS) { + code = blockDataCheck(p); + if (code != TSDB_CODE_SUCCESS) { + qError("blockDataCheck failed, code:%s", tstrerror(code)); + } + } + return (code == 0) ? p : NULL; } SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) { SSDataBlock* p = NULL; - int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p); - blockDataCheck(p, false); + int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p); + if (code == TSDB_CODE_SUCCESS) { + code = blockDataCheck(p); + if (code != TSDB_CODE_SUCCESS) { + qError("blockDataCheck failed, code:%s", tstrerror(code)); + } + } return (code == 0)? p:NULL; } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 1c241dffec..a6ca20c5ee 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -334,10 +334,14 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; - int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); - blockDataCheck(*ppBlock, false); + int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); if (code) { qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code)); + } else { + code = blockDataCheck(*ppBlock); + if (code) { + qError("failed to check block data, %s code:%s", __func__, tstrerror(code)); + } } return code; } @@ -630,7 +634,8 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) { QUERY_CHECK_CODE(code, lino, _end); if (block != NULL) { - blockDataCheck(block, false); + code = blockDataCheck(block); + QUERY_CHECK_CODE(code, lino, _end); if (block->info.id.groupId == grpSortOpInfo->currGroupId) { grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; *ppBlock = block; diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 7a836c181b..3832530218 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -48,7 +48,7 @@ class TDSimClient: "telemetryReporting": "0", "tqDebugflag": "135", "stDebugflag":"135", - "enableSafetyCheck":"1" + "safetyCheckLevel":"2" } def getLogDir(self): @@ -151,7 +151,7 @@ class TDDnode: "enableQueryHb": "1", "supportVnodes": "1024", "telemetryReporting": "0", - "enableSafetyCheck":"1" + "safetyCheckLevel":"2" } def init(self, path, remoteIP = ""):