diff --git a/include/util/tutil.h b/include/util/tutil.h index 1ee3bb0e83..6c7517f630 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -139,6 +139,13 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, #define QUERY_CHECK_CODE TSDB_CHECK_CODE +#define QUERY_CHECK_CONDITION(condition, CODE, LINO, LABEL, ERRNO) \ + if (!condition) { \ + (CODE) = (ERRNO); \ + (LINO) = __LINE__; \ + goto LABEL; \ + } + #define TSDB_CHECK_NULL(ptr, CODE, LINO, LABEL, ERRNO) \ if ((ptr) == NULL) { \ (CODE) = (ERRNO); \ diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b1c9207ab7..833371cfc5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -158,7 +158,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SStreamScanInfo* pInfo = pOperator->info; qDebug("s-task:%s in this batch, %d blocks need to be processed", id, (int32_t)numOfBlocks); - ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); + QUERY_CHECK_CONDITION((pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); if (type == STREAM_INPUT__MERGED_SUBMIT) { for (int32_t i = 0; i < numOfBlocks; i++) { @@ -189,7 +190,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pInfo->blockType = STREAM_INPUT__CHECKPOINT; } else { - ASSERT(0); + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); } return TSDB_CODE_SUCCESS; @@ -543,7 +545,9 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table int32_t* tversion, int32_t idx, bool* tbGet) { *tbGet = false; - ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); + if (tinfo == NULL || dbName == NULL || tableName == NULL) { + return TSDB_CODE_INVALID_PARA; + } SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) { @@ -722,7 +726,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo blockIndex += 1; current += p->info.rows; - ASSERT(p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT); + QUERY_CHECK_CONDITION((p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); void* tmp = taosArrayPush(pResList, &p); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); @@ -987,15 +992,17 @@ void qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { *scanner = pOperator->info; break; } else { - ASSERT(pOperator->numOfDownstream == 1); pOperator = pOperator->pDownstream[0]; } } } int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo; @@ -1007,12 +1014,19 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan ", window:%" PRId64 " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); - return 0; +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo; @@ -1024,14 +1038,26 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan "-%" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); - return 0; +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; - return 0; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { @@ -1046,9 +1072,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - qInfo("save stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); pSup->calTriggerSaved = pSup->calTrigger; @@ -1063,9 +1086,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - qInfo("save stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); pSup->calTriggerSaved = pSup->calTrigger; @@ -1078,9 +1098,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); pSup->calTriggerSaved = pSup->calTrigger; @@ -1093,9 +1110,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SStreamEventAggOperatorInfo* pInfo = pOperator->info; STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); pSup->calTriggerSaved = pSup->calTrigger; @@ -1108,9 +1122,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SStreamCountAggOperatorInfo* pInfo = pOperator->info; STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); pSup->calTriggerSaved = pSup->calTrigger; @@ -1126,7 +1137,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) { if (pOperator->numOfDownstream > 1) { qError("unexpected stream, multiple downstream"); - ASSERT(0); return -1; } return 0; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index cbb124b9e0..031ffbb50e 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -163,7 +163,11 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR return NULL; } - ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); + if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) { + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + pTaskInfo->code = terrno; + return NULL; + } } } else { // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the @@ -176,7 +180,11 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR return NULL; } - ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); + if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) { + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + pTaskInfo->code = terrno; + return NULL; + } } } @@ -324,6 +332,7 @@ _end: static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SqlFunctionCtx* pCtx = pExprSup->pCtx; for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { @@ -363,7 +372,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int if (hasPk && (j == pkParamIdx)) { pInput->pPrimaryKey = pInput->pData[j]; } - ASSERT(pInput->pData[j] != NULL); + QUERY_CHECK_CONDITION((pInput->pData[j] != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { // todo avoid case: top(k, 12), 12 is the value parameter. // sum(11), 11 is also the value parameter. @@ -374,14 +383,16 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int pInput->blankFill = pBlock->info.blankFill; code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + QUERY_CHECK_CODE(code, lino, _end); } } } } +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 5ece57cad1..fe9d0d3cf0 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -55,7 +55,6 @@ typedef struct SFillOperatorInfo { SExprSupp noFillExprSupp; } SFillOperatorInfo; -static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order); static void destroyFillOperatorInfo(void* param); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo); @@ -168,56 +167,6 @@ _end: return code; } -// todo refactor: decide the start key according to the query time range. -static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order) { - if (order == TSDB_ORDER_ASC) { - int64_t skey = pBlock->info.window.skey; - if (skey < pInfo->pFillInfo->start) { // the start key may be smaller than the - ASSERT(taosFillNotStarted(pInfo->pFillInfo)); - taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey); - } else if (pInfo->pFillInfo->start < skey) { - int64_t t = skey; - SInterval* pInterval = &pInfo->pFillInfo->interval; - - while (1) { - int64_t prev = taosTimeAdd(t, -pInterval->sliding, pInterval->slidingUnit, pInterval->precision); - if (prev <= pInfo->pFillInfo->start) { - t = prev; - break; - } - t = prev; - } - - // todo time window chosen problem: t or prev value? - taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t); - } - } else { - int64_t ekey = pBlock->info.window.ekey; - if (ekey > pInfo->pFillInfo->start) { - ASSERT(taosFillNotStarted(pInfo->pFillInfo)); - taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, ekey); - } else if (ekey < pInfo->pFillInfo->start) { - int64_t t = ekey; - SInterval* pInterval = &pInfo->pFillInfo->interval; - int64_t prev = t; - while (1) { - int64_t next = taosTimeAdd(t, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); - if (next >= pInfo->pFillInfo->start) { - prev = t; - t = next; - break; - } - prev = t; - t = next; - } - - // todo time window chosen problem: t or next value? - if (t > pInfo->pFillInfo->start) t = prev; - taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t); - } - } -} - static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -602,7 +551,6 @@ static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) { } pInfo->win.ekey = ekey; } else { - assert(order == TSDB_ORDER_DESC); skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval); next = skey; while (next < pInfo->win.skey) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b3655e16f2..04aaa3c210 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1241,14 +1241,11 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { taosRUnLockLatch(&pTaskInfo->lock); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(pInfo->base.dataReader == NULL); + QUERY_CHECK_CONDITION((pInfo->base.dataReader == NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->pIgnoreTables); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); - } + QUERY_CHECK_CODE(code, lino, _end); if (pInfo->filesetDelimited) { pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader); @@ -1590,7 +1587,6 @@ static bool isCountWindow(SStreamScanInfo* pInfo) { static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); uint64_t* groupCol = (uint64_t*)pColInfo->pData; - ASSERT(rowIndex < pBlock->info.rows); pInfo->groupId = groupCol[rowIndex]; } @@ -1659,9 +1655,8 @@ _end: bool comparePrimaryKey(SColumnInfoData* pCol, int32_t rowId, void* pVal) { // coverity scan - ASSERTS(pVal != NULL, "pVal should not be NULL"); - if (!pVal) { - qError("failed to compare primary key, since primary key is null"); + if (!pVal || !pCol) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_INVALID_PARA)); return false; } void* pData = colDataGetData(pCol, rowId); @@ -1738,7 +1733,6 @@ static void prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ goto _end; } - ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3); SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startData = (TSKEY*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -1770,27 +1764,22 @@ static void prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ win.skey = TMIN(win.skey, startData[*pRowIndex]); continue; } - - ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) || - !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0))); break; } STableScanInfo* pTScanInfo = pInfo->pTableScanOp->info; // coverity scan - ASSERTS(pInfo->pUpdateInfo != NULL, "Failed to set data version, since pInfo->pUpdateInfo is NULL"); - if (pInfo->pUpdateInfo) { - qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey, - pInfo->pUpdateInfo->maxDataVersion); - resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion); + if (pInfo->pUpdateInfo == NULL){ + qError("Failed to set data version, since pInfo->pUpdateInfo is NULL"); + return; } + qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey, + pInfo->pUpdateInfo->maxDataVersion); + resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion); pInfo->pTableScanOp->status = OP_OPENED; if (pRes) { (*pRes) = true; } - -_end: - qTrace("%s success", __func__); } static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval, @@ -2198,7 +2187,6 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS } uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; - ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; int64_t ver = pSrcBlock->info.version - 1; @@ -2296,7 +2284,6 @@ static int32_t generatePartitionDelResBlock(SStreamScanInfo* pInfo, SSDataBlock* SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData; - ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; int64_t ver = pSrcBlock->info.version - 1; @@ -2357,7 +2344,6 @@ static int32_t generateDeleteResultBlockImpl(SStreamScanInfo* pInfo, SSDataBlock pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); } - ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; int64_t ver = pSrcBlock->info.version - 1; @@ -2477,7 +2463,6 @@ static int32_t checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBl QUERY_CHECK_CODE(code, lino, _end); } SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); - ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; SColumnInfoData* pPkColDataInfo = NULL; if (hasPrimaryKeyCol(pInfo)) { @@ -2554,7 +2539,7 @@ static int32_t doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STi if (pWindow->skey != INT64_MIN) { qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); - ASSERT(pCol->pData != NULL); + QUERY_CHECK_NULL(pCol->pData, code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); for (int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t* ts = (int64_t*)colDataGetData(pCol, i); p[i] = (*ts >= pWindow->skey); @@ -2603,7 +2588,8 @@ static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* of SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1); qDebug("doBlockDataWindowFilter primary key, ts:%" PRId64 " %" PRId64, offset->ts, offset->primaryKey.val); - ASSERT(pColPk->info.type == offset->primaryKey.type); + QUERY_CHECK_CONDITION((pColPk->info.type == offset->primaryKey.type), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); __compar_fn_t func = getComparFunc(pColPk->info.type, 0); for (int32_t i = 0; i < pBlock->info.rows; ++i) { @@ -3945,7 +3931,9 @@ void streamScanReloadState(SOperatorInfo* pOperator) { pInfo->stateStore.windowSBfDelete(pInfo->pUpdateInfo, 1); code = pInfo->stateStore.windowSBfAdd(pInfo->pUpdateInfo, 1); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(pInfo->pUpdateInfo->minTS > pUpInfo->minTS); + + QUERY_CHECK_CONDITION((pInfo->pUpdateInfo->minTS > pUpInfo->minTS), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion); SHashObj* curMap = pInfo->pUpdateInfo->pMap; void* pIte = taosHashIterate(curMap, NULL); @@ -4105,12 +4093,11 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } if (pHandle->initTqReader) { - ASSERT(pHandle->tqReader == NULL); pInfo->tqReader = pAPI->tqReaderFn.tqReaderOpen(pHandle->vnode); - ASSERT(pInfo->tqReader); + QUERY_CHECK_NULL(pInfo->tqReader, code, lino, _error, terrno); } else { - ASSERT(pHandle->tqReader); pInfo->tqReader = pHandle->tqReader; + QUERY_CHECK_NULL(pInfo->tqReader, code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } pInfo->pUpdateInfo = NULL; @@ -5887,7 +5874,10 @@ void destroyTableMergeScanOperatorInfo(void* param) { } int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { - ASSERT(pOptr != NULL); + if (pOptr == NULL) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_INVALID_PARA)); + return TSDB_CODE_INVALID_PARA; + } // TODO: merge these two info into one struct STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo)); if (!execInfo) { @@ -6200,7 +6190,7 @@ int32_t fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, ch int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (pSupp->dbNameSlotId != -1) { - ASSERT(strlen(dbName)); + QUERY_CHECK_CONDITION((strlen(dbName) > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId); QUERY_CHECK_NULL(colInfoData, code, lino, _end, terrno); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 36f9ac0954..cb15c3c836 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -735,7 +735,12 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { } // beginSortGroup would fetch all child blocks of pInfo->currGroupId; - ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP); + if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + pOperator->pTaskInfo->code = code; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pInfo, &pBlock); if (pBlock != NULL && (code == 0)) { diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 44a383772d..b4f8d6837a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -88,7 +88,7 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group winCode = TSDB_CODE_FAILED; } else if (pBuffInfo->winBuffOp == MOVE_NEXT_WINDOW) { - ASSERT(pBuffInfo->pCur); + QUERY_CHECK_NULL(pBuffInfo->pCur, code, lino, _end, terrno); pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pBuffInfo->pCur); winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin, (void**)&pCurWin->winInfo.pStatePos, &size); @@ -345,7 +345,6 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl if (slidingRows + winRows > pAggSup->windowSliding) { buffInfo.winBuffOp = CREATE_NEW_WINDOW; winRows = pAggSup->windowSliding - slidingRows; - ASSERT(i >= 0); } } else { buffInfo.winBuffOp = MOVE_NEXT_WINDOW; @@ -690,7 +689,10 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_CODE(code, lino, _end); continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index ff1ff579fc..38a813bf33 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -378,7 +378,6 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild, &winRows); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(winRows >= 1); if (rebuild) { uint64_t uid = 0; code = appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey, @@ -660,7 +659,10 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_CODE(code, lino, _end); continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -779,7 +781,6 @@ void streamEventReloadState(SOperatorInfo* pOperator) { int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); qDebug("===stream=== event window operator reload state. get result count:%d", num); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; - ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 75b15dbea4..fac1cf48c7 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -331,7 +331,7 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS pFillInfo->pLinearInfo->winIndex = 0; } break; default: - ASSERT(0); + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR)); break; } } @@ -419,7 +419,6 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS pFillSup->next.pRowVal = pFillSup->cur.pRowVal; pFillInfo->preRowKey = INT64_MIN; } else { - ASSERT(hasNextWindow(pFillSup)); setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; } @@ -447,7 +446,6 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pLinearInfo->hasNext = false; } else { - ASSERT(hasNextWindow(pFillSup)); setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; pFillInfo->pLinearInfo->nextEnd = INT64_MIN; @@ -458,10 +456,9 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS } } break; default: - ASSERT(0); + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR)); break; } - ASSERT(pFillInfo->pos != FILL_POS_INVALID); } static int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) { @@ -628,7 +625,9 @@ static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SRes SWinKey key = {.groupId = groupId, .ts = pRow->key}; int32_t code = pAPI->stateStore.streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len); qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code); - ASSERT(code == TSDB_CODE_SUCCESS); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } } static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* pFillSup, SSDataBlock* pRes) { @@ -795,7 +794,6 @@ static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint if (winCode != TSDB_CODE_SUCCESS) { colDataSetNULL(pTableCol, pBlock->info.rows); } else { - ASSERT(tbname); char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false); @@ -1125,7 +1123,7 @@ static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { return code; } break; default: - ASSERTS(false, "invalid SSDataBlock type"); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5c12db1ab9..071c82c970 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -213,7 +213,6 @@ static void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) { } bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) { - ASSERTS(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0, "maxts should greater than 0"); return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark; } @@ -415,7 +414,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin code = appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL); QUERY_CHECK_CODE(code, lino, _end); } else { - ASSERT(tbname); + QUERY_CHECK_CONDITION((tbname), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); code = appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName); @@ -916,7 +915,6 @@ void buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBl } if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { - ASSERT(pBlock->info.rows > 0); break; } pGroupResInfo->index += 1; @@ -1366,7 +1364,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t winCode = TSDB_CODE_SUCCESS; code = pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize, &winCode); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(winCode == TSDB_CODE_SUCCESS); + QUERY_CHECK_CONDITION((winCode == TSDB_CODE_SUCCESS), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); code = tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES); QUERY_CHECK_CODE(code, lino, _end); @@ -1694,7 +1692,10 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc } else if (IS_FINAL_INTERVAL_OP(pOperator) && pBlock->info.type == STREAM_MID_RETRIEVE) { continue; } else { - ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -1883,7 +1884,6 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN .deleteMarkSaved = 0, .calTriggerSaved = 0, }; - ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); @@ -2090,7 +2090,6 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pSup->pCtx[i].saveHandle.pBuf = NULL; } - ASSERT(numOfCols > 0); return TSDB_CODE_SUCCESS; } @@ -2391,7 +2390,6 @@ _end: static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) { - ASSERT(pWinInfo->sessionWin.win.skey <= pWinInfo->sessionWin.win.ekey); *pResult = (SResultRow*)pWinInfo->pStatePos->pRowBuff; // set time window for current result (*pResult)->win = pWinInfo->sessionWin.win; @@ -3022,7 +3020,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa } if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { - ASSERT(pBlock->info.rows > 0); break; } @@ -3262,7 +3259,7 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe code = pAggSup->stateStore.streamStateSessionAddIfNotExist( pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(winCode == TSDB_CODE_SUCCESS); + QUERY_CHECK_CONDITION((winCode == TSDB_CODE_SUCCESS), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); code = @@ -3276,7 +3273,6 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe // 3.pChildren int32_t size = 0; buf = taosDecodeFixedI32(buf, &size); - ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); code = doStreamSessionDecodeOpState(buf, 0, pChOp, false, &buf); @@ -3447,7 +3443,10 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -3621,7 +3620,6 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; - ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; code = getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo); @@ -3667,7 +3665,6 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; - ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); @@ -3990,7 +3987,10 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock* doStreamSessionSaveCheckpoint(pOperator); continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -4205,7 +4205,8 @@ int32_t getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, pCurWin->winInfo.sessionWin.win.ekey = pKey->win.ekey; code = getSessionWindowInfoByKey(pAggSup, pKey, &pCurWin->winInfo); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(IS_VALID_SESSION_WIN(pCurWin->winInfo)); + QUERY_CHECK_CONDITION((IS_VALID_SESSION_WIN(pCurWin->winInfo)), code, lino, _end, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); pCurWin->pStateKey = (SStateKeys*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize)); @@ -4575,7 +4576,6 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera // 3.pChildren int32_t size = 0; buf = taosDecodeFixedI32(buf, &size); - ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); code = doStreamStateDecodeOpState(buf, 0, pChOp, false, &buf); @@ -4717,7 +4717,10 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -4829,7 +4832,6 @@ void streamStateReloadState(SOperatorInfo* pOperator) { int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); qDebug("===stream=== reload state. get result count:%d", num); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; - ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); @@ -5135,7 +5137,10 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p continue; } else { - ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) { @@ -5254,8 +5259,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* .minTs = INT64_MAX, .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)}; - ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); - pOperator->pTaskInfo = pTaskInfo; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; @@ -5635,7 +5638,6 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* } else { pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; } - ASSERT(taosArrayGetSize(pInfo->pUpdated) == 0); (*ppRes) = pInfo->pDelRes; return code; } @@ -5684,7 +5686,10 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* pInfo->clearState = true; break; } else { - ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + if (pBlock->info.type != STREAM_INVALID) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } } if (pInfo->scalarSupp.pExprInfo != NULL) { diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 258f886805..f48393273f 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -114,7 +114,6 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); char* p = colDataGetData(pColInfoData, rowIndex); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - ASSERT(varDataTLen(p) <= pColInfoData->info.bytes); memcpy(pLinearInfo->start.val, p, varDataTLen(p)); } else { memcpy(pLinearInfo->start.val, p, pLinearInfo->bytes); @@ -127,7 +126,6 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo char* p = colDataGetData(pColInfoData, rowIndex); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - ASSERT(varDataTLen(p) <= pColInfoData->info.bytes); memcpy(pLinearInfo->end.val, p, varDataTLen(p)); } else { memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes); @@ -143,7 +141,6 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo char* p = colDataGetData(pColInfoData, rowIndex); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - ASSERT(varDataTLen(p) <= pColInfoData->info.bytes); memcpy(pLinearInfo->end.val, p, varDataTLen(p)); } else { memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6a74c6a093..91583d03ed 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -136,7 +136,6 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn // } } - ASSERT(forwardRows >= 0); return forwardRows; } @@ -211,8 +210,6 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) { - ASSERT(startPos >= 0 && startPos < pDataBlockInfo->rows); - int32_t num = -1; int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); @@ -259,8 +256,6 @@ void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY pr SFunctParam* pParam = &pCtx[k].param[0]; SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId); - ASSERT(pColInfo->info.type == pParam->pCol->type && curTs != windowKey); - double v1 = 0, v2 = 0, v = 0; if (prevRowIndex == -1) { SGroupKeys* p = taosArrayGet(pPrevValues, index); @@ -356,9 +351,11 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i return true; } -static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex, +static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex, int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols, - TSKEY blockEkey, STimeWindow* win) { + TSKEY blockEkey, STimeWindow* win, bool* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; int32_t order = pInfo->binfo.inputTsOrder; TSKEY actualEndKey = tsCols[endRowIndex]; @@ -367,21 +364,27 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx // not ended in current data block, do not invoke interpolation if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) { setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP); - return false; + (*pRes) = false; + return code; } // there is actual end point of current time window, no interpolation needs if (key == actualEndKey) { setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP); - return true; + (*pRes) = true; + return code; } - ASSERT(nextRowIndex >= 0); + if (nextRowIndex < 0) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR)); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } TSKEY nextKey = tsCols[nextRowIndex]; doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key, RESULT_ROW_END_INTERP, pSup); - return true; + (*pRes) = true; + return code; } bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) { @@ -437,13 +440,7 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl * This time window does not cover any data, try next time window, * this case may happen when the time window is too small */ - if (primaryKeys == NULL) { - if (ascQuery) { - ASSERT(pDataBlockInfo->window.skey <= pNext->ekey); - } else { - ASSERT(pDataBlockInfo->window.ekey >= pNext->skey); - } - } else { + if (primaryKeys != NULL) { if (ascQuery && primaryKeys[startPos] > pNext->ekey) { TSKEY next = primaryKeys[startPos]; if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { @@ -469,7 +466,6 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl } static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) { - ASSERT(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); if (type == RESULT_ROW_START_INTERP) { return pResult->startInterp == true; } else { @@ -485,15 +481,21 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { } } -static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult, - STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) { +static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult, + STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (!pInfo->timeWindowInterpo) { - return; + return code; + } + + if (pBlock == NULL) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return code; } - ASSERT(pBlock != NULL); if (pBlock->pDataBlock == NULL) { - return; + return code; } SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); @@ -528,14 +530,22 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB } TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; - bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, - endKey, win); + bool interp = false; + code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, + endKey, win, &interp); + QUERY_CHECK_CODE(code, lino, _end); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } } else { setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP); } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) { @@ -558,7 +568,6 @@ static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, S char* val = colDataGetData(pColInfo, i); if (IS_VAR_DATA_TYPE(pkey->type)) { memcpy(pkey->pData, val, varDataTLen(val)); - ASSERT(varDataTLen(val) <= pkey->bytes); } else { memcpy(pkey->pData, val, pkey->bytes); } @@ -594,11 +603,17 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num T_LONG_JMP(pTaskInfo->env, terrno); } - ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId); + if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) { + pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + T_LONG_JMP(pTaskInfo->env, terrno); + } if (pr->closed) { - ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) && - isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)); + if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) && + isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)) ) { + pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + T_LONG_JMP(pTaskInfo->env, terrno); + } SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow); taosMemoryFree(pNode); continue; @@ -611,7 +626,10 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } - ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); + if(isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { + pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + T_LONG_JMP(pTaskInfo->env, terrno); + } SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0); if (!pTsKey) { @@ -869,7 +887,10 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo } tsCols = (int64_t*)pColDataInfo->pData; - ASSERT(tsCols[0] != 0); + if(tsCols[0] == 0) { + pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + T_LONG_JMP(pTaskInfo->env, terrno); + } // no data in primary ts if (tsCols[0] == 0 && tsCols[pBlock->info.rows - 1] == 0) { @@ -1959,7 +1980,10 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { } else { if (pMiaInfo->groupId != pBlock->info.id.groupId) { // if there are unclosed time window, close it firstly. - ASSERT(pMiaInfo->curTs != INT64_MIN); + if (pMiaInfo->curTs == INT64_MIN) { + pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + T_LONG_JMP(pTaskInfo->env, terrno); + } finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo); resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow)); @@ -2216,7 +2240,9 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* TSKEY ekey = ascScan ? win.ekey : win.skey; int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->binfo.inputTsOrder); - ASSERT(forwardRows > 0); + if(forwardRows <= 0) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } // prev time window not interpolation yet. if (iaInfo->timeWindowInterpo) { diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 50a587e353..5eb55467b1 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -282,7 +282,6 @@ _end: int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen, int32_t* pWinCode) { SWinKey* pTmpkey = pKey; - ASSERT(keyLen == sizeof(SWinKey)); SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts}; return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode); } @@ -455,7 +454,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream SSessionKey pTmpKey = *pWinKey; int32_t winCode = TSDB_CODE_SUCCESS; code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen, &winCode); - ASSERT(winCode == TSDB_CODE_FAILED); + QUERY_CHECK_CONDITION((winCode == TSDB_CODE_FAILED), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); QUERY_CHECK_CODE(code, lino, _end); goto _end; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 6a2c85323a..bc2253d4bd 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -564,7 +564,7 @@ _end: int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - ASSERT(pInfo); + QUERY_CHECK_NULL(pInfo, code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); if (tStartDecode(&decoder) < 0) return -1; @@ -572,6 +572,8 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { int32_t size = 0; if (tDecodeI32(&decoder, &size) < 0) return -1; pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); + QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno); + TSKEY ts = INT64_MIN; for (int32_t i = 0; i < size; i++) { if (tDecodeI64(&decoder, &ts) < 0) return -1; @@ -623,7 +625,8 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { code = taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize); QUERY_CHECK_CODE(code, lino, _error); } - ASSERT(mapSize == taosHashGetSize(pInfo->pMap)); + QUERY_CHECK_CONDITION((mapSize == taosHashGetSize(pInfo->pMap)), code, lino, _error, + TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1; if (tDecodeI32(&decoder, &pInfo->pkColLen) < 0) return -1; @@ -634,7 +637,6 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { if (pInfo->pkColLen != 0) { pInfo->comparePkRowFn = compareKeyTsAndPk; pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC); - ; } else { pInfo->comparePkRowFn = compareKeyTs; pInfo->comparePkCol = NULL; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 3cdbad2dd5..cbaaec0964 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -125,7 +125,6 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { (*pVal) = taosMemoryCalloc(1, *pLen); void* buff = *pVal; int32_t tmp = taosEncodeFixedI64(&buff, *pKey); - ASSERT(tmp == sizeof(TSKEY)); } SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, @@ -204,7 +203,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ int32_t len = 0; int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len); if (tmpRes == TSDB_CODE_SUCCESS) { - ASSERT(len == sizeof(TSKEY)); + QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); streamFileStateDecode(&pFileState->flushMark, valBuf, len); qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark); } @@ -361,7 +360,7 @@ int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; if (pPos->beUsed == used) { if (used && !pPos->pRowBuff) { - ASSERT(pPos->needFree == true); + QUERY_CHECK_CONDITION((pPos->needFree == true), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); continue; } code = tdListAppend(pFlushList, &pPos); @@ -496,13 +495,13 @@ _end: code = tdListAppend(pFileState->usedBuffs, &pPos); QUERY_CHECK_CODE(code, lino, _error); + QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return NULL; } - ASSERT(pPos->pRowBuff != NULL); return pPos; } @@ -636,7 +635,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** QUERY_CHECK_CODE(code, lino, _end); pPos->pRowBuff = getFreeBuff(pFileState); } - ASSERT(pPos->pRowBuff); + QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } code = recoverSessionRowBuff(pFileState, pPos); @@ -877,7 +876,10 @@ void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { taosMemoryFreeClear(pVal); break; } - ASSERT(vlen == pFileState->rowSize); + if (vlen != pFileState->rowSize) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true;