From adbea71230d7184beb0ac0ba4b345fac663b8422 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Oct 2024 16:00:07 +0800 Subject: [PATCH] refact: adjust forecast paras such as wncheck --- include/common/tanal.h | 10 +- include/util/taoserror.h | 1 + include/util/tlog.h | 4 +- source/dnode/mnode/impl/src/mndAnode.c | 2 + .../libs/executor/src/anomalywindowoperator.c | 20 +++- source/libs/executor/src/forecastoperator.c | 107 ++++++++---------- source/util/src/tanal.c | 18 +-- source/util/src/terror.c | 1 + 8 files changed, 81 insertions(+), 82 deletions(-) diff --git a/include/common/tanal.h b/include/common/tanal.h index 59a28ddbe3..431d9ac89e 100644 --- a/include/common/tanal.h +++ b/include/common/tanal.h @@ -25,11 +25,9 @@ extern "C" { #endif -#define ANAL_FORECAST_DEFAULT_PERIOD 10 -#define ANAL_FORECAST_DEFAULT_ROWS 10 -#define ANAL_FORECAST_DEFAULT_CONF 95 -#define ANAL_FORECAST_DEFAULT_ALPHA 0.05 -#define ANAL_FORECAST_DEFAULT_PARAM "diff" +#define ANAL_FORECAST_DEFAULT_ROWS 10 +#define ANAL_FORECAST_DEFAULT_CONF 95 +#define ANAL_FORECAST_DEFAULT_WNCHECK 1 typedef struct { EAnalAlgoType type; @@ -69,7 +67,7 @@ SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pB int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen); bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen); -bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue); +bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue); int64_t taosAnalGetVersion(); void taosAnalUpdate(int64_t newVer, SHashObj *pHash); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2620342b22..b1cf573479 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -493,6 +493,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_ANAL_ALGO_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0442) #define TSDB_CODE_ANAL_ALGO_NOT_LOAD TAOS_DEF_ERROR_CODE(0, 0x0443) #define TSDB_CODE_ANAL_BUF_INVALID_TYPE TAOS_DEF_ERROR_CODE(0, 0x0444) +#define TSDB_CODE_ANAL_ANODE_RETURN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0445) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) diff --git a/include/util/tlog.h b/include/util/tlog.h index e80e94de32..6b0270523e 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -123,8 +123,8 @@ void taosReleaseCrashLogFile(TdFilePtr pFile, bool truncateFile); #define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", DEBUG_INFO, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} #define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", DEBUG_DEBUG, uDebugFlag, __VA_ARGS__); }} #define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", DEBUG_TRACE, uDebugFlag, __VA_ARGS__); }} -#define uDebugL(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLongString("UTL ", DEBUG_DEBUG, uDebugFlag, __VA_ARGS__); }} -#define uInfoL(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLongString("UTL ", DEBUG_INFO, uDebugFlag, __VA_ARGS__); }} +#define uDebugL(...){ if (uDebugFlag & DEBUG_DEBUG) { taosPrintLongString("UTL ", DEBUG_DEBUG, uDebugFlag, __VA_ARGS__); }} +#define uInfoL(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLongString("UTL ", DEBUG_INFO, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} #define pError(...) { taosPrintLog("APP ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); } #define pPrint(...) { taosPrintLog("APP ", DEBUG_INFO, 255, __VA_ARGS__); } diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index 469e16c7ea..17e3e84c81 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -702,8 +702,10 @@ static int32_t mndDecodeAlgoList(SJson *pJson, SAnodeObj *pObj) { code = tjsonGetDoubleValue(pJson, "version", &tmp); pObj->version = (int32_t)(tmp * 1000); +#if 0 if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; if (pObj->version <= 0) return TSDB_CODE_MND_ANODE_INVALID_VERSION; +#endif SJson *details = tjsonGetObjectItem(pJson, "details"); if (details == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 25d3dbd423..64ad872543 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -342,12 +342,6 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR; if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR; - code = taosAnalBufWriteOptStr(&analBuf, "algo", pInfo->algoName); - if (code != 0) goto _OVER; - - code = taosAnalBufWriteOptStr(&analBuf, "prec", prec); - if (code != 0) goto _OVER; - code = taosAnalBufWriteColMeta(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts"); if (code != 0) goto _OVER; @@ -399,6 +393,20 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { code = taosAnalBufWriteOptStr(&analBuf, "option", pInfo->anomalyOpt); if (code != 0) goto _OVER; + code = taosAnalBufWriteOptStr(&analBuf, "algo", pInfo->algoName); + if (code != 0) goto _OVER; + + code = taosAnalBufWriteOptStr(&analBuf, "prec", prec); + if (code != 0) goto _OVER; + + int64_t wncheck = ANAL_FORECAST_DEFAULT_WNCHECK; + bool hasWncheck = taosAnalGetOptInt(pInfo->anomalyOpt, "wncheck", &wncheck); + if (!hasWncheck) { + qDebug("anomaly_window wncheck not found from %s, use default:%" PRId64, pInfo->anomalyOpt, wncheck); + } + code = taosAnalBufWriteOptInt(&analBuf, "wncheck", wncheck); + if (code != 0) goto _OVER; + code = taosAnalBufClose(&analBuf); if (code != 0) goto _OVER; diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 0c800eeab5..243330a151 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -36,8 +36,8 @@ typedef struct { int64_t minTs; int64_t numOfRows; uint64_t groupId; + int64_t optRows; int32_t numOfBlocks; - int32_t optRows; int16_t resTsSlot; int16_t resValSlot; int16_t resLowSlot; @@ -114,24 +114,61 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp) { code = taosAnalBufWriteDataEnd(pBuf); if (code != 0) return code; + code = taosAnalBufWriteOptStr(pBuf, "option", pSupp->algoOpt); + if (code != 0) return code; + + code = taosAnalBufWriteOptStr(pBuf, "algo", pSupp->algoName); + if (code != 0) return code; + + const char* prec = TSDB_TIME_PRECISION_MILLI_STR; + if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR; + if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR; + code = taosAnalBufWriteOptStr(pBuf, "prec", prec); + if (code != 0) return code; + + int64_t wncheck = ANAL_FORECAST_DEFAULT_WNCHECK; + bool hasWncheck = taosAnalGetOptInt(pSupp->algoOpt, "wncheck", &wncheck); + if (!hasWncheck) { + qDebug("forecast wncheck not found from %s, use default:%" PRId64, pSupp->algoOpt, wncheck); + } + code = taosAnalBufWriteOptInt(pBuf, "wncheck", wncheck); + if (code != 0) return code; + + bool noConf = (pSupp->resHighSlot == -1 && pSupp->resLowSlot == -1); + code = taosAnalBufWriteOptInt(pBuf, "return_conf", !noConf); + if (code != 0) return code; + + pSupp->optRows = ANAL_FORECAST_DEFAULT_ROWS; + bool hasRows = taosAnalGetOptInt(pSupp->algoOpt, "rows", &pSupp->optRows); + if (!hasRows) { + qDebug("forecast rows not found from %s, use default:%" PRId64, pSupp->algoOpt, pSupp->optRows); + } + code = taosAnalBufWriteOptInt(pBuf, "forecast_rows", pSupp->optRows); + if (code != 0) return code; + + int64_t conf = ANAL_FORECAST_DEFAULT_CONF; + bool hasConf = taosAnalGetOptInt(pSupp->algoOpt, "conf", &conf); + if (!hasConf) { + qDebug("forecast conf not found from %s, use default:%" PRId64, pSupp->algoOpt, conf); + } + code = taosAnalBufWriteOptInt(pBuf, "conf", conf); + if (code != 0) return code; + int32_t len = strlen(pSupp->algoOpt); int64_t every = (pSupp->maxTs - pSupp->minTs) / (pSupp->numOfRows + 1); int64_t start = pSupp->maxTs + every; - bool hasStart = taosAnalGetOptStr(pSupp->algoOpt, "start", NULL, 0); + bool hasStart = taosAnalGetOptInt(pSupp->algoOpt, "start", &start); if (!hasStart) { qDebug("forecast start not found from %s, use %" PRId64, pSupp->algoOpt, start); - code = taosAnalBufWriteOptInt(pBuf, "start", start); - if (code != 0) return code; } + code = taosAnalBufWriteOptInt(pBuf, "start", start); + if (code != 0) return code; - bool hasEvery = taosAnalGetOptStr(pSupp->algoOpt, "every", NULL, 0); + bool hasEvery = taosAnalGetOptInt(pSupp->algoOpt, "every", &every); if (!hasEvery) { qDebug("forecast every not found from %s, use %" PRId64, pSupp->algoOpt, every); - code = taosAnalBufWriteOptInt(pBuf, "every", every); - if (code != 0) return code; } - - code = taosAnalBufWriteOptStr(pBuf, "option", pSupp->algoOpt); + code = taosAnalBufWriteOptInt(pBuf, "every", every); if (code != 0) return code; code = taosAnalBufClose(pBuf); @@ -490,58 +527,6 @@ static int32_t forecastCreateBuf(SForecastSupp* pSupp) { int32_t code = tsosAnalBufOpen(pBuf, 2); if (code != 0) goto _OVER; - code = taosAnalBufWriteOptStr(pBuf, "algo", pSupp->algoName); - if (code != 0) goto _OVER; - - bool returnConf = (pSupp->resHighSlot == -1 || pSupp->resLowSlot == -1); - code = taosAnalBufWriteOptStr(pBuf, "return_conf", returnConf ? "true" : "false"); - if (code != 0) goto _OVER; - - bool hasAlpha = taosAnalGetOptStr(pSupp->algoOpt, "alpha", NULL, 0); - if (!hasAlpha) { - qDebug("forecast alpha not found from %s, use default:%f", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_ALPHA); - code = taosAnalBufWriteOptFloat(pBuf, "alpha", ANAL_FORECAST_DEFAULT_ALPHA); - if (code != 0) goto _OVER; - } - - char tmpOpt[32] = {0}; - bool hasParam = taosAnalGetOptStr(pSupp->algoOpt, "param", tmpOpt, sizeof(tmpOpt)); - if (!hasParam) { - qDebug("forecast param not found from %s, use default:%s", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_PARAM); - code = taosAnalBufWriteOptStr(pBuf, "param", ANAL_FORECAST_DEFAULT_PARAM); - if (code != 0) goto _OVER; - } - - bool hasPeriod = taosAnalGetOptInt(pSupp->algoOpt, "period", NULL); - if (!hasPeriod) { - qDebug("forecast period not found from %s, use default:%d", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_PERIOD); - code = taosAnalBufWriteOptInt(pBuf, "period", ANAL_FORECAST_DEFAULT_PERIOD); - if (code != 0) goto _OVER; - } - - bool hasRows = taosAnalGetOptInt(pSupp->algoOpt, "rows", &pSupp->optRows); - if (!hasRows) { - pSupp->optRows = ANAL_FORECAST_DEFAULT_ROWS; - qDebug("forecast rows not found from %s, use default:%d", pSupp->algoOpt, pSupp->optRows); - code = taosAnalBufWriteOptInt(pBuf, "forecast_rows", pSupp->optRows); - if (code != 0) goto _OVER; - } - - const char* prec = TSDB_TIME_PRECISION_MILLI_STR; - if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR; - if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR; - code = taosAnalBufWriteOptStr(pBuf, "prec", prec); - if (code != 0) goto _OVER; - - if (returnConf) { - bool hasConf = taosAnalGetOptStr(pSupp->algoOpt, "conf", NULL, 0); - if (!hasConf) { - qDebug("forecast conf not found from %s, use default:%d", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_CONF); - code = taosAnalBufWriteOptInt(pBuf, "conf", ANAL_FORECAST_DEFAULT_CONF); - if (code != 0) goto _OVER; - } - } - code = taosAnalBufWriteColMeta(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts"); if (code != 0) goto _OVER; diff --git a/source/util/src/tanal.c b/source/util/src/tanal.c index 34be2af3e7..92eee28ba8 100644 --- a/source/util/src/tanal.c +++ b/source/util/src/tanal.c @@ -137,7 +137,7 @@ bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, if (optMaxLen > 0) { int32_t copyLen = optMaxLen; if (pos2 != NULL) { - copyLen = (int32_t)(pos2 - pos1 - strlen(optName) + 1); + copyLen = (int32_t)(pos2 - pos1 - strlen(optName)); copyLen = MIN(copyLen, optMaxLen); } tstrncpy(optValue, pos1 + bufLen, copyLen); @@ -148,14 +148,14 @@ bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, } } -bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue) { +bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0}; int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); char *pos1 = strstr(option, buf); char *pos2 = strstr(option, ANAL_ALGO_SPLIT); if (pos1 != NULL) { - *optValue = taosStr2Int32(pos1 + bufLen + 1, NULL, 10); + *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10); return true; } else { return false; @@ -204,7 +204,7 @@ static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void if (pRsp->data != NULL) { (void)memcpy(pRsp->data, pCont, pRsp->dataLen); pRsp->data[pRsp->dataLen] = 0; - uDebug("curl response is received, len:%" PRId64 ", content:%s", pRsp->dataLen, pRsp->data); + uDebugL("curl response is received, len:%" PRId64 ", content:%s", pRsp->dataLen, pRsp->data); return pRsp->dataLen; } else { pRsp->dataLen = 0; @@ -260,7 +260,7 @@ static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER; - uDebug("curl post request will sent, url:%s len:%d", url, bufLen); + uDebugL("curl post request will sent, url:%s len:%d content:%s", url, bufLen, buf); code = curl_easy_perform(curl); if (code != CURLE_OK) { uError("failed to perform curl action, code:%d", code); @@ -305,7 +305,11 @@ SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBu pJson = tjsonParse(curlRsp.data); if (pJson == NULL) { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; + if (curlRsp.data[0] == '<') { + terrno = TSDB_CODE_ANAL_ANODE_RETURN_ERROR; + } else { + terrno = TSDB_CODE_INVALID_JSON_FORMAT; + } goto _OVER; } @@ -728,7 +732,7 @@ SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pB int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } -bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue) { return true; } +bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; } int64_t taosAnalGetVersion() { return 0; } void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {} diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ad43426efa..933ef908fa 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -360,6 +360,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis url can't ac TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm not found") TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded") TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ANODE_RETURN_ERROR, "Analysis failed since anode return error") // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")