diff --git a/include/common/tanalytics.h b/include/common/tanalytics.h index 85eb963129..d0af84ecfb 100644 --- a/include/common/tanalytics.h +++ b/include/common/tanalytics.h @@ -39,14 +39,14 @@ typedef struct { } SAnalyticsUrl; typedef enum { - ANAL_BUF_TYPE_JSON = 0, - ANAL_BUF_TYPE_JSON_COL = 1, - ANAL_BUF_TYPE_OTHERS, + ANALYTICS_BUF_TYPE_JSON = 0, + ANALYTICS_BUF_TYPE_JSON_COL = 1, + ANALYTICS_BUF_TYPE_OTHERS, } EAnalBufType; typedef enum { - ANAL_HTTP_TYPE_GET = 0, - ANAL_HTTP_TYPE_POST, + ANALYTICS_HTTP_TYPE_GET = 0, + ANALYTICS_HTTP_TYPE_POST, } EAnalHttpType; typedef struct { @@ -61,11 +61,11 @@ typedef struct { char fileName[TSDB_FILENAME_LEN]; int32_t numOfCols; SAnalyticsColBuf *pCols; -} SAnalBuf; +} SAnalyticBuf; int32_t taosAnalyticsInit(); void taosAnalyticsCleanup(); -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf); +SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf); int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen); bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen); @@ -73,18 +73,18 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optV int64_t taosAnalGetVersion(); void taosAnalUpdate(int64_t newVer, SHashObj *pHash); -int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols); -int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal); -int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal); -int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal); -int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName); -int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf); -int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex); -int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue); -int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex); -int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf); -int32_t taosAnalBufClose(SAnalBuf *pBuf); -void taosAnalBufDestroy(SAnalBuf *pBuf); +int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols); +int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal); +int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal); +int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal); +int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName); +int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf); +int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex); +int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue); +int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex); +int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf); +int32_t taosAnalBufClose(SAnalyticBuf *pBuf); +void taosAnalBufDestroy(SAnalyticBuf *pBuf); const char *taosAnalAlgoStr(EAnalAlgoType algoType); EAnalAlgoType taosAnalAlgoInt(const char *algoName); diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 0b617c7ce3..867f8c8efc 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -322,7 +322,7 @@ typedef struct SAlterDnodeStmt { typedef struct { ENodeType type; - char url[TSDB_ANAL_ANODE_URL_LEN + 3]; + char url[TSDB_ANALYTIC_ANODE_URL_LEN + 3]; } SCreateAnodeStmt; typedef struct { diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 48852e5552..89bc27a1fa 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -334,7 +334,7 @@ typedef struct SWindowLogicNode { int64_t windowSliding; SNodeList* pTsmaSubplans; SNode* pAnomalyExpr; - char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN]; + char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN]; } SWindowLogicNode; typedef struct SFillLogicNode { @@ -740,7 +740,7 @@ typedef SCountWinodwPhysiNode SStreamCountWinodwPhysiNode; typedef struct SAnomalyWindowPhysiNode { SWindowPhysiNode window; SNode* pAnomalyKey; - char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN]; + char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN]; } SAnomalyWindowPhysiNode; typedef struct SSortPhysiNode { diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 763882ab3a..7af74a347a 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -351,7 +351,7 @@ typedef struct SAnomalyWindowNode { ENodeType type; // QUERY_NODE_ANOMALY_WINDOW SNode* pCol; // timestamp primary key SNode* pExpr; - char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN]; + char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN]; } SAnomalyWindowNode; typedef enum EFillMode { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e33af33d0e..6cedaeeef1 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -491,13 +491,14 @@ int32_t taosGetErrSize(); #define TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE TAOS_DEF_ERROR_CODE(0, 0x0438) // analysis -#define TSDB_CODE_ANAL_URL_RSP_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x0440) -#define TSDB_CODE_ANAL_URL_CANT_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0441) -#define TSDB_CODE_ANAL_ALGO_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0442) -#define TSDB_CODE_ANAL_ALGO_NOT_LOAD TAOS_DEF_ERROR_CODE(0, 0x0443) -#define TSDB_CODE_ANAL_BUF_INVALID_TYPE TAOS_DEF_ERROR_CODE(0, 0x0444) -#define TSDB_CODE_ANAL_ANODE_RETURN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0445) -#define TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS TAOS_DEF_ERROR_CODE(0, 0x0446) +#define TSDB_CODE_ANA_URL_RSP_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x0440) +#define TSDB_CODE_ANA_URL_CANT_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0441) +#define TSDB_CODE_ANA_ALGO_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0442) +#define TSDB_CODE_ANA_ALGO_NOT_LOAD TAOS_DEF_ERROR_CODE(0, 0x0443) +#define TSDB_CODE_ANA_BUF_INVALID_TYPE TAOS_DEF_ERROR_CODE(0, 0x0444) +#define TSDB_CODE_ANA_ANODE_RETURN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0445) +#define TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS TAOS_DEF_ERROR_CODE(0, 0x0446) +#define TSDB_CODE_ANA_WN_DATA TAOS_DEF_ERROR_CODE(0, 0x0447) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) diff --git a/include/util/tdef.h b/include/util/tdef.h index 4e1fb21838..823c4bbe4b 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -335,12 +335,13 @@ typedef enum ELogicConditionType { #define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_LOG_VAR_LEN 32 -#define TSDB_ANAL_ANODE_URL_LEN 128 -#define TSDB_ANAL_ALGO_NAME_LEN 64 -#define TSDB_ANAL_ALGO_TYPE_LEN 24 -#define TSDB_ANAL_ALGO_KEY_LEN (TSDB_ANAL_ALGO_NAME_LEN + 9) -#define TSDB_ANAL_ALGO_URL_LEN (TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN + 1) -#define TSDB_ANAL_ALGO_OPTION_LEN 256 + +#define TSDB_ANALYTIC_ANODE_URL_LEN 128 +#define TSDB_ANALYTIC_ALGO_NAME_LEN 64 +#define TSDB_ANALYTIC_ALGO_TYPE_LEN 24 +#define TSDB_ANALYTIC_ALGO_KEY_LEN (TSDB_ANALYTIC_ALGO_NAME_LEN + 9) +#define TSDB_ANALYTIC_ALGO_URL_LEN (TSDB_ANALYTIC_ANODE_URL_LEN + TSDB_ANALYTIC_ALGO_TYPE_LEN + 1) +#define TSDB_ANALYTIC_ALGO_OPTION_LEN 256 #define TSDB_MAX_EP_NUM 10 diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 12b789f14e..bfe82aa7ae 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -402,7 +402,7 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = { static const SSysDbTableSchema anodesSchema[] = { {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "url", .bytes = TSDB_ANAL_ANODE_URL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + {.name = "url", .bytes = TSDB_ANALYTIC_ANODE_URL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "update_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, @@ -410,8 +410,8 @@ static const SSysDbTableSchema anodesSchema[] = { static const SSysDbTableSchema anodesFullSchema[] = { {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "type", .bytes = TSDB_ANAL_ALGO_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, - {.name = "algo", .bytes = TSDB_ANAL_ALGO_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + {.name = "type", .bytes = TSDB_ANALYTIC_ALGO_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + {.name = "algo", .bytes = TSDB_ANALYTIC_ALGO_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, }; static const SSysDbTableSchema tsmaSchema[] = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index bc8830505e..814ec4a626 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2169,7 +2169,7 @@ int32_t tSerializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAl SAnalyticsUrl *pUrl = pIter; size_t nameLen = 0; const char *name = taosHashGetKey(pIter, &nameLen); - if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_KEY_LEN && pUrl->urlLen > 0) { + if (nameLen > 0 && nameLen <= TSDB_ANALYTIC_ALGO_KEY_LEN && pUrl->urlLen > 0) { numOfAlgos++; } pIter = taosHashIterate(pRsp->hash, pIter); @@ -2224,7 +2224,7 @@ int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnal int32_t numOfAlgos = 0; int32_t nameLen; int32_t type; - char name[TSDB_ANAL_ALGO_KEY_LEN]; + char name[TSDB_ANALYTIC_ALGO_KEY_LEN]; SAnalyticsUrl url = {0}; TAOS_CHECK_EXIT(tStartDecode(&decoder)); @@ -2233,7 +2233,7 @@ int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnal for (int32_t f = 0; f < numOfAlgos; ++f) { TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nameLen)); - if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_NAME_LEN) { + if (nameLen > 0 && nameLen <= TSDB_ANALYTIC_ALGO_NAME_LEN) { TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, name)); } diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index 87bfe9f7af..c64208600a 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -309,7 +309,7 @@ static int32_t mndCreateAnode(SMnode *pMnode, SRpcMsg *pReq, SMCreateAnodeReq *p anodeObj.updateTime = anodeObj.createdTime; anodeObj.version = 0; anodeObj.urlLen = pCreate->urlLen; - if (anodeObj.urlLen > TSDB_ANAL_ANODE_URL_LEN) { + if (anodeObj.urlLen > TSDB_ANALYTIC_ANODE_URL_LEN) { code = TSDB_CODE_MND_ANODE_TOO_LONG_URL; goto _OVER; } @@ -491,23 +491,24 @@ static int32_t mndSetDropAnodeRedoLogs(STrans *pTrans, SAnodeObj *pObj) { int32_t code = 0; SSdbRaw *pRedoRaw = mndAnodeActionEncode(pObj); if (pRedoRaw == NULL) { - code = TSDB_CODE_MND_RETURN_VALUE_NULL; - if (terrno != 0) code = terrno; - TAOS_RETURN(code); + code = terrno; + return code; } + TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw)); TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)); - TAOS_RETURN(code); + + return code; } static int32_t mndSetDropAnodeCommitLogs(STrans *pTrans, SAnodeObj *pObj) { int32_t code = 0; SSdbRaw *pCommitRaw = mndAnodeActionEncode(pObj); if (pCommitRaw == NULL) { - code = TSDB_CODE_MND_RETURN_VALUE_NULL; - if (terrno != 0) code = terrno; - TAOS_RETURN(code); + code = terrno; + return code; } + TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw)); TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)); TAOS_RETURN(code); @@ -521,25 +522,25 @@ static int32_t mndSetDropAnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SAnode } static int32_t mndDropAnode(SMnode *pMnode, SRpcMsg *pReq, SAnodeObj *pObj) { - int32_t code = -1; + int32_t code = 0; + int32_t lino = 0; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-anode"); - if (pTrans == NULL) { - code = TSDB_CODE_MND_RETURN_VALUE_NULL; - if (terrno != 0) code = terrno; - goto _OVER; - } + TSDB_CHECK_NULL(pTrans, code, lino, _OVER, terrno); + mndTransSetSerial(pTrans); + mInfo("trans:%d, to drop anode:%d", pTrans->id, pObj->id); - mInfo("trans:%d, used to drop anode:%d", pTrans->id, pObj->id); - TAOS_CHECK_GOTO(mndSetDropAnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER); - TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); + code = mndSetDropAnodeInfoToTrans(pMnode, pTrans, pObj, false); + mndReleaseAnode(pMnode, pObj); - code = 0; + TSDB_CHECK_CODE(code, lino, _OVER); + + code = mndTransPrepare(pMnode, pTrans); _OVER: mndTransDrop(pTrans); - TAOS_RETURN(code); + return code; } static int32_t mndProcessDropAnodeReq(SRpcMsg *pReq) { @@ -560,20 +561,20 @@ static int32_t mndProcessDropAnodeReq(SRpcMsg *pReq) { pObj = mndAcquireAnode(pMnode, dropReq.anodeId); if (pObj == NULL) { - code = TSDB_CODE_MND_RETURN_VALUE_NULL; - if (terrno != 0) code = terrno; + code = terrno; goto _OVER; } code = mndDropAnode(pMnode, pReq, pObj); - if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + if (code == 0) { + code = TSDB_CODE_ACTION_IN_PROGRESS; + } _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("anode:%d, failed to drop since %s", dropReq.anodeId, tstrerror(code)); } - mndReleaseAnode(pMnode, pObj); tFreeSMDropAnodeReq(&dropReq); TAOS_RETURN(code); } @@ -584,7 +585,7 @@ static int32_t mndRetrieveAnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB int32_t numOfRows = 0; int32_t cols = 0; SAnodeObj *pObj = NULL; - char buf[TSDB_ANAL_ANODE_URL_LEN + VARSTR_HEADER_SIZE]; + char buf[TSDB_ANALYTIC_ANODE_URL_LEN + VARSTR_HEADER_SIZE]; char status[64]; int32_t code = 0; @@ -642,7 +643,7 @@ static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock int32_t numOfRows = 0; int32_t cols = 0; SAnodeObj *pObj = NULL; - char buf[TSDB_ANAL_ALGO_NAME_LEN + VARSTR_HEADER_SIZE]; + char buf[TSDB_ANALYTIC_ALGO_NAME_LEN + VARSTR_HEADER_SIZE]; int32_t code = 0; while (numOfRows < rows) { @@ -693,7 +694,7 @@ static int32_t mndDecodeAlgoList(SJson *pJson, SAnodeObj *pObj) { int32_t code = 0; int32_t protocol = 0; double tmp = 0; - char buf[TSDB_ANAL_ALGO_NAME_LEN + 1] = {0}; + char buf[TSDB_ANALYTIC_ALGO_NAME_LEN + 1] = {0}; code = tjsonGetDoubleValue(pJson, "protocol", &tmp); if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; @@ -753,10 +754,10 @@ static int32_t mndDecodeAlgoList(SJson *pJson, SAnodeObj *pObj) { } static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj) { - char anodeUrl[TSDB_ANAL_ANODE_URL_LEN + 1] = {0}; - snprintf(anodeUrl, TSDB_ANAL_ANODE_URL_LEN, "%s/%s", url, "list"); + char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0}; + snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", url, "list"); - SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANAL_HTTP_TYPE_GET, NULL); + SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL); if (pJson == NULL) return terrno; int32_t code = mndDecodeAlgoList(pJson, pObj); @@ -769,10 +770,10 @@ static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status, int32_t statusLe int32_t code = 0; int32_t protocol = 0; double tmp = 0; - char anodeUrl[TSDB_ANAL_ANODE_URL_LEN + 1] = {0}; - snprintf(anodeUrl, TSDB_ANAL_ANODE_URL_LEN, "%s/%s", pObj->url, "status"); + char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0}; + snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", pObj->url, "status"); - SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANAL_HTTP_TYPE_GET, NULL); + SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL); if (pJson == NULL) return terrno; code = tjsonGetDoubleValue(pJson, "protocol", &tmp); @@ -808,7 +809,7 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) { SAnodeObj *pObj = NULL; SAnalyticsUrl url; int32_t nameLen; - char name[TSDB_ANAL_ALGO_KEY_LEN]; + char name[TSDB_ANALYTIC_ALGO_KEY_LEN]; SRetrieveAnalAlgoReq req = {0}; SRetrieveAnalAlgoRsp rsp = {0}; @@ -847,13 +848,13 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) { goto _OVER; } } - url.url = taosMemoryMalloc(TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN + 1); + url.url = taosMemoryMalloc(TSDB_ANALYTIC_ANODE_URL_LEN + TSDB_ANALYTIC_ALGO_TYPE_LEN + 1); if (url.url == NULL) { sdbRelease(pSdb, pAnode); goto _OVER; } - url.urlLen = 1 + tsnprintf(url.url, TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN, "%s/%s", pAnode->url, + url.urlLen = 1 + tsnprintf(url.url, TSDB_ANALYTIC_ANODE_URL_LEN + TSDB_ANALYTIC_ALGO_TYPE_LEN, "%s/%s", pAnode->url, taosAnalAlgoUrlStr(url.type)); if (taosHashPut(rsp.hash, name, nameLen, &url, sizeof(SAnalyticsUrl)) != 0) { taosMemoryFree(url.url); diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 94cc5d9129..3bc9c806b0 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -44,9 +44,9 @@ typedef struct { SExprSupp scalarSup; int32_t tsSlotId; STimeWindowAggSupp twAggSup; - char algoName[TSDB_ANAL_ALGO_NAME_LEN]; - char algoUrl[TSDB_ANAL_ALGO_URL_LEN]; - char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN]; + char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN]; + char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN]; + char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN]; SAnomalyWindowSupp anomalySup; SWindowRowsSup anomalyWinRowSup; SColumn anomalyCol; @@ -75,13 +75,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) { qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt); - code = TSDB_CODE_ANAL_ALGO_NOT_FOUND; + code = TSDB_CODE_ANA_ALGO_NOT_FOUND; goto _error; } if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) { qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName); - code = TSDB_CODE_ANAL_ALGO_NOT_LOAD; + code = TSDB_CODE_ANA_ALGO_NOT_LOAD; goto _error; } @@ -262,7 +262,7 @@ static void anomalyDestroyOperatorInfo(void* param) { static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pSrc) { if (pInfo->anomalySup.cachedRows > ANAL_ANOMALY_WINDOW_MAX_ROWS) { - return TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS; + return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS; } SSDataBlock* pDst = NULL; @@ -287,7 +287,7 @@ static int32_t anomalyFindWindow(SAnomalyWindowSupp* pSupp, TSKEY key) { return -1; } -static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows) { +static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows, const char* pId) { int32_t code = 0; int32_t rows = 0; STimeWindow win = {0}; @@ -295,8 +295,23 @@ static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows) { taosArrayClear(pWindows); tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code); - if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; - if (rows <= 0) return 0; + if (code < 0) { + return TSDB_CODE_INVALID_JSON_FORMAT; + } + + if (rows < 0) { + char pMsg[1024] = {0}; + code = tjsonGetStringValue(pJson, "msg", pMsg); + if (code) { + qError("%s failed to get error msg from rsp, unknown error", pId); + } else { + qError("%s failed to exec forecast, msg:%s", pId, pMsg); + } + + return TSDB_CODE_ANA_WN_DATA; + } else if (rows == 0) { + return TSDB_CODE_SUCCESS; + } SJson* res = tjsonGetObjectItem(pJson, "res"); if (res == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; @@ -313,7 +328,10 @@ static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows) { SJson* start = tjsonGetArrayItem(row, 0); SJson* end = tjsonGetArrayItem(row, 1); - if (start == NULL || end == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; + if (start == NULL || end == NULL) { + qError("%s invalid res from analytic sys, code:%s", pId, tstrerror(TSDB_CODE_INVALID_JSON_FORMAT)); + return TSDB_CODE_INVALID_JSON_FORMAT; + } tjsonGetObjectValueBigInt(start, &win.skey); tjsonGetObjectValueBigInt(end, &win.ekey); @@ -322,52 +340,57 @@ static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows) { win.ekey = win.skey + 1; } - if (taosArrayPush(pWindows, &win) == NULL) return TSDB_CODE_OUT_OF_BUFFER; + if (taosArrayPush(pWindows, &win) == NULL) { + qError("%s out of memory in generating anomaly_window", pId); + return TSDB_CODE_OUT_OF_BUFFER; + } } int32_t numOfWins = taosArrayGetSize(pWindows); - qDebug("anomaly window recevied, total:%d", numOfWins); + qDebug("%s anomaly window recevied, total:%d", pId, numOfWins); for (int32_t i = 0; i < numOfWins; ++i) { STimeWindow* pWindow = taosArrayGet(pWindows, i); - qDebug("anomaly win:%d [%" PRId64 ", %" PRId64 ")", i, pWindow->skey, pWindow->ekey); + qDebug("%s anomaly win:%d [%" PRId64 ", %" PRId64 ")", pId, i, pWindow->skey, pWindow->ekey); } - return 0; + return code; } static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { SAnomalyWindowOperatorInfo* pInfo = pOperator->info; SAnomalyWindowSupp* pSupp = &pInfo->anomalySup; SJson* pJson = NULL; - SAnalBuf analBuf = {.bufType = ANAL_BUF_TYPE_JSON}; + SAnalyticBuf analBuf = {.bufType = ANALYTICS_BUF_TYPE_JSON}; char dataBuf[64] = {0}; int32_t code = 0; int64_t ts = 0; + int32_t lino = 0; + const char* pId = GET_TASKID(pOperator->pTaskInfo); - // int64_t ts = taosGetTimestampMs(); snprintf(analBuf.fileName, sizeof(analBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%" PRId64, tsTempDir, ts, pSupp->groupId); code = tsosAnalBufOpen(&analBuf, 2); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); const char* prec = TSDB_TIME_PRECISION_MILLI_STR; if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR; if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR; code = taosAnalBufWriteColMeta(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts"); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); code = taosAnalBufWriteColMeta(&analBuf, 1, pInfo->anomalyCol.type, "val"); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); code = taosAnalBufWriteDataBegin(&analBuf); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); int32_t numOfBlocks = (int32_t)taosArrayGetSize(pSupp->blocks); // timestamp code = taosAnalBufWriteColBegin(&analBuf, 0); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); + for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i); if (pBlock == NULL) break; @@ -375,15 +398,17 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { if (pTsCol == NULL) break; for (int32_t j = 0; j < pBlock->info.rows; ++j) { code = taosAnalBufWriteColData(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &((TSKEY*)pTsCol->pData)[j]); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); } } + code = taosAnalBufWriteColEnd(&analBuf, 0); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); // data code = taosAnalBufWriteColBegin(&analBuf, 1); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); + for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i); if (pBlock == NULL) break; @@ -392,48 +417,47 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { for (int32_t j = 0; j < pBlock->info.rows; ++j) { code = taosAnalBufWriteColData(&analBuf, 1, pValCol->info.type, colDataGetData(pValCol, j)); - if (code != 0) goto _OVER; - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); } } code = taosAnalBufWriteColEnd(&analBuf, 1); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); code = taosAnalBufWriteDataEnd(&analBuf); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); code = taosAnalBufWriteOptStr(&analBuf, "option", pInfo->anomalyOpt); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); code = taosAnalBufWriteOptStr(&analBuf, "algo", pInfo->algoName); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); code = taosAnalBufWriteOptStr(&analBuf, "prec", prec); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); int64_t wncheck = ANAL_FORECAST_DEFAULT_WNCHECK; bool hasWncheck = taosAnalGetOptInt(pInfo->anomalyOpt, "wncheck", &wncheck); if (!hasWncheck) { qDebug("anomaly_window wncheck not found from %s, use default:%" PRId64, pInfo->anomalyOpt, wncheck); } + code = taosAnalBufWriteOptInt(&analBuf, "wncheck", wncheck); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); code = taosAnalBufClose(&analBuf); - if (code != 0) goto _OVER; + QUERY_CHECK_CODE(code, lino, _OVER); - pJson = taosAnalSendReqRetJson(pInfo->algoUrl, ANAL_HTTP_TYPE_POST, &analBuf); + pJson = taosAnalSendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf); if (pJson == NULL) { code = terrno; goto _OVER; } - code = anomalyParseJson(pJson, pSupp->windows); - if (code != 0) goto _OVER; + code = anomalyParseJson(pJson, pSupp->windows, pId); _OVER: if (code != 0) { - qError("failed to analysis window since %s", tstrerror(code)); + qError("%s failed to analysis window since %s, lino:%d", pId, tstrerror(code), lino); } taosAnalBufDestroy(&analBuf); diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 20dc9e28ba..bf1efc54ca 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -29,9 +29,9 @@ #ifdef USE_ANALYTICS typedef struct { - char algoName[TSDB_ANAL_ALGO_NAME_LEN]; - char algoUrl[TSDB_ANAL_ALGO_URL_LEN]; - char algoOpt[TSDB_ANAL_ALGO_OPTION_LEN]; + char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN]; + char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN]; + char algoOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN]; int64_t maxTs; int64_t minTs; int64_t numOfRows; @@ -47,7 +47,7 @@ typedef struct { int16_t inputValSlot; int8_t inputValType; int8_t inputPrecision; - SAnalBuf analBuf; + SAnalyticBuf analBuf; } SForecastSupp; typedef struct SForecastOperatorInfo { @@ -74,12 +74,12 @@ static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) { if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) { - return TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS; + return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS; } int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - SAnalBuf* pBuf = &pSupp->analBuf; + SAnalyticBuf* pBuf = &pSupp->analBuf; qDebug("block:%d, %p rows:%" PRId64, pSupp->numOfBlocks, pBlock, pBlock->info.rows); pSupp->numOfBlocks++; @@ -108,7 +108,7 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) { } static int32_t forecastCloseBuf(SForecastSupp* pSupp) { - SAnalBuf* pBuf = &pSupp->analBuf; + SAnalyticBuf* pBuf = &pSupp->analBuf; int32_t code = 0; for (int32_t i = 0; i < 2; ++i) { @@ -180,8 +180,8 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp) { return code; } -static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock) { - SAnalBuf* pBuf = &pSupp->analBuf; +static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) { + SAnalyticBuf* pBuf = &pSupp->analBuf; int32_t resCurRow = pBlock->info.rows; int8_t tmpI8; int16_t tmpI16; @@ -192,28 +192,45 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock) { int32_t code = 0; SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot); - if (NULL == pResValCol) return TSDB_CODE_OUT_OF_RANGE; + if (NULL == pResValCol) { + return terrno; + } SColumnInfoData* pResTsCol = (pSupp->resTsSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL); SColumnInfoData* pResLowCol = (pSupp->resLowSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL); SColumnInfoData* pResHighCol = (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL); - SJson* pJson = taosAnalSendReqRetJson(pSupp->algoUrl, ANAL_HTTP_TYPE_POST, pBuf); - if (pJson == NULL) return terrno; + SJson* pJson = taosAnalSendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf); + if (pJson == NULL) { + return terrno; + } int32_t rows = 0; tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code); - if (code < 0) goto _OVER; - if (rows <= 0) goto _OVER; + if (rows < 0 && code == 0) { + char pMsg[1024] = {0}; + code = tjsonGetStringValue(pJson, "msg", pMsg); + if (code != 0) { + qError("%s failed to get msg from rsp, unknown error", pId); + } else { + qError("%s failed to exec forecast, msg:%s", pId, pMsg); + } + + tjsonDelete(pJson); + return TSDB_CODE_ANA_WN_DATA; + } + + if (code < 0) { + goto _OVER; + } SJson* res = tjsonGetObjectItem(pJson, "res"); if (res == NULL) goto _OVER; int32_t ressize = tjsonGetArraySize(res); bool returnConf = (pSupp->resHighSlot != -1 || pSupp->resLowSlot != -1); - if (returnConf) { - if (ressize != 4) goto _OVER; - } else if (ressize != 2) { + + if ((returnConf && (ressize != 4)) || ((!returnConf) && (ressize != 2))) { goto _OVER; } @@ -313,41 +330,25 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock) { resCurRow++; } - // for (int32_t i = rows; i < pSupp->optRows; ++i) { - // colDataSetNNULL(pResValCol, rows, (pSupp->optRows - rows)); - // if (pResTsCol != NULL) { - // colDataSetNNULL(pResTsCol, rows, (pSupp->optRows - rows)); - // } - // if (pResLowCol != NULL) { - // colDataSetNNULL(pResLowCol, rows, (pSupp->optRows - rows)); - // } - // if (pResHighCol != NULL) { - // colDataSetNNULL(pResHighCol, rows, (pSupp->optRows - rows)); - // } - // } - - // if (rows == pSupp->optRows) { - // pResValCol->hasNull = false; - // } - pBlock->info.rows += rows; if (pJson != NULL) tjsonDelete(pJson); return 0; _OVER: - if (pJson != NULL) tjsonDelete(pJson); + tjsonDelete(pJson); if (code == 0) { code = TSDB_CODE_INVALID_JSON_FORMAT; } - qError("failed to perform forecast finalize since %s", tstrerror(code)); - return TSDB_CODE_INVALID_JSON_FORMAT; + + qError("%s failed to perform forecast finalize since %s", pId, tstrerror(code)); + return code; } -static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock) { +static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - SAnalBuf* pBuf = &pSupp->analBuf; + SAnalyticBuf* pBuf = &pSupp->analBuf; code = forecastCloseBuf(pSupp); QUERY_CHECK_CODE(code, lino, _end); @@ -355,10 +356,10 @@ static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBl code = forecastEnsureBlockCapacity(pResBlock, 1); QUERY_CHECK_CODE(code, lino, _end); - code = forecastAnalysis(pSupp, pResBlock); + code = forecastAnalysis(pSupp, pResBlock, pId); QUERY_CHECK_CODE(code, lino, _end); - uInfo("block:%d, forecast finalize", pSupp->numOfBlocks); + uInfo("%s block:%d, forecast finalize", pId, pSupp->numOfBlocks); _end: pSupp->numOfBlocks = 0; @@ -373,9 +374,10 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SForecastOperatorInfo* pInfo = pOperator->info; SSDataBlock* pResBlock = pInfo->pRes; SForecastSupp* pSupp = &pInfo->forecastSupp; - SAnalBuf* pBuf = &pSupp->analBuf; + SAnalyticBuf* pBuf = &pSupp->analBuf; int64_t st = taosGetTimestampUs(); int32_t numOfBlocks = pSupp->numOfBlocks; + const char* pId = GET_TASKID(pOperator->pTaskInfo); blockDataCleanup(pResBlock); @@ -389,45 +391,46 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pSupp->groupId = pBlock->info.id.groupId; numOfBlocks++; pSupp->cachedRows += pBlock->info.rows; - qDebug("group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, numOfBlocks, + qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks, pBlock->info.rows, pSupp->cachedRows); code = forecastCacheBlock(pSupp, pBlock); QUERY_CHECK_CODE(code, lino, _end); } else { - qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks); - code = forecastAggregateBlocks(pSupp, pResBlock); + qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks); + code = forecastAggregateBlocks(pSupp, pResBlock, pId); QUERY_CHECK_CODE(code, lino, _end); pSupp->groupId = pBlock->info.id.groupId; numOfBlocks = 1; pSupp->cachedRows = pBlock->info.rows; - qDebug("group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, pBlock->info.rows, - pSupp->cachedRows); + qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, + pBlock->info.rows, pSupp->cachedRows); code = forecastCacheBlock(pSupp, pBlock); QUERY_CHECK_CODE(code, lino, _end); } if (pResBlock->info.rows > 0) { (*ppRes) = pResBlock; - qDebug("group:%" PRId64 ", return to upstream, blocks:%d", pResBlock->info.id.groupId, numOfBlocks); + qDebug("%s group:%" PRId64 ", return to upstream, blocks:%d", pId, pResBlock->info.id.groupId, numOfBlocks); return code; } } if (numOfBlocks > 0) { - qDebug("group:%" PRId64 ", read finish, blocks:%d", pSupp->groupId, numOfBlocks); - code = forecastAggregateBlocks(pSupp, pResBlock); + qDebug("%s group:%" PRId64 ", read finish, blocks:%d", pId, pSupp->groupId, numOfBlocks); + code = forecastAggregateBlocks(pSupp, pResBlock, pId); QUERY_CHECK_CODE(code, lino, _end); } int64_t cost = taosGetTimestampUs() - st; - qDebug("all groups finished, cost:%" PRId64 "us", cost); + qDebug("%s all groups finished, cost:%" PRId64 "us", pId, cost); _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code)); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } + (*ppRes) = (pResBlock->info.rows == 0) ? NULL : pResBlock; return code; } @@ -498,7 +501,7 @@ static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs) { pSupp->inputPrecision = pTsNode->node.resType.precision; pSupp->inputValSlot = pValNode->slotId; pSupp->inputValType = pValNode->node.resType.type; - tstrncpy(pSupp->algoOpt, "algo=arima", TSDB_ANAL_ALGO_OPTION_LEN); + tstrncpy(pSupp->algoOpt, "algo=arima", TSDB_ANALYTIC_ALGO_OPTION_LEN); } else { return TSDB_CODE_PLAN_INTERNAL_ERROR; } @@ -516,22 +519,22 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp) { if (!taosAnalGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) { qError("failed to get forecast algorithm name from %s", pSupp->algoOpt); - return TSDB_CODE_ANAL_ALGO_NOT_FOUND; + return TSDB_CODE_ANA_ALGO_NOT_FOUND; } if (taosAnalGetAlgoUrl(pSupp->algoName, ANAL_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) { qError("failed to get forecast algorithm url from %s", pSupp->algoName); - return TSDB_CODE_ANAL_ALGO_NOT_LOAD; + return TSDB_CODE_ANA_ALGO_NOT_LOAD; } return 0; } static int32_t forecastCreateBuf(SForecastSupp* pSupp) { - SAnalBuf* pBuf = &pSupp->analBuf; + SAnalyticBuf* pBuf = &pSupp->analBuf; int64_t ts = 0; // taosGetTimestampMs(); - pBuf->bufType = ANAL_BUF_TYPE_JSON_COL; + pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL; snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts); int32_t code = tsosAnalBufOpen(pBuf, 2); if (code != 0) goto _OVER; diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 245346273f..1a5e3444c0 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1377,7 +1377,7 @@ SNode* createAnomalyWindowNode(SAstCreateContext* pCxt, SNode* pExpr, const STok CHECK_MAKE_NODE(pAnomaly->pCol); pAnomaly->pExpr = pExpr; if (pFuncOpt == NULL) { - tstrncpy(pAnomaly->anomalyOpt, "algo=iqr", TSDB_ANAL_ALGO_OPTION_LEN); + tstrncpy(pAnomaly->anomalyOpt, "algo=iqr", TSDB_ANALYTIC_ALGO_OPTION_LEN); } else { (void)trimString(pFuncOpt->z, pFuncOpt->n, pAnomaly->anomalyOpt, sizeof(pAnomaly->anomalyOpt)); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 9bea3491c3..fcb6361a6b 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -9652,7 +9652,7 @@ static int32_t translateDropUser(STranslateContext* pCxt, SDropUserStmt* pStmt) static int32_t translateCreateAnode(STranslateContext* pCxt, SCreateAnodeStmt* pStmt) { SMCreateAnodeReq createReq = {0}; createReq.urlLen = strlen(pStmt->url) + 1; - if (createReq.urlLen > TSDB_ANAL_ANODE_URL_LEN) { + if (createReq.urlLen > TSDB_ANALYTIC_ANODE_URL_LEN) { return TSDB_CODE_MND_ANODE_TOO_LONG_URL; } diff --git a/source/util/src/tanalytics.c b/source/util/src/tanalytics.c index 99d91700a2..68bbbb7e99 100644 --- a/source/util/src/tanalytics.c +++ b/source/util/src/tanalytics.c @@ -34,7 +34,7 @@ typedef struct { } SCurlResp; static SAlgoMgmt tsAlgos = {0}; -static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen); +static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen); const char *taosAnalAlgoStr(EAnalAlgoType type) { switch (type) { @@ -127,28 +127,44 @@ void taosAnalUpdate(int64_t newVer, SHashObj *pHash) { } bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { - char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0}; - int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); + char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0}; + char *pStart = NULL; + char *pEnd = NULL; - char *pos1 = strstr(option, buf); - char *pos2 = strstr(option, ANAL_ALGO_SPLIT); - if (pos1 != NULL) { - if (optMaxLen > 0) { - int32_t copyLen = optMaxLen; - if (pos2 != NULL) { - copyLen = (int32_t)(pos2 - pos1 - strlen(optName)); - copyLen = MIN(copyLen, optMaxLen); - } - tstrncpy(optValue, pos1 + bufLen, copyLen); - } - return true; - } else { + pStart = strstr(option, optName); + if (pStart == NULL) { return false; } + + pEnd = strstr(pStart, ANAL_ALGO_SPLIT); + if (optMaxLen > 0) { + if (pEnd > pStart) { + int32_t len = (int32_t)(pEnd - pStart); + len = MIN(len + 1, TSDB_ANALYTIC_ALGO_OPTION_LEN); + tstrncpy(buf, pStart, len); + } else { + int32_t len = MIN(tListLen(buf), strlen(pStart) + 1); + tstrncpy(buf, pStart, len); + } + + char *pRight = strstr(buf, "="); + if (pRight == NULL) { + return false; + } else { + pRight += 1; + } + + int32_t unused = strtrim(pRight); + + int32_t vLen = MIN(optMaxLen, strlen(pRight) + 1); + tstrncpy(optValue, pRight, vLen); + } + + return true; } bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { - char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0}; + char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0}; int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); char *pos1 = strstr(option, buf); @@ -163,7 +179,7 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValu int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { int32_t code = 0; - char name[TSDB_ANAL_ALGO_KEY_LEN] = {0}; + char name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0}; int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName); char *unused = strntolower(name, name, nameLen); @@ -175,7 +191,7 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url); } else { url[0] = 0; - terrno = TSDB_CODE_ANAL_ALGO_NOT_FOUND; + terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND; code = terrno; uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type)); } @@ -276,16 +292,16 @@ _OVER: return code; } -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { +SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { int32_t code = -1; char *pCont = NULL; int64_t contentLen; SJson *pJson = NULL; SCurlResp curlRsp = {0}; - if (type == ANAL_HTTP_TYPE_GET) { + if (type == ANALYTICS_HTTP_TYPE_GET) { if (taosCurlGetRequest(url, &curlRsp) != 0) { - terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS; + terrno = TSDB_CODE_ANA_URL_CANT_ACCESS; goto _OVER; } } else { @@ -295,20 +311,20 @@ SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBu goto _OVER; } if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) { - terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS; + terrno = TSDB_CODE_ANA_URL_CANT_ACCESS; goto _OVER; } } if (curlRsp.data == NULL || curlRsp.dataLen == 0) { - terrno = TSDB_CODE_ANAL_URL_RSP_IS_NULL; + terrno = TSDB_CODE_ANA_URL_RSP_IS_NULL; goto _OVER; } pJson = tjsonParse(curlRsp.data); if (pJson == NULL) { if (curlRsp.data[0] == '<') { - terrno = TSDB_CODE_ANAL_ANODE_RETURN_ERROR; + terrno = TSDB_CODE_ANA_ANODE_RETURN_ERROR; } else { terrno = TSDB_CODE_INVALID_JSON_FORMAT; } @@ -360,7 +376,7 @@ _OVER: return code; } -static int32_t taosAnalJsonBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { +static int32_t taosAnalJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { char buf[64] = {0}; int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal); if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { @@ -369,7 +385,7 @@ static int32_t taosAnalJsonBufWriteOptInt(SAnalBuf *pBuf, const char *optName, i return 0; } -static int32_t taosAnalJsonBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { +static int32_t taosAnalJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { char buf[128] = {0}; int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal); if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { @@ -378,7 +394,7 @@ static int32_t taosAnalJsonBufWriteOptStr(SAnalBuf *pBuf, const char *optName, c return 0; } -static int32_t taosAnalJsonBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { +static int32_t taosAnalJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { char buf[128] = {0}; int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal); if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { @@ -387,7 +403,7 @@ static int32_t taosAnalJsonBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, return 0; } -static int32_t taosAnalJsonBufWriteStr(SAnalBuf *pBuf, const char *buf, int32_t bufLen) { +static int32_t taosAnalJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) { if (bufLen <= 0) { bufLen = strlen(buf); } @@ -397,9 +413,9 @@ static int32_t taosAnalJsonBufWriteStr(SAnalBuf *pBuf, const char *buf, int32_t return 0; } -static int32_t taosAnalJsonBufWriteStart(SAnalBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); } +static int32_t taosAnalJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); } -static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { +static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); if (pBuf->filePtr == NULL) { return terrno; @@ -409,7 +425,7 @@ static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY; pBuf->numOfCols = numOfCols; - if (pBuf->bufType == ANAL_BUF_TYPE_JSON) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) { return taosAnalJsonBufWriteStart(pBuf); } @@ -426,7 +442,7 @@ static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { return taosAnalJsonBufWriteStart(pBuf); } -static int32_t taosAnalJsonBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { +static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { char buf[128] = {0}; bool first = (colIndex == 0); bool last = (colIndex == pBuf->numOfCols - 1); @@ -452,16 +468,16 @@ static int32_t taosAnalJsonBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int return 0; } -static int32_t taosAnalJsonBufWriteDataBegin(SAnalBuf *pBuf) { +static int32_t taosAnalJsonBufWriteDataBegin(SAnalyticBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0); } -static int32_t taosAnalJsonBufWriteStrUseCol(SAnalBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) { +static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) { if (bufLen <= 0) { bufLen = strlen(buf); } - if (pBuf->bufType == ANAL_BUF_TYPE_JSON) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) { if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { return terrno; } @@ -474,11 +490,11 @@ static int32_t taosAnalJsonBufWriteStrUseCol(SAnalBuf *pBuf, const char *buf, in return 0; } -static int32_t taosAnalJsonBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { +static int32_t taosAnalJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex); } -static int32_t taosAnalJsonBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { +static int32_t taosAnalJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { if (colIndex == pBuf->numOfCols - 1) { return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex); @@ -487,7 +503,7 @@ static int32_t taosAnalJsonBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { } } -static int32_t taosAnalJsonBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { +static int32_t taosAnalJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { char buf[64]; int32_t bufLen = 0; @@ -541,12 +557,12 @@ static int32_t taosAnalJsonBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex); } -static int32_t taosAnalJsonBufWriteDataEnd(SAnalBuf *pBuf) { +static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) { int32_t code = 0; char *pCont = NULL; int64_t contLen = 0; - if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { for (int32_t i = 0; i < pBuf->numOfCols; ++i) { SAnalyticsColBuf *pCol = &pBuf->pCols[i]; @@ -570,14 +586,14 @@ static int32_t taosAnalJsonBufWriteDataEnd(SAnalBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "],\n", 0); } -static int32_t taosAnalJsonBufWriteEnd(SAnalBuf *pBuf) { +static int32_t taosAnalJsonBufWriteEnd(SAnalyticBuf *pBuf) { int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows); if (code != 0) return code; return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0); } -int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) { +int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) { int32_t code = taosAnalJsonBufWriteEnd(pBuf); if (code != 0) return code; @@ -588,7 +604,7 @@ int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) { if (code != 0) return code; } - if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { for (int32_t i = 0; i < pBuf->numOfCols; ++i) { SAnalyticsColBuf *pCol = &pBuf->pCols[i]; if (pCol->filePtr != NULL) { @@ -603,14 +619,14 @@ int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) { return 0; } -void taosAnalBufDestroy(SAnalBuf *pBuf) { +void taosAnalBufDestroy(SAnalyticBuf *pBuf) { if (pBuf->fileName[0] != 0) { if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr); // taosRemoveFile(pBuf->fileName); pBuf->fileName[0] = 0; } - if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { for (int32_t i = 0; i < pBuf->numOfCols; ++i) { SAnalyticsColBuf *pCol = &pBuf->pCols[i]; if (pCol->fileName[0] != 0) { @@ -627,102 +643,102 @@ void taosAnalBufDestroy(SAnalBuf *pBuf) { pBuf->numOfCols = 0; } -int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { +int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return tsosAnalJsonBufOpen(pBuf, numOfCols); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { +int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { +int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { +int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { +int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) { - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { +int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufWriteDataBegin(pBuf); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { +int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufWriteColBegin(pBuf, colIndex); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { +int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { +int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufWriteColEnd(pBuf, colIndex); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) { - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { +int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufWriteDataEnd(pBuf); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -int32_t taosAnalBufClose(SAnalBuf *pBuf) { - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { +int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufClose(pBuf); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } -static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen) { +static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) { *ppCont = NULL; *pContLen = 0; - if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen); } else { - return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + return TSDB_CODE_ANA_BUF_INVALID_TYPE; } } @@ -730,7 +746,7 @@ static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContL int32_t taosAnalyticsInit() { return 0; } void taosAnalyticsCleanup() {} -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { return NULL; } +SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } @@ -738,18 +754,18 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optV int64_t taosAnalGetVersion() { return 0; } void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {} -int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { return 0; } -int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { return 0; } -int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { return 0; } -int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { return 0; } -int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; } -int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) { return 0; } -int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { return 0; } -int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } -int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { return 0; } -int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) { return 0; } -int32_t taosAnalBufClose(SAnalBuf *pBuf) { return 0; } -void taosAnalBufDestroy(SAnalBuf *pBuf) {} +int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; } +int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; } +int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; } +int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; } +int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; } +int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; } +int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } +int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } +int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } +int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; } +int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; } +void taosAnalBufDestroy(SAnalyticBuf *pBuf) {} const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; } EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 00f72123dc..9e8a85d301 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -361,13 +361,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO, "Anode too many algori TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME, "Anode too long algorithm name") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE, "Anode too many algorithm type") -TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis service response is NULL") -TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis service can't access") -TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm not found") -TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded") -TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type") -TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ANODE_RETURN_ERROR, "Analysis failed since anode return error") -TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS, "Analysis failed since too many input rows for anode") +TAOS_DEFINE_ERROR(TSDB_CODE_ANA_URL_RSP_IS_NULL, "Analysis service response is NULL") +TAOS_DEFINE_ERROR(TSDB_CODE_ANA_URL_CANT_ACCESS, "Analysis service can't access") +TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ALGO_NOT_FOUND, "Analysis algorithm is missing") +TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ALGO_NOT_LOAD, "Analysis algorithm not loaded") +TAOS_DEFINE_ERROR(TSDB_CODE_ANA_BUF_INVALID_TYPE, "Analysis invalid buffer type") +TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_RETURN_ERROR, "Analysis failed since anode return error") +TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS, "Analysis failed since too many input rows for anode") +TAOS_DEFINE_ERROR(TSDB_CODE_ANA_WN_DATA, "white-noise data not processed") // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")