Merge pull request #29996 from taosdata/fix/droptask

feat(stream): add timeout parameter.
This commit is contained in:
Simon Guan 2025-03-05 18:49:23 +08:00 committed by GitHub
commit 6fa7a7bb99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 81 additions and 37 deletions

View File

@ -25,12 +25,13 @@
extern "C" {
#endif
#define ANALY_FORECAST_DEFAULT_ROWS 10
#define ANALY_FORECAST_DEFAULT_CONF 95
#define ANALY_FORECAST_DEFAULT_WNCHECK 1
#define ANALY_FORECAST_MAX_HISTORY_ROWS 40000
#define ANALY_MAX_FC_ROWS 1024
#define ANALY_ANOMALY_WINDOW_MAX_ROWS 40000
#define ANALY_FORECAST_DEFAULT_ROWS 10
#define ANALY_FORECAST_DEFAULT_CONF 95
#define ANALY_FORECAST_DEFAULT_WNCHECK 1
#define ANALY_FORECAST_MAX_ROWS 40000
#define ANALY_ANOMALY_WINDOW_MAX_ROWS 40000
#define ANALY_DEFAULT_TIMEOUT 60
#define ANALY_MAX_TIMEOUT 600
typedef struct {
EAnalAlgoType type;
@ -48,7 +49,7 @@ typedef enum {
typedef enum {
ANALYTICS_HTTP_TYPE_GET = 0,
ANALYTICS_HTTP_TYPE_POST,
} EAnalHttpType;
} EAnalyHttpType;
typedef struct {
TdFilePtr filePtr;
@ -66,7 +67,7 @@ typedef struct {
int32_t taosAnalyticsInit();
void taosAnalyticsCleanup();
SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf);
SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout);
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen);
bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen);

View File

@ -276,7 +276,7 @@ _OVER:
return code;
}
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) {
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout) {
struct curl_slist *headers = NULL;
CURL *curl = NULL;
CURLcode code = 0;
@ -292,7 +292,7 @@ static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char
if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER;
@ -311,7 +311,7 @@ _OVER:
return code;
}
SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) {
SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) {
int32_t code = -1;
char *pCont = NULL;
int64_t contentLen;
@ -329,7 +329,7 @@ SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf
terrno = code;
goto _OVER;
}
if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) {
if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen, timeout) != 0) {
terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
goto _OVER;
}
@ -767,7 +767,7 @@ static int32_t taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *p
int32_t taosAnalyticsInit() { return 0; }
void taosAnalyticsCleanup() {}
SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; }
SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) { return NULL; }
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }
bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }

View File

@ -789,7 +789,7 @@ static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj) {
char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", url, "list");
SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL);
SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL, 0);
if (pJson == NULL) return terrno;
int32_t code = mndDecodeAlgoList(pJson, pObj);
@ -805,7 +805,7 @@ static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status, int32_t statusLe
char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", pObj->url, "status");
SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL);
SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL, 0);
if (pJson == NULL) return terrno;
code = tjsonGetDoubleValue(pJson, "protocol", &tmp);

View File

@ -47,6 +47,7 @@ typedef struct {
char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN];
int64_t timeout;
SAnomalyWindowSupp anomalySup;
SWindowRowsSup anomalyWinRowSup;
SColumn anomalyCol;
@ -89,6 +90,20 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
goto _error;
}
bool hasTimeout = taosAnalyGetOptInt(pAnomalyNode->anomalyOpt, "timeout", &pInfo->timeout);
if (!hasTimeout) {
qDebug("not set the timeout val, set default:%d", ANALY_DEFAULT_TIMEOUT);
pInfo->timeout = ANALY_DEFAULT_TIMEOUT;
} else {
if (pInfo->timeout <= 0 || pInfo->timeout > ANALY_MAX_TIMEOUT) {
qDebug("timeout val:%" PRId64 "s is invalid (greater than 10min or less than 1s), use default:%dms",
pInfo->timeout, ANALY_DEFAULT_TIMEOUT);
pInfo->timeout = ANALY_DEFAULT_TIMEOUT;
} else {
qDebug("timeout val is set to: %" PRId64 "s", pInfo->timeout);
}
}
pOperator->exprSupp.hasWindowOrGroup = true;
pInfo->tsSlotId = ((SColumnNode*)pAnomalyNode->window.pTspk)->slotId;
tstrncpy(pInfo->anomalyOpt, pAnomalyNode->anomalyOpt, sizeof(pInfo->anomalyOpt));
@ -451,7 +466,7 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
code = taosAnalyBufClose(&analyBuf);
QUERY_CHECK_CODE(code, lino, _OVER);
pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analyBuf);
pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analyBuf, pInfo->timeout * 1000);
if (pJson == NULL) {
code = terrno;
goto _OVER;

View File

@ -38,6 +38,7 @@ typedef struct {
int64_t optRows;
int64_t cachedRows;
int32_t numOfBlocks;
int64_t timeout;
int16_t resTsSlot;
int16_t resValSlot;
int16_t resLowSlot;
@ -76,10 +77,10 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, con
int32_t lino = 0;
SAnalyticBuf* pBuf = &pSupp->analyBuf;
if (pSupp->cachedRows > ANALY_FORECAST_MAX_HISTORY_ROWS) {
if (pSupp->cachedRows > ANALY_FORECAST_MAX_ROWS) {
code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows,
tstrerror(code), ANALY_FORECAST_MAX_HISTORY_ROWS);
tstrerror(code), ANALY_FORECAST_MAX_ROWS);
return code;
}
@ -157,8 +158,8 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
qDebug("%s forecast rows not found from %s, use default:%" PRId64, id, pSupp->algoOpt, pSupp->optRows);
}
if (pSupp->optRows > ANALY_MAX_FC_ROWS) {
qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_MAX_FC_ROWS,
if (pSupp->optRows > ANALY_FORECAST_MAX_ROWS) {
qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_FORECAST_MAX_ROWS,
pSupp->optRows);
return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
}
@ -198,12 +199,12 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
SAnalyticBuf* pBuf = &pSupp->analyBuf;
int32_t resCurRow = pBlock->info.rows;
int8_t tmpI8;
int16_t tmpI16;
int32_t tmpI32;
int64_t tmpI64;
float tmpFloat;
double tmpDouble;
int8_t tmpI8 = 0;
int16_t tmpI16 = 0;
int32_t tmpI32 = 0;
int64_t tmpI64 = 0;
float tmpFloat = 0;
double tmpDouble = 0;
int32_t code = 0;
SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot);
@ -211,12 +212,13 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const
return terrno;
}
SColumnInfoData* pResTsCol = (pSupp->resTsSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
SColumnInfoData* pResLowCol = (pSupp->resLowSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
SColumnInfoData* pResTsCol = ((pSupp->resTsSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
SColumnInfoData* pResLowCol =
((pSupp->resLowSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
SColumnInfoData* pResHighCol =
(pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL);
SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf);
SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout * 1000);
if (pJson == NULL) {
return terrno;
}
@ -527,18 +529,32 @@ static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs) {
return 0;
}
static int32_t forecastParseAlgo(SForecastSupp* pSupp) {
static int32_t forecastParseAlgo(SForecastSupp* pSupp, const char* id) {
pSupp->maxTs = 0;
pSupp->minTs = INT64_MAX;
pSupp->numOfRows = 0;
if (!taosAnalyGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) {
qError("failed to get forecast algorithm name from %s", pSupp->algoOpt);
qError("%s failed to get forecast algorithm name from %s", id, pSupp->algoOpt);
return TSDB_CODE_ANA_ALGO_NOT_FOUND;
}
bool hasTimeout = taosAnalyGetOptInt(pSupp->algoOpt, "timeout", &pSupp->timeout);
if (!hasTimeout) {
qDebug("%s not set the timeout val, set default:%d", id, ANALY_DEFAULT_TIMEOUT);
pSupp->timeout = ANALY_DEFAULT_TIMEOUT;
} else {
if (pSupp->timeout <= 0 || pSupp->timeout > ANALY_MAX_TIMEOUT) {
qDebug("%s timeout val:%" PRId64 "s is invalid (greater than 10min or less than 1s), use default:%dms",
id, pSupp->timeout, ANALY_DEFAULT_TIMEOUT);
pSupp->timeout = ANALY_DEFAULT_TIMEOUT;
} else {
qDebug("%s timeout val is set to: %" PRId64 "s", id, pSupp->timeout);
}
}
if (taosAnalyGetAlgoUrl(pSupp->algoName, ANALY_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) {
qError("failed to get forecast algorithm url from %s", pSupp->algoName);
qError("%s failed to get forecast algorithm url from %s", id, pSupp->algoName);
return TSDB_CODE_ANA_ALGO_NOT_LOAD;
}
@ -589,6 +605,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo
goto _error;
}
const char* pId = pTaskInfo->id.str;
SForecastSupp* pSupp = &pInfo->forecastSupp;
SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode;
SExprSupp* pExprSup = &pOperator->exprSupp;
@ -620,7 +637,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo
code = forecastParseOutput(pSupp, pExprSup);
QUERY_CHECK_CODE(code, lino, _error);
code = forecastParseAlgo(pSupp);
code = forecastParseAlgo(pSupp, pId);
QUERY_CHECK_CODE(code, lino, _error);
code = forecastCreateBuf(pSupp);
@ -644,7 +661,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo
*pOptrInfo = pOperator;
qDebug("forecast env is initialized, option:%s", pSupp->algoOpt);
qDebug("%s forecast env is initialized, option:%s", pId, pSupp->algoOpt);
return TSDB_CODE_SUCCESS;
_error:

View File

@ -91,6 +91,14 @@ sql_error select count(*) from ct1 anomaly_window(c6, 'algo=ksigma,k=2');
sql_error select forecast(c6, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1
print ==================== invalid timeout parameter, will reset the parameters.
sql select forecast(c1, 'algo=holtwinters, timeout=6000') from ct1;
sql select forecast(c1, 'algo=holtwinters, timeout=0') from ct1;
print =========================== valid timeout
sql select forecast(c1, 'algo=holtwinters, timeout=120') from ct1;
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1

View File

@ -19,7 +19,8 @@ IF(TD_WEBSOCKET)
PATCH_COMMAND
COMMAND git clean -f -d
BUILD_COMMAND
COMMAND RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release --locked -p taos-ws-sys --features rustls
COMMAND cargo update
COMMAND RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release -p taos-ws-sys --features rustls
INSTALL_COMMAND
COMMAND cp target/release/${websocket_lib_file} ${CMAKE_BINARY_DIR}/build/lib
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include
@ -37,7 +38,8 @@ IF(TD_WEBSOCKET)
PATCH_COMMAND
COMMAND git clean -f -d
BUILD_COMMAND
COMMAND cargo build --release --locked -p taos-ws-sys --features rustls
COMMAND cargo update
COMMAND cargo build --release -p taos-ws-sys --features rustls
INSTALL_COMMAND
COMMAND cp target/release/taosws.dll ${CMAKE_BINARY_DIR}/build/lib
COMMAND cp target/release/taosws.dll.lib ${CMAKE_BINARY_DIR}/build/lib/taosws.lib
@ -56,7 +58,8 @@ IF(TD_WEBSOCKET)
PATCH_COMMAND
COMMAND git clean -f -d
BUILD_COMMAND
COMMAND cargo build --release --locked -p taos-ws-sys --features rustls
COMMAND cargo update
COMMAND cargo build --release -p taos-ws-sys --features rustls
INSTALL_COMMAND
COMMAND cp target/release/${websocket_lib_file} ${CMAKE_BINARY_DIR}/build/lib
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include