Merge pull request #29988 from taosdata/fix/chkptq

fix(stream):check the maximum allowed forecast rows, add one test case
This commit is contained in:
Simon Guan 2025-03-05 09:29:01 +08:00 committed by GitHub
commit 6656460320
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 228 additions and 211 deletions

View File

@ -25,11 +25,12 @@
extern "C" {
#endif
#define ANAL_FORECAST_DEFAULT_ROWS 10
#define ANAL_FORECAST_DEFAULT_CONF 95
#define ANAL_FORECAST_DEFAULT_WNCHECK 1
#define ANAL_FORECAST_MAX_ROWS 40000
#define ANAL_ANOMALY_WINDOW_MAX_ROWS 40000
#define ANALY_FORECAST_DEFAULT_ROWS 10
#define ANALY_FORECAST_DEFAULT_CONF 95
#define ANALY_FORECAST_DEFAULT_WNCHECK 1
#define ANALY_FORECAST_MAX_HISTORY_ROWS 40000
#define ANALY_MAX_FC_ROWS 1024
#define ANALY_ANOMALY_WINDOW_MAX_ROWS 40000
typedef struct {
EAnalAlgoType type;
@ -65,30 +66,30 @@ typedef struct {
int32_t taosAnalyticsInit();
void taosAnalyticsCleanup();
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf);
SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf);
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen);
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen);
bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue);
int64_t taosAnalGetVersion();
void taosAnalUpdate(int64_t newVer, SHashObj *pHash);
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen);
bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen);
bool taosAnalyGetOptInt(const char *option, const char *optName, int64_t *optValue);
int64_t taosAnalyGetVersion();
void taosAnalyUpdate(int64_t newVer, SHashObj *pHash);
int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols);
int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal);
int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal);
int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal);
int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName);
int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf);
int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex);
int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue);
int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex);
int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf);
int32_t taosAnalBufClose(SAnalyticBuf *pBuf);
void taosAnalBufDestroy(SAnalyticBuf *pBuf);
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols);
int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal);
int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal);
int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal);
int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName);
int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf);
int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex);
int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue);
int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex);
int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf);
int32_t taosAnalyBufClose(SAnalyticBuf *pBuf);
void taosAnalyBufDestroy(SAnalyticBuf *pBuf);
const char *taosAnalysisAlgoType(EAnalAlgoType algoType);
EAnalAlgoType taosAnalAlgoInt(const char *algoName);
const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType);
EAnalAlgoType taosAnalyAlgoInt(const char *algoName);
const char *taosAnalyAlgoUrlStr(EAnalAlgoType algoType);
#ifdef __cplusplus
}

View File

@ -653,9 +653,9 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 };
#define AUDIT_OPERATION_LEN 20
typedef enum {
ANAL_ALGO_TYPE_ANOMALY_DETECT = 0,
ANAL_ALGO_TYPE_FORECAST = 1,
ANAL_ALGO_TYPE_END,
ANALY_ALGO_TYPE_ANOMALY_DETECT = 0,
ANALY_ALGO_TYPE_FORECAST = 1,
ANALY_ALGO_TYPE_END,
} EAnalAlgoType;
typedef enum {

View File

@ -34,38 +34,38 @@ typedef struct {
} SCurlResp;
static SAlgoMgmt tsAlgos = {0};
static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen);
static int32_t taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen);
const char *taosAnalysisAlgoType(EAnalAlgoType type) {
switch (type) {
case ANAL_ALGO_TYPE_ANOMALY_DETECT:
case ANALY_ALGO_TYPE_ANOMALY_DETECT:
return "anomaly-detection";
case ANAL_ALGO_TYPE_FORECAST:
case ANALY_ALGO_TYPE_FORECAST:
return "forecast";
default:
return "unknown";
}
}
const char *taosAnalAlgoUrlStr(EAnalAlgoType type) {
const char *taosAnalyAlgoUrlStr(EAnalAlgoType type) {
switch (type) {
case ANAL_ALGO_TYPE_ANOMALY_DETECT:
case ANALY_ALGO_TYPE_ANOMALY_DETECT:
return "anomaly-detect";
case ANAL_ALGO_TYPE_FORECAST:
case ANALY_ALGO_TYPE_FORECAST:
return "forecast";
default:
return "unknown";
}
}
EAnalAlgoType taosAnalAlgoInt(const char *name) {
for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) {
EAnalAlgoType taosAnalyAlgoInt(const char *name) {
for (EAnalAlgoType i = 0; i < ANALY_ALGO_TYPE_END; ++i) {
if (strcasecmp(name, taosAnalysisAlgoType(i)) == 0) {
return i;
}
}
return ANAL_ALGO_TYPE_END;
return ANALY_ALGO_TYPE_END;
}
int32_t taosAnalyticsInit() {
@ -90,7 +90,7 @@ int32_t taosAnalyticsInit() {
return 0;
}
static void taosAnalFreeHash(SHashObj *hash) {
static void taosAnalyFreeHash(SHashObj *hash) {
void *pIter = taosHashIterate(hash, NULL);
while (pIter != NULL) {
SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter;
@ -105,12 +105,12 @@ void taosAnalyticsCleanup() {
if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) {
uError("failed to destroy anal lock");
}
taosAnalFreeHash(tsAlgos.hash);
taosAnalyFreeHash(tsAlgos.hash);
tsAlgos.hash = NULL;
uInfo("analysis env is cleaned up");
}
void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {
void taosAnalyUpdate(int64_t newVer, SHashObj *pHash) {
if (newVer > tsAlgos.ver) {
if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
SHashObj *hash = tsAlgos.hash;
@ -119,14 +119,14 @@ void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {
if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
uError("failed to unlock hash")
}
taosAnalFreeHash(hash);
taosAnalyFreeHash(hash);
}
} else {
taosAnalFreeHash(pHash);
taosAnalyFreeHash(pHash);
}
}
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) {
bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) {
char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0};
char *pStart = NULL;
char *pEnd = NULL;
@ -163,7 +163,7 @@ bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue,
return true;
}
bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) {
bool taosAnalyGetOptInt(const char *option, const char *optName, int64_t *optValue) {
char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0};
int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName);
@ -177,7 +177,7 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValu
}
}
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) {
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) {
int32_t code = 0;
char name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0};
int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName);
@ -205,7 +205,7 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url,
return code;
}
int64_t taosAnalGetVersion() { return tsAlgos.ver; }
int64_t taosAnalyGetVersion() { return tsAlgos.ver; }
static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) {
SCurlResp *pRsp = userdata;
@ -311,7 +311,7 @@ _OVER:
return code;
}
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) {
SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) {
int32_t code = -1;
char *pCont = NULL;
int64_t contentLen;
@ -324,7 +324,7 @@ SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf
goto _OVER;
}
} else {
code = taosAnalBufGetCont(pBuf, &pCont, &contentLen);
code = taosAnalyBufGetCont(pBuf, &pCont, &contentLen);
if (code != 0) {
terrno = code;
goto _OVER;
@ -356,7 +356,7 @@ _OVER:
return pJson;
}
static int32_t taosAnalJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) {
static int32_t taosAnalyJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) {
int32_t code = 0;
int64_t contLen;
char *pCont = NULL;
@ -395,7 +395,7 @@ _OVER:
return code;
}
static int32_t taosAnalJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
static int32_t taosAnalyJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
char buf[64] = {0};
int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal);
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
@ -404,7 +404,7 @@ static int32_t taosAnalJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optNam
return 0;
}
static int32_t taosAnalJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
static int32_t taosAnalyJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
char buf[128] = {0};
int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal);
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
@ -413,7 +413,7 @@ static int32_t taosAnalJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optNam
return 0;
}
static int32_t taosAnalJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
static int32_t taosAnalyJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
char buf[128] = {0};
int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal);
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
@ -422,7 +422,7 @@ static int32_t taosAnalJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optN
return 0;
}
static int32_t taosAnalJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) {
static int32_t taosAnalyJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) {
if (bufLen <= 0) {
bufLen = strlen(buf);
}
@ -432,7 +432,7 @@ static int32_t taosAnalJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int3
return 0;
}
static int32_t taosAnalJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); }
static int32_t taosAnalyJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalyJsonBufWriteStr(pBuf, "{\n", 0); }
static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
@ -445,7 +445,7 @@ static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
pBuf->numOfCols = numOfCols;
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
return taosAnalJsonBufWriteStart(pBuf);
return taosAnalyJsonBufWriteStart(pBuf);
}
for (int32_t i = 0; i < numOfCols; ++i) {
@ -458,16 +458,16 @@ static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
}
}
return taosAnalJsonBufWriteStart(pBuf);
return taosAnalyJsonBufWriteStart(pBuf);
}
static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
static int32_t taosAnalyJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
char buf[128] = {0};
bool first = (colIndex == 0);
bool last = (colIndex == pBuf->numOfCols - 1);
if (first) {
if (taosAnalJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) {
if (taosAnalyJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) {
return terrno;
}
}
@ -479,7 +479,7 @@ static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex,
}
if (last) {
if (taosAnalJsonBufWriteStr(pBuf, "],\n", 0) != 0) {
if (taosAnalyJsonBufWriteStr(pBuf, "],\n", 0) != 0) {
return terrno;
}
}
@ -487,11 +487,11 @@ static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex,
return 0;
}
static int32_t taosAnalJsonBufWriteDataBegin(SAnalyticBuf *pBuf) {
return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0);
static int32_t taosAnalyJsonBufWriteDataBegin(SAnalyticBuf *pBuf) {
return taosAnalyJsonBufWriteStr(pBuf, "\"data\": [\n", 0);
}
static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) {
static int32_t taosAnalyJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) {
if (bufLen <= 0) {
bufLen = strlen(buf);
}
@ -511,20 +511,20 @@ static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf
return 0;
}
static int32_t taosAnalJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex);
static int32_t taosAnalyJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
return taosAnalyJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex);
}
static int32_t taosAnalJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
static int32_t taosAnalyJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
if (colIndex == pBuf->numOfCols - 1) {
return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex);
return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex);
} else {
return taosAnalJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex);
return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex);
}
}
static int32_t taosAnalJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
static int32_t taosAnalyJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
char buf[64];
int32_t bufLen = 0;
@ -575,10 +575,10 @@ static int32_t taosAnalJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex,
}
pBuf->pCols[colIndex].numOfRows++;
return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex);
return taosAnalyJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex);
}
static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) {
static int32_t taosAnalyJsonBufWriteDataEnd(SAnalyticBuf *pBuf) {
int32_t code = 0;
char *pCont = NULL;
int64_t contLen = 0;
@ -593,10 +593,10 @@ static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) {
code = taosCloseFile(&pCol->filePtr);
if (code != 0) return code;
code = taosAnalJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen);
code = taosAnalyJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen);
if (code != 0) return code;
code = taosAnalJsonBufWriteStr(pBuf, pCont, contLen);
code = taosAnalyJsonBufWriteStr(pBuf, pCont, contLen);
if (code != 0) return code;
taosMemoryFreeClear(pCont);
@ -604,18 +604,18 @@ static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) {
}
}
return taosAnalJsonBufWriteStr(pBuf, "],\n", 0);
return taosAnalyJsonBufWriteStr(pBuf, "],\n", 0);
}
static int32_t taosAnalJsonBufWriteEnd(SAnalyticBuf *pBuf) {
int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows);
static int32_t taosAnalyJsonBufWriteEnd(SAnalyticBuf *pBuf) {
int32_t code = taosAnalyJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows);
if (code != 0) return code;
return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0);
return taosAnalyJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0);
}
int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) {
int32_t code = taosAnalJsonBufWriteEnd(pBuf);
int32_t code = taosAnalyJsonBufWriteEnd(pBuf);
if (code != 0) return code;
if (pBuf->filePtr != NULL) {
@ -640,7 +640,7 @@ int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) {
return 0;
}
void taosAnalBufDestroy(SAnalyticBuf *pBuf) {
void taosAnalyBufDestroy(SAnalyticBuf *pBuf) {
if (pBuf->fileName[0] != 0) {
if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr);
// taosRemoveFile(pBuf->fileName);
@ -664,7 +664,7 @@ void taosAnalBufDestroy(SAnalyticBuf *pBuf) {
pBuf->numOfCols = 0;
}
int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return tsosAnalJsonBufOpen(pBuf, numOfCols);
} else {
@ -672,79 +672,79 @@ int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
}
}
int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal);
return taosAnalyJsonBufWriteOptStr(pBuf, optName, optVal);
} else {
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal);
return taosAnalyJsonBufWriteOptInt(pBuf, optName, optVal);
} else {
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal);
return taosAnalyJsonBufWriteOptFloat(pBuf, optName, optVal);
} else {
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName);
return taosAnalyJsonBufWriteColMeta(pBuf, colIndex, colType, colName);
} else {
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) {
int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteDataBegin(pBuf);
return taosAnalyJsonBufWriteDataBegin(pBuf);
} else {
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColBegin(pBuf, colIndex);
return taosAnalyJsonBufWriteColBegin(pBuf, colIndex);
} else {
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue);
return taosAnalyJsonBufWriteColData(pBuf, colIndex, colType, colValue);
} else {
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColEnd(pBuf, colIndex);
return taosAnalyJsonBufWriteColEnd(pBuf, colIndex);
} else {
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) {
int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteDataEnd(pBuf);
return taosAnalyJsonBufWriteDataEnd(pBuf);
} else {
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufClose(SAnalyticBuf *pBuf) {
int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufClose(pBuf);
} else {
@ -752,12 +752,12 @@ int32_t taosAnalBufClose(SAnalyticBuf *pBuf) {
}
}
static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) {
static int32_t taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) {
*ppCont = NULL;
*pContLen = 0;
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen);
return taosAnalyJsonBufGetCont(pBuf->fileName, ppCont, pContLen);
} else {
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
@ -767,28 +767,28 @@ static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pC
int32_t taosAnalyticsInit() { return 0; }
void taosAnalyticsCleanup() {}
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; }
SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; }
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }
bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; }
int64_t taosAnalGetVersion() { return 0; }
void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {}
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }
bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }
bool taosAnalyGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; }
int64_t taosAnalyGetVersion() { return 0; }
void taosAnalyUpdate(int64_t newVer, SHashObj *pHash) {}
int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; }
int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; }
int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; }
int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; }
int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; }
int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; }
int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
return 0;
}
int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; }
int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; }
int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; }
void taosAnalBufDestroy(SAnalyticBuf *pBuf) {}
int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; }
int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; }
int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) { return 0; }
void taosAnalyBufDestroy(SAnalyticBuf *pBuf) {}
const char *taosAnalysisAlgoType(EAnalAlgoType algoType) { return 0; }
EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; }

View File

@ -94,7 +94,7 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
static void dmMayShouldUpdateAnalFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
int32_t code = 0;
int64_t oldVer = taosAnalGetVersion();
int64_t oldVer = taosAnalyGetVersion();
if (oldVer == newVer) return;
dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
@ -233,7 +233,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.statusSeq = pMgmt->statusSeq;
req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
req.analVer = taosAnalGetVersion();
req.analVer = taosAnalyGetVersion();
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
if (contLen < 0) {

View File

@ -119,7 +119,7 @@ static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
SRetrieveAnalAlgoRsp rsp = {0};
if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
taosAnalUpdate(rsp.ver, rsp.hash);
taosAnalyUpdate(rsp.ver, rsp.hash);
rsp.hash = NULL;
}
tFreeRetrieveAnalAlgoRsp(&rsp);

View File

@ -649,13 +649,19 @@ void mndRetrieveAlgoList(SMnode* pMnode, SArray* pFc, SArray* pAd) {
break;
}
if (pObj->numOfAlgos >= ANAL_ALGO_TYPE_END) {
if (pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT] != NULL) {
taosArrayAddAll(pAd, pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT]);
if (pObj->numOfAlgos >= ANALY_ALGO_TYPE_END) {
if (pObj->algos[ANALY_ALGO_TYPE_ANOMALY_DETECT] != NULL) {
void* p = taosArrayAddAll(pAd, pObj->algos[ANALY_ALGO_TYPE_ANOMALY_DETECT]);
if (p == NULL) {
mError("failed to add retrieved anomaly-detection algorithms, code:%s", tstrerror(terrno));
}
}
if (pObj->algos[ANAL_ALGO_TYPE_FORECAST] != NULL) {
taosArrayAddAll(pFc, pObj->algos[ANAL_ALGO_TYPE_FORECAST]);
if (pObj->algos[ANALY_ALGO_TYPE_FORECAST] != NULL) {
void* p = taosArrayAddAll(pFc, pObj->algos[ANALY_ALGO_TYPE_FORECAST]);
if (p == NULL) {
mError("failed to add retrieved forecast algorithms, code:%s", tstrerror(terrno));
}
}
}
@ -738,11 +744,11 @@ static int32_t mndDecodeAlgoList(SJson *pJson, SAnodeObj *pObj) {
if (details == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
int32_t numOfDetails = tjsonGetArraySize(details);
pObj->algos = taosMemoryCalloc(ANAL_ALGO_TYPE_END, sizeof(SArray *));
pObj->algos = taosMemoryCalloc(ANALY_ALGO_TYPE_END, sizeof(SArray *));
if (pObj->algos == NULL) return TSDB_CODE_OUT_OF_MEMORY;
pObj->numOfAlgos = ANAL_ALGO_TYPE_END;
for (int32_t i = 0; i < ANAL_ALGO_TYPE_END; ++i) {
pObj->numOfAlgos = ANALY_ALGO_TYPE_END;
for (int32_t i = 0; i < ANALY_ALGO_TYPE_END; ++i) {
pObj->algos[i] = taosArrayInit(4, sizeof(SAnodeAlgo));
if (pObj->algos[i] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
}
@ -753,8 +759,8 @@ static int32_t mndDecodeAlgoList(SJson *pJson, SAnodeObj *pObj) {
code = tjsonGetStringValue2(detail, "type", buf, sizeof(buf));
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
EAnalAlgoType type = taosAnalAlgoInt(buf);
if (type < 0 || type >= ANAL_ALGO_TYPE_END) return TSDB_CODE_MND_ANODE_INVALID_ALGO_TYPE;
EAnalAlgoType type = taosAnalyAlgoInt(buf);
if (type < 0 || type >= ANALY_ALGO_TYPE_END) return TSDB_CODE_MND_ANODE_INVALID_ALGO_TYPE;
SJson *algos = tjsonGetObjectItem(detail, "algo");
if (algos == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
@ -783,7 +789,7 @@ static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj) {
char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", url, "list");
SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL);
SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL);
if (pJson == NULL) return terrno;
int32_t code = mndDecodeAlgoList(pJson, pObj);
@ -799,7 +805,7 @@ static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status, int32_t statusLe
char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", pObj->url, "status");
SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL);
SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL);
if (pJson == NULL) return terrno;
code = tjsonGetDoubleValue(pJson, "protocol", &tmp);
@ -881,7 +887,7 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) {
}
url.urlLen = 1 + tsnprintf(url.url, TSDB_ANALYTIC_ANODE_URL_LEN + TSDB_ANALYTIC_ALGO_TYPE_LEN, "%s/%s", pAnode->url,
taosAnalAlgoUrlStr(url.type));
taosAnalyAlgoUrlStr(url.type));
if (taosHashPut(rsp.hash, name, nameLen, &url, sizeof(SAnalyticsUrl)) != 0) {
taosMemoryFree(url.url);
sdbRelease(pSdb, pAnode);

View File

@ -77,13 +77,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
goto _error;
}
if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) {
if (!taosAnalyGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) {
qError("%s failed to get anomaly_window algorithm name from %s", id, pAnomalyNode->anomalyOpt);
code = TSDB_CODE_ANA_ALGO_NOT_FOUND;
goto _error;
}
if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) {
if (taosAnalyGetAlgoUrl(pInfo->algoName, ANALY_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) {
qError("%s failed to get anomaly_window algorithm url from %s", id, pInfo->algoName);
code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
goto _error;
@ -265,7 +265,7 @@ static void anomalyDestroyOperatorInfo(void* param) {
}
static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pSrc) {
if (pInfo->anomalySup.cachedRows > ANAL_ANOMALY_WINDOW_MAX_ROWS) {
if (pInfo->anomalySup.cachedRows > ANALY_ANOMALY_WINDOW_MAX_ROWS) {
return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
}
@ -364,35 +364,35 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
SAnomalyWindowSupp* pSupp = &pInfo->anomalySup;
SJson* pJson = NULL;
SAnalyticBuf analBuf = {.bufType = ANALYTICS_BUF_TYPE_JSON};
SAnalyticBuf analyBuf = {.bufType = ANALYTICS_BUF_TYPE_JSON};
char dataBuf[64] = {0};
int32_t code = 0;
int64_t ts = 0;
int32_t lino = 0;
const char* pId = GET_TASKID(pOperator->pTaskInfo);
snprintf(analBuf.fileName, sizeof(analBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%" PRId64, tsTempDir, ts,
snprintf(analyBuf.fileName, sizeof(analyBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%" PRId64, tsTempDir, ts,
pSupp->groupId);
code = tsosAnalBufOpen(&analBuf, 2);
code = tsosAnalyBufOpen(&analyBuf, 2);
QUERY_CHECK_CODE(code, lino, _OVER);
const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
code = taosAnalBufWriteColMeta(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts");
code = taosAnalyBufWriteColMeta(&analyBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts");
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteColMeta(&analBuf, 1, pInfo->anomalyCol.type, "val");
code = taosAnalyBufWriteColMeta(&analyBuf, 1, pInfo->anomalyCol.type, "val");
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteDataBegin(&analBuf);
code = taosAnalyBufWriteDataBegin(&analyBuf);
QUERY_CHECK_CODE(code, lino, _OVER);
int32_t numOfBlocks = (int32_t)taosArrayGetSize(pSupp->blocks);
// timestamp
code = taosAnalBufWriteColBegin(&analBuf, 0);
code = taosAnalyBufWriteColBegin(&analyBuf, 0);
QUERY_CHECK_CODE(code, lino, _OVER);
for (int32_t i = 0; i < numOfBlocks; ++i) {
@ -401,16 +401,16 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
if (pTsCol == NULL) break;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
code = taosAnalBufWriteColData(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &((TSKEY*)pTsCol->pData)[j]);
code = taosAnalyBufWriteColData(&analyBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &((TSKEY*)pTsCol->pData)[j]);
QUERY_CHECK_CODE(code, lino, _OVER);
}
}
code = taosAnalBufWriteColEnd(&analBuf, 0);
code = taosAnalyBufWriteColEnd(&analyBuf, 0);
QUERY_CHECK_CODE(code, lino, _OVER);
// data
code = taosAnalBufWriteColBegin(&analBuf, 1);
code = taosAnalyBufWriteColBegin(&analyBuf, 1);
QUERY_CHECK_CODE(code, lino, _OVER);
for (int32_t i = 0; i < numOfBlocks; ++i) {
@ -420,38 +420,38 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
if (pValCol == NULL) break;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
code = taosAnalBufWriteColData(&analBuf, 1, pValCol->info.type, colDataGetData(pValCol, j));
code = taosAnalyBufWriteColData(&analyBuf, 1, pValCol->info.type, colDataGetData(pValCol, j));
QUERY_CHECK_CODE(code, lino, _OVER);
}
}
code = taosAnalBufWriteColEnd(&analBuf, 1);
code = taosAnalyBufWriteColEnd(&analyBuf, 1);
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteDataEnd(&analBuf);
code = taosAnalyBufWriteDataEnd(&analyBuf);
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteOptStr(&analBuf, "option", pInfo->anomalyOpt);
code = taosAnalyBufWriteOptStr(&analyBuf, "option", pInfo->anomalyOpt);
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteOptStr(&analBuf, "algo", pInfo->algoName);
code = taosAnalyBufWriteOptStr(&analyBuf, "algo", pInfo->algoName);
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteOptStr(&analBuf, "prec", prec);
code = taosAnalyBufWriteOptStr(&analyBuf, "prec", prec);
QUERY_CHECK_CODE(code, lino, _OVER);
int64_t wncheck = ANAL_FORECAST_DEFAULT_WNCHECK;
bool hasWncheck = taosAnalGetOptInt(pInfo->anomalyOpt, "wncheck", &wncheck);
int64_t wncheck = ANALY_FORECAST_DEFAULT_WNCHECK;
bool hasWncheck = taosAnalyGetOptInt(pInfo->anomalyOpt, "wncheck", &wncheck);
if (!hasWncheck) {
qDebug("anomaly_window wncheck not found from %s, use default:%" PRId64, pInfo->anomalyOpt, wncheck);
}
code = taosAnalBufWriteOptInt(&analBuf, "wncheck", wncheck);
code = taosAnalyBufWriteOptInt(&analyBuf, "wncheck", wncheck);
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufClose(&analBuf);
code = taosAnalyBufClose(&analyBuf);
QUERY_CHECK_CODE(code, lino, _OVER);
pJson = taosAnalSendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf);
pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analyBuf);
if (pJson == NULL) {
code = terrno;
goto _OVER;
@ -464,7 +464,7 @@ _OVER:
qError("%s failed to analysis window since %s, lino:%d", pId, tstrerror(code), lino);
}
taosAnalBufDestroy(&analBuf);
taosAnalyBufDestroy(&analyBuf);
if (pJson != NULL) tjsonDelete(pJson);
return code;
}

View File

@ -46,7 +46,7 @@ typedef struct {
int16_t inputValSlot;
int8_t inputValType;
int8_t inputPrecision;
SAnalyticBuf analBuf;
SAnalyticBuf analyBuf;
} SForecastSupp;
typedef struct SForecastOperatorInfo {
@ -74,12 +74,12 @@ static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SAnalyticBuf* pBuf = &pSupp->analBuf;
SAnalyticBuf* pBuf = &pSupp->analyBuf;
if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) {
if (pSupp->cachedRows > ANALY_FORECAST_MAX_HISTORY_ROWS) {
code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows,
tstrerror(code), ANAL_FORECAST_MAX_ROWS);
tstrerror(code), ANALY_FORECAST_MAX_HISTORY_ROWS);
return code;
}
@ -99,13 +99,13 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, con
pSupp->maxTs = MAX(pSupp->maxTs, ts);
pSupp->numOfRows++;
code = taosAnalBufWriteColData(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &ts);
code = taosAnalyBufWriteColData(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &ts);
if (TSDB_CODE_SUCCESS != code) {
qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
return code;
}
code = taosAnalBufWriteColData(pBuf, 1, valType, val);
code = taosAnalyBufWriteColData(pBuf, 1, valType, val);
if (TSDB_CODE_SUCCESS != code) {
qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
return code;
@ -115,81 +115,88 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, con
return 0;
}
static int32_t forecastCloseBuf(SForecastSupp* pSupp) {
SAnalyticBuf* pBuf = &pSupp->analBuf;
static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
SAnalyticBuf* pBuf = &pSupp->analyBuf;
int32_t code = 0;
for (int32_t i = 0; i < 2; ++i) {
code = taosAnalBufWriteColEnd(pBuf, i);
code = taosAnalyBufWriteColEnd(pBuf, i);
if (code != 0) return code;
}
code = taosAnalBufWriteDataEnd(pBuf);
code = taosAnalyBufWriteDataEnd(pBuf);
if (code != 0) return code;
code = taosAnalBufWriteOptStr(pBuf, "option", pSupp->algoOpt);
code = taosAnalyBufWriteOptStr(pBuf, "option", pSupp->algoOpt);
if (code != 0) return code;
code = taosAnalBufWriteOptStr(pBuf, "algo", pSupp->algoName);
code = taosAnalyBufWriteOptStr(pBuf, "algo", pSupp->algoName);
if (code != 0) return code;
const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
code = taosAnalBufWriteOptStr(pBuf, "prec", prec);
code = taosAnalyBufWriteOptStr(pBuf, "prec", prec);
if (code != 0) return code;
int64_t wncheck = ANAL_FORECAST_DEFAULT_WNCHECK;
bool hasWncheck = taosAnalGetOptInt(pSupp->algoOpt, "wncheck", &wncheck);
int64_t wncheck = ANALY_FORECAST_DEFAULT_WNCHECK;
bool hasWncheck = taosAnalyGetOptInt(pSupp->algoOpt, "wncheck", &wncheck);
if (!hasWncheck) {
qDebug("forecast wncheck not found from %s, use default:%" PRId64, pSupp->algoOpt, wncheck);
qDebug("%s forecast wncheck not found from %s, use default:%" PRId64, id, pSupp->algoOpt, wncheck);
}
code = taosAnalBufWriteOptInt(pBuf, "wncheck", wncheck);
code = taosAnalyBufWriteOptInt(pBuf, "wncheck", wncheck);
if (code != 0) return code;
bool noConf = (pSupp->resHighSlot == -1 && pSupp->resLowSlot == -1);
code = taosAnalBufWriteOptInt(pBuf, "return_conf", !noConf);
code = taosAnalyBufWriteOptInt(pBuf, "return_conf", !noConf);
if (code != 0) return code;
pSupp->optRows = ANAL_FORECAST_DEFAULT_ROWS;
bool hasRows = taosAnalGetOptInt(pSupp->algoOpt, "rows", &pSupp->optRows);
pSupp->optRows = ANALY_FORECAST_DEFAULT_ROWS;
bool hasRows = taosAnalyGetOptInt(pSupp->algoOpt, "rows", &pSupp->optRows);
if (!hasRows) {
qDebug("forecast rows not found from %s, use default:%" PRId64, pSupp->algoOpt, pSupp->optRows);
qDebug("%s forecast rows not found from %s, use default:%" PRId64, id, pSupp->algoOpt, pSupp->optRows);
}
code = taosAnalBufWriteOptInt(pBuf, "forecast_rows", pSupp->optRows);
if (pSupp->optRows > ANALY_MAX_FC_ROWS) {
qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_MAX_FC_ROWS,
pSupp->optRows);
return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
}
code = taosAnalyBufWriteOptInt(pBuf, "forecast_rows", pSupp->optRows);
if (code != 0) return code;
int64_t conf = ANAL_FORECAST_DEFAULT_CONF;
bool hasConf = taosAnalGetOptInt(pSupp->algoOpt, "conf", &conf);
int64_t conf = ANALY_FORECAST_DEFAULT_CONF;
bool hasConf = taosAnalyGetOptInt(pSupp->algoOpt, "conf", &conf);
if (!hasConf) {
qDebug("forecast conf not found from %s, use default:%" PRId64, pSupp->algoOpt, conf);
qDebug("%s forecast conf not found from %s, use default:%" PRId64, id, pSupp->algoOpt, conf);
}
code = taosAnalBufWriteOptInt(pBuf, "conf", conf);
code = taosAnalyBufWriteOptInt(pBuf, "conf", conf);
if (code != 0) return code;
int32_t len = strlen(pSupp->algoOpt);
int64_t every = (pSupp->maxTs - pSupp->minTs) / (pSupp->numOfRows - 1);
int64_t start = pSupp->maxTs + every;
bool hasStart = taosAnalGetOptInt(pSupp->algoOpt, "start", &start);
bool hasStart = taosAnalyGetOptInt(pSupp->algoOpt, "start", &start);
if (!hasStart) {
qDebug("forecast start not found from %s, use %" PRId64, pSupp->algoOpt, start);
qDebug("%s forecast start not found from %s, use %" PRId64, id, pSupp->algoOpt, start);
}
code = taosAnalBufWriteOptInt(pBuf, "start", start);
code = taosAnalyBufWriteOptInt(pBuf, "start", start);
if (code != 0) return code;
bool hasEvery = taosAnalGetOptInt(pSupp->algoOpt, "every", &every);
bool hasEvery = taosAnalyGetOptInt(pSupp->algoOpt, "every", &every);
if (!hasEvery) {
qDebug("forecast every not found from %s, use %" PRId64, pSupp->algoOpt, every);
qDebug("%s forecast every not found from %s, use %" PRId64, id, pSupp->algoOpt, every);
}
code = taosAnalBufWriteOptInt(pBuf, "every", every);
code = taosAnalyBufWriteOptInt(pBuf, "every", every);
if (code != 0) return code;
code = taosAnalBufClose(pBuf);
code = taosAnalyBufClose(pBuf);
return code;
}
static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
SAnalyticBuf* pBuf = &pSupp->analBuf;
SAnalyticBuf* pBuf = &pSupp->analyBuf;
int32_t resCurRow = pBlock->info.rows;
int8_t tmpI8;
int16_t tmpI16;
@ -209,7 +216,7 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const
SColumnInfoData* pResHighCol =
(pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL);
SJson* pJson = taosAnalSendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf);
SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf);
if (pJson == NULL) {
return terrno;
}
@ -356,9 +363,9 @@ _OVER:
static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SAnalyticBuf* pBuf = &pSupp->analBuf;
SAnalyticBuf* pBuf = &pSupp->analyBuf;
code = forecastCloseBuf(pSupp);
code = forecastCloseBuf(pSupp, pId);
QUERY_CHECK_CODE(code, lino, _end);
code = forecastEnsureBlockCapacity(pResBlock, 1);
@ -371,7 +378,7 @@ static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBl
_end:
pSupp->numOfBlocks = 0;
taosAnalBufDestroy(&pSupp->analBuf);
taosAnalyBufDestroy(&pSupp->analyBuf);
return code;
}
@ -382,7 +389,7 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SForecastOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pResBlock = pInfo->pRes;
SForecastSupp* pSupp = &pInfo->forecastSupp;
SAnalyticBuf* pBuf = &pSupp->analBuf;
SAnalyticBuf* pBuf = &pSupp->analyBuf;
int64_t st = taosGetTimestampUs();
int32_t numOfBlocks = pSupp->numOfBlocks;
const char* pId = GET_TASKID(pOperator->pTaskInfo);
@ -525,12 +532,12 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp) {
pSupp->minTs = INT64_MAX;
pSupp->numOfRows = 0;
if (!taosAnalGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) {
if (!taosAnalyGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) {
qError("failed to get forecast algorithm name from %s", pSupp->algoOpt);
return TSDB_CODE_ANA_ALGO_NOT_FOUND;
}
if (taosAnalGetAlgoUrl(pSupp->algoName, ANAL_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) {
if (taosAnalyGetAlgoUrl(pSupp->algoName, ANALY_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) {
qError("failed to get forecast algorithm url from %s", pSupp->algoName);
return TSDB_CODE_ANA_ALGO_NOT_LOAD;
}
@ -539,32 +546,32 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp) {
}
static int32_t forecastCreateBuf(SForecastSupp* pSupp) {
SAnalyticBuf* pBuf = &pSupp->analBuf;
SAnalyticBuf* pBuf = &pSupp->analyBuf;
int64_t ts = 0; // taosGetTimestampMs();
pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts);
int32_t code = tsosAnalBufOpen(pBuf, 2);
int32_t code = tsosAnalyBufOpen(pBuf, 2);
if (code != 0) goto _OVER;
code = taosAnalBufWriteColMeta(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts");
code = taosAnalyBufWriteColMeta(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts");
if (code != 0) goto _OVER;
code = taosAnalBufWriteColMeta(pBuf, 1, pSupp->inputValType, "val");
code = taosAnalyBufWriteColMeta(pBuf, 1, pSupp->inputValType, "val");
if (code != 0) goto _OVER;
code = taosAnalBufWriteDataBegin(pBuf);
code = taosAnalyBufWriteDataBegin(pBuf);
if (code != 0) goto _OVER;
for (int32_t i = 0; i < 2; ++i) {
code = taosAnalBufWriteColBegin(pBuf, i);
code = taosAnalyBufWriteColBegin(pBuf, i);
if (code != 0) goto _OVER;
}
_OVER:
if (code != 0) {
(void)taosAnalBufClose(pBuf);
taosAnalBufDestroy(pBuf);
(void)taosAnalyBufClose(pBuf);
taosAnalyBufDestroy(pBuf);
}
return code;
}
@ -656,7 +663,7 @@ static void destroyForecastInfo(void* param) {
blockDataDestroy(pInfo->pRes);
pInfo->pRes = NULL;
cleanupExprSupp(&pInfo->scalarSup);
taosAnalBufDestroy(&pInfo->forecastSupp.analBuf);
taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
taosMemoryFreeClear(param);
}

View File

@ -1223,7 +1223,7 @@ static int32_t translateForecast(SFunctionNode* pFunc, char* pErrBuf, int32_t le
}
SValueNode* pValue = (SValueNode*)pOption;
if (!taosAnalGetOptStr(pValue->literal, "algo", NULL, 0) != 0) {
if (!taosAnalyGetOptStr(pValue->literal, "algo", NULL, 0) != 0) {
return invaildFuncParaValueErrMsg(pErrBuf, len, "FORECAST option should include algo field");
}

View File

@ -6110,7 +6110,7 @@ static int32_t translateAnomalyWindow(STranslateContext* pCxt, SSelectStmt* pSel
SAnomalyWindowNode* pAnomaly = (SAnomalyWindowNode*)pSelect->pWindow;
int32_t code = checkAnomalyExpr(pCxt, pAnomaly->pExpr);
if (TSDB_CODE_SUCCESS == code) {
if (!taosAnalGetOptStr(pAnomaly->anomalyOpt, "algo", NULL, 0) != 0) {
if (!taosAnalyGetOptStr(pAnomaly->anomalyOpt, "algo", NULL, 0) != 0) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT,
"ANOMALY_WINDOW option should include algo field");
}

View File

@ -69,6 +69,9 @@ if $data00 != 1 then
return -1
endi
print ================= too many rows error
sql_error select forecast(c6, 'algo=holtwinters, rows=1025') from ct1;
print ================= try every loaded anomaly detection algorithm
sql select count(*) from ct1 anomaly_window(c1, 'algo=iqr');
sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma');