diff --git a/include/common/tanalytics.h b/include/common/tanalytics.h index d0af84ecfb..6ebdb38fa6 100644 --- a/include/common/tanalytics.h +++ b/include/common/tanalytics.h @@ -28,8 +28,8 @@ extern "C" { #define ANAL_FORECAST_DEFAULT_ROWS 10 #define ANAL_FORECAST_DEFAULT_CONF 95 #define ANAL_FORECAST_DEFAULT_WNCHECK 1 -#define ANAL_FORECAST_MAX_ROWS 10000 -#define ANAL_ANOMALY_WINDOW_MAX_ROWS 10000 +#define ANAL_FORECAST_MAX_ROWS 40000 +#define ANAL_ANOMALY_WINDOW_MAX_ROWS 40000 typedef struct { EAnalAlgoType type; diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index a56b0dd214..2985e5e000 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -72,17 +72,20 @@ static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int return TSDB_CODE_SUCCESS; } -static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) { - if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) { - return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS; - } - - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; +static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SAnalyticBuf* pBuf = &pSupp->analBuf; - qDebug("block:%d, %p rows:%" PRId64, pSupp->numOfBlocks, pBlock, pBlock->info.rows); + if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) { + code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS; + qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows, + tstrerror(code), ANAL_FORECAST_MAX_ROWS); + return code; + } + pSupp->numOfBlocks++; + qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->numOfBlocks, pBlock, pBlock->info.rows); for (int32_t j = 0; j < pBlock->info.rows; ++j) { SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputValSlot); @@ -98,10 +101,16 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) { pSupp->numOfRows++; code = taosAnalBufWriteColData(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &ts); - if (TSDB_CODE_SUCCESS != code) return code; + if (TSDB_CODE_SUCCESS != code) { + qError("%s failed to write ts in buf, code:%s", id, tstrerror(code)); + return code; + } code = taosAnalBufWriteColData(pBuf, 1, valType, val); - if (TSDB_CODE_SUCCESS != code) return code; + if (TSDB_CODE_SUCCESS != code) { + qError("%s failed to write val in buf, code:%s", id, tstrerror(code)); + return code; + } } return 0; @@ -394,7 +403,7 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pSupp->cachedRows += pBlock->info.rows; qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks, pBlock->info.rows, pSupp->cachedRows); - code = forecastCacheBlock(pSupp, pBlock); + code = forecastCacheBlock(pSupp, pBlock, pId); QUERY_CHECK_CODE(code, lino, _end); } else { qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks); @@ -405,7 +414,7 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pSupp->cachedRows = pBlock->info.rows; qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, pBlock->info.rows, pSupp->cachedRows); - code = forecastCacheBlock(pSupp, pBlock); + code = forecastCacheBlock(pSupp, pBlock, pId); QUERY_CHECK_CODE(code, lino, _end); }