fix: forecast and anomaly can input up to 10000 rows

This commit is contained in:
Shengliang Guan 2024-10-15 16:25:14 +08:00
parent 55c7770d84
commit 868ccd86ab
5 changed files with 28 additions and 7 deletions

View File

@ -28,6 +28,8 @@ extern "C" {
#define ANAL_FORECAST_DEFAULT_ROWS 10
#define ANAL_FORECAST_DEFAULT_CONF 95
#define ANAL_FORECAST_DEFAULT_WNCHECK 1
#define ANAL_FORECAST_MAX_ROWS 10000
#define ANAL_ANOMALY_WINDOW_MAX_ROWS 10000
typedef struct {
EAnalAlgoType type;

View File

@ -494,6 +494,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_ANAL_ALGO_NOT_LOAD TAOS_DEF_ERROR_CODE(0, 0x0443)
#define TSDB_CODE_ANAL_BUF_INVALID_TYPE TAOS_DEF_ERROR_CODE(0, 0x0444)
#define TSDB_CODE_ANAL_ANODE_RETURN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0445)
#define TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS TAOS_DEF_ERROR_CODE(0, 0x0446)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)

View File

@ -32,7 +32,7 @@ typedef struct {
SArray* blocks; // SSDataBlock*
SArray* windows; // STimeWindow
uint64_t groupId;
int64_t numOfRows;
int64_t cachedRows;
int32_t curWinIndex;
STimeWindow curWin;
SResultRow* pResultRow;
@ -191,7 +191,9 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
pSupp->groupId = pBlock->info.id.groupId;
numOfBlocks++;
qDebug("group:%" PRId64 ", blocks:%d, cache block rows:%" PRId64, pSupp->groupId, numOfBlocks, pBlock->info.rows);
pSupp->cachedRows += pBlock->info.rows;
qDebug("group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, numOfBlocks,
pBlock->info.rows, pSupp->cachedRows);
code = anomalyCacheBlock(pInfo, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
} else {
@ -199,7 +201,9 @@ static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
anomalyAggregateBlocks(pOperator);
pSupp->groupId = pBlock->info.id.groupId;
numOfBlocks = 1;
qDebug("group:%" PRId64 ", new group, cache block rows:%" PRId64, pSupp->groupId, pBlock->info.rows);
pSupp->cachedRows = pBlock->info.rows;
qDebug("group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId,
pBlock->info.rows, pSupp->cachedRows);
code = anomalyCacheBlock(pInfo, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
}
@ -253,6 +257,10 @@ static void anomalyDestroyOperatorInfo(void* param) {
}
static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pSrc) {
if (pInfo->anomalySup.cachedRows > ANAL_ANOMALY_WINDOW_MAX_ROWS) {
return TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS;
}
SSDataBlock* pDst = NULL;
int32_t code = createOneDataBlock(pSrc, true, &pDst);
@ -491,7 +499,7 @@ static void anomalyAggregateBlocks(SOperatorInfo* pOperator) {
QUERY_CHECK_CODE(code, lino, _OVER);
int32_t numOfWins = taosArrayGetSize(pSupp->windows);
qDebug("group:%" PRId64 ", wins:%d, rows:%" PRId64, pSupp->groupId, numOfWins, pSupp->numOfRows);
qDebug("group:%" PRId64 ", wins:%d, rows:%" PRId64, pSupp->groupId, numOfWins, pSupp->cachedRows);
for (int32_t w = 0; w < numOfWins; ++w) {
STimeWindow* pWindow = taosArrayGet(pSupp->windows, w);
if (w == 0) {
@ -611,7 +619,7 @@ _OVER:
taosArrayClear(pSupp->blocks);
taosArrayClear(pSupp->windows);
pSupp->numOfRows = 0;
pSupp->cachedRows = 0;
pSupp->curWin.ekey = 0;
pSupp->curWin.skey = 0;
pSupp->curWinIndex = 0;

View File

@ -37,6 +37,7 @@ typedef struct {
int64_t numOfRows;
uint64_t groupId;
int64_t optRows;
int64_t cachedRows;
int32_t numOfBlocks;
int16_t resTsSlot;
int16_t resValSlot;
@ -72,6 +73,10 @@ static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int
}
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) {
if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) {
return TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SAnalBuf* pBuf = &pSupp->analBuf;
@ -383,7 +388,9 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
pSupp->groupId = pBlock->info.id.groupId;
numOfBlocks++;
qDebug("group:%" PRId64 ", blocks:%d, cache block rows:%" PRId64, pSupp->groupId, numOfBlocks, pBlock->info.rows);
pSupp->cachedRows += pBlock->info.rows;
qDebug("group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, numOfBlocks,
pBlock->info.rows, pSupp->cachedRows);
code = forecastCacheBlock(pSupp, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
} else {
@ -392,7 +399,9 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
QUERY_CHECK_CODE(code, lino, _end);
pSupp->groupId = pBlock->info.id.groupId;
numOfBlocks = 1;
qDebug("group:%" PRId64 ", new group, cache block rows:%" PRId64, pSupp->groupId, pBlock->info.rows);
pSupp->cachedRows = pBlock->info.rows;
qDebug("group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, pBlock->info.rows,
pSupp->cachedRows);
code = forecastCacheBlock(pSupp, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
}

View File

@ -361,6 +361,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm no
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ANODE_RETURN_ERROR, "Analysis failed since anode return error")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS, "Analysis failed since too many input rows for anode")
// mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")