From 19f09d679d31b818c47354d549562328024909cb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Mar 2025 23:05:14 +0800 Subject: [PATCH 01/10] feat(stream): add timeout parameter. --- include/common/tanalytics.h | 5 ++- source/common/src/tanalytics.c | 10 ++--- source/dnode/mnode/impl/src/mndAnode.c | 4 +- .../libs/executor/src/anomalywindowoperator.c | 17 ++++++- source/libs/executor/src/forecastoperator.c | 45 +++++++++++++------ 5 files changed, 57 insertions(+), 24 deletions(-) diff --git a/include/common/tanalytics.h b/include/common/tanalytics.h index 344093245b..bcab4490cb 100644 --- a/include/common/tanalytics.h +++ b/include/common/tanalytics.h @@ -30,6 +30,7 @@ extern "C" { #define ANAL_FORECAST_DEFAULT_WNCHECK 1 #define ANAL_FORECAST_MAX_ROWS 40000 #define ANAL_ANOMALY_WINDOW_MAX_ROWS 40000 +#define ANALY_FC_DEFAULT_TIMEOUT 60000 typedef struct { EAnalAlgoType type; @@ -47,7 +48,7 @@ typedef enum { typedef enum { ANALYTICS_HTTP_TYPE_GET = 0, ANALYTICS_HTTP_TYPE_POST, -} EAnalHttpType; +} EAnalyHttpType; typedef struct { TdFilePtr filePtr; @@ -65,7 +66,7 @@ typedef struct { int32_t taosAnalyticsInit(); void taosAnalyticsCleanup(); -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf); +SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout); int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen); bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen); diff --git a/source/common/src/tanalytics.c b/source/common/src/tanalytics.c index 0ed67eed0a..60c809b265 100644 --- a/source/common/src/tanalytics.c +++ b/source/common/src/tanalytics.c @@ -276,7 +276,7 @@ _OVER: return code; } -static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) { +static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout) { struct curl_slist *headers = NULL; CURL *curl = NULL; CURLcode code = 0; @@ -292,7 +292,7 @@ static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER; @@ -311,7 +311,7 @@ _OVER: return code; } -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { +SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) { int32_t code = -1; char *pCont = NULL; int64_t contentLen; @@ -329,7 +329,7 @@ SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf terrno = code; goto _OVER; } - if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) { + if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen, timeout) != 0) { terrno = TSDB_CODE_ANA_URL_CANT_ACCESS; goto _OVER; } @@ -767,7 +767,7 @@ static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pC int32_t taosAnalyticsInit() { return 0; } void taosAnalyticsCleanup() {} -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } +SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index 9f5635a74b..89d116d52d 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -783,7 +783,7 @@ static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj) { char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0}; snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", url, "list"); - SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL); + SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL, 0); if (pJson == NULL) return terrno; int32_t code = mndDecodeAlgoList(pJson, pObj); @@ -799,7 +799,7 @@ static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status, int32_t statusLe char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0}; snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", pObj->url, "status"); - SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL); + SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL, 0); if (pJson == NULL) return terrno; code = tjsonGetDoubleValue(pJson, "protocol", &tmp); diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 3124fa0b57..8af8e2434b 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -47,6 +47,7 @@ typedef struct { char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN]; char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN]; char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN]; + int64_t timeout; SAnomalyWindowSupp anomalySup; SWindowRowsSup anomalyWinRowSup; SColumn anomalyCol; @@ -89,6 +90,20 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p goto _error; } + bool hasTimeout = taosAnalGetOptInt(pAnomalyNode->anomalyOpt, "timeout", &pInfo->timeout); + if (!hasTimeout) { + qDebug("not set the timeout val, set default:%d", ANALY_FC_DEFAULT_TIMEOUT); + pInfo->timeout = ANALY_FC_DEFAULT_TIMEOUT; + } else { + if (pInfo->timeout <= 500 || pInfo->timeout > 600*1000) { + qDebug("timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%" PRId64 "ms", + pInfo->timeout, ANALY_FC_DEFAULT_TIMEOUT); + pInfo->timeout = ANALY_FC_DEFAULT_TIMEOUT; + } else { + qDebug("timeout val is set to: %d" PRId64 "ms", pInfo->timeout); + } + } + pOperator->exprSupp.hasWindowOrGroup = true; pInfo->tsSlotId = ((SColumnNode*)pAnomalyNode->window.pTspk)->slotId; tstrncpy(pInfo->anomalyOpt, pAnomalyNode->anomalyOpt, sizeof(pInfo->anomalyOpt)); @@ -451,7 +466,7 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { code = taosAnalBufClose(&analBuf); QUERY_CHECK_CODE(code, lino, _OVER); - pJson = taosAnalSendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf); + pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf, pInfo->timeout); if (pJson == NULL) { code = terrno; goto _OVER; diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 02b122830c..0fbe3948f3 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -38,6 +38,7 @@ typedef struct { int64_t optRows; int64_t cachedRows; int32_t numOfBlocks; + int64_t timeout; int16_t resTsSlot; int16_t resValSlot; int16_t resLowSlot; @@ -191,12 +192,12 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp) { static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) { SAnalyticBuf* pBuf = &pSupp->analBuf; int32_t resCurRow = pBlock->info.rows; - int8_t tmpI8; - int16_t tmpI16; - int32_t tmpI32; - int64_t tmpI64; - float tmpFloat; - double tmpDouble; + int8_t tmpI8 = 0; + int16_t tmpI16 = 0; + int32_t tmpI32 = 0; + int64_t tmpI64 = 0; + float tmpFloat = 0; + double tmpDouble = 0; int32_t code = 0; SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot); @@ -204,12 +205,13 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const return terrno; } - SColumnInfoData* pResTsCol = (pSupp->resTsSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL); - SColumnInfoData* pResLowCol = (pSupp->resLowSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL); + SColumnInfoData* pResTsCol = ((pSupp->resTsSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL); + SColumnInfoData* pResLowCol = + ((pSupp->resLowSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL); SColumnInfoData* pResHighCol = (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL); - SJson* pJson = taosAnalSendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf); + SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout); if (pJson == NULL) { return terrno; } @@ -520,18 +522,32 @@ static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs) { return 0; } -static int32_t forecastParseAlgo(SForecastSupp* pSupp) { +static int32_t forecastParseAlgo(SForecastSupp* pSupp, const char* id) { pSupp->maxTs = 0; pSupp->minTs = INT64_MAX; pSupp->numOfRows = 0; if (!taosAnalGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) { - qError("failed to get forecast algorithm name from %s", pSupp->algoOpt); + qError("%s failed to get forecast algorithm name from %s", id, pSupp->algoOpt); return TSDB_CODE_ANA_ALGO_NOT_FOUND; } + bool hasTimeout = taosAnalGetOptInt(pSupp->algoOpt, "timeout", &pSupp->timeout); + if (!hasTimeout) { + qDebug("%s not set the timeout val, set default:%d", id, ANALY_FC_DEFAULT_TIMEOUT); + pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT; + } else { + if (pSupp->timeout <= 500 || pSupp->timeout > 600*1000) { + qDebug("%s timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%" PRId64 "ms", + id, pSupp->timeout, ANALY_FC_DEFAULT_TIMEOUT); + pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT; + } else { + qDebug("%s timeout val is set to: %d" PRId64 "ms", id, pSupp->timeout); + } + } + if (taosAnalGetAlgoUrl(pSupp->algoName, ANAL_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) { - qError("failed to get forecast algorithm url from %s", pSupp->algoName); + qError("%s failed to get forecast algorithm url from %s", id, pSupp->algoName); return TSDB_CODE_ANA_ALGO_NOT_LOAD; } @@ -582,6 +598,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo goto _error; } + const char* pId = pTaskInfo->id.str; SForecastSupp* pSupp = &pInfo->forecastSupp; SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode; SExprSupp* pExprSup = &pOperator->exprSupp; @@ -613,7 +630,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo code = forecastParseOutput(pSupp, pExprSup); QUERY_CHECK_CODE(code, lino, _error); - code = forecastParseAlgo(pSupp); + code = forecastParseAlgo(pSupp, pId); QUERY_CHECK_CODE(code, lino, _error); code = forecastCreateBuf(pSupp); @@ -637,7 +654,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo *pOptrInfo = pOperator; - qDebug("forecast env is initialized, option:%s", pSupp->algoOpt); + qDebug("%s forecast env is initialized, option:%s", pId, pSupp->algoOpt); return TSDB_CODE_SUCCESS; _error: From f85347b311d1a3f15fca0f87e1e3d40d7aef9283 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Mar 2025 23:44:21 +0800 Subject: [PATCH 02/10] fix(analysis): fix syntax error. --- source/libs/executor/src/forecastoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 0fbe3948f3..1115a85903 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -542,7 +542,7 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp, const char* id) { id, pSupp->timeout, ANALY_FC_DEFAULT_TIMEOUT); pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT; } else { - qDebug("%s timeout val is set to: %d" PRId64 "ms", id, pSupp->timeout); + qDebug("%s timeout val is set to: %" PRId64 "ms", id, pSupp->timeout); } } From 8c035a930ae761ba0b444165b173e0b50693d96e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Mar 2025 13:40:33 +0800 Subject: [PATCH 03/10] refactor(analytics): do some internal refactor. --- include/common/tanalytics.h | 1 + source/libs/executor/src/forecastoperator.c | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/include/common/tanalytics.h b/include/common/tanalytics.h index bcab4490cb..59f195ec4f 100644 --- a/include/common/tanalytics.h +++ b/include/common/tanalytics.h @@ -31,6 +31,7 @@ extern "C" { #define ANAL_FORECAST_MAX_ROWS 40000 #define ANAL_ANOMALY_WINDOW_MAX_ROWS 40000 #define ANALY_FC_DEFAULT_TIMEOUT 60000 +#define ANALY_FC_MAX_TIMEOUT (600*1000) typedef struct { EAnalAlgoType type; diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 1115a85903..de39e63a36 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -537,7 +537,7 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp, const char* id) { qDebug("%s not set the timeout val, set default:%d", id, ANALY_FC_DEFAULT_TIMEOUT); pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT; } else { - if (pSupp->timeout <= 500 || pSupp->timeout > 600*1000) { + if (pSupp->timeout <= 500 || pSupp->timeout > ANALY_FC_MAX_TIMEOUT) { qDebug("%s timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%" PRId64 "ms", id, pSupp->timeout, ANALY_FC_DEFAULT_TIMEOUT); pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT; From 0e37b45e060122001709320e8c6cc85a25bc792d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Mar 2025 13:56:39 +0800 Subject: [PATCH 04/10] fix(analytics): fix syntax error. --- source/libs/executor/src/anomalywindowoperator.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 8af8e2434b..ab5a478179 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -96,11 +96,11 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p pInfo->timeout = ANALY_FC_DEFAULT_TIMEOUT; } else { if (pInfo->timeout <= 500 || pInfo->timeout > 600*1000) { - qDebug("timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%" PRId64 "ms", + qDebug("timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%dms", pInfo->timeout, ANALY_FC_DEFAULT_TIMEOUT); pInfo->timeout = ANALY_FC_DEFAULT_TIMEOUT; } else { - qDebug("timeout val is set to: %d" PRId64 "ms", pInfo->timeout); + qDebug("timeout val is set to: %" PRId64 "ms", pInfo->timeout); } } From 450561226a23bbb3f6cc4717e66e3350ee17635a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Mar 2025 14:02:21 +0800 Subject: [PATCH 05/10] fix(analytics): fix syntax error. --- source/libs/executor/src/forecastoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index de39e63a36..7fd10ca676 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -538,7 +538,7 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp, const char* id) { pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT; } else { if (pSupp->timeout <= 500 || pSupp->timeout > ANALY_FC_MAX_TIMEOUT) { - qDebug("%s timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%" PRId64 "ms", + qDebug("%s timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%dms", id, pSupp->timeout, ANALY_FC_DEFAULT_TIMEOUT); pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT; } else { From 2eea385485f11214ea4a540f430a4badb3c4912c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Mar 2025 14:30:48 +0800 Subject: [PATCH 06/10] fix(analytics): check return value. --- source/dnode/mnode/impl/src/mndAnode.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index 89d116d52d..a24aa2654b 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -651,11 +651,17 @@ void mndRetrieveAlgoList(SMnode* pMnode, SArray* pFc, SArray* pAd) { if (pObj->numOfAlgos >= ANAL_ALGO_TYPE_END) { if (pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT] != NULL) { - taosArrayAddAll(pAd, pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT]); + void* p = taosArrayAddAll(pAd, pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT]); + if (p == NULL) { + mError("failed to add retrieved anomaly-detection algorithms, code:%s", tstrerror(terrno)); + } } if (pObj->algos[ANAL_ALGO_TYPE_FORECAST] != NULL) { - taosArrayAddAll(pFc, pObj->algos[ANAL_ALGO_TYPE_FORECAST]); + void* p = taosArrayAddAll(pFc, pObj->algos[ANAL_ALGO_TYPE_FORECAST]); + if (p == NULL) { + mError("failed to add retrieved forecast algorithms, code:%s", tstrerror(terrno)); + } } } From a4aba46a283bef25b2c0b69a7aac73c9e529022a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Mar 2025 17:59:14 +0800 Subject: [PATCH 07/10] fix(analytics): fix syntax error. --- source/common/src/tanalytics.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tanalytics.c b/source/common/src/tanalytics.c index 60c809b265..b48e749db8 100644 --- a/source/common/src/tanalytics.c +++ b/source/common/src/tanalytics.c @@ -767,7 +767,7 @@ static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pC int32_t taosAnalyticsInit() { return 0; } void taosAnalyticsCleanup() {} -SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } +SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf) { return NULL; } int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } From 5af14a905f75f1433948befb69c887dc2363d4ad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Mar 2025 19:01:57 +0800 Subject: [PATCH 08/10] fix(analytics): fix syntax error. --- source/common/src/tanalytics.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tanalytics.c b/source/common/src/tanalytics.c index b48e749db8..3e804f42f5 100644 --- a/source/common/src/tanalytics.c +++ b/source/common/src/tanalytics.c @@ -767,7 +767,7 @@ static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pC int32_t taosAnalyticsInit() { return 0; } void taosAnalyticsCleanup() {} -SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf) { return NULL; } +SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) { return NULL; } int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } From 2c92cca6037bad70988448bc99bac8ab6ce56200 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Mar 2025 23:31:02 +0800 Subject: [PATCH 09/10] test(analytics): add test case and set the unit to be second. --- include/common/tanalytics.h | 4 ++-- source/libs/executor/src/anomalywindowoperator.c | 16 ++++++++-------- source/libs/executor/src/forecastoperator.c | 16 ++++++++-------- tests/script/tsim/analytics/basic0.sim | 8 ++++++++ 4 files changed, 26 insertions(+), 18 deletions(-) diff --git a/include/common/tanalytics.h b/include/common/tanalytics.h index 59f195ec4f..891a9ead07 100644 --- a/include/common/tanalytics.h +++ b/include/common/tanalytics.h @@ -30,8 +30,8 @@ extern "C" { #define ANAL_FORECAST_DEFAULT_WNCHECK 1 #define ANAL_FORECAST_MAX_ROWS 40000 #define ANAL_ANOMALY_WINDOW_MAX_ROWS 40000 -#define ANALY_FC_DEFAULT_TIMEOUT 60000 -#define ANALY_FC_MAX_TIMEOUT (600*1000) +#define ANALY_DEFAULT_TIMEOUT 60 +#define ANALY_MAX_TIMEOUT 600 typedef struct { EAnalAlgoType type; diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index ab5a478179..93c3eed867 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -92,15 +92,15 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p bool hasTimeout = taosAnalGetOptInt(pAnomalyNode->anomalyOpt, "timeout", &pInfo->timeout); if (!hasTimeout) { - qDebug("not set the timeout val, set default:%d", ANALY_FC_DEFAULT_TIMEOUT); - pInfo->timeout = ANALY_FC_DEFAULT_TIMEOUT; + qDebug("not set the timeout val, set default:%d", ANALY_DEFAULT_TIMEOUT); + pInfo->timeout = ANALY_DEFAULT_TIMEOUT; } else { - if (pInfo->timeout <= 500 || pInfo->timeout > 600*1000) { - qDebug("timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%dms", - pInfo->timeout, ANALY_FC_DEFAULT_TIMEOUT); - pInfo->timeout = ANALY_FC_DEFAULT_TIMEOUT; + if (pInfo->timeout <= 0 || pInfo->timeout > ANALY_MAX_TIMEOUT) { + qDebug("timeout val:%" PRId64 "s is invalid (greater than 10min or less than 1s), use default:%dms", + pInfo->timeout, ANALY_DEFAULT_TIMEOUT); + pInfo->timeout = ANALY_DEFAULT_TIMEOUT; } else { - qDebug("timeout val is set to: %" PRId64 "ms", pInfo->timeout); + qDebug("timeout val is set to: %" PRId64 "s", pInfo->timeout); } } @@ -466,7 +466,7 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { code = taosAnalBufClose(&analBuf); QUERY_CHECK_CODE(code, lino, _OVER); - pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf, pInfo->timeout); + pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf, pInfo->timeout * 1000); if (pJson == NULL) { code = terrno; goto _OVER; diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 7fd10ca676..d79d7fcce1 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -211,7 +211,7 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const SColumnInfoData* pResHighCol = (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL); - SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout); + SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout * 1000); if (pJson == NULL) { return terrno; } @@ -534,15 +534,15 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp, const char* id) { bool hasTimeout = taosAnalGetOptInt(pSupp->algoOpt, "timeout", &pSupp->timeout); if (!hasTimeout) { - qDebug("%s not set the timeout val, set default:%d", id, ANALY_FC_DEFAULT_TIMEOUT); - pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT; + qDebug("%s not set the timeout val, set default:%d", id, ANALY_DEFAULT_TIMEOUT); + pSupp->timeout = ANALY_DEFAULT_TIMEOUT; } else { - if (pSupp->timeout <= 500 || pSupp->timeout > ANALY_FC_MAX_TIMEOUT) { - qDebug("%s timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%dms", - id, pSupp->timeout, ANALY_FC_DEFAULT_TIMEOUT); - pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT; + if (pSupp->timeout <= 0 || pSupp->timeout > ANALY_MAX_TIMEOUT) { + qDebug("%s timeout val:%" PRId64 "s is invalid (greater than 10min or less than 1s), use default:%dms", + id, pSupp->timeout, ANALY_DEFAULT_TIMEOUT); + pSupp->timeout = ANALY_DEFAULT_TIMEOUT; } else { - qDebug("%s timeout val is set to: %" PRId64 "ms", id, pSupp->timeout); + qDebug("%s timeout val is set to: %" PRId64 "s", id, pSupp->timeout); } } diff --git a/tests/script/tsim/analytics/basic0.sim b/tests/script/tsim/analytics/basic0.sim index 3ac49b1fc3..3d2931fca6 100644 --- a/tests/script/tsim/analytics/basic0.sim +++ b/tests/script/tsim/analytics/basic0.sim @@ -88,6 +88,14 @@ sql_error select count(*) from ct1 anomaly_window(c6, 'algo=ksigma,k=2'); sql_error select forecast(c6, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 +print ==================== invalid timeout parameter +sql_error select forecast(c1, 'algo=holtwinters, timeout=6000') from ct1; +sql_error select forecast(c1, 'algo=holtwinters, timeout=0') from ct1; + +print =========================== valid timeout +sql select forecast(c1, 'algo=holtwinters, timeout=120') from ct1; + + sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1 sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1 From 04c2ecce690587fd42ab106b67293c866409b645 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 5 Mar 2025 13:10:41 +0800 Subject: [PATCH 10/10] fix(analytics): revise the test cases. --- tests/script/tsim/analytics/basic0.sim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/script/tsim/analytics/basic0.sim b/tests/script/tsim/analytics/basic0.sim index 3d2931fca6..e0dc1b6e1a 100644 --- a/tests/script/tsim/analytics/basic0.sim +++ b/tests/script/tsim/analytics/basic0.sim @@ -88,9 +88,9 @@ sql_error select count(*) from ct1 anomaly_window(c6, 'algo=ksigma,k=2'); sql_error select forecast(c6, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 -print ==================== invalid timeout parameter -sql_error select forecast(c1, 'algo=holtwinters, timeout=6000') from ct1; -sql_error select forecast(c1, 'algo=holtwinters, timeout=0') from ct1; +print ==================== invalid timeout parameter, will reset the parameters. +sql select forecast(c1, 'algo=holtwinters, timeout=6000') from ct1; +sql select forecast(c1, 'algo=holtwinters, timeout=0') from ct1; print =========================== valid timeout sql select forecast(c1, 'algo=holtwinters, timeout=120') from ct1;