diff --git a/include/common/tanalytics.h b/include/common/tanalytics.h index 344093245b..42c3ce9391 100644 --- a/include/common/tanalytics.h +++ b/include/common/tanalytics.h @@ -25,11 +25,12 @@ extern "C" { #endif -#define ANAL_FORECAST_DEFAULT_ROWS 10 -#define ANAL_FORECAST_DEFAULT_CONF 95 -#define ANAL_FORECAST_DEFAULT_WNCHECK 1 -#define ANAL_FORECAST_MAX_ROWS 40000 -#define ANAL_ANOMALY_WINDOW_MAX_ROWS 40000 +#define ANALY_FORECAST_DEFAULT_ROWS 10 +#define ANALY_FORECAST_DEFAULT_CONF 95 +#define ANALY_FORECAST_DEFAULT_WNCHECK 1 +#define ANALY_FORECAST_MAX_HISTORY_ROWS 40000 +#define ANALY_MAX_FC_ROWS 1024 +#define ANALY_ANOMALY_WINDOW_MAX_ROWS 40000 typedef struct { EAnalAlgoType type; @@ -65,30 +66,30 @@ typedef struct { int32_t taosAnalyticsInit(); void taosAnalyticsCleanup(); -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf); +SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf); -int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen); -bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen); -bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue); -int64_t taosAnalGetVersion(); -void taosAnalUpdate(int64_t newVer, SHashObj *pHash); +int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen); +bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen); +bool taosAnalyGetOptInt(const char *option, const char *optName, int64_t *optValue); +int64_t taosAnalyGetVersion(); +void taosAnalyUpdate(int64_t newVer, SHashObj *pHash); -int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols); -int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal); -int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal); -int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal); -int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName); -int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf); -int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex); -int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue); -int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex); -int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf); -int32_t taosAnalBufClose(SAnalyticBuf *pBuf); -void taosAnalBufDestroy(SAnalyticBuf *pBuf); +int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols); +int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal); +int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal); +int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal); +int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName); +int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf); +int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex); +int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue); +int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex); +int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf); +int32_t taosAnalyBufClose(SAnalyticBuf *pBuf); +void taosAnalyBufDestroy(SAnalyticBuf *pBuf); const char *taosAnalysisAlgoType(EAnalAlgoType algoType); -EAnalAlgoType taosAnalAlgoInt(const char *algoName); -const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType); +EAnalAlgoType taosAnalyAlgoInt(const char *algoName); +const char *taosAnalyAlgoUrlStr(EAnalAlgoType algoType); #ifdef __cplusplus } diff --git a/include/util/tdef.h b/include/util/tdef.h index cd7a697e59..270e9df444 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -653,9 +653,9 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 }; #define AUDIT_OPERATION_LEN 20 typedef enum { - ANAL_ALGO_TYPE_ANOMALY_DETECT = 0, - ANAL_ALGO_TYPE_FORECAST = 1, - ANAL_ALGO_TYPE_END, + ANALY_ALGO_TYPE_ANOMALY_DETECT = 0, + ANALY_ALGO_TYPE_FORECAST = 1, + ANALY_ALGO_TYPE_END, } EAnalAlgoType; typedef enum { diff --git a/source/common/src/tanalytics.c b/source/common/src/tanalytics.c index 0ed67eed0a..deb68af3ea 100644 --- a/source/common/src/tanalytics.c +++ b/source/common/src/tanalytics.c @@ -34,38 +34,38 @@ typedef struct { } SCurlResp; static SAlgoMgmt tsAlgos = {0}; -static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen); +static int32_t taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen); const char *taosAnalysisAlgoType(EAnalAlgoType type) { switch (type) { - case ANAL_ALGO_TYPE_ANOMALY_DETECT: + case ANALY_ALGO_TYPE_ANOMALY_DETECT: return "anomaly-detection"; - case ANAL_ALGO_TYPE_FORECAST: + case ANALY_ALGO_TYPE_FORECAST: return "forecast"; default: return "unknown"; } } -const char *taosAnalAlgoUrlStr(EAnalAlgoType type) { +const char *taosAnalyAlgoUrlStr(EAnalAlgoType type) { switch (type) { - case ANAL_ALGO_TYPE_ANOMALY_DETECT: + case ANALY_ALGO_TYPE_ANOMALY_DETECT: return "anomaly-detect"; - case ANAL_ALGO_TYPE_FORECAST: + case ANALY_ALGO_TYPE_FORECAST: return "forecast"; default: return "unknown"; } } -EAnalAlgoType taosAnalAlgoInt(const char *name) { - for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) { +EAnalAlgoType taosAnalyAlgoInt(const char *name) { + for (EAnalAlgoType i = 0; i < ANALY_ALGO_TYPE_END; ++i) { if (strcasecmp(name, taosAnalysisAlgoType(i)) == 0) { return i; } } - return ANAL_ALGO_TYPE_END; + return ANALY_ALGO_TYPE_END; } int32_t taosAnalyticsInit() { @@ -90,7 +90,7 @@ int32_t taosAnalyticsInit() { return 0; } -static void taosAnalFreeHash(SHashObj *hash) { +static void taosAnalyFreeHash(SHashObj *hash) { void *pIter = taosHashIterate(hash, NULL); while (pIter != NULL) { SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter; @@ -105,12 +105,12 @@ void taosAnalyticsCleanup() { if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) { uError("failed to destroy anal lock"); } - taosAnalFreeHash(tsAlgos.hash); + taosAnalyFreeHash(tsAlgos.hash); tsAlgos.hash = NULL; uInfo("analysis env is cleaned up"); } -void taosAnalUpdate(int64_t newVer, SHashObj *pHash) { +void taosAnalyUpdate(int64_t newVer, SHashObj *pHash) { if (newVer > tsAlgos.ver) { if (taosThreadMutexLock(&tsAlgos.lock) == 0) { SHashObj *hash = tsAlgos.hash; @@ -119,14 +119,14 @@ void taosAnalUpdate(int64_t newVer, SHashObj *pHash) { if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) { uError("failed to unlock hash") } - taosAnalFreeHash(hash); + taosAnalyFreeHash(hash); } } else { - taosAnalFreeHash(pHash); + taosAnalyFreeHash(pHash); } } -bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { +bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0}; char *pStart = NULL; char *pEnd = NULL; @@ -163,7 +163,7 @@ bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, return true; } -bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { +bool taosAnalyGetOptInt(const char *option, const char *optName, int64_t *optValue) { char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0}; int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); @@ -177,7 +177,7 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValu } } -int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { +int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { int32_t code = 0; char name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0}; int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName); @@ -205,7 +205,7 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, return code; } -int64_t taosAnalGetVersion() { return tsAlgos.ver; } +int64_t taosAnalyGetVersion() { return tsAlgos.ver; } static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) { SCurlResp *pRsp = userdata; @@ -311,7 +311,7 @@ _OVER: return code; } -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { +SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { int32_t code = -1; char *pCont = NULL; int64_t contentLen; @@ -324,7 +324,7 @@ SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf goto _OVER; } } else { - code = taosAnalBufGetCont(pBuf, &pCont, &contentLen); + code = taosAnalyBufGetCont(pBuf, &pCont, &contentLen); if (code != 0) { terrno = code; goto _OVER; @@ -356,7 +356,7 @@ _OVER: return pJson; } -static int32_t taosAnalJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) { +static int32_t taosAnalyJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) { int32_t code = 0; int64_t contLen; char *pCont = NULL; @@ -395,7 +395,7 @@ _OVER: return code; } -static int32_t taosAnalJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { +static int32_t taosAnalyJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { char buf[64] = {0}; int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal); if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { @@ -404,7 +404,7 @@ static int32_t taosAnalJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optNam return 0; } -static int32_t taosAnalJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { +static int32_t taosAnalyJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { char buf[128] = {0}; int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal); if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { @@ -413,7 +413,7 @@ static int32_t taosAnalJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optNam return 0; } -static int32_t taosAnalJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { +static int32_t taosAnalyJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { char buf[128] = {0}; int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal); if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { @@ -422,7 +422,7 @@ static int32_t taosAnalJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optN return 0; } -static int32_t taosAnalJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) { +static int32_t taosAnalyJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) { if (bufLen <= 0) { bufLen = strlen(buf); } @@ -432,7 +432,7 @@ static int32_t taosAnalJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int3 return 0; } -static int32_t taosAnalJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); } +static int32_t taosAnalyJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalyJsonBufWriteStr(pBuf, "{\n", 0); } static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); @@ -445,7 +445,7 @@ static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { pBuf->numOfCols = numOfCols; if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) { - return taosAnalJsonBufWriteStart(pBuf); + return taosAnalyJsonBufWriteStart(pBuf); } for (int32_t i = 0; i < numOfCols; ++i) { @@ -458,16 +458,16 @@ static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { } } - return taosAnalJsonBufWriteStart(pBuf); + return taosAnalyJsonBufWriteStart(pBuf); } -static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { +static int32_t taosAnalyJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { char buf[128] = {0}; bool first = (colIndex == 0); bool last = (colIndex == pBuf->numOfCols - 1); if (first) { - if (taosAnalJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) { + if (taosAnalyJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) { return terrno; } } @@ -479,7 +479,7 @@ static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, } if (last) { - if (taosAnalJsonBufWriteStr(pBuf, "],\n", 0) != 0) { + if (taosAnalyJsonBufWriteStr(pBuf, "],\n", 0) != 0) { return terrno; } } @@ -487,11 +487,11 @@ static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, return 0; } -static int32_t taosAnalJsonBufWriteDataBegin(SAnalyticBuf *pBuf) { - return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0); +static int32_t taosAnalyJsonBufWriteDataBegin(SAnalyticBuf *pBuf) { + return taosAnalyJsonBufWriteStr(pBuf, "\"data\": [\n", 0); } -static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) { +static int32_t taosAnalyJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) { if (bufLen <= 0) { bufLen = strlen(buf); } @@ -511,20 +511,20 @@ static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf return 0; } -static int32_t taosAnalJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { - return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex); +static int32_t taosAnalyJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { + return taosAnalyJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex); } -static int32_t taosAnalJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { +static int32_t taosAnalyJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { if (colIndex == pBuf->numOfCols - 1) { - return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex); + return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex); } else { - return taosAnalJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex); + return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex); } } -static int32_t taosAnalJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { +static int32_t taosAnalyJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { char buf[64]; int32_t bufLen = 0; @@ -575,10 +575,10 @@ static int32_t taosAnalJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, } pBuf->pCols[colIndex].numOfRows++; - return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex); + return taosAnalyJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex); } -static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) { +static int32_t taosAnalyJsonBufWriteDataEnd(SAnalyticBuf *pBuf) { int32_t code = 0; char *pCont = NULL; int64_t contLen = 0; @@ -593,10 +593,10 @@ static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) { code = taosCloseFile(&pCol->filePtr); if (code != 0) return code; - code = taosAnalJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen); + code = taosAnalyJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen); if (code != 0) return code; - code = taosAnalJsonBufWriteStr(pBuf, pCont, contLen); + code = taosAnalyJsonBufWriteStr(pBuf, pCont, contLen); if (code != 0) return code; taosMemoryFreeClear(pCont); @@ -604,18 +604,18 @@ static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) { } } - return taosAnalJsonBufWriteStr(pBuf, "],\n", 0); + return taosAnalyJsonBufWriteStr(pBuf, "],\n", 0); } -static int32_t taosAnalJsonBufWriteEnd(SAnalyticBuf *pBuf) { - int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows); +static int32_t taosAnalyJsonBufWriteEnd(SAnalyticBuf *pBuf) { + int32_t code = taosAnalyJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows); if (code != 0) return code; - return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0); + return taosAnalyJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0); } int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) { - int32_t code = taosAnalJsonBufWriteEnd(pBuf); + int32_t code = taosAnalyJsonBufWriteEnd(pBuf); if (code != 0) return code; if (pBuf->filePtr != NULL) { @@ -640,7 +640,7 @@ int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) { return 0; } -void taosAnalBufDestroy(SAnalyticBuf *pBuf) { +void taosAnalyBufDestroy(SAnalyticBuf *pBuf) { if (pBuf->fileName[0] != 0) { if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr); // taosRemoveFile(pBuf->fileName); @@ -664,7 +664,7 @@ void taosAnalBufDestroy(SAnalyticBuf *pBuf) { pBuf->numOfCols = 0; } -int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { +int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return tsosAnalJsonBufOpen(pBuf, numOfCols); } else { @@ -672,79 +672,79 @@ int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { } } -int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { +int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal); + return taosAnalyJsonBufWriteOptStr(pBuf, optName, optVal); } else { return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { +int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal); + return taosAnalyJsonBufWriteOptInt(pBuf, optName, optVal); } else { return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { +int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal); + return taosAnalyJsonBufWriteOptFloat(pBuf, optName, optVal); } else { return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { +int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName); + return taosAnalyJsonBufWriteColMeta(pBuf, colIndex, colType, colName); } else { return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { +int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteDataBegin(pBuf); + return taosAnalyJsonBufWriteDataBegin(pBuf); } else { return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { +int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColBegin(pBuf, colIndex); + return taosAnalyJsonBufWriteColBegin(pBuf, colIndex); } else { return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { +int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue); + return taosAnalyJsonBufWriteColData(pBuf, colIndex, colType, colValue); } else { return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { +int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColEnd(pBuf, colIndex); + return taosAnalyJsonBufWriteColEnd(pBuf, colIndex); } else { return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { +int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteDataEnd(pBuf); + return taosAnalyJsonBufWriteDataEnd(pBuf); } else { return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { +int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufClose(pBuf); } else { @@ -752,12 +752,12 @@ int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { } } -static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) { +static int32_t taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) { *ppCont = NULL; *pContLen = 0; if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen); + return taosAnalyJsonBufGetCont(pBuf->fileName, ppCont, pContLen); } else { return TSDB_CODE_ANA_BUF_INVALID_TYPE; } @@ -767,28 +767,28 @@ static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pC int32_t taosAnalyticsInit() { return 0; } void taosAnalyticsCleanup() {} -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } +SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } -int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } -bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } -bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; } -int64_t taosAnalGetVersion() { return 0; } -void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {} +int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } +bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } +bool taosAnalyGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; } +int64_t taosAnalyGetVersion() { return 0; } +void taosAnalyUpdate(int64_t newVer, SHashObj *pHash) {} -int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; } -int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; } -int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; } -int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; } -int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { +int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; } +int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; } +int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; } +int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; } +int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; } -int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; } -int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } -int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } -int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } -int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; } -int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; } -void taosAnalBufDestroy(SAnalyticBuf *pBuf) {} +int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; } +int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } +int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } +int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } +int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; } +int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) { return 0; } +void taosAnalyBufDestroy(SAnalyticBuf *pBuf) {} const char *taosAnalysisAlgoType(EAnalAlgoType algoType) { return 0; } EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index b1a910d246..2556b5c936 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -94,7 +94,7 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) { static void dmMayShouldUpdateAnalFunc(SDnodeMgmt *pMgmt, int64_t newVer) { int32_t code = 0; - int64_t oldVer = taosAnalGetVersion(); + int64_t oldVer = taosAnalyGetVersion(); if (oldVer == newVer) return; dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer); @@ -233,7 +233,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.statusSeq = pMgmt->statusSeq; req.ipWhiteVer = pMgmt->pData->ipWhiteVer; - req.analVer = taosAnalGetVersion(); + req.analVer = taosAnalyGetVersion(); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); if (contLen < 0) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 6fefd47a6f..779b324283 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -119,7 +119,7 @@ static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) { static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) { SRetrieveAnalAlgoRsp rsp = {0}; if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) { - taosAnalUpdate(rsp.ver, rsp.hash); + taosAnalyUpdate(rsp.ver, rsp.hash); rsp.hash = NULL; } tFreeRetrieveAnalAlgoRsp(&rsp); diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index 9f5635a74b..c08d4aead4 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -649,13 +649,19 @@ void mndRetrieveAlgoList(SMnode* pMnode, SArray* pFc, SArray* pAd) { break; } - if (pObj->numOfAlgos >= ANAL_ALGO_TYPE_END) { - if (pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT] != NULL) { - taosArrayAddAll(pAd, pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT]); + if (pObj->numOfAlgos >= ANALY_ALGO_TYPE_END) { + if (pObj->algos[ANALY_ALGO_TYPE_ANOMALY_DETECT] != NULL) { + void* p = taosArrayAddAll(pAd, pObj->algos[ANALY_ALGO_TYPE_ANOMALY_DETECT]); + if (p == NULL) { + mError("failed to add retrieved anomaly-detection algorithms, code:%s", tstrerror(terrno)); + } } - if (pObj->algos[ANAL_ALGO_TYPE_FORECAST] != NULL) { - taosArrayAddAll(pFc, pObj->algos[ANAL_ALGO_TYPE_FORECAST]); + if (pObj->algos[ANALY_ALGO_TYPE_FORECAST] != NULL) { + void* p = taosArrayAddAll(pFc, pObj->algos[ANALY_ALGO_TYPE_FORECAST]); + if (p == NULL) { + mError("failed to add retrieved forecast algorithms, code:%s", tstrerror(terrno)); + } } } @@ -738,11 +744,11 @@ static int32_t mndDecodeAlgoList(SJson *pJson, SAnodeObj *pObj) { if (details == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; int32_t numOfDetails = tjsonGetArraySize(details); - pObj->algos = taosMemoryCalloc(ANAL_ALGO_TYPE_END, sizeof(SArray *)); + pObj->algos = taosMemoryCalloc(ANALY_ALGO_TYPE_END, sizeof(SArray *)); if (pObj->algos == NULL) return TSDB_CODE_OUT_OF_MEMORY; - pObj->numOfAlgos = ANAL_ALGO_TYPE_END; - for (int32_t i = 0; i < ANAL_ALGO_TYPE_END; ++i) { + pObj->numOfAlgos = ANALY_ALGO_TYPE_END; + for (int32_t i = 0; i < ANALY_ALGO_TYPE_END; ++i) { pObj->algos[i] = taosArrayInit(4, sizeof(SAnodeAlgo)); if (pObj->algos[i] == NULL) return TSDB_CODE_OUT_OF_MEMORY; } @@ -753,8 +759,8 @@ static int32_t mndDecodeAlgoList(SJson *pJson, SAnodeObj *pObj) { code = tjsonGetStringValue2(detail, "type", buf, sizeof(buf)); if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; - EAnalAlgoType type = taosAnalAlgoInt(buf); - if (type < 0 || type >= ANAL_ALGO_TYPE_END) return TSDB_CODE_MND_ANODE_INVALID_ALGO_TYPE; + EAnalAlgoType type = taosAnalyAlgoInt(buf); + if (type < 0 || type >= ANALY_ALGO_TYPE_END) return TSDB_CODE_MND_ANODE_INVALID_ALGO_TYPE; SJson *algos = tjsonGetObjectItem(detail, "algo"); if (algos == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; @@ -783,7 +789,7 @@ static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj) { char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0}; snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", url, "list"); - SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL); + SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL); if (pJson == NULL) return terrno; int32_t code = mndDecodeAlgoList(pJson, pObj); @@ -799,7 +805,7 @@ static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status, int32_t statusLe char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0}; snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", pObj->url, "status"); - SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL); + SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL); if (pJson == NULL) return terrno; code = tjsonGetDoubleValue(pJson, "protocol", &tmp); @@ -881,7 +887,7 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) { } url.urlLen = 1 + tsnprintf(url.url, TSDB_ANALYTIC_ANODE_URL_LEN + TSDB_ANALYTIC_ALGO_TYPE_LEN, "%s/%s", pAnode->url, - taosAnalAlgoUrlStr(url.type)); + taosAnalyAlgoUrlStr(url.type)); if (taosHashPut(rsp.hash, name, nameLen, &url, sizeof(SAnalyticsUrl)) != 0) { taosMemoryFree(url.url); sdbRelease(pSdb, pAnode); diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 3124fa0b57..46aae38ad4 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -77,13 +77,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p goto _error; } - if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) { + if (!taosAnalyGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) { qError("%s failed to get anomaly_window algorithm name from %s", id, pAnomalyNode->anomalyOpt); code = TSDB_CODE_ANA_ALGO_NOT_FOUND; goto _error; } - if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) { + if (taosAnalyGetAlgoUrl(pInfo->algoName, ANALY_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) { qError("%s failed to get anomaly_window algorithm url from %s", id, pInfo->algoName); code = TSDB_CODE_ANA_ALGO_NOT_LOAD; goto _error; @@ -265,7 +265,7 @@ static void anomalyDestroyOperatorInfo(void* param) { } static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pSrc) { - if (pInfo->anomalySup.cachedRows > ANAL_ANOMALY_WINDOW_MAX_ROWS) { + if (pInfo->anomalySup.cachedRows > ANALY_ANOMALY_WINDOW_MAX_ROWS) { return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS; } @@ -364,35 +364,35 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { SAnomalyWindowOperatorInfo* pInfo = pOperator->info; SAnomalyWindowSupp* pSupp = &pInfo->anomalySup; SJson* pJson = NULL; - SAnalyticBuf analBuf = {.bufType = ANALYTICS_BUF_TYPE_JSON}; + SAnalyticBuf analyBuf = {.bufType = ANALYTICS_BUF_TYPE_JSON}; char dataBuf[64] = {0}; int32_t code = 0; int64_t ts = 0; int32_t lino = 0; const char* pId = GET_TASKID(pOperator->pTaskInfo); - snprintf(analBuf.fileName, sizeof(analBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%" PRId64, tsTempDir, ts, + snprintf(analyBuf.fileName, sizeof(analyBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%" PRId64, tsTempDir, ts, pSupp->groupId); - code = tsosAnalBufOpen(&analBuf, 2); + code = tsosAnalyBufOpen(&analyBuf, 2); QUERY_CHECK_CODE(code, lino, _OVER); const char* prec = TSDB_TIME_PRECISION_MILLI_STR; if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR; if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR; - code = taosAnalBufWriteColMeta(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts"); + code = taosAnalyBufWriteColMeta(&analyBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts"); QUERY_CHECK_CODE(code, lino, _OVER); - code = taosAnalBufWriteColMeta(&analBuf, 1, pInfo->anomalyCol.type, "val"); + code = taosAnalyBufWriteColMeta(&analyBuf, 1, pInfo->anomalyCol.type, "val"); QUERY_CHECK_CODE(code, lino, _OVER); - code = taosAnalBufWriteDataBegin(&analBuf); + code = taosAnalyBufWriteDataBegin(&analyBuf); QUERY_CHECK_CODE(code, lino, _OVER); int32_t numOfBlocks = (int32_t)taosArrayGetSize(pSupp->blocks); // timestamp - code = taosAnalBufWriteColBegin(&analBuf, 0); + code = taosAnalyBufWriteColBegin(&analyBuf, 0); QUERY_CHECK_CODE(code, lino, _OVER); for (int32_t i = 0; i < numOfBlocks; ++i) { @@ -401,16 +401,16 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); if (pTsCol == NULL) break; for (int32_t j = 0; j < pBlock->info.rows; ++j) { - code = taosAnalBufWriteColData(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &((TSKEY*)pTsCol->pData)[j]); + code = taosAnalyBufWriteColData(&analyBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &((TSKEY*)pTsCol->pData)[j]); QUERY_CHECK_CODE(code, lino, _OVER); } } - code = taosAnalBufWriteColEnd(&analBuf, 0); + code = taosAnalyBufWriteColEnd(&analyBuf, 0); QUERY_CHECK_CODE(code, lino, _OVER); // data - code = taosAnalBufWriteColBegin(&analBuf, 1); + code = taosAnalyBufWriteColBegin(&analyBuf, 1); QUERY_CHECK_CODE(code, lino, _OVER); for (int32_t i = 0; i < numOfBlocks; ++i) { @@ -420,38 +420,38 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { if (pValCol == NULL) break; for (int32_t j = 0; j < pBlock->info.rows; ++j) { - code = taosAnalBufWriteColData(&analBuf, 1, pValCol->info.type, colDataGetData(pValCol, j)); + code = taosAnalyBufWriteColData(&analyBuf, 1, pValCol->info.type, colDataGetData(pValCol, j)); QUERY_CHECK_CODE(code, lino, _OVER); } } - code = taosAnalBufWriteColEnd(&analBuf, 1); + code = taosAnalyBufWriteColEnd(&analyBuf, 1); QUERY_CHECK_CODE(code, lino, _OVER); - code = taosAnalBufWriteDataEnd(&analBuf); + code = taosAnalyBufWriteDataEnd(&analyBuf); QUERY_CHECK_CODE(code, lino, _OVER); - code = taosAnalBufWriteOptStr(&analBuf, "option", pInfo->anomalyOpt); + code = taosAnalyBufWriteOptStr(&analyBuf, "option", pInfo->anomalyOpt); QUERY_CHECK_CODE(code, lino, _OVER); - code = taosAnalBufWriteOptStr(&analBuf, "algo", pInfo->algoName); + code = taosAnalyBufWriteOptStr(&analyBuf, "algo", pInfo->algoName); QUERY_CHECK_CODE(code, lino, _OVER); - code = taosAnalBufWriteOptStr(&analBuf, "prec", prec); + code = taosAnalyBufWriteOptStr(&analyBuf, "prec", prec); QUERY_CHECK_CODE(code, lino, _OVER); - int64_t wncheck = ANAL_FORECAST_DEFAULT_WNCHECK; - bool hasWncheck = taosAnalGetOptInt(pInfo->anomalyOpt, "wncheck", &wncheck); + int64_t wncheck = ANALY_FORECAST_DEFAULT_WNCHECK; + bool hasWncheck = taosAnalyGetOptInt(pInfo->anomalyOpt, "wncheck", &wncheck); if (!hasWncheck) { qDebug("anomaly_window wncheck not found from %s, use default:%" PRId64, pInfo->anomalyOpt, wncheck); } - code = taosAnalBufWriteOptInt(&analBuf, "wncheck", wncheck); + code = taosAnalyBufWriteOptInt(&analyBuf, "wncheck", wncheck); QUERY_CHECK_CODE(code, lino, _OVER); - code = taosAnalBufClose(&analBuf); + code = taosAnalyBufClose(&analyBuf); QUERY_CHECK_CODE(code, lino, _OVER); - pJson = taosAnalSendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf); + pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analyBuf); if (pJson == NULL) { code = terrno; goto _OVER; @@ -464,7 +464,7 @@ _OVER: qError("%s failed to analysis window since %s, lino:%d", pId, tstrerror(code), lino); } - taosAnalBufDestroy(&analBuf); + taosAnalyBufDestroy(&analyBuf); if (pJson != NULL) tjsonDelete(pJson); return code; } diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 02b122830c..e318530352 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -46,7 +46,7 @@ typedef struct { int16_t inputValSlot; int8_t inputValType; int8_t inputPrecision; - SAnalyticBuf analBuf; + SAnalyticBuf analyBuf; } SForecastSupp; typedef struct SForecastOperatorInfo { @@ -74,12 +74,12 @@ static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - SAnalyticBuf* pBuf = &pSupp->analBuf; + SAnalyticBuf* pBuf = &pSupp->analyBuf; - if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) { + if (pSupp->cachedRows > ANALY_FORECAST_MAX_HISTORY_ROWS) { code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS; qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows, - tstrerror(code), ANAL_FORECAST_MAX_ROWS); + tstrerror(code), ANALY_FORECAST_MAX_HISTORY_ROWS); return code; } @@ -99,13 +99,13 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, con pSupp->maxTs = MAX(pSupp->maxTs, ts); pSupp->numOfRows++; - code = taosAnalBufWriteColData(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &ts); + code = taosAnalyBufWriteColData(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &ts); if (TSDB_CODE_SUCCESS != code) { qError("%s failed to write ts in buf, code:%s", id, tstrerror(code)); return code; } - code = taosAnalBufWriteColData(pBuf, 1, valType, val); + code = taosAnalyBufWriteColData(pBuf, 1, valType, val); if (TSDB_CODE_SUCCESS != code) { qError("%s failed to write val in buf, code:%s", id, tstrerror(code)); return code; @@ -115,81 +115,88 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, con return 0; } -static int32_t forecastCloseBuf(SForecastSupp* pSupp) { - SAnalyticBuf* pBuf = &pSupp->analBuf; +static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) { + SAnalyticBuf* pBuf = &pSupp->analyBuf; int32_t code = 0; for (int32_t i = 0; i < 2; ++i) { - code = taosAnalBufWriteColEnd(pBuf, i); + code = taosAnalyBufWriteColEnd(pBuf, i); if (code != 0) return code; } - code = taosAnalBufWriteDataEnd(pBuf); + code = taosAnalyBufWriteDataEnd(pBuf); if (code != 0) return code; - code = taosAnalBufWriteOptStr(pBuf, "option", pSupp->algoOpt); + code = taosAnalyBufWriteOptStr(pBuf, "option", pSupp->algoOpt); if (code != 0) return code; - code = taosAnalBufWriteOptStr(pBuf, "algo", pSupp->algoName); + code = taosAnalyBufWriteOptStr(pBuf, "algo", pSupp->algoName); if (code != 0) return code; const char* prec = TSDB_TIME_PRECISION_MILLI_STR; if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR; if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR; - code = taosAnalBufWriteOptStr(pBuf, "prec", prec); + code = taosAnalyBufWriteOptStr(pBuf, "prec", prec); if (code != 0) return code; - int64_t wncheck = ANAL_FORECAST_DEFAULT_WNCHECK; - bool hasWncheck = taosAnalGetOptInt(pSupp->algoOpt, "wncheck", &wncheck); + int64_t wncheck = ANALY_FORECAST_DEFAULT_WNCHECK; + bool hasWncheck = taosAnalyGetOptInt(pSupp->algoOpt, "wncheck", &wncheck); if (!hasWncheck) { - qDebug("forecast wncheck not found from %s, use default:%" PRId64, pSupp->algoOpt, wncheck); + qDebug("%s forecast wncheck not found from %s, use default:%" PRId64, id, pSupp->algoOpt, wncheck); } - code = taosAnalBufWriteOptInt(pBuf, "wncheck", wncheck); + code = taosAnalyBufWriteOptInt(pBuf, "wncheck", wncheck); if (code != 0) return code; bool noConf = (pSupp->resHighSlot == -1 && pSupp->resLowSlot == -1); - code = taosAnalBufWriteOptInt(pBuf, "return_conf", !noConf); + code = taosAnalyBufWriteOptInt(pBuf, "return_conf", !noConf); if (code != 0) return code; - pSupp->optRows = ANAL_FORECAST_DEFAULT_ROWS; - bool hasRows = taosAnalGetOptInt(pSupp->algoOpt, "rows", &pSupp->optRows); + pSupp->optRows = ANALY_FORECAST_DEFAULT_ROWS; + bool hasRows = taosAnalyGetOptInt(pSupp->algoOpt, "rows", &pSupp->optRows); if (!hasRows) { - qDebug("forecast rows not found from %s, use default:%" PRId64, pSupp->algoOpt, pSupp->optRows); + qDebug("%s forecast rows not found from %s, use default:%" PRId64, id, pSupp->algoOpt, pSupp->optRows); } - code = taosAnalBufWriteOptInt(pBuf, "forecast_rows", pSupp->optRows); + + if (pSupp->optRows > ANALY_MAX_FC_ROWS) { + qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_MAX_FC_ROWS, + pSupp->optRows); + return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS; + } + + code = taosAnalyBufWriteOptInt(pBuf, "forecast_rows", pSupp->optRows); if (code != 0) return code; - int64_t conf = ANAL_FORECAST_DEFAULT_CONF; - bool hasConf = taosAnalGetOptInt(pSupp->algoOpt, "conf", &conf); + int64_t conf = ANALY_FORECAST_DEFAULT_CONF; + bool hasConf = taosAnalyGetOptInt(pSupp->algoOpt, "conf", &conf); if (!hasConf) { - qDebug("forecast conf not found from %s, use default:%" PRId64, pSupp->algoOpt, conf); + qDebug("%s forecast conf not found from %s, use default:%" PRId64, id, pSupp->algoOpt, conf); } - code = taosAnalBufWriteOptInt(pBuf, "conf", conf); + code = taosAnalyBufWriteOptInt(pBuf, "conf", conf); if (code != 0) return code; int32_t len = strlen(pSupp->algoOpt); int64_t every = (pSupp->maxTs - pSupp->minTs) / (pSupp->numOfRows - 1); int64_t start = pSupp->maxTs + every; - bool hasStart = taosAnalGetOptInt(pSupp->algoOpt, "start", &start); + bool hasStart = taosAnalyGetOptInt(pSupp->algoOpt, "start", &start); if (!hasStart) { - qDebug("forecast start not found from %s, use %" PRId64, pSupp->algoOpt, start); + qDebug("%s forecast start not found from %s, use %" PRId64, id, pSupp->algoOpt, start); } - code = taosAnalBufWriteOptInt(pBuf, "start", start); + code = taosAnalyBufWriteOptInt(pBuf, "start", start); if (code != 0) return code; - bool hasEvery = taosAnalGetOptInt(pSupp->algoOpt, "every", &every); + bool hasEvery = taosAnalyGetOptInt(pSupp->algoOpt, "every", &every); if (!hasEvery) { - qDebug("forecast every not found from %s, use %" PRId64, pSupp->algoOpt, every); + qDebug("%s forecast every not found from %s, use %" PRId64, id, pSupp->algoOpt, every); } - code = taosAnalBufWriteOptInt(pBuf, "every", every); + code = taosAnalyBufWriteOptInt(pBuf, "every", every); if (code != 0) return code; - code = taosAnalBufClose(pBuf); + code = taosAnalyBufClose(pBuf); return code; } static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) { - SAnalyticBuf* pBuf = &pSupp->analBuf; + SAnalyticBuf* pBuf = &pSupp->analyBuf; int32_t resCurRow = pBlock->info.rows; int8_t tmpI8; int16_t tmpI16; @@ -209,7 +216,7 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const SColumnInfoData* pResHighCol = (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL); - SJson* pJson = taosAnalSendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf); + SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf); if (pJson == NULL) { return terrno; } @@ -356,9 +363,9 @@ _OVER: static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - SAnalyticBuf* pBuf = &pSupp->analBuf; + SAnalyticBuf* pBuf = &pSupp->analyBuf; - code = forecastCloseBuf(pSupp); + code = forecastCloseBuf(pSupp, pId); QUERY_CHECK_CODE(code, lino, _end); code = forecastEnsureBlockCapacity(pResBlock, 1); @@ -371,7 +378,7 @@ static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBl _end: pSupp->numOfBlocks = 0; - taosAnalBufDestroy(&pSupp->analBuf); + taosAnalyBufDestroy(&pSupp->analyBuf); return code; } @@ -382,7 +389,7 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SForecastOperatorInfo* pInfo = pOperator->info; SSDataBlock* pResBlock = pInfo->pRes; SForecastSupp* pSupp = &pInfo->forecastSupp; - SAnalyticBuf* pBuf = &pSupp->analBuf; + SAnalyticBuf* pBuf = &pSupp->analyBuf; int64_t st = taosGetTimestampUs(); int32_t numOfBlocks = pSupp->numOfBlocks; const char* pId = GET_TASKID(pOperator->pTaskInfo); @@ -525,12 +532,12 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp) { pSupp->minTs = INT64_MAX; pSupp->numOfRows = 0; - if (!taosAnalGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) { + if (!taosAnalyGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) { qError("failed to get forecast algorithm name from %s", pSupp->algoOpt); return TSDB_CODE_ANA_ALGO_NOT_FOUND; } - if (taosAnalGetAlgoUrl(pSupp->algoName, ANAL_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) { + if (taosAnalyGetAlgoUrl(pSupp->algoName, ANALY_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) { qError("failed to get forecast algorithm url from %s", pSupp->algoName); return TSDB_CODE_ANA_ALGO_NOT_LOAD; } @@ -539,32 +546,32 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp) { } static int32_t forecastCreateBuf(SForecastSupp* pSupp) { - SAnalyticBuf* pBuf = &pSupp->analBuf; + SAnalyticBuf* pBuf = &pSupp->analyBuf; int64_t ts = 0; // taosGetTimestampMs(); pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL; snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts); - int32_t code = tsosAnalBufOpen(pBuf, 2); + int32_t code = tsosAnalyBufOpen(pBuf, 2); if (code != 0) goto _OVER; - code = taosAnalBufWriteColMeta(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts"); + code = taosAnalyBufWriteColMeta(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts"); if (code != 0) goto _OVER; - code = taosAnalBufWriteColMeta(pBuf, 1, pSupp->inputValType, "val"); + code = taosAnalyBufWriteColMeta(pBuf, 1, pSupp->inputValType, "val"); if (code != 0) goto _OVER; - code = taosAnalBufWriteDataBegin(pBuf); + code = taosAnalyBufWriteDataBegin(pBuf); if (code != 0) goto _OVER; for (int32_t i = 0; i < 2; ++i) { - code = taosAnalBufWriteColBegin(pBuf, i); + code = taosAnalyBufWriteColBegin(pBuf, i); if (code != 0) goto _OVER; } _OVER: if (code != 0) { - (void)taosAnalBufClose(pBuf); - taosAnalBufDestroy(pBuf); + (void)taosAnalyBufClose(pBuf); + taosAnalyBufDestroy(pBuf); } return code; } @@ -656,7 +663,7 @@ static void destroyForecastInfo(void* param) { blockDataDestroy(pInfo->pRes); pInfo->pRes = NULL; cleanupExprSupp(&pInfo->scalarSup); - taosAnalBufDestroy(&pInfo->forecastSupp.analBuf); + taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf); taosMemoryFreeClear(param); } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index b42d739b40..180f83dbeb 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1223,7 +1223,7 @@ static int32_t translateForecast(SFunctionNode* pFunc, char* pErrBuf, int32_t le } SValueNode* pValue = (SValueNode*)pOption; - if (!taosAnalGetOptStr(pValue->literal, "algo", NULL, 0) != 0) { + if (!taosAnalyGetOptStr(pValue->literal, "algo", NULL, 0) != 0) { return invaildFuncParaValueErrMsg(pErrBuf, len, "FORECAST option should include algo field"); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3511636bd9..c5afb9e9f3 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6110,7 +6110,7 @@ static int32_t translateAnomalyWindow(STranslateContext* pCxt, SSelectStmt* pSel SAnomalyWindowNode* pAnomaly = (SAnomalyWindowNode*)pSelect->pWindow; int32_t code = checkAnomalyExpr(pCxt, pAnomaly->pExpr); if (TSDB_CODE_SUCCESS == code) { - if (!taosAnalGetOptStr(pAnomaly->anomalyOpt, "algo", NULL, 0) != 0) { + if (!taosAnalyGetOptStr(pAnomaly->anomalyOpt, "algo", NULL, 0) != 0) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT, "ANOMALY_WINDOW option should include algo field"); } diff --git a/tests/script/tsim/analytics/basic0.sim b/tests/script/tsim/analytics/basic0.sim index 3ac49b1fc3..999b2fff37 100644 --- a/tests/script/tsim/analytics/basic0.sim +++ b/tests/script/tsim/analytics/basic0.sim @@ -69,6 +69,9 @@ if $data00 != 1 then return -1 endi +print ================= too many rows error +sql_error select forecast(c6, 'algo=holtwinters, rows=1025') from ct1; + print ================= try every loaded anomaly detection algorithm sql select count(*) from ct1 anomaly_window(c1, 'algo=iqr'); sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma');