From 99bdcb3f580946808bb3923f765db62a99a63429 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 20 Aug 2024 15:02:09 +0800 Subject: [PATCH 1/6] enh(stream):remove assert of stream operator --- include/util/tutil.h | 7 ++ source/libs/executor/src/executor.c | 64 +++++++------ source/libs/executor/src/executorInt.c | 23 +++-- source/libs/executor/src/filloperator.c | 52 ---------- source/libs/executor/src/scanoperator.c | 56 +++++------ source/libs/executor/src/sortoperator.c | 7 +- .../executor/src/streamcountwindowoperator.c | 8 +- .../executor/src/streameventwindowoperator.c | 7 +- source/libs/executor/src/streamfilloperator.c | 14 ++- .../executor/src/streamtimewindowoperator.c | 53 ++++++----- source/libs/executor/src/timesliceoperator.c | 3 - source/libs/executor/src/timewindowoperator.c | 94 ++++++++++++------- source/libs/stream/src/streamSessionState.c | 3 +- source/libs/stream/src/streamUpdate.c | 8 +- source/libs/stream/src/tstreamFileState.c | 14 +-- 15 files changed, 208 insertions(+), 205 deletions(-) diff --git a/include/util/tutil.h b/include/util/tutil.h index 1ee3bb0e83..6c7517f630 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -139,6 +139,13 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, #define QUERY_CHECK_CODE TSDB_CHECK_CODE +#define QUERY_CHECK_CONDITION(condition, CODE, LINO, LABEL, ERRNO) \ + if (!condition) { \ + (CODE) = (ERRNO); \ + (LINO) = __LINE__; \ + goto LABEL; \ + } + #define TSDB_CHECK_NULL(ptr, CODE, LINO, LABEL, ERRNO) \ if ((ptr) == NULL) { \ (CODE) = (ERRNO); \ diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b1c9207ab7..833371cfc5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -158,7 +158,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SStreamScanInfo* pInfo = pOperator->info; qDebug("s-task:%s in this batch, %d blocks need to be processed", id, (int32_t)numOfBlocks); - ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); + QUERY_CHECK_CONDITION((pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); if (type == STREAM_INPUT__MERGED_SUBMIT) { for (int32_t i = 0; i < numOfBlocks; i++) { @@ -189,7 +190,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pInfo->blockType = STREAM_INPUT__CHECKPOINT; } else { - ASSERT(0); + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); } return TSDB_CODE_SUCCESS; @@ -543,7 +545,9 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table int32_t* tversion, int32_t idx, bool* tbGet) { *tbGet = false; - ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); + if (tinfo == NULL || dbName == NULL || tableName == NULL) { + return TSDB_CODE_INVALID_PARA; + } SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) { @@ -722,7 +726,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo blockIndex += 1; current += p->info.rows; - ASSERT(p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT); + QUERY_CHECK_CONDITION((p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); void* tmp = taosArrayPush(pResList, &p); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); @@ -987,15 +992,17 @@ void qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { *scanner = pOperator->info; break; } else { - ASSERT(pOperator->numOfDownstream == 1); pOperator = pOperator->pDownstream[0]; } } } int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo; @@ -1007,12 +1014,19 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan ", window:%" PRId64 " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); - return 0; +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo; @@ -1024,14 +1038,26 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan "-%" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); - return 0; +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; - return 0; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { @@ -1046,9 +1072,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - qInfo("save stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); pSup->calTriggerSaved = pSup->calTrigger; @@ -1063,9 +1086,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - qInfo("save stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); pSup->calTriggerSaved = pSup->calTrigger; @@ -1078,9 +1098,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); pSup->calTriggerSaved = pSup->calTrigger; @@ -1093,9 +1110,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SStreamEventAggOperatorInfo* pInfo = pOperator->info; STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); pSup->calTriggerSaved = pSup->calTrigger; @@ -1108,9 +1122,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SStreamCountAggOperatorInfo* pInfo = pOperator->info; STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); pSup->calTriggerSaved = pSup->calTrigger; @@ -1126,7 +1137,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) { if (pOperator->numOfDownstream > 1) { qError("unexpected stream, multiple downstream"); - ASSERT(0); return -1; } return 0; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index cbb124b9e0..031ffbb50e 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -163,7 +163,11 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR return NULL; } - ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); + if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) { + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + pTaskInfo->code = terrno; + return NULL; + } } } else { // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the @@ -176,7 +180,11 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR return NULL; } - ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); + if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) { + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + pTaskInfo->code = terrno; + return NULL; + } } } @@ -324,6 +332,7 @@ _end: static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SqlFunctionCtx* pCtx = pExprSup->pCtx; for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { @@ -363,7 +372,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int if (hasPk && (j == pkParamIdx)) { pInput->pPrimaryKey = pInput->pData[j]; } - ASSERT(pInput->pData[j] != NULL); + QUERY_CHECK_CONDITION((pInput->pData[j] != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { // todo avoid case: top(k, 12), 12 is the value parameter. // sum(11), 11 is also the value parameter. @@ -374,14 +383,16 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int pInput->blankFill = pBlock->info.blankFill; code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + QUERY_CHECK_CODE(code, lino, _end); } } } } +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 5ece57cad1..fe9d0d3cf0 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -55,7 +55,6 @@ typedef struct SFillOperatorInfo { SExprSupp noFillExprSupp; } SFillOperatorInfo; -static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order); static void destroyFillOperatorInfo(void* param); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo); @@ -168,56 +167,6 @@ _end: return code; } -// todo refactor: decide the start key according to the query time range. -static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order) { - if (order == TSDB_ORDER_ASC) { - int64_t skey = pBlock->info.window.skey; - if (skey < pInfo->pFillInfo->start) { // the start key may be smaller than the - ASSERT(taosFillNotStarted(pInfo->pFillInfo)); - taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey); - } else if (pInfo->pFillInfo->start < skey) { - int64_t t = skey; - SInterval* pInterval = &pInfo->pFillInfo->interval; - - while (1) { - int64_t prev = taosTimeAdd(t, -pInterval->sliding, pInterval->slidingUnit, pInterval->precision); - if (prev <= pInfo->pFillInfo->start) { - t = prev; - break; - } - t = prev; - } - - // todo time window chosen problem: t or prev value? - taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t); - } - } else { - int64_t ekey = pBlock->info.window.ekey; - if (ekey > pInfo->pFillInfo->start) { - ASSERT(taosFillNotStarted(pInfo->pFillInfo)); - taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, ekey); - } else if (ekey < pInfo->pFillInfo->start) { - int64_t t = ekey; - SInterval* pInterval = &pInfo->pFillInfo->interval; - int64_t prev = t; - while (1) { - int64_t next = taosTimeAdd(t, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); - if (next >= pInfo->pFillInfo->start) { - prev = t; - t = next; - break; - } - prev = t; - t = next; - } - - // todo time window chosen problem: t or next value? - if (t > pInfo->pFillInfo->start) t = prev; - taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t); - } - } -} - static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -602,7 +551,6 @@ static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) { } pInfo->win.ekey = ekey; } else { - assert(order == TSDB_ORDER_DESC); skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval); next = skey; while (next < pInfo->win.skey) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b3655e16f2..04aaa3c210 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1241,14 +1241,11 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { taosRUnLockLatch(&pTaskInfo->lock); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(pInfo->base.dataReader == NULL); + QUERY_CHECK_CONDITION((pInfo->base.dataReader == NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->pIgnoreTables); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); - } + QUERY_CHECK_CODE(code, lino, _end); if (pInfo->filesetDelimited) { pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader); @@ -1590,7 +1587,6 @@ static bool isCountWindow(SStreamScanInfo* pInfo) { static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); uint64_t* groupCol = (uint64_t*)pColInfo->pData; - ASSERT(rowIndex < pBlock->info.rows); pInfo->groupId = groupCol[rowIndex]; } @@ -1659,9 +1655,8 @@ _end: bool comparePrimaryKey(SColumnInfoData* pCol, int32_t rowId, void* pVal) { // coverity scan - ASSERTS(pVal != NULL, "pVal should not be NULL"); - if (!pVal) { - qError("failed to compare primary key, since primary key is null"); + if (!pVal || !pCol) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_INVALID_PARA)); return false; } void* pData = colDataGetData(pCol, rowId); @@ -1738,7 +1733,6 @@ static void prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ goto _end; } - ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3); SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startData = (TSKEY*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -1770,27 +1764,22 @@ static void prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ win.skey = TMIN(win.skey, startData[*pRowIndex]); continue; } - - ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) || - !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0))); break; } STableScanInfo* pTScanInfo = pInfo->pTableScanOp->info; // coverity scan - ASSERTS(pInfo->pUpdateInfo != NULL, "Failed to set data version, since pInfo->pUpdateInfo is NULL"); - if (pInfo->pUpdateInfo) { - qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey, - pInfo->pUpdateInfo->maxDataVersion); - resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion); + if (pInfo->pUpdateInfo == NULL){ + qError("Failed to set data version, since pInfo->pUpdateInfo is NULL"); + return; } + qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey, + pInfo->pUpdateInfo->maxDataVersion); + resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion); pInfo->pTableScanOp->status = OP_OPENED; if (pRes) { (*pRes) = true; } - -_end: - qTrace("%s success", __func__); } static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval, @@ -2198,7 +2187,6 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS } uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; - ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; int64_t ver = pSrcBlock->info.version - 1; @@ -2296,7 +2284,6 @@ static int32_t generatePartitionDelResBlock(SStreamScanInfo* pInfo, SSDataBlock* SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData; - ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; int64_t ver = pSrcBlock->info.version - 1; @@ -2357,7 +2344,6 @@ static int32_t generateDeleteResultBlockImpl(SStreamScanInfo* pInfo, SSDataBlock pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); } - ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; int64_t ver = pSrcBlock->info.version - 1; @@ -2477,7 +2463,6 @@ static int32_t checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBl QUERY_CHECK_CODE(code, lino, _end); } SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); - ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; SColumnInfoData* pPkColDataInfo = NULL; if (hasPrimaryKeyCol(pInfo)) { @@ -2554,7 +2539,7 @@ static int32_t doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STi if (pWindow->skey != INT64_MIN) { qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); - ASSERT(pCol->pData != NULL); + QUERY_CHECK_NULL(pCol->pData, code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); for (int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t* ts = (int64_t*)colDataGetData(pCol, i); p[i] = (*ts >= pWindow->skey); @@ -2603,7 +2588,8 @@ static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* of SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1); qDebug("doBlockDataWindowFilter primary key, ts:%" PRId64 " %" PRId64, offset->ts, offset->primaryKey.val); - ASSERT(pColPk->info.type == offset->primaryKey.type); + QUERY_CHECK_CONDITION((pColPk->info.type == offset->primaryKey.type), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); __compar_fn_t func = getComparFunc(pColPk->info.type, 0); for (int32_t i = 0; i < pBlock->info.rows; ++i) { @@ -3945,7 +3931,9 @@ void streamScanReloadState(SOperatorInfo* pOperator) { pInfo->stateStore.windowSBfDelete(pInfo->pUpdateInfo, 1); code = pInfo->stateStore.windowSBfAdd(pInfo->pUpdateInfo, 1); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(pInfo->pUpdateInfo->minTS > pUpInfo->minTS); + + QUERY_CHECK_CONDITION((pInfo->pUpdateInfo->minTS > pUpInfo->minTS), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion); SHashObj* curMap = pInfo->pUpdateInfo->pMap; void* pIte = taosHashIterate(curMap, NULL); @@ -4105,12 +4093,11 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } if (pHandle->initTqReader) { - ASSERT(pHandle->tqReader == NULL); pInfo->tqReader = pAPI->tqReaderFn.tqReaderOpen(pHandle->vnode); - ASSERT(pInfo->tqReader); + QUERY_CHECK_NULL(pInfo->tqReader, code, lino, _error, terrno); } else { - ASSERT(pHandle->tqReader); pInfo->tqReader = pHandle->tqReader; + QUERY_CHECK_NULL(pInfo->tqReader, code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } pInfo->pUpdateInfo = NULL; @@ -5887,7 +5874,10 @@ void destroyTableMergeScanOperatorInfo(void* param) { } int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { - ASSERT(pOptr != NULL); + if (pOptr == NULL) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_INVALID_PARA)); + return TSDB_CODE_INVALID_PARA; + } // TODO: merge these two info into one struct STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo)); if (!execInfo) { @@ -6200,7 +6190,7 @@ int32_t fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, ch int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (pSupp->dbNameSlotId != -1) { - ASSERT(strlen(dbName)); + QUERY_CHECK_CONDITION((strlen(dbName) > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId); QUERY_CHECK_NULL(colInfoData, code, lino, _end, terrno); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 36f9ac0954..cb15c3c836 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -735,7 +735,12 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { } // beginSortGroup would fetch all child blocks of pInfo->currGroupId; - ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP); + if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + pOperator->pTaskInfo->code = code; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pInfo, &pBlock); if (pBlock != NULL && (code == 0)) { diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 44a383772d..b4f8d6837a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -88,7 +88,7 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group winCode = TSDB_CODE_FAILED; } else if (pBuffInfo->winBuffOp == MOVE_NEXT_WINDOW) { - ASSERT(pBuffInfo->pCur); + QUERY_CHECK_NULL(pBuffInfo->pCur, code, lino, _end, terrno); pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pBuffInfo->pCur); winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin, (void**)&pCurWin->winInfo.pStatePos, &size); @@ -345,7 +345,6 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl if (slidingRows + winRows > pAggSup->windowSliding) { buffInfo.winBuffOp = CREATE_NEW_WINDOW; winRows = pAggSup->windowSliding - slidingRows; - ASSERT(i >= 0); } } else { buffInfo.winBuffOp = MOVE_NEXT_WINDOW; @@ -690,7 +689,10 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_CODE(code, lino, _end); continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index ff1ff579fc..38a813bf33 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -378,7 +378,6 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild, &winRows); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(winRows >= 1); if (rebuild) { uint64_t uid = 0; code = appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey, @@ -660,7 +659,10 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_CODE(code, lino, _end); continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -779,7 +781,6 @@ void streamEventReloadState(SOperatorInfo* pOperator) { int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); qDebug("===stream=== event window operator reload state. get result count:%d", num); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; - ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 75b15dbea4..fac1cf48c7 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -331,7 +331,7 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS pFillInfo->pLinearInfo->winIndex = 0; } break; default: - ASSERT(0); + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR)); break; } } @@ -419,7 +419,6 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS pFillSup->next.pRowVal = pFillSup->cur.pRowVal; pFillInfo->preRowKey = INT64_MIN; } else { - ASSERT(hasNextWindow(pFillSup)); setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; } @@ -447,7 +446,6 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pLinearInfo->hasNext = false; } else { - ASSERT(hasNextWindow(pFillSup)); setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; pFillInfo->pLinearInfo->nextEnd = INT64_MIN; @@ -458,10 +456,9 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS } } break; default: - ASSERT(0); + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR)); break; } - ASSERT(pFillInfo->pos != FILL_POS_INVALID); } static int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) { @@ -628,7 +625,9 @@ static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SRes SWinKey key = {.groupId = groupId, .ts = pRow->key}; int32_t code = pAPI->stateStore.streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len); qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code); - ASSERT(code == TSDB_CODE_SUCCESS); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } } static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* pFillSup, SSDataBlock* pRes) { @@ -795,7 +794,6 @@ static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint if (winCode != TSDB_CODE_SUCCESS) { colDataSetNULL(pTableCol, pBlock->info.rows); } else { - ASSERT(tbname); char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false); @@ -1125,7 +1123,7 @@ static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { return code; } break; default: - ASSERTS(false, "invalid SSDataBlock type"); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5c12db1ab9..071c82c970 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -213,7 +213,6 @@ static void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) { } bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) { - ASSERTS(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0, "maxts should greater than 0"); return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark; } @@ -415,7 +414,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin code = appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL); QUERY_CHECK_CODE(code, lino, _end); } else { - ASSERT(tbname); + QUERY_CHECK_CONDITION((tbname), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); code = appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName); @@ -916,7 +915,6 @@ void buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBl } if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { - ASSERT(pBlock->info.rows > 0); break; } pGroupResInfo->index += 1; @@ -1366,7 +1364,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t winCode = TSDB_CODE_SUCCESS; code = pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize, &winCode); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(winCode == TSDB_CODE_SUCCESS); + QUERY_CHECK_CONDITION((winCode == TSDB_CODE_SUCCESS), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); code = tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES); QUERY_CHECK_CODE(code, lino, _end); @@ -1694,7 +1692,10 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc } else if (IS_FINAL_INTERVAL_OP(pOperator) && pBlock->info.type == STREAM_MID_RETRIEVE) { continue; } else { - ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -1883,7 +1884,6 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN .deleteMarkSaved = 0, .calTriggerSaved = 0, }; - ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); @@ -2090,7 +2090,6 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pSup->pCtx[i].saveHandle.pBuf = NULL; } - ASSERT(numOfCols > 0); return TSDB_CODE_SUCCESS; } @@ -2391,7 +2390,6 @@ _end: static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) { - ASSERT(pWinInfo->sessionWin.win.skey <= pWinInfo->sessionWin.win.ekey); *pResult = (SResultRow*)pWinInfo->pStatePos->pRowBuff; // set time window for current result (*pResult)->win = pWinInfo->sessionWin.win; @@ -3022,7 +3020,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa } if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { - ASSERT(pBlock->info.rows > 0); break; } @@ -3262,7 +3259,7 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe code = pAggSup->stateStore.streamStateSessionAddIfNotExist( pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(winCode == TSDB_CODE_SUCCESS); + QUERY_CHECK_CONDITION((winCode == TSDB_CODE_SUCCESS), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); code = @@ -3276,7 +3273,6 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe // 3.pChildren int32_t size = 0; buf = taosDecodeFixedI32(buf, &size); - ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); code = doStreamSessionDecodeOpState(buf, 0, pChOp, false, &buf); @@ -3447,7 +3443,10 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -3621,7 +3620,6 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; - ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; code = getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo); @@ -3667,7 +3665,6 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; - ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); @@ -3990,7 +3987,10 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock* doStreamSessionSaveCheckpoint(pOperator); continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -4205,7 +4205,8 @@ int32_t getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, pCurWin->winInfo.sessionWin.win.ekey = pKey->win.ekey; code = getSessionWindowInfoByKey(pAggSup, pKey, &pCurWin->winInfo); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(IS_VALID_SESSION_WIN(pCurWin->winInfo)); + QUERY_CHECK_CONDITION((IS_VALID_SESSION_WIN(pCurWin->winInfo)), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); pCurWin->pStateKey = (SStateKeys*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize)); @@ -4575,7 +4576,6 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera // 3.pChildren int32_t size = 0; buf = taosDecodeFixedI32(buf, &size); - ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); code = doStreamStateDecodeOpState(buf, 0, pChOp, false, &buf); @@ -4717,7 +4717,10 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -4829,7 +4832,6 @@ void streamStateReloadState(SOperatorInfo* pOperator) { int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); qDebug("===stream=== reload state. get result count:%d", num); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; - ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); @@ -5135,7 +5137,10 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) { @@ -5254,8 +5259,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* .minTs = INT64_MAX, .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)}; - ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); - pOperator->pTaskInfo = pTaskInfo; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; @@ -5635,7 +5638,6 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* } else { pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; } - ASSERT(taosArrayGetSize(pInfo->pUpdated) == 0); (*ppRes) = pInfo->pDelRes; return code; } @@ -5684,7 +5686,10 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* pInfo->clearState = true; break; } else { - ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 258f886805..f48393273f 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -114,7 +114,6 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); char* p = colDataGetData(pColInfoData, rowIndex); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - ASSERT(varDataTLen(p) <= pColInfoData->info.bytes); memcpy(pLinearInfo->start.val, p, varDataTLen(p)); } else { memcpy(pLinearInfo->start.val, p, pLinearInfo->bytes); @@ -127,7 +126,6 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo char* p = colDataGetData(pColInfoData, rowIndex); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - ASSERT(varDataTLen(p) <= pColInfoData->info.bytes); memcpy(pLinearInfo->end.val, p, varDataTLen(p)); } else { memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes); @@ -143,7 +141,6 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo char* p = colDataGetData(pColInfoData, rowIndex); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - ASSERT(varDataTLen(p) <= pColInfoData->info.bytes); memcpy(pLinearInfo->end.val, p, varDataTLen(p)); } else { memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6a74c6a093..91583d03ed 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -136,7 +136,6 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn // } } - ASSERT(forwardRows >= 0); return forwardRows; } @@ -211,8 +210,6 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) { - ASSERT(startPos >= 0 && startPos < pDataBlockInfo->rows); - int32_t num = -1; int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); @@ -259,8 +256,6 @@ void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY pr SFunctParam* pParam = &pCtx[k].param[0]; SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId); - ASSERT(pColInfo->info.type == pParam->pCol->type && curTs != windowKey); - double v1 = 0, v2 = 0, v = 0; if (prevRowIndex == -1) { SGroupKeys* p = taosArrayGet(pPrevValues, index); @@ -356,9 +351,11 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i return true; } -static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex, +static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex, int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols, - TSKEY blockEkey, STimeWindow* win) { + TSKEY blockEkey, STimeWindow* win, bool* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; int32_t order = pInfo->binfo.inputTsOrder; TSKEY actualEndKey = tsCols[endRowIndex]; @@ -367,21 +364,27 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx // not ended in current data block, do not invoke interpolation if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) { setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP); - return false; + (*pRes) = false; + return code; } // there is actual end point of current time window, no interpolation needs if (key == actualEndKey) { setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP); - return true; + (*pRes) = true; + return code; } - ASSERT(nextRowIndex >= 0); + if (nextRowIndex < 0) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR)); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } TSKEY nextKey = tsCols[nextRowIndex]; doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key, RESULT_ROW_END_INTERP, pSup); - return true; + (*pRes) = true; + return code; } bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) { @@ -437,13 +440,7 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl * This time window does not cover any data, try next time window, * this case may happen when the time window is too small */ - if (primaryKeys == NULL) { - if (ascQuery) { - ASSERT(pDataBlockInfo->window.skey <= pNext->ekey); - } else { - ASSERT(pDataBlockInfo->window.ekey >= pNext->skey); - } - } else { + if (primaryKeys != NULL) { if (ascQuery && primaryKeys[startPos] > pNext->ekey) { TSKEY next = primaryKeys[startPos]; if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { @@ -469,7 +466,6 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl } static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) { - ASSERT(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); if (type == RESULT_ROW_START_INTERP) { return pResult->startInterp == true; } else { @@ -485,15 +481,21 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { } } -static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult, - STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) { +static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult, + STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (!pInfo->timeWindowInterpo) { - return; + return code; + } + + if (pBlock == NULL) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return code; } - ASSERT(pBlock != NULL); if (pBlock->pDataBlock == NULL) { - return; + return code; } SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); @@ -528,14 +530,22 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB } TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; - bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, - endKey, win); + bool interp = false; + code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, + endKey, win, &interp); + QUERY_CHECK_CODE(code, lino, _end); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } } else { setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP); } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) { @@ -558,7 +568,6 @@ static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, S char* val = colDataGetData(pColInfo, i); if (IS_VAR_DATA_TYPE(pkey->type)) { memcpy(pkey->pData, val, varDataTLen(val)); - ASSERT(varDataTLen(val) <= pkey->bytes); } else { memcpy(pkey->pData, val, pkey->bytes); } @@ -594,11 +603,17 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num T_LONG_JMP(pTaskInfo->env, terrno); } - ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId); + if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) { + pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + T_LONG_JMP(pTaskInfo->env, terrno); + } if (pr->closed) { - ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) && - isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)); + if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) && + isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)) ) { + pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + T_LONG_JMP(pTaskInfo->env, terrno); + } SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow); taosMemoryFree(pNode); continue; @@ -611,7 +626,10 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } - ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); + if(isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { + pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + T_LONG_JMP(pTaskInfo->env, terrno); + } SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0); if (!pTsKey) { @@ -869,7 +887,10 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo } tsCols = (int64_t*)pColDataInfo->pData; - ASSERT(tsCols[0] != 0); + if(tsCols[0] == 0) { + pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + T_LONG_JMP(pTaskInfo->env, terrno); + } // no data in primary ts if (tsCols[0] == 0 && tsCols[pBlock->info.rows - 1] == 0) { @@ -1959,7 +1980,10 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { } else { if (pMiaInfo->groupId != pBlock->info.id.groupId) { // if there are unclosed time window, close it firstly. - ASSERT(pMiaInfo->curTs != INT64_MIN); + if (pMiaInfo->curTs == INT64_MIN) { + pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + T_LONG_JMP(pTaskInfo->env, terrno); + } finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo); resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow)); @@ -2216,7 +2240,9 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* TSKEY ekey = ascScan ? win.ekey : win.skey; int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->binfo.inputTsOrder); - ASSERT(forwardRows > 0); + if(forwardRows <= 0) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } // prev time window not interpolation yet. if (iaInfo->timeWindowInterpo) { diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 50a587e353..5eb55467b1 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -282,7 +282,6 @@ _end: int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen, int32_t* pWinCode) { SWinKey* pTmpkey = pKey; - ASSERT(keyLen == sizeof(SWinKey)); SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts}; return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode); } @@ -455,7 +454,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream SSessionKey pTmpKey = *pWinKey; int32_t winCode = TSDB_CODE_SUCCESS; code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen, &winCode); - ASSERT(winCode == TSDB_CODE_FAILED); + QUERY_CHECK_CONDITION((winCode == TSDB_CODE_FAILED), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); QUERY_CHECK_CODE(code, lino, _end); goto _end; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 6a2c85323a..bc2253d4bd 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -564,7 +564,7 @@ _end: int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - ASSERT(pInfo); + QUERY_CHECK_NULL(pInfo, code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); if (tStartDecode(&decoder) < 0) return -1; @@ -572,6 +572,8 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { int32_t size = 0; if (tDecodeI32(&decoder, &size) < 0) return -1; pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); + QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno); + TSKEY ts = INT64_MIN; for (int32_t i = 0; i < size; i++) { if (tDecodeI64(&decoder, &ts) < 0) return -1; @@ -623,7 +625,8 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { code = taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize); QUERY_CHECK_CODE(code, lino, _error); } - ASSERT(mapSize == taosHashGetSize(pInfo->pMap)); + QUERY_CHECK_CONDITION((mapSize == taosHashGetSize(pInfo->pMap)), code, lino, _error, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1; if (tDecodeI32(&decoder, &pInfo->pkColLen) < 0) return -1; @@ -634,7 +637,6 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { if (pInfo->pkColLen != 0) { pInfo->comparePkRowFn = compareKeyTsAndPk; pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC); - ; } else { pInfo->comparePkRowFn = compareKeyTs; pInfo->comparePkCol = NULL; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 3cdbad2dd5..cbaaec0964 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -125,7 +125,6 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { (*pVal) = taosMemoryCalloc(1, *pLen); void* buff = *pVal; int32_t tmp = taosEncodeFixedI64(&buff, *pKey); - ASSERT(tmp == sizeof(TSKEY)); } SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, @@ -204,7 +203,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ int32_t len = 0; int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len); if (tmpRes == TSDB_CODE_SUCCESS) { - ASSERT(len == sizeof(TSKEY)); + QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); streamFileStateDecode(&pFileState->flushMark, valBuf, len); qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark); } @@ -361,7 +360,7 @@ int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; if (pPos->beUsed == used) { if (used && !pPos->pRowBuff) { - ASSERT(pPos->needFree == true); + QUERY_CHECK_CONDITION((pPos->needFree == true), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); continue; } code = tdListAppend(pFlushList, &pPos); @@ -496,13 +495,13 @@ _end: code = tdListAppend(pFileState->usedBuffs, &pPos); QUERY_CHECK_CODE(code, lino, _error); + QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return NULL; } - ASSERT(pPos->pRowBuff != NULL); return pPos; } @@ -636,7 +635,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** QUERY_CHECK_CODE(code, lino, _end); pPos->pRowBuff = getFreeBuff(pFileState); } - ASSERT(pPos->pRowBuff); + QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } code = recoverSessionRowBuff(pFileState, pPos); @@ -877,7 +876,10 @@ void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { taosMemoryFreeClear(pVal); break; } - ASSERT(vlen == pFileState->rowSize); + if (vlen != pFileState->rowSize) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; From 1f8e712a64d20c94d9dabbf5c577a32aacd8129e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 20 Aug 2024 16:47:22 +0800 Subject: [PATCH 2/6] fix compile error --- source/libs/executor/src/cachescanoperator.c | 6 +++++- source/libs/executor/src/countwindowoperator.c | 13 ++++++++----- source/libs/executor/src/scanoperator.c | 13 +++++++++---- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 2751cf2851..6b5fadbaa1 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -298,7 +298,11 @@ int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { int32_t resultRows = pBufRes->info.rows; // the results may be null, if last values are all null - ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList)); + if (resultRows != 0 && resultRows != taosArrayGetSize(pInfo->pUidList)) { + pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pTaskInfo->code)); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } pInfo->indexOfBufferedRes = 0; } diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index a9858eeb96..f4eb0dd87e 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -66,14 +66,17 @@ static void clearWinStateBuff(SCountWindowResult* pBuff) { pBuff->winRows = 0; } static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) { SCountWindowResult* pBuffInfo = taosArrayGet(pCountSup->pWinStates, pCountSup->stateIndex); if (!pBuffInfo) { + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); return NULL; } - int32_t size = taosArrayGetSize(pCountSup->pWinStates); - // coverity scan - ASSERTS(size > 0, "WinStates is empty"); - if (size > 0) { - pCountSup->stateIndex = (pCountSup->stateIndex + 1) % size; + int32_t size = taosArrayGetSize(pCountSup->pWinStates); + if (size == 0) { + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return NULL; } + pCountSup->stateIndex = (pCountSup->stateIndex + 1) % size; return pBuffInfo; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 04aaa3c210..ca3d08be78 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1720,6 +1720,8 @@ static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, } static void prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex, bool* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pBlock->info.rows == 0) { if (pRes) { (*pRes) = false; @@ -1769,10 +1771,8 @@ static void prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ STableScanInfo* pTScanInfo = pInfo->pTableScanOp->info; // coverity scan - if (pInfo->pUpdateInfo == NULL){ - qError("Failed to set data version, since pInfo->pUpdateInfo is NULL"); - return; - } + QUERY_CHECK_NULL(pInfo->pUpdateInfo, code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey, pInfo->pUpdateInfo->maxDataVersion); resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion); @@ -1780,6 +1780,11 @@ static void prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ if (pRes) { (*pRes) = true; } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval, From c867f93e0e02b139336bccdf530cb8fa70f2d55f Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 20 Aug 2024 19:41:07 +0800 Subject: [PATCH 3/6] fix compile error --- source/libs/executor/src/timewindowoperator.c | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 91583d03ed..95d415f85c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -490,7 +490,7 @@ static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDa } if (pBlock == NULL) { - code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + code = TSDB_CODE_INVALID_PARA; return code; } @@ -782,11 +782,14 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, ret); } // window start key interpolation - doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); + ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1); @@ -814,7 +817,10 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->binfo.inputTsOrder); // window start(end) key interpolation - doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); + code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } // TODO: add to open window? how to close the open windows after input blocks exhausted? #if 0 if ((ascScan && ekey <= pBlock->info.window.ekey) || @@ -2253,11 +2259,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, ret); } // window start key interpolation - doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup); + ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1); @@ -2294,7 +2303,10 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* iaInfo->binfo.inputTsOrder); // window start(end) key interpolation - doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); + code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1); applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, From 1cd0e35443a438699e4013d2deee65a210b8379b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 21 Aug 2024 10:00:21 +0800 Subject: [PATCH 4/6] fix(stream):ignore some bf error --- source/util/src/tscalablebf.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index ffcfdfeaf1..407223e937 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -71,8 +71,7 @@ int32_t tScalableBfPutNoCheck(SScalableBf* pSBf, const void* keyBuf, uint32_t le int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (pSBf->status == SBF_INVALID) { - code = TSDB_CODE_OUT_OF_BUFFER; - QUERY_CHECK_CODE(code, lino, _error); + return code; } int32_t size = taosArrayGetSize(pSBf->bfArray); SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); @@ -105,8 +104,8 @@ int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len, int3 int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (pSBf->status == SBF_INVALID) { - code = TSDB_CODE_OUT_OF_BUFFER; - QUERY_CHECK_CODE(code, lino, _end); + (*winRes) = TSDB_CODE_FAILED; + return code; } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); From 044f7d66587ade3f966fcda100aedd3a55c9f9fe Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 21 Aug 2024 14:41:14 +0800 Subject: [PATCH 5/6] fix: remove assert --- source/libs/catalog/inc/catalogInt.h | 40 +++++++-- source/libs/catalog/src/catalog.c | 2 - source/libs/catalog/src/ctgAsync.c | 52 ++++++------ source/libs/catalog/src/ctgRemote.c | 7 +- source/libs/catalog/src/ctgUtil.c | 2 +- source/libs/executor/src/dataDeleter.c | 10 ++- source/libs/executor/src/dataDispatcher.c | 6 +- source/libs/executor/src/dataInserter.c | 6 +- .../libs/executor/src/dynqueryctrloperator.c | 4 +- source/libs/executor/src/hashjoin.c | 6 +- source/libs/executor/src/hashjoinoperator.c | 3 - source/libs/executor/src/mergejoin.c | 46 +++++----- source/libs/executor/src/mergejoinoperator.c | 14 ++-- source/libs/qworker/inc/qwInt.h | 84 ++++++++++++------- source/libs/scheduler/inc/schInt.h | 40 +++++++-- source/os/src/osFile.c | 3 - source/os/src/osMemory.c | 8 +- source/os/src/osSocket.c | 11 +-- source/os/src/osString.c | 49 ----------- source/os/src/osSysinfo.c | 3 +- source/os/src/osSystem.c | 3 - source/os/src/osThread.c | 3 +- 22 files changed, 214 insertions(+), 188 deletions(-) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index f3b1852ce1..5580b0cadc 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -852,34 +852,58 @@ typedef struct SCtgCacheItemInfo { #define CTG_LOCK(type, _lock) \ do { \ if (CTG_READ == (type)) { \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value before read lock"); \ + break; \ + } \ CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRLockLatch(_lock); \ CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \ + if (atomic_load_32((_lock)) <= 0) { \ + qError("invalid lock value after read lock"); \ + break; \ + } \ } else { \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value before write lock"); \ + break; \ + } \ CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWLockLatch(_lock); \ CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \ + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ + qError("invalid lock value after write lock"); \ + break; \ + } \ } \ } while (0) #define CTG_UNLOCK(type, _lock) \ do { \ if (CTG_READ == (type)) { \ - ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \ + if (atomic_load_32((_lock)) <= 0) { \ + qError("invalid lock value before read unlock"); \ + break; \ + } \ CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRUnLockLatch(_lock); \ CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value after read unlock"); \ + break; \ + } \ } else { \ - ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \ + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ + qError("invalid lock value before write unlock"); \ + break; \ + } \ CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWUnLockLatch(_lock); \ CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value after write unlock"); \ + break; \ + } \ } \ } while (0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index d4c79a6c8d..334dce9c1a 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -773,8 +773,6 @@ int32_t ctgGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaNa } CTG_ERR_JRET(code); - - ASSERT(tsmaRsp.pTsmas && tsmaRsp.pTsmas->size == 1); *pTsma = taosArrayGetP(tsmaRsp.pTsmas, 0); taosArrayDestroy(tsmaRsp.pTsmas); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 4caa66445a..031d61554c 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2448,30 +2448,33 @@ int32_t ctgHandleGetTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* STableTSMAInfoRsp* pOut = pMsgCtx->out; pRes->code = 0; - if (pOut->pTsmas->size > 0) { - ASSERT(pOut->pTsmas->size == 1); - pRes->pRes = pOut; - pMsgCtx->out = NULL; - TSWAP(pTask->res, pCtx->pResList); + if (1 != pOut->pTsmas->size) { + ctgError("invalid tsma num:%d", (int32_t)pOut->pTsmas->size); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } - STableTSMAInfo* pTsma = taosArrayGetP(pOut->pTsmas, 0); - if (NULL == pTsma) { - ctgError("fail to get the 0th STableTSMAInfo, totalNum:%d", (int32_t)taosArrayGetSize(pOut->pTsmas)); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - int32_t exists = false; - CTG_ERR_JRET(ctgTbMetaExistInCache(pCtg, pTsma->targetDbFName, pTsma->targetTb, &exists)); - if (!exists) { - TSWAP(pMsgCtx->lastOut, pMsgCtx->out); - CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, pTsma->targetTb, NULL, tReq)); - } + pRes->pRes = pOut; + pMsgCtx->out = NULL; + TSWAP(pTask->res, pCtx->pResList); + + STableTSMAInfo* pTsma = taosArrayGetP(pOut->pTsmas, 0); + if (NULL == pTsma) { + ctgError("fail to get the 0th STableTSMAInfo, totalNum:%d", (int32_t)taosArrayGetSize(pOut->pTsmas)); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } + int32_t exists = false; + CTG_ERR_JRET(ctgTbMetaExistInCache(pCtg, pTsma->targetDbFName, pTsma->targetTb, &exists)); + if (!exists) { + TSWAP(pMsgCtx->lastOut, pMsgCtx->out); + CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, pTsma->targetTb, NULL, tReq)); + } + break; } default: - ASSERT(0); + ctgError("invalid reqType:%d while getting tsma rsp", reqType); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } _return: @@ -2635,14 +2638,14 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf break; } default: - ASSERT(0); + ctgError("invalid fetchType:%d while getting tb tsma rsp", pFetch->fetchType); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } break; } case TDMT_VND_TABLE_META: { // handle source tb meta - ASSERT(pFetch->fetchType == FETCH_TSMA_SOURCE_TB_META); STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; pFetch->fetchType = FETCH_TB_TSMA; pFetch->tsmaSourceTbName = *pTbName; @@ -2663,7 +2666,8 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf break; } default: - ASSERT(0); + ctgError("invalid reqType:%d while getting tsma rsp", reqType); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } _return: @@ -3628,7 +3632,8 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) { break; } default: - ASSERT(0); + ctgError("invalid fetchType:%d in getting tb tsma task", pFetch->fetchType); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); break; } } @@ -3644,14 +3649,12 @@ int32_t ctgLaunchGetTSMATask(SCtgTask* pTask) { SCtgJob* pJob = pTask->pJob; // currently, only support fetching one tsma - ASSERT(pCtx->pNames->size == 1); STablesReq* pReq = taosArrayGet(pCtx->pNames, 0); if (NULL == pReq) { ctgError("fail to get the 0th STablesReq, totalNum:%d", (int32_t)taosArrayGetSize(pCtx->pNames)); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } - ASSERT(pReq->pTables->size == 1); SName* pTsmaName = taosArrayGet(pReq->pTables, 0); if (NULL == pReq) { ctgError("fail to get the 0th SName, totalNum:%d", (int32_t)taosArrayGetSize(pReq->pTables)); @@ -3686,7 +3689,6 @@ int32_t ctgLaunchGetTSMATask(SCtgTask* pTask) { } STableTSMAInfoRsp* pRsp = (STableTSMAInfoRsp*)pRes->pRes; - ASSERT(pRsp->pTsmas->size == 1); const STSMACache* pTsma = taosArrayGetP(pRsp->pTsmas, 0); if (NULL == pTsma) { diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index a312dce164..96a700d2d6 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -42,7 +42,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu msgNum = taosArrayGetSize(batchRsp.pRsps); } - if (ASSERTS(taskNum == msgNum || 0 == msgNum, "taskNum %d mis-match msgNum %d", taskNum, msgNum)) { + if (taskNum != msgNum && 0 != msgNum) { + ctgError("taskNum %d mis-match msgNum %d", taskNum, msgNum); msgNum = 0; } @@ -77,7 +78,9 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu if (msgNum > 0) { pRsp = taosArrayGet(batchRsp.pRsps, i); - if (ASSERTS(pRsp->msgIdx == *msgIdx, "rsp msgIdx %d mis-match msgIdx %d", pRsp->msgIdx, *msgIdx)) { + if (pRsp->msgIdx != *msgIdx) { + ctgError("rsp msgIdx %d mis-match msgIdx %d", pRsp->msgIdx, *msgIdx); + pRsp = &rsp; pRsp->msgIdx = *msgIdx; pRsp->reqType = -1; diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index fa4df70f59..5d8107ca44 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -2631,7 +2631,7 @@ bool hasOutOfDateTSMACache(SArray* pTsmas) { for (int32_t i = 0; i < pTsmas->size; ++i) { STSMACache* pTsmaInfo = taosArrayGetP(pTsmas, i); if (NULL == pTsmaInfo) { - ASSERT(0); + continue; } if (isCtgTSMACacheOutOfDate(pTsmaInfo)) { return true; diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 9f0ea0a87f..60bfb58ef5 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -87,7 +87,10 @@ static int32_t toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* p if (pRes->affectedRows) { pRes->skey = *(int64_t*)pColSKey->pData; pRes->ekey = *(int64_t*)pColEKey->pData; - ASSERT(pRes->skey <= pRes->ekey); + if (pRes->skey > pRes->ekey) { + qError("data delter skey:%" PRId64 " is bigger than ekey:%" PRId64, pRes->skey, pRes->ekey); + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } } else { pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; @@ -205,7 +208,10 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRaw static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; if (NULL == pDeleter->nextOutput.pData) { - ASSERT(pDeleter->queryEnd); + if (!pDeleter->queryEnd) { + qError("empty res while query not end in data deleter"); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } pOutput->useconds = pDeleter->useconds; pOutput->precision = pDeleter->pSchema->precision; pOutput->bufStatus = DS_BUF_EMPTY; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 3964422411..d96b78a342 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -242,7 +242,11 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRow static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (NULL == pDispatcher->nextOutput.pData) { - ASSERT(pDispatcher->queryEnd); + if (!pDispatcher->queryEnd) { + qError("empty res while query not end in data dispatcher"); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + pOutput->useconds = pDispatcher->useconds; pOutput->precision = pDispatcher->pSchema->precision; pOutput->bufStatus = DS_BUF_EMPTY; diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 6f226ecb21..44342b0ac9 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -234,7 +234,11 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY - ASSERT(pColInfoData->info.type == pCol->type); + if (pColInfoData->info.type != pCol->type) { + qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k, pCol->type); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + goto _end; + } if (colDataIsNull_s(pColInfoData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); if (NULL == taosArrayPush(pVals, &cv)) { diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 02932cd278..ea2f81a4b8 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -823,12 +823,14 @@ static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) { pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache); qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache)); +/* // debug only iter = 0; uint32_t* num = NULL; while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) { - ASSERT(*num > 1); + A S S E R T(*num > 1); } +*/ } static void buildStbJoinTableList(SOperatorInfo* pOperator) { diff --git a/source/libs/executor/src/hashjoin.c b/source/libs/executor/src/hashjoin.c index d4a84afea2..f63b4093db 100755 --- a/source/libs/executor/src/hashjoin.c +++ b/source/libs/executor/src/hashjoin.c @@ -60,7 +60,7 @@ int32_t hInnerJoinDo(struct SOperatorInfo* pOperator) { /* size_t keySize = 0; int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize); - ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen)); + A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen)); int64_t rows = getSingleKeyRowsNum(pGroup->rows); pJoin->execInfo.expectRows += rows; qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows); @@ -145,7 +145,7 @@ int32_t hLeftJoinHandleSeqProbeRows(struct SOperatorInfo* pOperator, SHJoinOpera /* size_t keySize = 0; int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize); - ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen)); + A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen)); int64_t rows = getSingleKeyRowsNum(pGroup->rows); pJoin->execInfo.expectRows += rows; qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows); @@ -248,7 +248,7 @@ int32_t hLeftJoinHandleProbeRows(struct SOperatorInfo* pOperator, SHJoinOperator /* size_t keySize = 0; int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize); - ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen)); + A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen)); int64_t rows = getSingleKeyRowsNum(pGroup->rows); pJoin->execInfo.expectRows += rows; qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 807d6b9785..15819cd94a 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -38,8 +38,6 @@ bool hJoinBlkReachThreshold(SHJoinOperatorInfo* pInfo, int64_t blkRows) { } int32_t hJoinHandleMidRemains(SHJoinOperatorInfo* pJoin, SHJoinCtx* pCtx) { - ASSERT(0 < pJoin->midBlk->info.rows); - TSWAP(pJoin->midBlk, pJoin->finBlk); pCtx->midRemains = false; @@ -1153,7 +1151,6 @@ int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinN if (NULL == pJoin->finBlk) { QRY_ERR_RET(terrno); } - ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0); int32_t code = blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode)); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index d3abaaab6d..302dd31788 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -321,7 +321,7 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { int32_t buildEndIdx = buildGrp->endIdx; buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; - ASSERT(buildGrp->endIdx >= buildGrp->readIdx); + //A S S E R T(buildGrp->endIdx >= buildGrp->readIdx); MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); buildGrp->readIdx += rowsLeft; buildGrp->endIdx = buildEndIdx; @@ -1383,9 +1383,9 @@ static int32_t mSemiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* p continue; } - ASSERT(1 == pCtx->midBlk->info.rows); + //A S S E R T(1 == pCtx->midBlk->info.rows); MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk)); - ASSERT(false == pCtx->midRemains); + //A S S E R T(false == pCtx->midRemains); break; } while (true); @@ -1449,10 +1449,10 @@ static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) { } build->pHashCurGrp = *(SArray**)pGrp; - ASSERT(1 == taosArrayGetSize(build->pHashCurGrp)); + //A S S E R T(1 == taosArrayGetSize(build->pHashCurGrp)); build->grpRowIdx = 0; MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, NULL)); - ASSERT(build->grpRowIdx < 0); + //A S S E R T(build->grpRowIdx < 0); } pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; @@ -1497,7 +1497,7 @@ static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { int32_t buildEndIdx = buildGrp->endIdx; buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; - ASSERT(buildGrp->endIdx >= buildGrp->readIdx); + //A S S E R T(buildGrp->endIdx >= buildGrp->readIdx); MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); buildGrp->readIdx += rowsLeft; buildGrp->endIdx = buildEndIdx; @@ -1513,9 +1513,9 @@ static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { continue; } } else { - ASSERT(1 == pCtx->midBlk->info.rows); + //A S S E R T(1 == pCtx->midBlk->info.rows); MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk)); - ASSERT(false == pCtx->midRemains); + //A S S E R T(false == pCtx->midRemains); if (build->grpIdx == buildGrpNum) { continue; @@ -1555,8 +1555,8 @@ static int32_t mSemiJoinMergeFullCart(SMJoinMergeCtx* pCtx) { int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); int32_t probeEndIdx = probeGrp->endIdx; - ASSERT(1 == taosArrayGetSize(build->eqGrps)); - ASSERT(buildGrp->beginIdx == buildGrp->endIdx); + //A S S E R T(1 == taosArrayGetSize(build->eqGrps)); + //A S S E R T(buildGrp->beginIdx == buildGrp->endIdx); if (probeRows <= rowsLeft) { MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); @@ -1826,7 +1826,7 @@ static int32_t mAntiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { int32_t buildEndIdx = buildGrp->endIdx; buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; - ASSERT(buildGrp->endIdx >= buildGrp->readIdx); + //A S S E R T(buildGrp->endIdx >= buildGrp->readIdx); MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); buildGrp->readIdx += rowsLeft; buildGrp->endIdx = buildEndIdx; @@ -2381,7 +2381,7 @@ int32_t mAsofForwardTrimCacheBlk(SMJoinWindowCtx* pCtx) { if (pGrp->blk == pCtx->cache.outBlk && pCtx->pJoin->build->blkRowIdx > 0) { MJ_ERR_RET(blockDataTrimFirstRows(pGrp->blk, pCtx->pJoin->build->blkRowIdx)); pCtx->pJoin->build->blkRowIdx = 0; - ASSERT(pCtx->pJoin->build->blk == pGrp->blk); + //A S S E R T(pCtx->pJoin->build->blk == pGrp->blk); MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build); } @@ -2419,16 +2419,16 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) { MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } - ASSERT(pGrp->blk == pCache->outBlk); + //A S S E R T(pGrp->blk == pCache->outBlk); //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx; } - //ASSERT((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum); + //A S S E R T((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum); } - ASSERT(taosArrayGetSize(pCache->grps) == 1); - ASSERT(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum); + //A S S E R T(taosArrayGetSize(pCache->grps) == 1); + //A S S E R T(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum); } do { @@ -2473,7 +2473,7 @@ int32_t mAsofForwardUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) { return TSDB_CODE_SUCCESS; } - ASSERT(pCtx->jLimit > (pGrp->blk->info.rows - pGrp->beginIdx)); + //A S S E R T(pCtx->jLimit > (pGrp->blk->info.rows - pGrp->beginIdx)); pGrp->endIdx = pGrp->blk->info.rows - 1; int64_t remainRows = pCtx->jLimit - (pGrp->endIdx - pGrp->beginIdx + 1); @@ -2550,8 +2550,8 @@ int32_t mAsofForwardSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) { MJOIN_RESTORE_TB_BLK(cache, pTable); } while (!MJOIN_BUILD_TB_ROWS_DONE(pTable)); - ASSERT(pCtx->cache.rowNum == 0); - ASSERT(taosArrayGetSize(pCtx->cache.grps) == 0); + //A S S E R T(pCtx->cache.rowNum == 0); + //A S S E R T(taosArrayGetSize(pCtx->cache.grps) == 0); if (pTable->dsFetchDone) { return TSDB_CODE_SUCCESS; @@ -2648,7 +2648,7 @@ static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo if ((probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) && pCtx->cache.rowNum < pCtx->jLimit) { pJoin->build->newBlk = false; MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build); - ASSERT(taosArrayGetSize(pCtx->cache.grps) <= 1); + //A S S E R T(taosArrayGetSize(pCtx->cache.grps) <= 1); buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); } @@ -3375,7 +3375,7 @@ int32_t mWinJoinMoveAscWinEnd(SMJoinWindowCtx* pCtx) { continue; } - ASSERT(pGrp->endIdx > startIdx); + //A S S E R T(pGrp->endIdx > startIdx); pGrp->endIdx--; break; @@ -3419,7 +3419,7 @@ int32_t mWinJoinMoveDescWinEnd(SMJoinWindowCtx* pCtx) { continue; } - ASSERT(pGrp->endIdx > startIdx); + //A S S E R T(pGrp->endIdx > startIdx); pGrp->endIdx--; break; @@ -3676,7 +3676,7 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ MJ_ERR_RET(terrno); } - ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0); + //A S S E R T(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0); MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode))); diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 52b0da7c92..fced682312 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -490,8 +490,6 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx) { - ASSERT(0 < pCtx->midBlk->info.rows); - TSWAP(pCtx->midBlk, pCtx->finBlk); pCtx->midRemains = false; @@ -567,7 +565,6 @@ int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app int32_t currRows = append ? pRes->info.rows : 0; int32_t firstRows = GRP_REMAIN_ROWS(pFirst); int32_t secondRows = GRP_REMAIN_ROWS(pSecond); - ASSERT(secondRows > 0); for (int32_t c = 0; c < probe->finNum; ++c) { SMJoinColMap* pFirstCol = probe->finCols + c; @@ -581,9 +578,15 @@ int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) { colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows); } else { - ASSERT(pRes->info.capacity >= (pRes->info.rows + firstRows * secondRows)); + if (pRes->info.capacity < (pRes->info.rows + firstRows * secondRows)) { + qError("capacity:%d not enough, rows:%" PRId64 ", firstRows:%d, secondRows:%d", pRes->info.capacity, pRes->info.rows, firstRows, secondRows); + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type)) ? pOutCol->varmeta.length : ((currRows + r * secondRows) * pOutCol->info.bytes); - ASSERT((startOffset + 1 * pOutCol->info.bytes) <= pRes->info.capacity * pOutCol->info.bytes); + if ((startOffset + 1 * pOutCol->info.bytes) > pRes->info.capacity * pOutCol->info.bytes) { + qError("col buff not enough, startOffset:%d, bytes:%d, capacity:%d", startOffset, pOutCol->info.bytes, pRes->info.capacity); + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } MJ_ERR_RET(colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r), secondRows, true)); } } @@ -1097,7 +1100,6 @@ SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTa } SMJoinTableCtx* pProbe = pJoin->probe; - ASSERT(pProbe->lastInGid); while (true) { if (pTable->remainInBlk) { diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 93bbc44a25..68355b628e 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -338,38 +338,62 @@ typedef struct SQWorkerMgmt { #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 -#define QW_LOCK(type, _lock) \ - do { \ - if (QW_READ == (type)) { \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \ - QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosRLockLatch(_lock); \ - QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \ - } else { \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \ - QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosWLockLatch(_lock); \ - QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \ - } \ +#define QW_LOCK(type, _lock) \ + do { \ + if (QW_READ == (type)) { \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value before read lock"); \ + break; \ + } \ + QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosRLockLatch(_lock); \ + QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (atomic_load_32((_lock)) <= 0) { \ + qError("invalid lock value after read lock"); \ + break; \ + } \ + } else { \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value before write lock"); \ + break; \ + } \ + QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosWLockLatch(_lock); \ + QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ + qError("invalid lock value after write lock"); \ + break; \ + } \ + } \ } while (0) -#define QW_UNLOCK(type, _lock) \ - do { \ - if (QW_READ == (type)) { \ - ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \ - QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosRUnLockLatch(_lock); \ - QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \ - } else { \ - ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \ - QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosWUnLockLatch(_lock); \ - QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \ - } \ +#define QW_UNLOCK(type, _lock) \ + do { \ + if (QW_READ == (type)) { \ + if (atomic_load_32((_lock)) <= 0) { \ + qError("invalid lock value before read unlock"); \ + break; \ + } \ + QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosRUnLockLatch(_lock); \ + QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value after read unlock"); \ + break; \ + } \ + } else { \ + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ + qError("invalid lock value before write unlock"); \ + break; \ + } \ + QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosWUnLockLatch(_lock); \ + QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value after write unlock"); \ + break; \ + } \ + } \ } while (0) extern SQWorkerMgmt gQwMgmt; diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 5ea79c6ae9..6a64ff67e4 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -501,34 +501,58 @@ extern SSchedulerMgmt schMgmt; #define SCH_LOCK(type, _lock) \ do { \ if (SCH_READ == (type)) { \ - ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before read lock"); \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value before read lock"); \ + break; \ + } \ SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRLockLatch(_lock); \ SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32(_lock) > 0, "invalid lock value after read lock"); \ + if (atomic_load_32((_lock)) <= 0) { \ + qError("invalid lock value after read lock"); \ + break; \ + } \ } else { \ - ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before write lock"); \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value before write lock"); \ + break; \ + } \ SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWLockLatch(_lock); \ SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \ + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ + qError("invalid lock value after write lock"); \ + break; \ + } \ } \ } while (0) #define SCH_UNLOCK(type, _lock) \ do { \ if (SCH_READ == (type)) { \ - ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \ + if (atomic_load_32((_lock)) <= 0) { \ + qError("invalid lock value before read unlock"); \ + break; \ + } \ SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRUnLockLatch(_lock); \ SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value after read unlock"); \ + break; \ + } \ } else { \ - ASSERTS(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \ + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ + qError("invalid lock value before write unlock"); \ + break; \ + } \ SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWUnLockLatch(_lock); \ SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \ + if (atomic_load_32((_lock)) < 0) { \ + qError("invalid lock value after write unlock"); \ + break; \ + } \ } \ } while (0) diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index a5df4f63f3..16ebe05520 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -454,7 +454,6 @@ int64_t taosPWriteFile(TdFilePtr pFile, const void *buf, int64_t count, int64_t #if FILE_WITH_LOCK taosThreadRwlockWrlock(&(pFile->rwlock)); #endif - ASSERT(pFile->hFile != NULL); // Please check if you have closed the file. if (pFile->hFile == NULL) { #if FILE_WITH_LOCK taosThreadRwlockUnlock(&(pFile->rwlock)); @@ -867,7 +866,6 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) { #endif int32_t code = 0; - ASSERT(pFile->fd >= 0); // Please check if you have closed the file. #ifdef WINDOWS int64_t ret = _lseeki64(pFile->fd, offset, whence); @@ -1479,7 +1477,6 @@ int32_t taosEOFFile(TdFilePtr pFile) { terrno = TSDB_CODE_INVALID_PARA; return -1; } - ASSERT(pFile->fp != NULL); if (pFile->fp == NULL) { terrno = TSDB_CODE_INVALID_PARA; return -1; diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 7a5a547354..91eb7763bc 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -326,10 +326,8 @@ void *taosMemoryRealloc(void *ptr, int64_t size) { if (ptr == NULL) return taosMemoryMalloc(size); TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); - ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) { - +return NULL; - + + return NULL; } TdMemoryInfo tdMemoryInfo; @@ -366,7 +364,6 @@ char *taosStrdup(const char *ptr) { if (ptr == NULL) return NULL; TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); - ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); if (pTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) { return NULL; } @@ -413,7 +410,6 @@ int64_t taosMemorySize(void *ptr) { #ifdef USE_TD_MEMORY TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); - ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); if (pTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) { return NULL; } @@ -441,7 +437,7 @@ void taosMemoryTrim(int32_t size) { void *taosMemoryMallocAlign(uint32_t alignment, int64_t size) { #ifdef USE_TD_MEMORY - ASSERT(0); + return NULL; #else #if defined(LINUX) #ifdef BUILD_WITH_RAND_ERR diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 081ed46c9a..2d160b277b 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -312,8 +312,7 @@ int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void return -1; } #ifdef WINDOWS - ASSERT(0); - return 0; + return -1; #else return getsockopt(pSocket->fd, level, optname, optval, (int *)optlen); #endif @@ -681,8 +680,7 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) { int taosGetLocalIp(const char *eth, char *ip) { #if defined(WINDOWS) // DO NOTHAING - ASSERT(0); - return 0; + return -1; #else int fd; struct ifreq ifr; @@ -708,8 +706,7 @@ int taosGetLocalIp(const char *eth, char *ip) { int taosValidIp(uint32_t ip) { #if defined(WINDOWS) // DO NOTHAING - ASSERT(0); - return 0; + return -1; #else int ret = -1; int fd; @@ -1111,7 +1108,7 @@ int32_t taosIgnSIGPIPE() { int32_t taosSetMaskSIGPIPE() { #ifdef WINDOWS - // ASSERT(0); + return -1; #else sigset_t signal_mask; (void)sigemptyset(&signal_mask); diff --git a/source/os/src/osString.c b/source/os/src/osString.c index b0a3615ee5..1c99355e34 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -425,10 +425,6 @@ int64_t taosStr2Int64(const char *str, char **pEnd, int32_t radix) { int64_t tmp = strtoll(str, pEnd, radix); #if defined(DARWIN) || defined(_ALPINE) if (errno == EINVAL) errno = 0; -#endif -#ifdef TD_CHECK_STR_TO_INT_ERROR - ASSERT(errno != ERANGE); - ASSERT(errno != EINVAL); #endif return tmp; } @@ -437,10 +433,6 @@ uint64_t taosStr2UInt64(const char *str, char **pEnd, int32_t radix) { uint64_t tmp = strtoull(str, pEnd, radix); #if defined(DARWIN) || defined(_ALPINE) if (errno == EINVAL) errno = 0; -#endif -#ifdef TD_CHECK_STR_TO_INT_ERROR - ASSERT(errno != ERANGE); - ASSERT(errno != EINVAL); #endif return tmp; } @@ -449,10 +441,6 @@ int32_t taosStr2Int32(const char *str, char **pEnd, int32_t radix) { int32_t tmp = strtol(str, pEnd, radix); #if defined(DARWIN) || defined(_ALPINE) if (errno == EINVAL) errno = 0; -#endif -#ifdef TD_CHECK_STR_TO_INT_ERROR - ASSERT(errno != ERANGE); - ASSERT(errno != EINVAL); #endif return tmp; } @@ -461,10 +449,6 @@ uint32_t taosStr2UInt32(const char *str, char **pEnd, int32_t radix) { uint32_t tmp = strtol(str, pEnd, radix); #if defined(DARWIN) || defined(_ALPINE) if (errno == EINVAL) errno = 0; -#endif -#ifdef TD_CHECK_STR_TO_INT_ERROR - ASSERT(errno != ERANGE); - ASSERT(errno != EINVAL); #endif return tmp; } @@ -473,12 +457,6 @@ int16_t taosStr2Int16(const char *str, char **pEnd, int32_t radix) { int32_t tmp = strtol(str, pEnd, radix); #if defined(DARWIN) || defined(_ALPINE) if (errno == EINVAL) errno = 0; -#endif -#ifdef TD_CHECK_STR_TO_INT_ERROR - ASSERT(errno != ERANGE); - ASSERT(errno != EINVAL); - ASSERT(tmp >= SHRT_MIN); - ASSERT(tmp <= SHRT_MAX); #endif return (int16_t)tmp; } @@ -487,23 +465,12 @@ uint16_t taosStr2UInt16(const char *str, char **pEnd, int32_t radix) { uint32_t tmp = strtoul(str, pEnd, radix); #if defined(DARWIN) || defined(_ALPINE) if (errno == EINVAL) errno = 0; -#endif -#ifdef TD_CHECK_STR_TO_INT_ERROR - ASSERT(errno != ERANGE); - ASSERT(errno != EINVAL); - ASSERT(tmp <= USHRT_MAX); #endif return (uint16_t)tmp; } int8_t taosStr2Int8(const char *str, char **pEnd, int32_t radix) { int32_t tmp = strtol(str, pEnd, radix); -#ifdef TD_CHECK_STR_TO_INT_ERROR - ASSERT(errno != ERANGE); - ASSERT(errno != EINVAL); - ASSERT(tmp >= SCHAR_MIN); - ASSERT(tmp <= SCHAR_MAX); -#endif return tmp; } @@ -511,33 +478,17 @@ uint8_t taosStr2UInt8(const char *str, char **pEnd, int32_t radix) { uint32_t tmp = strtoul(str, pEnd, radix); #if defined(DARWIN) || defined(_ALPINE) if (errno == EINVAL) errno = 0; -#endif -#ifdef TD_CHECK_STR_TO_INT_ERROR - ASSERT(errno != ERANGE); - ASSERT(errno != EINVAL); - ASSERT(tmp <= UCHAR_MAX); #endif return tmp; } double taosStr2Double(const char *str, char **pEnd) { double tmp = strtod(str, pEnd); -#ifdef TD_CHECK_STR_TO_INT_ERROR - ASSERT(errno != ERANGE); - ASSERT(errno != EINVAL); - ASSERT(tmp != HUGE_VAL); -#endif return tmp; } float taosStr2Float(const char *str, char **pEnd) { float tmp = strtof(str, pEnd); -#ifdef TD_CHECK_STR_TO_INT_ERROR - ASSERT(errno != ERANGE); - ASSERT(errno != EINVAL); - ASSERT(tmp != HUGE_VALF); - ASSERT(tmp != NAN); -#endif return tmp; } diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 92e5967416..9a4f0f8a98 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -287,7 +287,7 @@ void taosGetSystemInfo() { int32_t taosGetEmail(char *email, int32_t maxLen) { #ifdef WINDOWS - // ASSERT(0); + return 0; #elif defined(_TD_DARWIN_64) #ifdef CUS_PROMPT const char *filepath = "/usr/local/"CUS_PROMPT"/email"; @@ -1040,7 +1040,6 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { char *taosGetCmdlineByPID(int pid) { #ifdef WINDOWS - ASSERT(0); return ""; #elif defined(_TD_DARWIN_64) static char cmdline[1024]; diff --git a/source/os/src/osSystem.c b/source/os/src/osSystem.c index 843ee20d5b..a8a9ff681b 100644 --- a/source/os/src/osSystem.c +++ b/source/os/src/osSystem.c @@ -91,7 +91,6 @@ typedef struct FILE TdCmd; #ifdef BUILD_NO_CALL void* taosLoadDll(const char* filename) { #if defined(WINDOWS) - ASSERT(0); return NULL; #elif defined(_TD_DARWIN_64) return NULL; @@ -110,7 +109,6 @@ void* taosLoadDll(const char* filename) { void* taosLoadSym(void* handle, char* name) { #if defined(WINDOWS) - ASSERT(0); return NULL; #elif defined(_TD_DARWIN_64) return NULL; @@ -131,7 +129,6 @@ void* taosLoadSym(void* handle, char* name) { void taosCloseDll(void* handle) { #if defined(WINDOWS) - ASSERT(0); return; #elif defined(_TD_DARWIN_64) return; diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 3e37d12759..5a24e7775f 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -784,8 +784,7 @@ int32_t taosThreadSpinDestroy(TdThreadSpinlock *lock) { int32_t taosThreadSpinInit(TdThreadSpinlock *lock, int32_t pshared) { #ifdef TD_USE_SPINLOCK_AS_MUTEX - ASSERT(pshared == 0); - if (pshared != 0) return -1; + if (pshared != 0) return TSDB_CODE_INVALID_PARA; return pthread_mutex_init((pthread_mutex_t *)lock, NULL); #else int32_t code = pthread_spin_init((pthread_spinlock_t *)lock, pshared); From 0f82052392828dd18b3491bbb2711efb4b3b9757 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 21 Aug 2024 15:28:25 +0800 Subject: [PATCH 6/6] enh: clear tdb asserts --- include/util/taoserror.h | 3 +- source/libs/tdb/src/db/tdbBtree.c | 48 ++++++++++++++----------------- source/libs/tdb/src/db/tdbPage.c | 10 +++++-- source/libs/tdb/src/db/tdbPager.c | 14 --------- source/libs/tdb/src/db/tdbTxn.c | 1 - source/libs/tdb/src/inc/tdbInt.h | 7 +++-- source/libs/tdb/src/inc/tdbUtil.h | 6 ---- source/util/src/terror.c | 1 + 8 files changed, 37 insertions(+), 53 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b772edbf22..3b6ed7143a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -91,7 +91,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024) #define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025) #define TSDB_CODE_RPC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0026) -#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027) +#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027) @@ -152,6 +152,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_FAILED_TO_CONNECT_S3 TAOS_DEF_ERROR_CODE(0, 0x0135) #define TSDB_CODE_MSG_PREPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0136) // internal #define TSDB_CODE_OUT_OF_BUFFER TAOS_DEF_ERROR_CODE(0, 0x0137) +#define TSDB_CODE_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0138) //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 59b7f4af2e..f878766f77 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -161,7 +161,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg if (pgno == 0) { tdbError("tdb/btree-open: pgno cannot be zero."); tdbOsFree(pBt); - ASSERT(0); + return TSDB_CODE_INTERNAL_ERROR; } pBt->root = pgno; /* @@ -418,10 +418,6 @@ static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2 int mlen; int cret; - if (ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL)) { - // -1 is less than - } - mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2; cret = memcmp(pKey1, pKey2, mlen); if (cret == 0) { @@ -571,14 +567,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx nOlds = 3; } for (int i = 0; i < nOlds; i++) { - if (ASSERT(sIdx + i <= nCells)) { + if (!(sIdx + i <= nCells)) { return TSDB_CODE_FAILED; } SPgno pgno; if (sIdx + i == nCells) { - if (ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pParent))) { - return TSDB_CODE_FAILED; + if (TDB_BTREE_PAGE_IS_LEAF(pParent)) { + return TSDB_CODE_INTERNAL_ERROR; } pgno = ((SIntHdr *)(pParent->pData))->pgno; } else { @@ -685,8 +681,6 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx // page is full, use a new page nNews++; - ASSERT(infoNews[nNews].size + cellBytes <= TDB_PAGE_USABLE_SIZE(pPage)); - if (childNotLeaf) { // for non-child page, this cell is used as the right-most child, // the divider cell to parent as well @@ -732,7 +726,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx szRCell = tdbBtreeCellSize(pPage, pCell, 0, NULL, NULL); } - if (ASSERT(infoNews[iNew - 1].cnt > 0)) { + if (!(infoNews[iNew - 1].cnt > 0)) { return TSDB_CODE_FAILED; } @@ -822,10 +816,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx pCell = tdbPageGetCell(pPage, oIdx); szCell = tdbBtreeCellSize(pPage, pCell, 0, NULL, NULL); - if (ASSERT(nNewCells <= infoNews[iNew].cnt)) { + if (!(nNewCells <= infoNews[iNew].cnt)) { return TSDB_CODE_FAILED; } - if (ASSERT(iNew < nNews)) { + if (!(iNew < nNews)) { return TSDB_CODE_FAILED; } @@ -866,10 +860,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx } } } else { - if (ASSERT(childNotLeaf)) { + if (!(childNotLeaf)) { return TSDB_CODE_FAILED; } - if (ASSERT(iNew < nNews - 1)) { + if (!(iNew < nNews - 1)) { return TSDB_CODE_FAILED; } @@ -877,7 +871,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx ((SIntHdr *)pNews[iNew]->pData)->pgno = ((SPgno *)pCell)[0]; // insert to parent as divider cell - if (ASSERT(iNew < nNews - 1)) { + if (!(iNew < nNews - 1)) { return TSDB_CODE_FAILED; } ((SPgno *)pCell)[0] = TDB_PAGE_PGNO(pNews[iNew]); @@ -894,7 +888,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx } if (childNotLeaf) { - if (ASSERT(TDB_PAGE_TOTAL_CELLS(pNews[nNews - 1]) == infoNews[nNews - 1].cnt)) { + if (!(TDB_PAGE_TOTAL_CELLS(pNews[nNews - 1]) == infoNews[nNews - 1].cnt)) { return TSDB_CODE_FAILED; } ((SIntHdr *)(pNews[nNews - 1]->pData))->pgno = rPgno; @@ -1091,7 +1085,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const nLeft -= kLen; // pack partial val to local if any space left if (nLocal > nHeader + kLen + sizeof(SPgno)) { - if (ASSERT(pVal != NULL && vLen != 0)) { + if (!(pVal != NULL && vLen != 0)) { tdbFree(pBuf); return TSDB_CODE_FAILED; } @@ -1259,14 +1253,14 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo int nPayload; int ret; - if (ASSERT(pPage->kLen == TDB_VARIANT_LEN || pPage->kLen == kLen)) { - return TSDB_CODE_FAILED; + if (!(pPage->kLen == TDB_VARIANT_LEN || pPage->kLen == kLen)) { + return TSDB_CODE_INVALID_PARA; } - if (ASSERT(pPage->vLen == TDB_VARIANT_LEN || pPage->vLen == vLen)) { - return TSDB_CODE_FAILED; + if (!(pPage->vLen == TDB_VARIANT_LEN || pPage->vLen == vLen)) { + return TSDB_CODE_INVALID_PARA; } - if (ASSERT(pKey != NULL && kLen > 0)) { - return TSDB_CODE_FAILED; + if (!(pKey != NULL && kLen > 0)) { + return TSDB_CODE_INVALID_PARA; } nPayload = 0; @@ -1645,7 +1639,6 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN * SArray *ofps = pPage->pPager->ofps; if (ofps) { if (taosArrayPush(ofps, &ofp) == NULL) { - ASSERT(0); return terrno; } } @@ -2438,7 +2431,10 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { lidx = 0; ridx = nCells - 1; - ASSERT(nCells > 0); + if (nCells <= 0) { + tdbError("tdb/btc-move-to: empty page."); + return TSDB_CODE_FAILED; + } // compare first cell pBtc->idx = lidx; diff --git a/source/libs/tdb/src/db/tdbPage.c b/source/libs/tdb/src/db/tdbPage.c index 26c1c108d2..eab8f6ef19 100644 --- a/source/libs/tdb/src/db/tdbPage.c +++ b/source/libs/tdb/src/db/tdbPage.c @@ -522,7 +522,9 @@ static int tdbPageDefragment(SPage *pPage) { SCell *pCell = TDB_PAGE_CELL_AT(pPage, aCellIdx[iCell].iCell); int32_t szCell = pPage->xCellSize(pPage, pCell, 0, NULL, NULL); - ASSERT(pNextCell - szCell >= pCell); + if (pNextCell - szCell < pCell) { + return TSDB_CODE_INTERNAL_ERROR; + } pNextCell -= szCell; if (pNextCell > pCell) { @@ -535,7 +537,11 @@ static int tdbPageDefragment(SPage *pPage) { TDB_PAGE_FCELL_SET(pPage, 0); tdbOsFree(aCellIdx); - ASSERT(pPage->pFreeEnd - pPage->pFreeStart == nFree); + if (pPage->pFreeEnd - pPage->pFreeStart != nFree) { + tdbError("tdb/page-defragment: nFree: %d, pFreeStart: %p, pFreeEnd: %p.", nFree, pPage->pFreeStart, + pPage->pFreeEnd); + return TSDB_CODE_INTERNAL_ERROR; + } return 0; } diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index a650847e1e..b0f32136a3 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -16,19 +16,7 @@ #include "crypt.h" #include "tdbInt.h" #include "tglobal.h" -/* -#pragma pack(push, 1) -typedef struct { - u8 hdrString[16]; - u16 pageSize; - SPgno freePage; - u32 nFreePages; - u8 reserved[102]; -} SFileHdr; -#pragma pack(pop) -TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct"); -*/ struct hashset_st { size_t nbits; size_t mask; @@ -450,7 +438,6 @@ static char *tdbEncryptPage(SPager *pPager, char *pPageData, int32_t pageSize, c if (encryptAlgorithm == DND_CA_SM4) { // tdbInfo("CBC_Encrypt key:%d %s %s", encryptAlgorithm, encryptKey, __FUNCTION__); - // ASSERT(strlen(encryptKey) > 0); // tdbInfo("CBC tdb offset:%" PRId64 ", flag:%d before Encrypt", offset, pPage->pData[0]); @@ -915,7 +902,6 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage if (encryptAlgorithm == DND_CA_SM4) { // tdbInfo("CBC_Decrypt key:%d %s %s", encryptAlgorithm, encryptKey, __FUNCTION__); - // ASSERT(strlen(encryptKey) > 0); // uint8_t flags = pPage->pData[0]; // tdbInfo("CBC tdb offset:%" PRId64 ", flag:%d before Decrypt", ((i64)pPage->pageSize) * (pgno - 1), flags); diff --git a/source/libs/tdb/src/db/tdbTxn.c b/source/libs/tdb/src/db/tdbTxn.c index 24a70f62b2..71560e3e85 100644 --- a/source/libs/tdb/src/db/tdbTxn.c +++ b/source/libs/tdb/src/db/tdbTxn.c @@ -40,7 +40,6 @@ int tdbTxnCloseImpl(TXN *pTxn) { if (pTxn->jfd) { TAOS_UNUSED(tdbOsClose(pTxn->jfd)); - ASSERT(pTxn->jfd == NULL); } tdbOsFree(pTxn); diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index 605fe6a1a4..7e97be962b 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -319,7 +319,6 @@ static inline int tdbTryLockPage(tdb_spinlock_t *pLock) { } else if (ret == EBUSY) { return P_LOCK_BUSY; } else { - ASSERT(0); return P_LOCK_FAIL; } } @@ -354,7 +353,10 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) { int iOvfl; int lidx; - ASSERT(idx >= 0 && idx < TDB_PAGE_TOTAL_CELLS(pPage)); + if (idx < 0 || idx >= TDB_PAGE_TOTAL_CELLS(pPage)) { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } iOvfl = 0; for (; iOvfl < pPage->nOverflow; iOvfl++) { @@ -367,7 +369,6 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) { } lidx = idx - iOvfl; - ASSERT(lidx >= 0 && lidx < pPage->pPageMethods->getCellNum(pPage)); pCell = pPage->pData + pPage->pPageMethods->getCellOffset(pPage, lidx); return pCell; diff --git a/source/libs/tdb/src/inc/tdbUtil.h b/source/libs/tdb/src/inc/tdbUtil.h index 4382513f73..22468e6579 100644 --- a/source/libs/tdb/src/inc/tdbUtil.h +++ b/source/libs/tdb/src/inc/tdbUtil.h @@ -22,12 +22,6 @@ extern "C" { #endif -#if __STDC_VERSION__ >= 201112LL -#define TDB_STATIC_ASSERT(op, info) static_assert(op, info) -#else -#define TDB_STATIC_ASSERT(op, info) -#endif - #define TDB_ROUND8(x) (((x) + 7) & ~7) int tdbGnrtFileID(tdb_fd_t fd, uint8_t *fileid, bool unique); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b307c4ac4b..1d2e2357d1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -111,6 +111,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connec TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_PREPROCESSED, "Message has been processed in preprocess") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_BUFFER, "Out of buffer") +TAOS_DEFINE_ERROR(TSDB_CODE_INTERNAL_ERROR, "Internal error") //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")