Merge pull request #29231 from taosdata/fix/liaohj

refactor:do some internal refactor about tdanalytics.
This commit is contained in:
Shengliang Guan 2024-12-20 13:57:25 +08:00 committed by GitHub
commit 5ff57a205d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 24 additions and 15 deletions

View File

@ -78,7 +78,7 @@ def check_docs(){
file_only_tdgpt_change_except = sh ( file_only_tdgpt_change_except = sh (
script: ''' script: '''
cd ${WKC} cd ${WKC}
git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${CHANGE_TARGET}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | grep -v "forecastoperator.c\\|anomalywindowoperator.c" |grep -v "tsim/analytics" |grep -v "tdgpt_cases.task" || : git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${CHANGE_TARGET}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | grep -v "forecastoperator.c\\|anomalywindowoperator.c\\|tanalytics.h\\|tanalytics.c" |grep -v "tsim/analytics" |grep -v "tdgpt_cases.task" || :
''', ''',
returnStdout: true returnStdout: true
).trim() ).trim()

View File

@ -28,8 +28,8 @@ extern "C" {
#define ANAL_FORECAST_DEFAULT_ROWS 10 #define ANAL_FORECAST_DEFAULT_ROWS 10
#define ANAL_FORECAST_DEFAULT_CONF 95 #define ANAL_FORECAST_DEFAULT_CONF 95
#define ANAL_FORECAST_DEFAULT_WNCHECK 1 #define ANAL_FORECAST_DEFAULT_WNCHECK 1
#define ANAL_FORECAST_MAX_ROWS 10000 #define ANAL_FORECAST_MAX_ROWS 40000
#define ANAL_ANOMALY_WINDOW_MAX_ROWS 10000 #define ANAL_ANOMALY_WINDOW_MAX_ROWS 40000
typedef struct { typedef struct {
EAnalAlgoType type; EAnalAlgoType type;

View File

@ -72,17 +72,20 @@ static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) { static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) {
if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) { int32_t code = TSDB_CODE_SUCCESS;
return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS; int32_t lino = 0;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SAnalyticBuf* pBuf = &pSupp->analBuf; SAnalyticBuf* pBuf = &pSupp->analBuf;
qDebug("block:%d, %p rows:%" PRId64, pSupp->numOfBlocks, pBlock, pBlock->info.rows); if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) {
code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows,
tstrerror(code), ANAL_FORECAST_MAX_ROWS);
return code;
}
pSupp->numOfBlocks++; pSupp->numOfBlocks++;
qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputValSlot); SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputValSlot);
@ -98,10 +101,16 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) {
pSupp->numOfRows++; pSupp->numOfRows++;
code = taosAnalBufWriteColData(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &ts); code = taosAnalBufWriteColData(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &ts);
if (TSDB_CODE_SUCCESS != code) return code; if (TSDB_CODE_SUCCESS != code) {
qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
return code;
}
code = taosAnalBufWriteColData(pBuf, 1, valType, val); code = taosAnalBufWriteColData(pBuf, 1, valType, val);
if (TSDB_CODE_SUCCESS != code) return code; if (TSDB_CODE_SUCCESS != code) {
qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
return code;
}
} }
return 0; return 0;
@ -394,7 +403,7 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
pSupp->cachedRows += pBlock->info.rows; pSupp->cachedRows += pBlock->info.rows;
qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks, qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks,
pBlock->info.rows, pSupp->cachedRows); pBlock->info.rows, pSupp->cachedRows);
code = forecastCacheBlock(pSupp, pBlock); code = forecastCacheBlock(pSupp, pBlock, pId);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} else { } else {
qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks); qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks);
@ -405,7 +414,7 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
pSupp->cachedRows = pBlock->info.rows; pSupp->cachedRows = pBlock->info.rows;
qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId,
pBlock->info.rows, pSupp->cachedRows); pBlock->info.rows, pSupp->cachedRows);
code = forecastCacheBlock(pSupp, pBlock); code = forecastCacheBlock(pSupp, pBlock, pId);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }