Merge pull request #28873 from taosdata/fix/liaohj

fix(analytics): do some internal refactor and fix the error in parse options.
This commit is contained in:
Shengliang Guan 2024-11-22 09:06:13 +08:00 committed by GitHub
commit 36deebdde9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 322 additions and 275 deletions

View File

@ -39,14 +39,14 @@ typedef struct {
} SAnalyticsUrl;
typedef enum {
ANAL_BUF_TYPE_JSON = 0,
ANAL_BUF_TYPE_JSON_COL = 1,
ANAL_BUF_TYPE_OTHERS,
ANALYTICS_BUF_TYPE_JSON = 0,
ANALYTICS_BUF_TYPE_JSON_COL = 1,
ANALYTICS_BUF_TYPE_OTHERS,
} EAnalBufType;
typedef enum {
ANAL_HTTP_TYPE_GET = 0,
ANAL_HTTP_TYPE_POST,
ANALYTICS_HTTP_TYPE_GET = 0,
ANALYTICS_HTTP_TYPE_POST,
} EAnalHttpType;
typedef struct {
@ -61,11 +61,11 @@ typedef struct {
char fileName[TSDB_FILENAME_LEN];
int32_t numOfCols;
SAnalyticsColBuf *pCols;
} SAnalBuf;
} SAnalyticBuf;
int32_t taosAnalyticsInit();
void taosAnalyticsCleanup();
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf);
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf);
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen);
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen);
@ -73,18 +73,18 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optV
int64_t taosAnalGetVersion();
void taosAnalUpdate(int64_t newVer, SHashObj *pHash);
int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols);
int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal);
int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal);
int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal);
int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName);
int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf);
int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex);
int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue);
int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex);
int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf);
int32_t taosAnalBufClose(SAnalBuf *pBuf);
void taosAnalBufDestroy(SAnalBuf *pBuf);
int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols);
int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal);
int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal);
int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal);
int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName);
int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf);
int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex);
int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue);
int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex);
int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf);
int32_t taosAnalBufClose(SAnalyticBuf *pBuf);
void taosAnalBufDestroy(SAnalyticBuf *pBuf);
const char *taosAnalAlgoStr(EAnalAlgoType algoType);
EAnalAlgoType taosAnalAlgoInt(const char *algoName);

View File

@ -322,7 +322,7 @@ typedef struct SAlterDnodeStmt {
typedef struct {
ENodeType type;
char url[TSDB_ANAL_ANODE_URL_LEN + 3];
char url[TSDB_ANALYTIC_ANODE_URL_LEN + 3];
} SCreateAnodeStmt;
typedef struct {

View File

@ -334,7 +334,7 @@ typedef struct SWindowLogicNode {
int64_t windowSliding;
SNodeList* pTsmaSubplans;
SNode* pAnomalyExpr;
char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN];
char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN];
} SWindowLogicNode;
typedef struct SFillLogicNode {
@ -740,7 +740,7 @@ typedef SCountWinodwPhysiNode SStreamCountWinodwPhysiNode;
typedef struct SAnomalyWindowPhysiNode {
SWindowPhysiNode window;
SNode* pAnomalyKey;
char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN];
char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN];
} SAnomalyWindowPhysiNode;
typedef struct SSortPhysiNode {

View File

@ -351,7 +351,7 @@ typedef struct SAnomalyWindowNode {
ENodeType type; // QUERY_NODE_ANOMALY_WINDOW
SNode* pCol; // timestamp primary key
SNode* pExpr;
char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN];
char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN];
} SAnomalyWindowNode;
typedef enum EFillMode {

View File

@ -491,13 +491,14 @@ int32_t taosGetErrSize();
#define TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE TAOS_DEF_ERROR_CODE(0, 0x0438)
// analysis
#define TSDB_CODE_ANAL_URL_RSP_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x0440)
#define TSDB_CODE_ANAL_URL_CANT_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0441)
#define TSDB_CODE_ANAL_ALGO_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0442)
#define TSDB_CODE_ANAL_ALGO_NOT_LOAD TAOS_DEF_ERROR_CODE(0, 0x0443)
#define TSDB_CODE_ANAL_BUF_INVALID_TYPE TAOS_DEF_ERROR_CODE(0, 0x0444)
#define TSDB_CODE_ANAL_ANODE_RETURN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0445)
#define TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS TAOS_DEF_ERROR_CODE(0, 0x0446)
#define TSDB_CODE_ANA_URL_RSP_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x0440)
#define TSDB_CODE_ANA_URL_CANT_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0441)
#define TSDB_CODE_ANA_ALGO_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0442)
#define TSDB_CODE_ANA_ALGO_NOT_LOAD TAOS_DEF_ERROR_CODE(0, 0x0443)
#define TSDB_CODE_ANA_BUF_INVALID_TYPE TAOS_DEF_ERROR_CODE(0, 0x0444)
#define TSDB_CODE_ANA_ANODE_RETURN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0445)
#define TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS TAOS_DEF_ERROR_CODE(0, 0x0446)
#define TSDB_CODE_ANA_WN_DATA TAOS_DEF_ERROR_CODE(0, 0x0447)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)

View File

@ -335,12 +335,13 @@ typedef enum ELogicConditionType {
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000
#define TSDB_LOG_VAR_LEN 32
#define TSDB_ANAL_ANODE_URL_LEN 128
#define TSDB_ANAL_ALGO_NAME_LEN 64
#define TSDB_ANAL_ALGO_TYPE_LEN 24
#define TSDB_ANAL_ALGO_KEY_LEN (TSDB_ANAL_ALGO_NAME_LEN + 9)
#define TSDB_ANAL_ALGO_URL_LEN (TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN + 1)
#define TSDB_ANAL_ALGO_OPTION_LEN 256
#define TSDB_ANALYTIC_ANODE_URL_LEN 128
#define TSDB_ANALYTIC_ALGO_NAME_LEN 64
#define TSDB_ANALYTIC_ALGO_TYPE_LEN 24
#define TSDB_ANALYTIC_ALGO_KEY_LEN (TSDB_ANALYTIC_ALGO_NAME_LEN + 9)
#define TSDB_ANALYTIC_ALGO_URL_LEN (TSDB_ANALYTIC_ANODE_URL_LEN + TSDB_ANALYTIC_ALGO_TYPE_LEN + 1)
#define TSDB_ANALYTIC_ALGO_OPTION_LEN 256
#define TSDB_MAX_EP_NUM 10

View File

@ -402,7 +402,7 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = {
static const SSysDbTableSchema anodesSchema[] = {
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "url", .bytes = TSDB_ANAL_ANODE_URL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "url", .bytes = TSDB_ANALYTIC_ANODE_URL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "update_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
@ -410,8 +410,8 @@ static const SSysDbTableSchema anodesSchema[] = {
static const SSysDbTableSchema anodesFullSchema[] = {
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "type", .bytes = TSDB_ANAL_ALGO_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "algo", .bytes = TSDB_ANAL_ALGO_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "type", .bytes = TSDB_ANALYTIC_ALGO_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "algo", .bytes = TSDB_ANALYTIC_ALGO_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
};
static const SSysDbTableSchema tsmaSchema[] = {

View File

@ -2169,7 +2169,7 @@ int32_t tSerializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAl
SAnalyticsUrl *pUrl = pIter;
size_t nameLen = 0;
const char *name = taosHashGetKey(pIter, &nameLen);
if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_KEY_LEN && pUrl->urlLen > 0) {
if (nameLen > 0 && nameLen <= TSDB_ANALYTIC_ALGO_KEY_LEN && pUrl->urlLen > 0) {
numOfAlgos++;
}
pIter = taosHashIterate(pRsp->hash, pIter);
@ -2224,7 +2224,7 @@ int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnal
int32_t numOfAlgos = 0;
int32_t nameLen;
int32_t type;
char name[TSDB_ANAL_ALGO_KEY_LEN];
char name[TSDB_ANALYTIC_ALGO_KEY_LEN];
SAnalyticsUrl url = {0};
TAOS_CHECK_EXIT(tStartDecode(&decoder));
@ -2233,7 +2233,7 @@ int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnal
for (int32_t f = 0; f < numOfAlgos; ++f) {
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nameLen));
if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_NAME_LEN) {
if (nameLen > 0 && nameLen <= TSDB_ANALYTIC_ALGO_NAME_LEN) {
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, name));
}

View File

@ -309,7 +309,7 @@ static int32_t mndCreateAnode(SMnode *pMnode, SRpcMsg *pReq, SMCreateAnodeReq *p
anodeObj.updateTime = anodeObj.createdTime;
anodeObj.version = 0;
anodeObj.urlLen = pCreate->urlLen;
if (anodeObj.urlLen > TSDB_ANAL_ANODE_URL_LEN) {
if (anodeObj.urlLen > TSDB_ANALYTIC_ANODE_URL_LEN) {
code = TSDB_CODE_MND_ANODE_TOO_LONG_URL;
goto _OVER;
}
@ -491,23 +491,24 @@ static int32_t mndSetDropAnodeRedoLogs(STrans *pTrans, SAnodeObj *pObj) {
int32_t code = 0;
SSdbRaw *pRedoRaw = mndAnodeActionEncode(pObj);
if (pRedoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
code = terrno;
return code;
}
TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
TAOS_RETURN(code);
return code;
}
static int32_t mndSetDropAnodeCommitLogs(STrans *pTrans, SAnodeObj *pObj) {
int32_t code = 0;
SSdbRaw *pCommitRaw = mndAnodeActionEncode(pObj);
if (pCommitRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
code = terrno;
return code;
}
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
TAOS_RETURN(code);
@ -521,25 +522,25 @@ static int32_t mndSetDropAnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SAnode
}
static int32_t mndDropAnode(SMnode *pMnode, SRpcMsg *pReq, SAnodeObj *pObj) {
int32_t code = -1;
int32_t code = 0;
int32_t lino = 0;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-anode");
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TSDB_CHECK_NULL(pTrans, code, lino, _OVER, terrno);
mndTransSetSerial(pTrans);
mInfo("trans:%d, to drop anode:%d", pTrans->id, pObj->id);
mInfo("trans:%d, used to drop anode:%d", pTrans->id, pObj->id);
TAOS_CHECK_GOTO(mndSetDropAnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER);
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
code = mndSetDropAnodeInfoToTrans(pMnode, pTrans, pObj, false);
mndReleaseAnode(pMnode, pObj);
code = 0;
TSDB_CHECK_CODE(code, lino, _OVER);
code = mndTransPrepare(pMnode, pTrans);
_OVER:
mndTransDrop(pTrans);
TAOS_RETURN(code);
return code;
}
static int32_t mndProcessDropAnodeReq(SRpcMsg *pReq) {
@ -560,20 +561,20 @@ static int32_t mndProcessDropAnodeReq(SRpcMsg *pReq) {
pObj = mndAcquireAnode(pMnode, dropReq.anodeId);
if (pObj == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
code = terrno;
goto _OVER;
}
code = mndDropAnode(pMnode, pReq, pObj);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
if (code == 0) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("anode:%d, failed to drop since %s", dropReq.anodeId, tstrerror(code));
}
mndReleaseAnode(pMnode, pObj);
tFreeSMDropAnodeReq(&dropReq);
TAOS_RETURN(code);
}
@ -584,7 +585,7 @@ static int32_t mndRetrieveAnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t numOfRows = 0;
int32_t cols = 0;
SAnodeObj *pObj = NULL;
char buf[TSDB_ANAL_ANODE_URL_LEN + VARSTR_HEADER_SIZE];
char buf[TSDB_ANALYTIC_ANODE_URL_LEN + VARSTR_HEADER_SIZE];
char status[64];
int32_t code = 0;
@ -642,7 +643,7 @@ static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
int32_t numOfRows = 0;
int32_t cols = 0;
SAnodeObj *pObj = NULL;
char buf[TSDB_ANAL_ALGO_NAME_LEN + VARSTR_HEADER_SIZE];
char buf[TSDB_ANALYTIC_ALGO_NAME_LEN + VARSTR_HEADER_SIZE];
int32_t code = 0;
while (numOfRows < rows) {
@ -693,7 +694,7 @@ static int32_t mndDecodeAlgoList(SJson *pJson, SAnodeObj *pObj) {
int32_t code = 0;
int32_t protocol = 0;
double tmp = 0;
char buf[TSDB_ANAL_ALGO_NAME_LEN + 1] = {0};
char buf[TSDB_ANALYTIC_ALGO_NAME_LEN + 1] = {0};
code = tjsonGetDoubleValue(pJson, "protocol", &tmp);
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
@ -753,10 +754,10 @@ static int32_t mndDecodeAlgoList(SJson *pJson, SAnodeObj *pObj) {
}
static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj) {
char anodeUrl[TSDB_ANAL_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANAL_ANODE_URL_LEN, "%s/%s", url, "list");
char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", url, "list");
SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANAL_HTTP_TYPE_GET, NULL);
SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL);
if (pJson == NULL) return terrno;
int32_t code = mndDecodeAlgoList(pJson, pObj);
@ -769,10 +770,10 @@ static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status, int32_t statusLe
int32_t code = 0;
int32_t protocol = 0;
double tmp = 0;
char anodeUrl[TSDB_ANAL_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANAL_ANODE_URL_LEN, "%s/%s", pObj->url, "status");
char anodeUrl[TSDB_ANALYTIC_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANALYTIC_ANODE_URL_LEN, "%s/%s", pObj->url, "status");
SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANAL_HTTP_TYPE_GET, NULL);
SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANALYTICS_HTTP_TYPE_GET, NULL);
if (pJson == NULL) return terrno;
code = tjsonGetDoubleValue(pJson, "protocol", &tmp);
@ -808,7 +809,7 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) {
SAnodeObj *pObj = NULL;
SAnalyticsUrl url;
int32_t nameLen;
char name[TSDB_ANAL_ALGO_KEY_LEN];
char name[TSDB_ANALYTIC_ALGO_KEY_LEN];
SRetrieveAnalAlgoReq req = {0};
SRetrieveAnalAlgoRsp rsp = {0};
@ -847,13 +848,13 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) {
goto _OVER;
}
}
url.url = taosMemoryMalloc(TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN + 1);
url.url = taosMemoryMalloc(TSDB_ANALYTIC_ANODE_URL_LEN + TSDB_ANALYTIC_ALGO_TYPE_LEN + 1);
if (url.url == NULL) {
sdbRelease(pSdb, pAnode);
goto _OVER;
}
url.urlLen = 1 + tsnprintf(url.url, TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN, "%s/%s", pAnode->url,
url.urlLen = 1 + tsnprintf(url.url, TSDB_ANALYTIC_ANODE_URL_LEN + TSDB_ANALYTIC_ALGO_TYPE_LEN, "%s/%s", pAnode->url,
taosAnalAlgoUrlStr(url.type));
if (taosHashPut(rsp.hash, name, nameLen, &url, sizeof(SAnalyticsUrl)) != 0) {
taosMemoryFree(url.url);

View File

@ -44,9 +44,9 @@ typedef struct {
SExprSupp scalarSup;
int32_t tsSlotId;
STimeWindowAggSupp twAggSup;
char algoName[TSDB_ANAL_ALGO_NAME_LEN];
char algoUrl[TSDB_ANAL_ALGO_URL_LEN];
char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN];
char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
char anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN];
SAnomalyWindowSupp anomalySup;
SWindowRowsSup anomalyWinRowSup;
SColumn anomalyCol;
@ -75,13 +75,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) {
qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt);
code = TSDB_CODE_ANAL_ALGO_NOT_FOUND;
code = TSDB_CODE_ANA_ALGO_NOT_FOUND;
goto _error;
}
if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) {
qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName);
code = TSDB_CODE_ANAL_ALGO_NOT_LOAD;
code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
goto _error;
}
@ -262,7 +262,7 @@ static void anomalyDestroyOperatorInfo(void* param) {
static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pSrc) {
if (pInfo->anomalySup.cachedRows > ANAL_ANOMALY_WINDOW_MAX_ROWS) {
return TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS;
return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
}
SSDataBlock* pDst = NULL;
@ -287,7 +287,7 @@ static int32_t anomalyFindWindow(SAnomalyWindowSupp* pSupp, TSKEY key) {
return -1;
}
static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows) {
static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows, const char* pId) {
int32_t code = 0;
int32_t rows = 0;
STimeWindow win = {0};
@ -295,8 +295,23 @@ static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows) {
taosArrayClear(pWindows);
tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
if (rows <= 0) return 0;
if (code < 0) {
return TSDB_CODE_INVALID_JSON_FORMAT;
}
if (rows < 0) {
char pMsg[1024] = {0};
code = tjsonGetStringValue(pJson, "msg", pMsg);
if (code) {
qError("%s failed to get error msg from rsp, unknown error", pId);
} else {
qError("%s failed to exec forecast, msg:%s", pId, pMsg);
}
return TSDB_CODE_ANA_WN_DATA;
} else if (rows == 0) {
return TSDB_CODE_SUCCESS;
}
SJson* res = tjsonGetObjectItem(pJson, "res");
if (res == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
@ -313,7 +328,10 @@ static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows) {
SJson* start = tjsonGetArrayItem(row, 0);
SJson* end = tjsonGetArrayItem(row, 1);
if (start == NULL || end == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
if (start == NULL || end == NULL) {
qError("%s invalid res from analytic sys, code:%s", pId, tstrerror(TSDB_CODE_INVALID_JSON_FORMAT));
return TSDB_CODE_INVALID_JSON_FORMAT;
}
tjsonGetObjectValueBigInt(start, &win.skey);
tjsonGetObjectValueBigInt(end, &win.ekey);
@ -322,52 +340,57 @@ static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows) {
win.ekey = win.skey + 1;
}
if (taosArrayPush(pWindows, &win) == NULL) return TSDB_CODE_OUT_OF_BUFFER;
if (taosArrayPush(pWindows, &win) == NULL) {
qError("%s out of memory in generating anomaly_window", pId);
return TSDB_CODE_OUT_OF_BUFFER;
}
}
int32_t numOfWins = taosArrayGetSize(pWindows);
qDebug("anomaly window recevied, total:%d", numOfWins);
qDebug("%s anomaly window recevied, total:%d", pId, numOfWins);
for (int32_t i = 0; i < numOfWins; ++i) {
STimeWindow* pWindow = taosArrayGet(pWindows, i);
qDebug("anomaly win:%d [%" PRId64 ", %" PRId64 ")", i, pWindow->skey, pWindow->ekey);
qDebug("%s anomaly win:%d [%" PRId64 ", %" PRId64 ")", pId, i, pWindow->skey, pWindow->ekey);
}
return 0;
return code;
}
static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
SAnomalyWindowSupp* pSupp = &pInfo->anomalySup;
SJson* pJson = NULL;
SAnalBuf analBuf = {.bufType = ANAL_BUF_TYPE_JSON};
SAnalyticBuf analBuf = {.bufType = ANALYTICS_BUF_TYPE_JSON};
char dataBuf[64] = {0};
int32_t code = 0;
int64_t ts = 0;
int32_t lino = 0;
const char* pId = GET_TASKID(pOperator->pTaskInfo);
// int64_t ts = taosGetTimestampMs();
snprintf(analBuf.fileName, sizeof(analBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%" PRId64, tsTempDir, ts,
pSupp->groupId);
code = tsosAnalBufOpen(&analBuf, 2);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
code = taosAnalBufWriteColMeta(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts");
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteColMeta(&analBuf, 1, pInfo->anomalyCol.type, "val");
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteDataBegin(&analBuf);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
int32_t numOfBlocks = (int32_t)taosArrayGetSize(pSupp->blocks);
// timestamp
code = taosAnalBufWriteColBegin(&analBuf, 0);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i);
if (pBlock == NULL) break;
@ -375,15 +398,17 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
if (pTsCol == NULL) break;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
code = taosAnalBufWriteColData(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &((TSKEY*)pTsCol->pData)[j]);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
}
}
code = taosAnalBufWriteColEnd(&analBuf, 0);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
// data
code = taosAnalBufWriteColBegin(&analBuf, 1);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i);
if (pBlock == NULL) break;
@ -392,48 +417,47 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
code = taosAnalBufWriteColData(&analBuf, 1, pValCol->info.type, colDataGetData(pValCol, j));
if (code != 0) goto _OVER;
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
}
}
code = taosAnalBufWriteColEnd(&analBuf, 1);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteDataEnd(&analBuf);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteOptStr(&analBuf, "option", pInfo->anomalyOpt);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteOptStr(&analBuf, "algo", pInfo->algoName);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufWriteOptStr(&analBuf, "prec", prec);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
int64_t wncheck = ANAL_FORECAST_DEFAULT_WNCHECK;
bool hasWncheck = taosAnalGetOptInt(pInfo->anomalyOpt, "wncheck", &wncheck);
if (!hasWncheck) {
qDebug("anomaly_window wncheck not found from %s, use default:%" PRId64, pInfo->anomalyOpt, wncheck);
}
code = taosAnalBufWriteOptInt(&analBuf, "wncheck", wncheck);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
code = taosAnalBufClose(&analBuf);
if (code != 0) goto _OVER;
QUERY_CHECK_CODE(code, lino, _OVER);
pJson = taosAnalSendReqRetJson(pInfo->algoUrl, ANAL_HTTP_TYPE_POST, &analBuf);
pJson = taosAnalSendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analBuf);
if (pJson == NULL) {
code = terrno;
goto _OVER;
}
code = anomalyParseJson(pJson, pSupp->windows);
if (code != 0) goto _OVER;
code = anomalyParseJson(pJson, pSupp->windows, pId);
_OVER:
if (code != 0) {
qError("failed to analysis window since %s", tstrerror(code));
qError("%s failed to analysis window since %s, lino:%d", pId, tstrerror(code), lino);
}
taosAnalBufDestroy(&analBuf);

View File

@ -29,9 +29,9 @@
#ifdef USE_ANALYTICS
typedef struct {
char algoName[TSDB_ANAL_ALGO_NAME_LEN];
char algoUrl[TSDB_ANAL_ALGO_URL_LEN];
char algoOpt[TSDB_ANAL_ALGO_OPTION_LEN];
char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
char algoOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN];
int64_t maxTs;
int64_t minTs;
int64_t numOfRows;
@ -47,7 +47,7 @@ typedef struct {
int16_t inputValSlot;
int8_t inputValType;
int8_t inputPrecision;
SAnalBuf analBuf;
SAnalyticBuf analBuf;
} SForecastSupp;
typedef struct SForecastOperatorInfo {
@ -74,12 +74,12 @@ static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) {
if (pSupp->cachedRows > ANAL_FORECAST_MAX_ROWS) {
return TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS;
return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SAnalBuf* pBuf = &pSupp->analBuf;
SAnalyticBuf* pBuf = &pSupp->analBuf;
qDebug("block:%d, %p rows:%" PRId64, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
pSupp->numOfBlocks++;
@ -108,7 +108,7 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) {
}
static int32_t forecastCloseBuf(SForecastSupp* pSupp) {
SAnalBuf* pBuf = &pSupp->analBuf;
SAnalyticBuf* pBuf = &pSupp->analBuf;
int32_t code = 0;
for (int32_t i = 0; i < 2; ++i) {
@ -180,8 +180,8 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp) {
return code;
}
static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock) {
SAnalBuf* pBuf = &pSupp->analBuf;
static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
SAnalyticBuf* pBuf = &pSupp->analBuf;
int32_t resCurRow = pBlock->info.rows;
int8_t tmpI8;
int16_t tmpI16;
@ -192,28 +192,45 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock) {
int32_t code = 0;
SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot);
if (NULL == pResValCol) return TSDB_CODE_OUT_OF_RANGE;
if (NULL == pResValCol) {
return terrno;
}
SColumnInfoData* pResTsCol = (pSupp->resTsSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
SColumnInfoData* pResLowCol = (pSupp->resLowSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
SColumnInfoData* pResHighCol =
(pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL);
SJson* pJson = taosAnalSendReqRetJson(pSupp->algoUrl, ANAL_HTTP_TYPE_POST, pBuf);
if (pJson == NULL) return terrno;
SJson* pJson = taosAnalSendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf);
if (pJson == NULL) {
return terrno;
}
int32_t rows = 0;
tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
if (code < 0) goto _OVER;
if (rows <= 0) goto _OVER;
if (rows < 0 && code == 0) {
char pMsg[1024] = {0};
code = tjsonGetStringValue(pJson, "msg", pMsg);
if (code != 0) {
qError("%s failed to get msg from rsp, unknown error", pId);
} else {
qError("%s failed to exec forecast, msg:%s", pId, pMsg);
}
tjsonDelete(pJson);
return TSDB_CODE_ANA_WN_DATA;
}
if (code < 0) {
goto _OVER;
}
SJson* res = tjsonGetObjectItem(pJson, "res");
if (res == NULL) goto _OVER;
int32_t ressize = tjsonGetArraySize(res);
bool returnConf = (pSupp->resHighSlot != -1 || pSupp->resLowSlot != -1);
if (returnConf) {
if (ressize != 4) goto _OVER;
} else if (ressize != 2) {
if ((returnConf && (ressize != 4)) || ((!returnConf) && (ressize != 2))) {
goto _OVER;
}
@ -313,41 +330,25 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock) {
resCurRow++;
}
// for (int32_t i = rows; i < pSupp->optRows; ++i) {
// colDataSetNNULL(pResValCol, rows, (pSupp->optRows - rows));
// if (pResTsCol != NULL) {
// colDataSetNNULL(pResTsCol, rows, (pSupp->optRows - rows));
// }
// if (pResLowCol != NULL) {
// colDataSetNNULL(pResLowCol, rows, (pSupp->optRows - rows));
// }
// if (pResHighCol != NULL) {
// colDataSetNNULL(pResHighCol, rows, (pSupp->optRows - rows));
// }
// }
// if (rows == pSupp->optRows) {
// pResValCol->hasNull = false;
// }
pBlock->info.rows += rows;
if (pJson != NULL) tjsonDelete(pJson);
return 0;
_OVER:
if (pJson != NULL) tjsonDelete(pJson);
tjsonDelete(pJson);
if (code == 0) {
code = TSDB_CODE_INVALID_JSON_FORMAT;
}
qError("failed to perform forecast finalize since %s", tstrerror(code));
return TSDB_CODE_INVALID_JSON_FORMAT;
qError("%s failed to perform forecast finalize since %s", pId, tstrerror(code));
return code;
}
static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock) {
static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SAnalBuf* pBuf = &pSupp->analBuf;
SAnalyticBuf* pBuf = &pSupp->analBuf;
code = forecastCloseBuf(pSupp);
QUERY_CHECK_CODE(code, lino, _end);
@ -355,10 +356,10 @@ static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBl
code = forecastEnsureBlockCapacity(pResBlock, 1);
QUERY_CHECK_CODE(code, lino, _end);
code = forecastAnalysis(pSupp, pResBlock);
code = forecastAnalysis(pSupp, pResBlock, pId);
QUERY_CHECK_CODE(code, lino, _end);
uInfo("block:%d, forecast finalize", pSupp->numOfBlocks);
uInfo("%s block:%d, forecast finalize", pId, pSupp->numOfBlocks);
_end:
pSupp->numOfBlocks = 0;
@ -373,9 +374,10 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SForecastOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pResBlock = pInfo->pRes;
SForecastSupp* pSupp = &pInfo->forecastSupp;
SAnalBuf* pBuf = &pSupp->analBuf;
SAnalyticBuf* pBuf = &pSupp->analBuf;
int64_t st = taosGetTimestampUs();
int32_t numOfBlocks = pSupp->numOfBlocks;
const char* pId = GET_TASKID(pOperator->pTaskInfo);
blockDataCleanup(pResBlock);
@ -389,45 +391,46 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
pSupp->groupId = pBlock->info.id.groupId;
numOfBlocks++;
pSupp->cachedRows += pBlock->info.rows;
qDebug("group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, numOfBlocks,
qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks,
pBlock->info.rows, pSupp->cachedRows);
code = forecastCacheBlock(pSupp, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
} else {
qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks);
code = forecastAggregateBlocks(pSupp, pResBlock);
qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks);
code = forecastAggregateBlocks(pSupp, pResBlock, pId);
QUERY_CHECK_CODE(code, lino, _end);
pSupp->groupId = pBlock->info.id.groupId;
numOfBlocks = 1;
pSupp->cachedRows = pBlock->info.rows;
qDebug("group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, pBlock->info.rows,
pSupp->cachedRows);
qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId,
pBlock->info.rows, pSupp->cachedRows);
code = forecastCacheBlock(pSupp, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
}
if (pResBlock->info.rows > 0) {
(*ppRes) = pResBlock;
qDebug("group:%" PRId64 ", return to upstream, blocks:%d", pResBlock->info.id.groupId, numOfBlocks);
qDebug("%s group:%" PRId64 ", return to upstream, blocks:%d", pId, pResBlock->info.id.groupId, numOfBlocks);
return code;
}
}
if (numOfBlocks > 0) {
qDebug("group:%" PRId64 ", read finish, blocks:%d", pSupp->groupId, numOfBlocks);
code = forecastAggregateBlocks(pSupp, pResBlock);
qDebug("%s group:%" PRId64 ", read finish, blocks:%d", pId, pSupp->groupId, numOfBlocks);
code = forecastAggregateBlocks(pSupp, pResBlock, pId);
QUERY_CHECK_CODE(code, lino, _end);
}
int64_t cost = taosGetTimestampUs() - st;
qDebug("all groups finished, cost:%" PRId64 "us", cost);
qDebug("%s all groups finished, cost:%" PRId64 "us", pId, cost);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = (pResBlock->info.rows == 0) ? NULL : pResBlock;
return code;
}
@ -498,7 +501,7 @@ static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs) {
pSupp->inputPrecision = pTsNode->node.resType.precision;
pSupp->inputValSlot = pValNode->slotId;
pSupp->inputValType = pValNode->node.resType.type;
tstrncpy(pSupp->algoOpt, "algo=arima", TSDB_ANAL_ALGO_OPTION_LEN);
tstrncpy(pSupp->algoOpt, "algo=arima", TSDB_ANALYTIC_ALGO_OPTION_LEN);
} else {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
@ -516,22 +519,22 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp) {
if (!taosAnalGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) {
qError("failed to get forecast algorithm name from %s", pSupp->algoOpt);
return TSDB_CODE_ANAL_ALGO_NOT_FOUND;
return TSDB_CODE_ANA_ALGO_NOT_FOUND;
}
if (taosAnalGetAlgoUrl(pSupp->algoName, ANAL_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) {
qError("failed to get forecast algorithm url from %s", pSupp->algoName);
return TSDB_CODE_ANAL_ALGO_NOT_LOAD;
return TSDB_CODE_ANA_ALGO_NOT_LOAD;
}
return 0;
}
static int32_t forecastCreateBuf(SForecastSupp* pSupp) {
SAnalBuf* pBuf = &pSupp->analBuf;
SAnalyticBuf* pBuf = &pSupp->analBuf;
int64_t ts = 0; // taosGetTimestampMs();
pBuf->bufType = ANAL_BUF_TYPE_JSON_COL;
pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts);
int32_t code = tsosAnalBufOpen(pBuf, 2);
if (code != 0) goto _OVER;

View File

@ -1377,7 +1377,7 @@ SNode* createAnomalyWindowNode(SAstCreateContext* pCxt, SNode* pExpr, const STok
CHECK_MAKE_NODE(pAnomaly->pCol);
pAnomaly->pExpr = pExpr;
if (pFuncOpt == NULL) {
tstrncpy(pAnomaly->anomalyOpt, "algo=iqr", TSDB_ANAL_ALGO_OPTION_LEN);
tstrncpy(pAnomaly->anomalyOpt, "algo=iqr", TSDB_ANALYTIC_ALGO_OPTION_LEN);
} else {
(void)trimString(pFuncOpt->z, pFuncOpt->n, pAnomaly->anomalyOpt, sizeof(pAnomaly->anomalyOpt));
}

View File

@ -9652,7 +9652,7 @@ static int32_t translateDropUser(STranslateContext* pCxt, SDropUserStmt* pStmt)
static int32_t translateCreateAnode(STranslateContext* pCxt, SCreateAnodeStmt* pStmt) {
SMCreateAnodeReq createReq = {0};
createReq.urlLen = strlen(pStmt->url) + 1;
if (createReq.urlLen > TSDB_ANAL_ANODE_URL_LEN) {
if (createReq.urlLen > TSDB_ANALYTIC_ANODE_URL_LEN) {
return TSDB_CODE_MND_ANODE_TOO_LONG_URL;
}

View File

@ -34,7 +34,7 @@ typedef struct {
} SCurlResp;
static SAlgoMgmt tsAlgos = {0};
static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen);
static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen);
const char *taosAnalAlgoStr(EAnalAlgoType type) {
switch (type) {
@ -127,28 +127,44 @@ void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {
}
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) {
char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0};
int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName);
char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0};
char *pStart = NULL;
char *pEnd = NULL;
char *pos1 = strstr(option, buf);
char *pos2 = strstr(option, ANAL_ALGO_SPLIT);
if (pos1 != NULL) {
if (optMaxLen > 0) {
int32_t copyLen = optMaxLen;
if (pos2 != NULL) {
copyLen = (int32_t)(pos2 - pos1 - strlen(optName));
copyLen = MIN(copyLen, optMaxLen);
}
tstrncpy(optValue, pos1 + bufLen, copyLen);
}
return true;
} else {
pStart = strstr(option, optName);
if (pStart == NULL) {
return false;
}
pEnd = strstr(pStart, ANAL_ALGO_SPLIT);
if (optMaxLen > 0) {
if (pEnd > pStart) {
int32_t len = (int32_t)(pEnd - pStart);
len = MIN(len + 1, TSDB_ANALYTIC_ALGO_OPTION_LEN);
tstrncpy(buf, pStart, len);
} else {
int32_t len = MIN(tListLen(buf), strlen(pStart) + 1);
tstrncpy(buf, pStart, len);
}
char *pRight = strstr(buf, "=");
if (pRight == NULL) {
return false;
} else {
pRight += 1;
}
int32_t unused = strtrim(pRight);
int32_t vLen = MIN(optMaxLen, strlen(pRight) + 1);
tstrncpy(optValue, pRight, vLen);
}
return true;
}
bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) {
char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0};
char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0};
int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName);
char *pos1 = strstr(option, buf);
@ -163,7 +179,7 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValu
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) {
int32_t code = 0;
char name[TSDB_ANAL_ALGO_KEY_LEN] = {0};
char name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0};
int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName);
char *unused = strntolower(name, name, nameLen);
@ -175,7 +191,7 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url,
uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url);
} else {
url[0] = 0;
terrno = TSDB_CODE_ANAL_ALGO_NOT_FOUND;
terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND;
code = terrno;
uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type));
}
@ -276,16 +292,16 @@ _OVER:
return code;
}
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) {
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) {
int32_t code = -1;
char *pCont = NULL;
int64_t contentLen;
SJson *pJson = NULL;
SCurlResp curlRsp = {0};
if (type == ANAL_HTTP_TYPE_GET) {
if (type == ANALYTICS_HTTP_TYPE_GET) {
if (taosCurlGetRequest(url, &curlRsp) != 0) {
terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS;
terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
goto _OVER;
}
} else {
@ -295,20 +311,20 @@ SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBu
goto _OVER;
}
if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) {
terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS;
terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
goto _OVER;
}
}
if (curlRsp.data == NULL || curlRsp.dataLen == 0) {
terrno = TSDB_CODE_ANAL_URL_RSP_IS_NULL;
terrno = TSDB_CODE_ANA_URL_RSP_IS_NULL;
goto _OVER;
}
pJson = tjsonParse(curlRsp.data);
if (pJson == NULL) {
if (curlRsp.data[0] == '<') {
terrno = TSDB_CODE_ANAL_ANODE_RETURN_ERROR;
terrno = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
} else {
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
}
@ -360,7 +376,7 @@ _OVER:
return code;
}
static int32_t taosAnalJsonBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) {
static int32_t taosAnalJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
char buf[64] = {0};
int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal);
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
@ -369,7 +385,7 @@ static int32_t taosAnalJsonBufWriteOptInt(SAnalBuf *pBuf, const char *optName, i
return 0;
}
static int32_t taosAnalJsonBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) {
static int32_t taosAnalJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
char buf[128] = {0};
int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal);
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
@ -378,7 +394,7 @@ static int32_t taosAnalJsonBufWriteOptStr(SAnalBuf *pBuf, const char *optName, c
return 0;
}
static int32_t taosAnalJsonBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) {
static int32_t taosAnalJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
char buf[128] = {0};
int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal);
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
@ -387,7 +403,7 @@ static int32_t taosAnalJsonBufWriteOptFloat(SAnalBuf *pBuf, const char *optName,
return 0;
}
static int32_t taosAnalJsonBufWriteStr(SAnalBuf *pBuf, const char *buf, int32_t bufLen) {
static int32_t taosAnalJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) {
if (bufLen <= 0) {
bufLen = strlen(buf);
}
@ -397,9 +413,9 @@ static int32_t taosAnalJsonBufWriteStr(SAnalBuf *pBuf, const char *buf, int32_t
return 0;
}
static int32_t taosAnalJsonBufWriteStart(SAnalBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); }
static int32_t taosAnalJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); }
static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) {
static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pBuf->filePtr == NULL) {
return terrno;
@ -409,7 +425,7 @@ static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) {
if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY;
pBuf->numOfCols = numOfCols;
if (pBuf->bufType == ANAL_BUF_TYPE_JSON) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
return taosAnalJsonBufWriteStart(pBuf);
}
@ -426,7 +442,7 @@ static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) {
return taosAnalJsonBufWriteStart(pBuf);
}
static int32_t taosAnalJsonBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
char buf[128] = {0};
bool first = (colIndex == 0);
bool last = (colIndex == pBuf->numOfCols - 1);
@ -452,16 +468,16 @@ static int32_t taosAnalJsonBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int
return 0;
}
static int32_t taosAnalJsonBufWriteDataBegin(SAnalBuf *pBuf) {
static int32_t taosAnalJsonBufWriteDataBegin(SAnalyticBuf *pBuf) {
return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0);
}
static int32_t taosAnalJsonBufWriteStrUseCol(SAnalBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) {
static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) {
if (bufLen <= 0) {
bufLen = strlen(buf);
}
if (pBuf->bufType == ANAL_BUF_TYPE_JSON) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
return terrno;
}
@ -474,11 +490,11 @@ static int32_t taosAnalJsonBufWriteStrUseCol(SAnalBuf *pBuf, const char *buf, in
return 0;
}
static int32_t taosAnalJsonBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) {
static int32_t taosAnalJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex);
}
static int32_t taosAnalJsonBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) {
static int32_t taosAnalJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
if (colIndex == pBuf->numOfCols - 1) {
return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex);
@ -487,7 +503,7 @@ static int32_t taosAnalJsonBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) {
}
}
static int32_t taosAnalJsonBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
static int32_t taosAnalJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
char buf[64];
int32_t bufLen = 0;
@ -541,12 +557,12 @@ static int32_t taosAnalJsonBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int
return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex);
}
static int32_t taosAnalJsonBufWriteDataEnd(SAnalBuf *pBuf) {
static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) {
int32_t code = 0;
char *pCont = NULL;
int64_t contLen = 0;
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
SAnalyticsColBuf *pCol = &pBuf->pCols[i];
@ -570,14 +586,14 @@ static int32_t taosAnalJsonBufWriteDataEnd(SAnalBuf *pBuf) {
return taosAnalJsonBufWriteStr(pBuf, "],\n", 0);
}
static int32_t taosAnalJsonBufWriteEnd(SAnalBuf *pBuf) {
static int32_t taosAnalJsonBufWriteEnd(SAnalyticBuf *pBuf) {
int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows);
if (code != 0) return code;
return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0);
}
int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) {
int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) {
int32_t code = taosAnalJsonBufWriteEnd(pBuf);
if (code != 0) return code;
@ -588,7 +604,7 @@ int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) {
if (code != 0) return code;
}
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
SAnalyticsColBuf *pCol = &pBuf->pCols[i];
if (pCol->filePtr != NULL) {
@ -603,14 +619,14 @@ int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) {
return 0;
}
void taosAnalBufDestroy(SAnalBuf *pBuf) {
void taosAnalBufDestroy(SAnalyticBuf *pBuf) {
if (pBuf->fileName[0] != 0) {
if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr);
// taosRemoveFile(pBuf->fileName);
pBuf->fileName[0] = 0;
}
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
SAnalyticsColBuf *pCol = &pBuf->pCols[i];
if (pCol->fileName[0] != 0) {
@ -627,102 +643,102 @@ void taosAnalBufDestroy(SAnalBuf *pBuf) {
pBuf->numOfCols = 0;
}
int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return tsosAnalJsonBufOpen(pBuf, numOfCols);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteDataBegin(pBuf);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColBegin(pBuf, colIndex);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColEnd(pBuf, colIndex);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteDataEnd(pBuf);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufClose(SAnalBuf *pBuf) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
int32_t taosAnalBufClose(SAnalyticBuf *pBuf) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufClose(pBuf);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen) {
static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) {
*ppCont = NULL;
*pContLen = 0;
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
return TSDB_CODE_ANA_BUF_INVALID_TYPE;
}
}
@ -730,7 +746,7 @@ static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContL
int32_t taosAnalyticsInit() { return 0; }
void taosAnalyticsCleanup() {}
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { return NULL; }
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; }
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }
@ -738,18 +754,18 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optV
int64_t taosAnalGetVersion() { return 0; }
void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {}
int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { return 0; }
int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { return 0; }
int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { return 0; }
int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; }
int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) { return 0; }
int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { return 0; }
int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { return 0; }
int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) { return 0; }
int32_t taosAnalBufClose(SAnalBuf *pBuf) { return 0; }
void taosAnalBufDestroy(SAnalBuf *pBuf) {}
int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; }
int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; }
int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; }
int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; }
int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; }
int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; }
int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; }
void taosAnalBufDestroy(SAnalyticBuf *pBuf) {}
const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; }
EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; }

View File

@ -361,13 +361,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO, "Anode too many algori
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME, "Anode too long algorithm name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE, "Anode too many algorithm type")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis service response is NULL")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis service can't access")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm not found")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ANODE_RETURN_ERROR, "Analysis failed since anode return error")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ANODE_TOO_MANY_ROWS, "Analysis failed since too many input rows for anode")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_URL_RSP_IS_NULL, "Analysis service response is NULL")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_URL_CANT_ACCESS, "Analysis service can't access")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ALGO_NOT_FOUND, "Analysis algorithm is missing")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ALGO_NOT_LOAD, "Analysis algorithm not loaded")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_BUF_INVALID_TYPE, "Analysis invalid buffer type")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_RETURN_ERROR, "Analysis failed since anode return error")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS, "Analysis failed since too many input rows for anode")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_WN_DATA, "white-noise data not processed")
// mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")