remove assert

This commit is contained in:
54liuyao 2024-09-09 17:58:40 +08:00
parent 7eef4f9712
commit 75727e47b5
3 changed files with 17 additions and 15 deletions

View File

@ -1156,9 +1156,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup; STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger; pSup->calTriggerSaved = pSup->calTrigger;

View File

@ -86,7 +86,7 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) {
int32_t num = (size - sizeof(TSKEY)) / sizeof(SWinKey); int32_t num = (size - sizeof(TSKEY)) / sizeof(SWinKey);
qDebug("===stream=== time slice operator reload state. get result count:%d", num); qDebug("===stream=== time slice operator reload state. get result count:%d", num);
SWinKey* pKeyBuf = (SWinKey*)pBuf; SWinKey* pKeyBuf = (SWinKey*)pBuf;
ASSERT(size == num * sizeof(SWinKey) + sizeof(TSKEY)); QUERY_CHECK_CONDITION(size == num * sizeof(SWinKey) + sizeof(TSKEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY));
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
@ -665,7 +665,7 @@ static int32_t getLinearResultInfoFromState(SStreamAggSupporter* pAggSup, SStrea
(void**)&pPrevPoint->pResPos, &preVLen, &tmpRes); (void**)&pPrevPoint->pResPos, &preVLen, &tmpRes);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (tmpRes == TSDB_CODE_SUCCESS) { if (tmpRes == TSDB_CODE_SUCCESS) {
ASSERT(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts)); QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
setPointBuff(pPrevPoint, pFillSup); setPointBuff(pPrevPoint, pFillSup);
if (HAS_ROW_DATA(pPrevPoint->pRightRow)) { if (HAS_ROW_DATA(pPrevPoint->pRightRow)) {
pFillSup->prev.key = pPrevPoint->pRightRow->key; pFillSup->prev.key = pPrevPoint->pRightRow->key;
@ -693,7 +693,7 @@ static int32_t getLinearResultInfoFromState(SStreamAggSupporter* pAggSup, SStrea
(void**)&pNextPoint->pResPos, &nextVLen, &tmpRes); (void**)&pNextPoint->pResPos, &nextVLen, &tmpRes);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (tmpRes == TSDB_CODE_SUCCESS) { if (tmpRes == TSDB_CODE_SUCCESS) {
ASSERT(!IS_INVALID_WIN_KEY(pNextPoint->key.ts)); QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pNextPoint->key.ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
setPointBuff(pNextPoint, pFillSup); setPointBuff(pNextPoint, pFillSup);
if (HAS_ROW_DATA(pNextPoint->pLeftRow)) { if (HAS_ROW_DATA(pNextPoint->pLeftRow)) {
pFillSup->next.key = pNextPoint->pLeftRow->key; pFillSup->next.key = pNextPoint->pLeftRow->key;
@ -750,7 +750,7 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS
(void**)&pPrevPoint->pResPos, &preVLen, &tmpRes); (void**)&pPrevPoint->pResPos, &preVLen, &tmpRes);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (tmpRes == TSDB_CODE_SUCCESS) { if (tmpRes == TSDB_CODE_SUCCESS) {
ASSERT(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts)); QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
setPointBuff(pPrevPoint, pFillSup); setPointBuff(pPrevPoint, pFillSup);
if (HAS_ROW_DATA(pPrevPoint->pRightRow)) { if (HAS_ROW_DATA(pPrevPoint->pRightRow)) {
pFillSup->prev.key = pPrevPoint->pRightRow->key; pFillSup->prev.key = pPrevPoint->pRightRow->key;
@ -768,7 +768,7 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS
(void**)&pNextPoint->pResPos, &nextVLen, &tmpRes); (void**)&pNextPoint->pResPos, &nextVLen, &tmpRes);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (tmpRes == TSDB_CODE_SUCCESS) { if (tmpRes == TSDB_CODE_SUCCESS) {
ASSERT(!IS_INVALID_WIN_KEY(pNextPoint->key.ts)); QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pNextPoint->key.ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
setPointBuff(pNextPoint, pFillSup); setPointBuff(pNextPoint, pFillSup);
if (HAS_ROW_DATA(pNextPoint->pLeftRow)) { if (HAS_ROW_DATA(pNextPoint->pLeftRow)) {
pFillSup->next.key = pNextPoint->pLeftRow->key; pFillSup->next.key = pNextPoint->pLeftRow->key;
@ -977,7 +977,7 @@ static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamF
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
} break; } break;
default: default:
ASSERT(0); qError("%s failed at line %d since invalid fill type", __func__, __LINE__);
break; break;
} }
@ -988,6 +988,8 @@ _end:
} }
static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (!hasNextWindow(pFillSup) && !hasPrevWindow(pFillSup)) { if (!hasNextWindow(pFillSup) && !hasPrevWindow(pFillSup)) {
pFillInfo->needFill = false; pFillInfo->needFill = false;
pFillInfo->pos = FILL_POS_START; pFillInfo->pos = FILL_POS_START;
@ -1039,7 +1041,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillSup->prev.key = ts; pFillSup->prev.key = ts;
pFillSup->prev.pRowVal = pFillSup->cur.pRowVal; pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
} else { } else {
ASSERT(hasPrevWindow(pFillSup)); QUERY_CHECK_CONDITION(hasPrevWindow(pFillSup), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END; pFillInfo->pos = FILL_POS_END;
} }
@ -1087,7 +1089,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
} else { } else {
ASSERT(hasNextWindow(pFillSup)); QUERY_CHECK_CONDITION(hasNextWindow(pFillSup), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START; pFillInfo->pos = FILL_POS_START;
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
@ -1099,7 +1101,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
} }
} break; } break;
default: default:
ASSERT(0); qError("%s failed at line %d since invalid fill type", __func__, __LINE__);
break; break;
} }
@ -1107,6 +1109,9 @@ _end:
if (ts != pFillSup->cur.key) { if (ts != pFillSup->cur.key) {
pFillInfo->pos = FILL_POS_INVALID; pFillInfo->pos = FILL_POS_INVALID;
} }
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
} }
static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t fillType) { static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t fillType) {
@ -1145,7 +1150,7 @@ static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t
} }
} break; } break;
default: default:
ASSERT(0); qError("%s failed at line %d since invalid fill type", __func__, __LINE__);
} }
return false; return false;
} }
@ -1276,7 +1281,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
TSDB_ORDER_ASC); TSDB_ORDER_ASC);
startPos += numOfWin; startPos += numOfWin;
int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull); int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull);
ASSERT(leftRowId >= 0); QUERY_CHECK_CONDITION((leftRowId >= 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
left = needAdjustValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type); left = needAdjustValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type);
if (left) { if (left) {
transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow); transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow);

View File

@ -1016,7 +1016,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
taosMemoryFreeClear(pVal); taosMemoryFreeClear(pVal);
break; break;
} }
ASSERT(vlen == pFileState->rowSize);
memcpy(pNewPos->pRowBuff, pVal, vlen); memcpy(pNewPos->pRowBuff, pVal, vlen);
taosMemoryFreeClear(pVal); taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true; pNewPos->beFlushed = true;