refactor:do some internal refactor.

This commit is contained in:
Haojun Liao 2024-12-19 15:27:23 +08:00
parent e9f398b3f8
commit c8b44b82c4
2 changed files with 23 additions and 14 deletions

View File

@ -28,8 +28,8 @@ extern "C" {
#define ANAL_FORECAST_DEFAULT_ROWS 10
#define ANAL_FORECAST_DEFAULT_CONF 95
#define ANAL_FORECAST_DEFAULT_WNCHECK 1
#define ANAL_FORECAST_MAX_ROWS 10000
#define ANAL_ANOMALY_WINDOW_MAX_ROWS 10000
#define ANAL_FORECAST_MAX_ROWS 40000
#define ANAL_ANOMALY_WINDOW_MAX_ROWS 40000
typedef struct {
EAnalAlgoType type;

View File

@ -72,17 +72,20 @@ static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int
return TSDB_CODE_SUCCESS;
}
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) {
if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) {
return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SAnalyticBuf* pBuf = &pSupp->analBuf;
qDebug("block:%d, %p rows:%" PRId64, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) {
code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows,
tstrerror(code), ANAL_FORECAST_MAX_ROWS);
return code;
}
pSupp->numOfBlocks++;
qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputValSlot);
@ -98,10 +101,16 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) {
pSupp->numOfRows++;
code = taosAnalBufWriteColData(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &ts);
if (TSDB_CODE_SUCCESS != code) return code;
if (TSDB_CODE_SUCCESS != code) {
qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
return code;
}
code = taosAnalBufWriteColData(pBuf, 1, valType, val);
if (TSDB_CODE_SUCCESS != code) return code;
if (TSDB_CODE_SUCCESS != code) {
qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
return code;
}
}
return 0;
@ -394,7 +403,7 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
pSupp->cachedRows += pBlock->info.rows;
qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks,
pBlock->info.rows, pSupp->cachedRows);
code = forecastCacheBlock(pSupp, pBlock);
code = forecastCacheBlock(pSupp, pBlock, pId);
QUERY_CHECK_CODE(code, lino, _end);
} else {
qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks);
@ -405,7 +414,7 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
pSupp->cachedRows = pBlock->info.rows;
qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId,
pBlock->info.rows, pSupp->cachedRows);
code = forecastCacheBlock(pSupp, pBlock);
code = forecastCacheBlock(pSupp, pBlock, pId);
QUERY_CHECK_CODE(code, lino, _end);
}