From 75727e47b572c660a195e91a964dcbf72662c283 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 9 Sep 2024 17:58:40 +0800 Subject: [PATCH] remove assert --- source/libs/executor/src/executor.c | 3 --- .../executor/src/streamtimesliceoperator.c | 27 +++++++++++-------- source/libs/stream/src/tstreamFileState.c | 2 +- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index cc2ddf4225..0191f68a85 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1156,9 +1156,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); pSup->calTriggerSaved = pSup->calTrigger; diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 87a0afa917..ba7e4cfcd4 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -86,7 +86,7 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) { int32_t num = (size - sizeof(TSKEY)) / sizeof(SWinKey); qDebug("===stream=== time slice operator reload state. get result count:%d", num); SWinKey* pKeyBuf = (SWinKey*)pBuf; - ASSERT(size == num * sizeof(SWinKey) + sizeof(TSKEY)); + QUERY_CHECK_CONDITION(size == num * sizeof(SWinKey) + sizeof(TSKEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); @@ -665,7 +665,7 @@ static int32_t getLinearResultInfoFromState(SStreamAggSupporter* pAggSup, SStrea (void**)&pPrevPoint->pResPos, &preVLen, &tmpRes); QUERY_CHECK_CODE(code, lino, _end); if (tmpRes == TSDB_CODE_SUCCESS) { - ASSERT(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts)); + QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); setPointBuff(pPrevPoint, pFillSup); if (HAS_ROW_DATA(pPrevPoint->pRightRow)) { pFillSup->prev.key = pPrevPoint->pRightRow->key; @@ -693,7 +693,7 @@ static int32_t getLinearResultInfoFromState(SStreamAggSupporter* pAggSup, SStrea (void**)&pNextPoint->pResPos, &nextVLen, &tmpRes); QUERY_CHECK_CODE(code, lino, _end); if (tmpRes == TSDB_CODE_SUCCESS) { - ASSERT(!IS_INVALID_WIN_KEY(pNextPoint->key.ts)); + QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pNextPoint->key.ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); setPointBuff(pNextPoint, pFillSup); if (HAS_ROW_DATA(pNextPoint->pLeftRow)) { pFillSup->next.key = pNextPoint->pLeftRow->key; @@ -750,7 +750,7 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS (void**)&pPrevPoint->pResPos, &preVLen, &tmpRes); QUERY_CHECK_CODE(code, lino, _end); if (tmpRes == TSDB_CODE_SUCCESS) { - ASSERT(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts)); + QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); setPointBuff(pPrevPoint, pFillSup); if (HAS_ROW_DATA(pPrevPoint->pRightRow)) { pFillSup->prev.key = pPrevPoint->pRightRow->key; @@ -768,7 +768,7 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS (void**)&pNextPoint->pResPos, &nextVLen, &tmpRes); QUERY_CHECK_CODE(code, lino, _end); if (tmpRes == TSDB_CODE_SUCCESS) { - ASSERT(!IS_INVALID_WIN_KEY(pNextPoint->key.ts)); + QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pNextPoint->key.ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); setPointBuff(pNextPoint, pFillSup); if (HAS_ROW_DATA(pNextPoint->pLeftRow)) { pFillSup->next.key = pNextPoint->pLeftRow->key; @@ -977,7 +977,7 @@ static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamF pFillInfo->pResRow = &pFillSup->prev; } break; default: - ASSERT(0); + qError("%s failed at line %d since invalid fill type", __func__, __LINE__); break; } @@ -988,6 +988,8 @@ _end: } static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (!hasNextWindow(pFillSup) && !hasPrevWindow(pFillSup)) { pFillInfo->needFill = false; pFillInfo->pos = FILL_POS_START; @@ -1039,7 +1041,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillSup->prev.key = ts; pFillSup->prev.pRowVal = pFillSup->cur.pRowVal; } else { - ASSERT(hasPrevWindow(pFillSup)); + QUERY_CHECK_CONDITION(hasPrevWindow(pFillSup), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; } @@ -1087,7 +1089,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pLinearInfo->hasNext = false; } else { - ASSERT(hasNextWindow(pFillSup)); + QUERY_CHECK_CONDITION(hasNextWindow(pFillSup), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); @@ -1099,7 +1101,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo } } break; default: - ASSERT(0); + qError("%s failed at line %d since invalid fill type", __func__, __LINE__); break; } @@ -1107,6 +1109,9 @@ _end: if (ts != pFillSup->cur.key) { pFillInfo->pos = FILL_POS_INVALID; } + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t fillType) { @@ -1145,7 +1150,7 @@ static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t } } break; default: - ASSERT(0); + qError("%s failed at line %d since invalid fill type", __func__, __LINE__); } return false; } @@ -1276,7 +1281,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) TSDB_ORDER_ASC); startPos += numOfWin; int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull); - ASSERT(leftRowId >= 0); + QUERY_CHECK_CONDITION((leftRowId >= 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); left = needAdjustValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type); if (left) { transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 1fe7bd46eb..aaa0f0242d 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -1016,7 +1016,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { taosMemoryFreeClear(pVal); break; } - ASSERT(vlen == pFileState->rowSize); + memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true;