feat(stream): add timeout parameter.

This commit is contained in:
Haojun Liao 2025-03-03 23:05:14 +08:00
parent dfd1beaead
commit 19f09d679d
5 changed files with 57 additions and 24 deletions

View File

@ -30,6 +30,7 @@ extern "C" {
#define ANAL_FORECAST_DEFAULT_WNCHECK 1
#define ANAL_FORECAST_MAX_ROWS 40000
#define ANAL_ANOMALY_WINDOW_MAX_ROWS 40000
#define ANALY_FC_DEFAULT_TIMEOUT 60000
typedef struct {
EAnalAlgoType type;
@ -47,7 +48,7 @@ typedef enum {
typedef enum {
ANALYTICS_HTTP_TYPE_GET = 0,
ANALYTICS_HTTP_TYPE_POST,
} EAnalHttpType;
} EAnalyHttpType;
typedef struct {
TdFilePtr filePtr;
@ -65,7 +66,7 @@ typedef struct {
int32_t taosAnalyticsInit();
void taosAnalyticsCleanup();
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf);
SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout);
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen);
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen);

View File

@ -276,7 +276,7 @@ _OVER:
return code;
}
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) {
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout) {
struct curl_slist *headers = NULL;
CURL *curl = NULL;
CURLcode code = 0;
@ -292,7 +292,7 @@ static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char
if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER;
if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER;
@ -311,7 +311,7 @@ _OVER:
return code;
}
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) {
SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) {
int32_t code = -1;
char *pCont = NULL;
int64_t contentLen;
@ -329,7 +329,7 @@ SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf
terrno = code;
goto _OVER;
}
if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) {
if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen, timeout) != 0) {
terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
goto _OVER;
}
@ -767,7 +767,7 @@ static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pC
int32_t taosAnalyticsInit() { return 0; }
void taosAnalyticsCleanup() {}
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; }
SJson *taosAnalySendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; }
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }

View File

@ -783,7 +783,7 @@ static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj) {
char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", url, "list");
SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL);
SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL, 0);
if (pJson == NULL) return terrno;
int32_t code = mndDecodeAlgoList(pJson, pObj);
@ -799,7 +799,7 @@ static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status, int32_t statusLe
char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", pObj->url, "status");
SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL);
SJson *pJson = taosAnalySendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL, 0);
if (pJson == NULL) return terrno;
code = tjsonGetDoubleValue(pJson, "protocol", &tmp);

View File

@ -47,6 +47,7 @@ typedef struct {
char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN];
int64_t timeout;
SAnomalyWindowSupp anomalySup;
SWindowRowsSup anomalyWinRowSup;
SColumn anomalyCol;
@ -89,6 +90,20 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
goto _error;
}
bool hasTimeout = taosAnalGetOptInt(pAnomalyNode->anomalyOpt, "timeout", &pInfo->timeout);
if (!hasTimeout) {
qDebug("not set the timeout val, set default:%d", ANALY_FC_DEFAULT_TIMEOUT);
pInfo->timeout = ANALY_FC_DEFAULT_TIMEOUT;
} else {
if (pInfo->timeout <= 500 || pInfo->timeout > 600*1000) {
qDebug("timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%" PRId64 "ms",
pInfo->timeout, ANALY_FC_DEFAULT_TIMEOUT);
pInfo->timeout = ANALY_FC_DEFAULT_TIMEOUT;
} else {
qDebug("timeout val is set to: %d" PRId64 "ms", pInfo->timeout);
}
}
pOperator->exprSupp.hasWindowOrGroup = true;
pInfo->tsSlotId = ((SColumnNode*)pAnomalyNode->window.pTspk)->slotId;
tstrncpy(pInfo->anomalyOpt, pAnomalyNode->anomalyOpt, sizeof(pInfo->anomalyOpt));
@ -451,7 +466,7 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
code = taosAnalBufClose(&analBuf);
QUERY_CHECK_CODE(code, lino, _OVER);
pJson = taosAnalSendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf);
pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf, pInfo->timeout);
if (pJson == NULL) {
code = terrno;
goto _OVER;

View File

@ -38,6 +38,7 @@ typedef struct {
int64_t optRows;
int64_t cachedRows;
int32_t numOfBlocks;
int64_t timeout;
int16_t resTsSlot;
int16_t resValSlot;
int16_t resLowSlot;
@ -191,12 +192,12 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp) {
static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
SAnalyticBuf* pBuf = &pSupp->analBuf;
int32_t resCurRow = pBlock->info.rows;
int8_t tmpI8;
int16_t tmpI16;
int32_t tmpI32;
int64_t tmpI64;
float tmpFloat;
double tmpDouble;
int8_t tmpI8 = 0;
int16_t tmpI16 = 0;
int32_t tmpI32 = 0;
int64_t tmpI64 = 0;
float tmpFloat = 0;
double tmpDouble = 0;
int32_t code = 0;
SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot);
@ -204,12 +205,13 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const
return terrno;
}
SColumnInfoData* pResTsCol = (pSupp->resTsSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
SColumnInfoData* pResLowCol = (pSupp->resLowSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
SColumnInfoData* pResTsCol = ((pSupp->resTsSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
SColumnInfoData* pResLowCol =
((pSupp->resLowSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
SColumnInfoData* pResHighCol =
(pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL);
SJson* pJson = taosAnalSendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf);
SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout);
if (pJson == NULL) {
return terrno;
}
@ -520,18 +522,32 @@ static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs) {
return 0;
}
static int32_t forecastParseAlgo(SForecastSupp* pSupp) {
static int32_t forecastParseAlgo(SForecastSupp* pSupp, const char* id) {
pSupp->maxTs = 0;
pSupp->minTs = INT64_MAX;
pSupp->numOfRows = 0;
if (!taosAnalGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) {
qError("failed to get forecast algorithm name from %s", pSupp->algoOpt);
qError("%s failed to get forecast algorithm name from %s", id, pSupp->algoOpt);
return TSDB_CODE_ANA_ALGO_NOT_FOUND;
}
bool hasTimeout = taosAnalGetOptInt(pSupp->algoOpt, "timeout", &pSupp->timeout);
if (!hasTimeout) {
qDebug("%s not set the timeout val, set default:%d", id, ANALY_FC_DEFAULT_TIMEOUT);
pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT;
} else {
if (pSupp->timeout <= 500 || pSupp->timeout > 600*1000) {
qDebug("%s timeout val:%" PRId64 "ms is invalid (greater than 10min or less than 0.5s), use default:%" PRId64 "ms",
id, pSupp->timeout, ANALY_FC_DEFAULT_TIMEOUT);
pSupp->timeout = ANALY_FC_DEFAULT_TIMEOUT;
} else {
qDebug("%s timeout val is set to: %d" PRId64 "ms", id, pSupp->timeout);
}
}
if (taosAnalGetAlgoUrl(pSupp->algoName, ANAL_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) {
qError("failed to get forecast algorithm url from %s", pSupp->algoName);
qError("%s failed to get forecast algorithm url from %s", id, pSupp->algoName);
return TSDB_CODE_ANA_ALGO_NOT_LOAD;
}
@ -582,6 +598,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo
goto _error;
}
const char* pId = pTaskInfo->id.str;
SForecastSupp* pSupp = &pInfo->forecastSupp;
SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode;
SExprSupp* pExprSup = &pOperator->exprSupp;
@ -613,7 +630,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo
code = forecastParseOutput(pSupp, pExprSup);
QUERY_CHECK_CODE(code, lino, _error);
code = forecastParseAlgo(pSupp);
code = forecastParseAlgo(pSupp, pId);
QUERY_CHECK_CODE(code, lino, _error);
code = forecastCreateBuf(pSupp);
@ -637,7 +654,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo
*pOptrInfo = pOperator;
qDebug("forecast env is initialized, option:%s", pSupp->algoOpt);
qDebug("%s forecast env is initialized, option:%s", pId, pSupp->algoOpt);
return TSDB_CODE_SUCCESS;
_error: