diff --git a/include/common/tanal.h b/include/common/tanal.h index 431d9ac89e..69d110d161 100644 --- a/include/common/tanal.h +++ b/include/common/tanal.h @@ -28,6 +28,8 @@ extern "C" { #define ANAL_FORECAST_DEFAULT_ROWS 10 #define ANAL_FORECAST_DEFAULT_CONF 95 #define ANAL_FORECAST_DEFAULT_WNCHECK 1 +#define ANAL_FORECAST_MAX_ROWS 10000 +#define ANAL_ANOMALY_WINDOW_MAX_ROWS 10000 typedef struct { EAnalAlgoType type; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b1cf573479..4d32c0aa24 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -494,6 +494,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_ANAL_ALGO_NOT_LOAD TAOS_DEF_ERROR_CODE(0, 0x0443) #define TSDB_CODE_ANAL_BUF_INVALID_TYPE TAOS_DEF_ERROR_CODE(0, 0x0444) #define TSDB_CODE_ANAL_ANODE_RETURN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0445) +#define TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS TAOS_DEF_ERROR_CODE(0, 0x0446) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 64ad872543..7f3430b837 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -32,7 +32,7 @@ typedef struct { SArray* blocks; // SSDataBlock* SArray* windows; // STimeWindow uint64_t groupId; - int64_t numOfRows; + int64_t cachedRows; int32_t curWinIndex; STimeWindow curWin; SResultRow* pResultRow; @@ -191,7 +191,9 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) { pSupp->groupId = pBlock->info.id.groupId; numOfBlocks++; - qDebug("group:%" PRId64 ", blocks:%d, cache block rows:%" PRId64, pSupp->groupId, numOfBlocks, pBlock->info.rows); + pSupp->cachedRows += pBlock->info.rows; + qDebug("group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, numOfBlocks, + pBlock->info.rows, pSupp->cachedRows); code = anomalyCacheBlock(pInfo, pBlock); QUERY_CHECK_CODE(code, lino, _end); } else { @@ -199,7 +201,9 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe anomalyAggregateBlocks(pOperator); pSupp->groupId = pBlock->info.id.groupId; numOfBlocks = 1; - qDebug("group:%" PRId64 ", new group, cache block rows:%" PRId64, pSupp->groupId, pBlock->info.rows); + pSupp->cachedRows = pBlock->info.rows; + qDebug("group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, + pBlock->info.rows, pSupp->cachedRows); code = anomalyCacheBlock(pInfo, pBlock); QUERY_CHECK_CODE(code, lino, _end); } @@ -253,6 +257,10 @@ static void anomalyDestroyOperatorInfo(void* param) { } static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pSrc) { + if (pInfo->anomalySup.cachedRows > ANAL_ANOMALY_WINDOW_MAX_ROWS) { + return TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS; + } + SSDataBlock* pDst = NULL; int32_t code = createOneDataBlock(pSrc, true, &pDst); @@ -491,7 +499,7 @@ static void anomalyAggregateBlocks(SOperatorInfo* pOperator) { QUERY_CHECK_CODE(code, lino, _OVER); int32_t numOfWins = taosArrayGetSize(pSupp->windows); - qDebug("group:%" PRId64 ", wins:%d, rows:%" PRId64, pSupp->groupId, numOfWins, pSupp->numOfRows); + qDebug("group:%" PRId64 ", wins:%d, rows:%" PRId64, pSupp->groupId, numOfWins, pSupp->cachedRows); for (int32_t w = 0; w < numOfWins; ++w) { STimeWindow* pWindow = taosArrayGet(pSupp->windows, w); if (w == 0) { @@ -611,7 +619,7 @@ _OVER: taosArrayClear(pSupp->blocks); taosArrayClear(pSupp->windows); - pSupp->numOfRows = 0; + pSupp->cachedRows = 0; pSupp->curWin.ekey = 0; pSupp->curWin.skey = 0; pSupp->curWinIndex = 0; diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 00956c56a3..0afa933ee8 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -37,6 +37,7 @@ typedef struct { int64_t numOfRows; uint64_t groupId; int64_t optRows; + int64_t cachedRows; int32_t numOfBlocks; int16_t resTsSlot; int16_t resValSlot; @@ -72,6 +73,10 @@ static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int } static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) { + if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) { + return TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS; + } + int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SAnalBuf* pBuf = &pSupp->analBuf; @@ -383,7 +388,9 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) { pSupp->groupId = pBlock->info.id.groupId; numOfBlocks++; - qDebug("group:%" PRId64 ", blocks:%d, cache block rows:%" PRId64, pSupp->groupId, numOfBlocks, pBlock->info.rows); + pSupp->cachedRows += pBlock->info.rows; + qDebug("group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, numOfBlocks, + pBlock->info.rows, pSupp->cachedRows); code = forecastCacheBlock(pSupp, pBlock); QUERY_CHECK_CODE(code, lino, _end); } else { @@ -392,7 +399,9 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { QUERY_CHECK_CODE(code, lino, _end); pSupp->groupId = pBlock->info.id.groupId; numOfBlocks = 1; - qDebug("group:%" PRId64 ", new group, cache block rows:%" PRId64, pSupp->groupId, pBlock->info.rows); + pSupp->cachedRows = pBlock->info.rows; + qDebug("group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, pBlock->info.rows, + pSupp->cachedRows); code = forecastCacheBlock(pSupp, pBlock); QUERY_CHECK_CODE(code, lino, _end); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 933ef908fa..f6ebebea1e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -361,6 +361,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm no TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded") TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type") TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ANODE_RETURN_ERROR, "Analysis failed since anode return error") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS, "Analysis failed since too many input rows for anode") // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")