From 19f09d679d31b818c47354d549562328024909cb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Mar 2025 23:05:14 +0800 Subject: [PATCH] feat(stream): add timeout parameter. --- include/common/tanalytics.h | 5 ++- source/common/src/tanalytics.c | 10 ++--- source/dnode/mnode/impl/src/mndAnode.c | 4 +- .../libs/executor/src/anomalywindowoperator.c | 17 ++++++- source/libs/executor/src/forecastoperator.c | 45 +++++++++++++------ 5 files changed, 57 insertions(+), 24 deletions(-) diff --git a/include/common/tanalytics.h b/include/common/tanalytics.h index 344093245b..bcab4490cb 100644 --- a/include/common/tanalytics.h +++ b/include/common/tanalytics.h @@ -30,6 +30,7 @@ extern "C" { #define ANAL_FORECAST_DEFAULT_WNCHECK 1 #define ANAL_FORECAST_MAX_ROWS 40000 #define ANAL_ANOMALY_WINDOW_MAX_ROWS 40000 +#define ANALY_FC_DEFAULT_TIMEOUT 60000 typedef struct { EAnalAlgoType type; @@ -47,7 +48,7 @@ typedef enum { typedef enum { ANALYTICS_HTTP_TYPE_GET = 0, ANALYTICS_HTTP_TYPE_POST, -} EAnalHttpType; +} EAnalyHttpType; typedef struct { TdFilePtr filePtr; @@ -65,7 +66,7 @@ typedef struct { int32_t taosAnalyticsInit(); void taosAnalyticsCleanup(); -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf); +SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout); int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen); bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen); diff --git a/source/common/src/tanalytics.c b/source/common/src/tanalytics.c index 0ed67eed0a..60c809b265 100644 --- a/source/common/src/tanalytics.c +++ b/source/common/src/tanalytics.c @@ -276,7 +276,7 @@ _OVER: return code; } -static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) { +static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout) { struct curl_slist *headers = NULL; CURL *curl = NULL; CURLcode code = 0; @@ -292,7 +292,7 @@ static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER; if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER; @@ -311,7 +311,7 @@ _OVER: return code; } -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { +SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) { int32_t code = -1; char *pCont = NULL; int64_t contentLen; @@ -329,7 +329,7 @@ SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf terrno = code; goto _OVER; } - if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) { + if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen, timeout) != 0) { terrno = TSDB_CODE_ANA_URL_CANT_ACCESS; goto _OVER; } @@ -767,7 +767,7 @@ static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pC int32_t taosAnalyticsInit() { return 0; } void taosAnalyticsCleanup() {} -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } +SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index 9f5635a74b..89d116d52d 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -783,7 +783,7 @@ static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj) { char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0}; snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", url, "list"); - SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL); + SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL, 0); if (pJson == NULL) return terrno; int32_t code = mndDecodeAlgoList(pJson, pObj); @@ -799,7 +799,7 @@ static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status, int32_t statusLe char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0}; snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", pObj->url, "status"); - SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL); + SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL, 0); if (pJson == NULL) return terrno; code = tjsonGetDoubleValue(pJson, "protocol", &tmp); diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 3124fa0b57..8af8e2434b 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -47,6 +47,7 @@ typedef struct { char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN]; char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN]; char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN]; + int64_t timeout; SAnomalyWindowSupp anomalySup; SWindowRowsSup anomalyWinRowSup; SColumn anomalyCol; @@ -89,6 +90,20 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p goto _error; } + bool hasTimeout = taosAnalGetOptInt(pAnomalyNode->anomalyOpt, "timeout", &pInfo->timeout); + if (!hasTimeout) { + qDebug("not set the timeout val, set default:%d", ANALY_FC_DEFAULT_TIMEOUT); + pInfo->timeout = ANALY_FC_DEFAULT_TIMEOUT; + } else { + if (pInfo->timeout <= 500 || pInfo->timeout > 600*1000) { + qDebug("timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%" PRId64 "ms", + pInfo->timeout, ANALY_FC_DEFAULT_TIMEOUT); + pInfo->timeout = ANALY_FC_DEFAULT_TIMEOUT; + } else { + qDebug("timeout val is set to: %d" PRId64 "ms", pInfo->timeout); + } + } + pOperator->exprSupp.hasWindowOrGroup = true; pInfo->tsSlotId = ((SColumnNode*)pAnomalyNode->window.pTspk)->slotId; tstrncpy(pInfo->anomalyOpt, pAnomalyNode->anomalyOpt, sizeof(pInfo->anomalyOpt)); @@ -451,7 +466,7 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { code = taosAnalBufClose(&analBuf); QUERY_CHECK_CODE(code, lino, _OVER); - pJson = taosAnalSendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf); + pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf, pInfo->timeout); if (pJson == NULL) { code = terrno; goto _OVER; diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 02b122830c..0fbe3948f3 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -38,6 +38,7 @@ typedef struct { int64_t optRows; int64_t cachedRows; int32_t numOfBlocks; + int64_t timeout; int16_t resTsSlot; int16_t resValSlot; int16_t resLowSlot; @@ -191,12 +192,12 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp) { static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) { SAnalyticBuf* pBuf = &pSupp->analBuf; int32_t resCurRow = pBlock->info.rows; - int8_t tmpI8; - int16_t tmpI16; - int32_t tmpI32; - int64_t tmpI64; - float tmpFloat; - double tmpDouble; + int8_t tmpI8 = 0; + int16_t tmpI16 = 0; + int32_t tmpI32 = 0; + int64_t tmpI64 = 0; + float tmpFloat = 0; + double tmpDouble = 0; int32_t code = 0; SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot); @@ -204,12 +205,13 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const return terrno; } - SColumnInfoData* pResTsCol = (pSupp->resTsSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL); - SColumnInfoData* pResLowCol = (pSupp->resLowSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL); + SColumnInfoData* pResTsCol = ((pSupp->resTsSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL); + SColumnInfoData* pResLowCol = + ((pSupp->resLowSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL); SColumnInfoData* pResHighCol = (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL); - SJson* pJson = taosAnalSendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf); + SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout); if (pJson == NULL) { return terrno; } @@ -520,18 +522,32 @@ static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs) { return 0; } -static int32_t forecastParseAlgo(SForecastSupp* pSupp) { +static int32_t forecastParseAlgo(SForecastSupp* pSupp, const char* id) { pSupp->maxTs = 0; pSupp->minTs = INT64_MAX; pSupp->numOfRows = 0; if (!taosAnalGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) { - qError("failed to get forecast algorithm name from %s", pSupp->algoOpt); + qError("%s failed to get forecast algorithm name from %s", id, pSupp->algoOpt); return TSDB_CODE_ANA_ALGO_NOT_FOUND; } + bool hasTimeout = taosAnalGetOptInt(pSupp->algoOpt, "timeout", &pSupp->timeout); + if (!hasTimeout) { + qDebug("%s not set the timeout val, set default:%d", id, ANALY_FC_DEFAULT_TIMEOUT); + pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT; + } else { + if (pSupp->timeout <= 500 || pSupp->timeout > 600*1000) { + qDebug("%s timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%" PRId64 "ms", + id, pSupp->timeout, ANALY_FC_DEFAULT_TIMEOUT); + pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT; + } else { + qDebug("%s timeout val is set to: %d" PRId64 "ms", id, pSupp->timeout); + } + } + if (taosAnalGetAlgoUrl(pSupp->algoName, ANAL_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) { - qError("failed to get forecast algorithm url from %s", pSupp->algoName); + qError("%s failed to get forecast algorithm url from %s", id, pSupp->algoName); return TSDB_CODE_ANA_ALGO_NOT_LOAD; } @@ -582,6 +598,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo goto _error; } + const char* pId = pTaskInfo->id.str; SForecastSupp* pSupp = &pInfo->forecastSupp; SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode; SExprSupp* pExprSup = &pOperator->exprSupp; @@ -613,7 +630,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo code = forecastParseOutput(pSupp, pExprSup); QUERY_CHECK_CODE(code, lino, _error); - code = forecastParseAlgo(pSupp); + code = forecastParseAlgo(pSupp, pId); QUERY_CHECK_CODE(code, lino, _error); code = forecastCreateBuf(pSupp); @@ -637,7 +654,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo *pOptrInfo = pOperator; - qDebug("forecast env is initialized, option:%s", pSupp->algoOpt); + qDebug("%s forecast env is initialized, option:%s", pId, pSupp->algoOpt); return TSDB_CODE_SUCCESS; _error: