diff --git a/cmake/cmake.options b/cmake/cmake.options index 2158157780..e3b5782d85 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -144,6 +144,12 @@ option( OFF ) +option( + BUILD_WITH_ANALYSIS + "If build with analysis" + ON +) + ENDIF () IF(NOT TD_ENTERPRISE) @@ -151,8 +157,15 @@ MESSAGE("switch s3 off with community version") set(BUILD_S3 OFF) set(BUILD_WITH_S3 OFF) set(BUILD_WITH_COS OFF) +set(BUILD_WITH_ANALYSIS OFF) ENDIF () +IF(${BUILD_WITH_ANALYSIS}) + message("build with analysis") + set(BUILD_S3 ON) + set(BUILD_WITH_S3 ON) +ENDIF() + IF(${BUILD_S3}) IF(${BUILD_WITH_S3}) diff --git a/include/common/systable.h b/include/common/systable.h index 65b3b36af8..0acafbfc30 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -29,6 +29,8 @@ extern "C" { #define TSDB_INS_TABLE_QNODES "ins_qnodes" #define TSDB_INS_TABLE_BNODES "ins_bnodes" // no longer used #define TSDB_INS_TABLE_SNODES "ins_snodes" +#define TSDB_INS_TABLE_ANODES "ins_anodes" +#define TSDB_INS_TABLE_ANODES_FULL "ins_anodes_full" #define TSDB_INS_TABLE_ARBGROUPS "ins_arbgroups" #define TSDB_INS_TABLE_CLUSTER "ins_cluster" #define TSDB_INS_TABLE_DATABASES "ins_databases" diff --git a/include/common/tanal.h b/include/common/tanal.h new file mode 100644 index 0000000000..59a28ddbe3 --- /dev/null +++ b/include/common/tanal.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_ANAL_H_ +#define _TD_UTIL_ANAL_H_ + +#include "os.h" +#include "tdef.h" +#include "thash.h" +#include "tjson.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define ANAL_FORECAST_DEFAULT_PERIOD 10 +#define ANAL_FORECAST_DEFAULT_ROWS 10 +#define ANAL_FORECAST_DEFAULT_CONF 95 +#define ANAL_FORECAST_DEFAULT_ALPHA 0.05 +#define ANAL_FORECAST_DEFAULT_PARAM "diff" + +typedef struct { + EAnalAlgoType type; + int32_t anode; + int32_t urlLen; + char *url; +} SAnalUrl; + +typedef enum { + ANAL_BUF_TYPE_JSON = 0, + ANAL_BUF_TYPE_JSON_COL = 1, + ANAL_BUF_TYPE_OTHERS, +} EAnalBufType; + +typedef enum { + ANAL_HTTP_TYPE_GET = 0, + ANAL_HTTP_TYPE_POST, +} EAnalHttpType; + +typedef struct { + TdFilePtr filePtr; + char fileName[TSDB_FILENAME_LEN + 10]; + int64_t numOfRows; +} SAnalColBuf; + +typedef struct { + EAnalBufType bufType; + TdFilePtr filePtr; + char fileName[TSDB_FILENAME_LEN]; + int32_t numOfCols; + SAnalColBuf *pCols; +} SAnalBuf; + +int32_t taosAnalInit(); +void taosAnalCleanup(); +SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf); + +int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen); +bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen); +bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue); +int64_t taosAnalGetVersion(); +void taosAnalUpdate(int64_t newVer, SHashObj *pHash); + +int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols); +int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal); +int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal); +int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal); +int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName); +int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf); +int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex); +int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue); +int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex); +int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf); +int32_t taosAnalBufClose(SAnalBuf *pBuf); +void taosAnalBufDestroy(SAnalBuf *pBuf); + +const char *taosAnalAlgoStr(EAnalAlgoType algoType); +EAnalAlgoType taosAnalAlgoInt(const char *algoName); +const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType); + +#ifdef __cplusplus +} +#endif +#endif /*_TD_UTIL_ANAL_H_*/ \ No newline at end of file diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1a10f02c96..ae9df866c5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -159,6 +159,8 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_ARBGROUP, TSDB_MGMT_TABLE_ENCRYPTIONS, TSDB_MGMT_TABLE_USER_FULL, + TSDB_MGMT_TABLE_ANODE, + TSDB_MGMT_TABLE_ANODE_FULL, TSDB_MGMT_TABLE_MAX, } EShowType; @@ -260,6 +262,7 @@ typedef enum ENodeType { QUERY_NODE_COUNT_WINDOW, QUERY_NODE_COLUMN_OPTIONS, QUERY_NODE_TSMA_OPTIONS, + QUERY_NODE_ANOMALY_WINDOW, // Statement nodes are used in parser and planner module. QUERY_NODE_SET_OPERATOR = 100, @@ -345,6 +348,9 @@ typedef enum ENodeType { QUERY_NODE_CREATE_VIEW_STMT, QUERY_NODE_DROP_VIEW_STMT, QUERY_NODE_CREATE_SUBTABLE_FROM_FILE_CLAUSE, + QUERY_NODE_CREATE_ANODE_STMT, + QUERY_NODE_DROP_ANODE_STMT, + QUERY_NODE_UPDATE_ANODE_STMT, // show statement nodes // see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET' @@ -386,6 +392,8 @@ typedef enum ENodeType { QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT, QUERY_NODE_SHOW_ENCRYPTIONS_STMT, QUERY_NODE_SHOW_TSMAS_STMT, + QUERY_NODE_SHOW_ANODES_STMT, + QUERY_NODE_SHOW_ANODES_FULL_STMT, QUERY_NODE_CREATE_TSMA_STMT, QUERY_NODE_SHOW_CREATE_TSMA_STMT, QUERY_NODE_DROP_TSMA_STMT, @@ -408,6 +416,7 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN, QUERY_NODE_LOGIC_PLAN_GROUP_CACHE, QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL, + QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC, // physical plan node QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100, @@ -458,6 +467,9 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY, + QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY, + QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, } ENodeType; typedef struct { @@ -1092,6 +1104,22 @@ typedef struct { int32_t tSerializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq); int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq); +typedef struct { + int32_t dnodeId; + int64_t analVer; +} SRetrieveAnalAlgoReq; + +typedef struct { + int64_t ver; + SHashObj* hash; // algoname:algotype -> SAnalUrl +} SRetrieveAnalAlgoRsp; + +int32_t tSerializeRetrieveAnalAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalAlgoReq* pReq); +int32_t tDeserializeRetrieveAnalAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalAlgoReq* pReq); +int32_t tSerializeRetrieveAnalAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalAlgoRsp* pRsp); +int32_t tDeserializeRetrieveAnalAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalAlgoRsp* pRsp); +void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp* pRsp); + typedef struct { int8_t alterType; int8_t superUser; @@ -1766,6 +1794,7 @@ typedef struct { SArray* pVloads; // array of SVnodeLoad int32_t statusSeq; int64_t ipWhiteVer; + int64_t analVer; } SStatusReq; int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); @@ -1831,6 +1860,7 @@ typedef struct { SArray* pDnodeEps; // Array of SDnodeEp int32_t statusSeq; int64_t ipWhiteVer; + int64_t analVer; } SStatusRsp; int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); @@ -2377,6 +2407,30 @@ typedef struct { int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); +typedef struct { + int32_t urlLen; + int32_t sqlLen; + char* url; + char* sql; +} SMCreateAnodeReq; + +int32_t tSerializeSMCreateAnodeReq(void* buf, int32_t bufLen, SMCreateAnodeReq* pReq); +int32_t tDeserializeSMCreateAnodeReq(void* buf, int32_t bufLen, SMCreateAnodeReq* pReq); +void tFreeSMCreateAnodeReq(SMCreateAnodeReq* pReq); + +typedef struct { + int32_t anodeId; + int32_t sqlLen; + char* sql; +} SMDropAnodeReq, SMUpdateAnodeReq; + +int32_t tSerializeSMDropAnodeReq(void* buf, int32_t bufLen, SMDropAnodeReq* pReq); +int32_t tDeserializeSMDropAnodeReq(void* buf, int32_t bufLen, SMDropAnodeReq* pReq); +void tFreeSMDropAnodeReq(SMDropAnodeReq* pReq); +int32_t tSerializeSMUpdateAnodeReq(void* buf, int32_t bufLen, SMUpdateAnodeReq* pReq); +int32_t tDeserializeSMUpdateAnodeReq(void* buf, int32_t bufLen, SMUpdateAnodeReq* pReq); +void tFreeSMUpdateAnodeReq(SMUpdateAnodeReq* pReq); + typedef struct { int32_t vgId; int32_t hbSeq; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 40464dc29a..6540e7b135 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -125,6 +125,11 @@ TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CREATE_ENCRYPT_KEY, "create-encrypt-key", NULL, NULL) + // mnode msg overload + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ANODE, "create-anode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_ANODE, "update-anode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_DROP_ANODE, "drop-anode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_ANAL_ALGO, "retrieve-anal-algo", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_DND_MSG) TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8 diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 519207377b..e5bacf85b2 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -62,6 +62,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_UNIQUE, FUNCTION_TYPE_STATE_COUNT, FUNCTION_TYPE_STATE_DURATION, + FUNCTION_TYPE_FORECAST, // math function FUNCTION_TYPE_ABS = 1000, @@ -149,6 +150,9 @@ typedef enum EFunctionType { FUNCTION_TYPE_TBUID, FUNCTION_TYPE_VGID, FUNCTION_TYPE_VGVER, + FUNCTION_TYPE_FORECAST_LOW, + FUNCTION_TYPE_FORECAST_HIGH, + FUNCTION_TYPE_FORECAST_ROWTS, // internal function FUNCTION_TYPE_SELECT_VALUE = 3750, @@ -263,6 +267,7 @@ bool fmIsForbidSysTableFunc(int32_t funcId); bool fmIsIntervalInterpoFunc(int32_t funcId); bool fmIsInterpFunc(int32_t funcId); bool fmIsLastRowFunc(int32_t funcId); +bool fmIsForecastFunc(int32_t funcId); bool fmIsNotNullOutputFunc(int32_t funcId); bool fmIsSelectValueFunc(int32_t funcId); bool fmIsSystemInfoFunc(int32_t funcId); @@ -272,6 +277,7 @@ bool fmIsMultiRowsFunc(int32_t funcId); bool fmIsKeepOrderFunc(int32_t funcId); bool fmIsCumulativeFunc(int32_t funcId); bool fmIsInterpPseudoColumnFunc(int32_t funcId); +bool fmIsForecastPseudoColumnFunc(int32_t funcId); bool fmIsGroupKeyFunc(int32_t funcId); bool fmIsBlockDistFunc(int32_t funcId); bool fmIsIgnoreNullFunc(int32_t funcId); diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 2a18e800b8..ba1e21b897 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -318,6 +318,21 @@ typedef struct SAlterDnodeStmt { char value[TSDB_DNODE_VALUE_LEN]; } SAlterDnodeStmt; +typedef struct { + ENodeType type; + char url[TSDB_ANAL_ANODE_URL_LEN]; +} SCreateAnodeStmt; + +typedef struct { + ENodeType type; + int32_t anodeId; +} SDropAnodeStmt; + +typedef struct { + ENodeType type; + int32_t anodeId; +} SUpdateAnodeStmt; + typedef struct SShowStmt { ENodeType type; SNode* pDbName; // SValueNode diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index a691433ee6..8e4a3ea32b 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -204,6 +204,11 @@ typedef struct SInterpFuncLogicNode { SNode* pTimeSeries; // SColumnNode } SInterpFuncLogicNode; +typedef struct SForecastFuncLogicNode { + SLogicNode node; + SNodeList* pFuncs; +} SForecastFuncLogicNode; + typedef struct SGroupCacheLogicNode { SLogicNode node; bool grpColsMayBeNull; @@ -274,7 +279,8 @@ typedef enum EWindowType { WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE, WINDOW_TYPE_EVENT, - WINDOW_TYPE_COUNT + WINDOW_TYPE_COUNT, + WINDOW_TYPE_ANOMALY } EWindowType; typedef enum EWindowAlgorithm { @@ -315,6 +321,8 @@ typedef struct SWindowLogicNode { int64_t windowCount; int64_t windowSliding; SNodeList* pTsmaSubplans; + SNode* pAnomalyExpr; + char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN]; } SWindowLogicNode; typedef struct SFillLogicNode { @@ -507,6 +515,12 @@ typedef struct SInterpFuncPhysiNode { SNode* pTimeSeries; // SColumnNode } SInterpFuncPhysiNode; +typedef struct SForecastFuncPhysiNode { + SPhysiNode node; + SNodeList* pExprs; + SNodeList* pFuncs; +} SForecastFuncPhysiNode; + typedef struct SSortMergeJoinPhysiNode { SPhysiNode node; EJoinType joinType; @@ -704,6 +718,12 @@ typedef struct SCountWinodwPhysiNode { typedef SCountWinodwPhysiNode SStreamCountWinodwPhysiNode; +typedef struct SAnomalyWindowPhysiNode { + SWindowPhysiNode window; + SNode* pAnomalyKey; + char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN]; +} SAnomalyWindowPhysiNode; + typedef struct SSortPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index f5567c735e..4763077ed9 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -347,6 +347,13 @@ typedef struct SCountWindowNode { int64_t windowSliding; } SCountWindowNode; +typedef struct SAnomalyWindowNode { + ENodeType type; // QUERY_NODE_ANOMALY_WINDOW + SNode* pCol; // timestamp primary key + SNode* pExpr; + char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN]; +} SAnomalyWindowNode; + typedef enum EFillMode { FILL_MODE_NONE = 1, FILL_MODE_VALUE, @@ -442,6 +449,8 @@ typedef struct SSelectStmt { bool hasTailFunc; bool hasInterpFunc; bool hasInterpPseudoColFunc; + bool hasForecastFunc; + bool hasForecastPseudoColFunc; bool hasLastRowFunc; bool hasLastFunc; bool hasTimeLineFunc; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index e0c7ff9a20..81a3952463 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -407,29 +407,29 @@ void* getTaskPoolWorkerCb(); #define IS_AUDIT_CTB_NAME(_ctbname) \ ((*(_ctbname) == 't') && (0 == strncmp(_ctbname, TSDB_AUDIT_CTB_OPERATION, TSDB_AUDIT_CTB_OPERATION_LEN))) -#define qFatal(...) \ - do { \ - if (qDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \ - } \ +#define qFatal(...) \ + do { \ + if (qDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qError(...) \ - do { \ - if (qDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \ - } \ +#define qError(...) \ + do { \ + if (qDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qWarn(...) \ - do { \ - if (qDebugFlag & DEBUG_WARN) { \ - taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \ - } \ +#define qWarn(...) \ + do { \ + if (qDebugFlag & DEBUG_WARN) { \ + taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qInfo(...) \ - do { \ - if (qDebugFlag & DEBUG_INFO) { \ - taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \ - } \ +#define qInfo(...) \ + do { \ + if (qDebugFlag & DEBUG_INFO) { \ + taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) #define qDebug(...) \ do { \ diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index b06b7c74c7..fd936dd087 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -139,6 +139,7 @@ int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t forecastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 558902075b..603207d8c0 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -474,8 +474,24 @@ int32_t taosGetErrSize(); #define TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR TAOS_DEF_ERROR_CODE(0, 0x0427) #define TSDB_CODE_DNODE_INVALID_EN_WHITELIST TAOS_DEF_ERROR_CODE(0, 0x0428) #define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429) -#define TSDB_CODE_MNODE_STOPPED TAOS_DEF_ERROR_CODE(0, 0x042A) +#define TSDB_CODE_MNODE_STOPPED TAOS_DEF_ERROR_CODE(0, 0x042A) +// anode +#define TSDB_CODE_MND_ANODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0430) +#define TSDB_CODE_MND_ANODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0431) +#define TSDB_CODE_MND_ANODE_TOO_LONG_URL TAOS_DEF_ERROR_CODE(0, 0x0432) +#define TSDB_CODE_MND_ANODE_INVALID_PROTOCOL TAOS_DEF_ERROR_CODE(0, 0x0433) +#define TSDB_CODE_MND_ANODE_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0434) +#define TSDB_CODE_MND_ANODE_TOO_MANY_ALGO TAOS_DEF_ERROR_CODE(0, 0x0435) +#define TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME TAOS_DEF_ERROR_CODE(0, 0x0436) +#define TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE TAOS_DEF_ERROR_CODE(0, 0x0437) + +// analysis +#define TSDB_CODE_ANAL_URL_RSP_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x0440) +#define TSDB_CODE_ANAL_URL_CANT_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0441) +#define TSDB_CODE_ANAL_ALGO_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0442) +#define TSDB_CODE_ANAL_ALGO_NOT_LOAD TAOS_DEF_ERROR_CODE(0, 0x0443) +#define TSDB_CODE_ANAL_BUF_INVALID_TYPE TAOS_DEF_ERROR_CODE(0, 0x0444) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) @@ -868,6 +884,10 @@ int32_t taosGetErrSize(); #define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F) #define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680) #define TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2681) +#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_TYPE TAOS_DEF_ERROR_CODE(0, 0x2682) +#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_COL TAOS_DEF_ERROR_CODE(0, 0x2683) +#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT TAOS_DEF_ERROR_CODE(0, 0x2684) +#define TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x2685) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/include/util/tdef.h b/include/util/tdef.h index 46c84ab26a..768ff82ade 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -293,6 +293,12 @@ typedef enum ELogicConditionType { #define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_LOG_VAR_LEN 32 +#define TSDB_ANAL_ANODE_URL_LEN 128 +#define TSDB_ANAL_ALGO_NAME_LEN 64 +#define TSDB_ANAL_ALGO_TYPE_LEN 24 +#define TSDB_ANAL_ALGO_KEY_LEN (TSDB_ANAL_ALGO_NAME_LEN + 9) +#define TSDB_ANAL_ALGO_URL_LEN (TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN + 1) +#define TSDB_ANAL_ALGO_OPTION_LEN 256 #define TSDB_MAX_EP_NUM 10 @@ -603,6 +609,13 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 }; #define MONITOR_TAG_NAME_LEN 100 #define MONITOR_TAG_VALUE_LEN 300 #define MONITOR_METRIC_NAME_LEN 100 + +typedef enum { + ANAL_ALGO_TYPE_ANOMALY_DETECT = 0, + ANAL_ALGO_TYPE_FORECAST = 1, + ANAL_ALGO_TYPE_END, +} EAnalAlgoType; + #ifdef __cplusplus } #endif diff --git a/include/util/tjson.h b/include/util/tjson.h index b9ea72b4bb..50d1a4d438 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -68,6 +68,8 @@ int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem); SJson* tjsonGetObjectItem(const SJson* pJson, const char* pName); int32_t tjsonGetObjectName(const SJson* pJson, char** pName); int32_t tjsonGetObjectValueString(const SJson* pJson, char** pStringValue); +void tjsonGetObjectValueBigInt(const SJson* pJson, int64_t* pVal); +void tjsonGetObjectValueDouble(const SJson* pJson, double* pVal); int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal); int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal); int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal); diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index eb3dd95e95..f01c8dcbb9 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -47,6 +47,10 @@ target_link_libraries( INTERFACE api ) +if(${BUILD_WITH_ANALYSIS}) + add_definitions(-DUSE_ANAL) +endif() + if(${BUILD_S3}) if(${BUILD_WITH_S3}) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 1561ab0a6b..74ef67ff1d 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "dmInt.h" #include "libs/function/tudf.h" +#include "tanal.h" static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { int32_t code = 0; @@ -77,7 +78,11 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } if ((code = udfStartUdfd(pMgmt->pData->dnodeId)) != 0) { - dError("failed to start udfd"); + dError("failed to start udfd since %s", tstrerror(code)); + } + + if ((code = taosAnalInit()) != 0) { + dError("failed to init analysis env since %s", tstrerror(code)); } pOutput->pMgmt = pMgmt; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 7204cde8f7..d9aa4614b6 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -141,6 +141,9 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ANODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_UPDATE_ANODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_ANODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; @@ -180,6 +183,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_ANAL_ALGO, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 2d0ad70adf..0c2bd2bc0f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -20,6 +20,7 @@ #include "libs/function/tudf.h" #include "tgrant.h" #include "tcompare.h" +#include "tanal.h" // clang-format on #define DM_INIT_AUDIT() \ @@ -214,6 +215,7 @@ void dmCleanup() { dError("failed to close udfc"); } udfStopUdfd(); + taosAnalCleanup(); taosStopCacheRefreshWorker(); (void)dmDiskClose(); DestroyRegexCache(); diff --git a/source/dnode/mnode/impl/inc/mndAnode.h b/source/dnode/mnode/impl/inc/mndAnode.h new file mode 100644 index 0000000000..63e8f9090e --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndAnode.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_ANODE_H_ +#define _TD_MND_ANODE_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitAnode(SMnode *pMnode); +void mndCleanupAnode(SMnode *pMnode); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_ANODE_H_*/ \ No newline at end of file diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 60b732f817..742db8f450 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -78,6 +78,9 @@ typedef enum { MND_OPER_DROP_VIEW, MND_OPER_CONFIG_CLUSTER, MND_OPER_BALANCE_VGROUP_LEADER, + MND_OPER_CREATE_ANODE, + MND_OPER_UPDATE_ANODE, + MND_OPER_DROP_ANODE } EOperType; typedef enum { @@ -232,6 +235,24 @@ typedef struct { char machineId[TSDB_MACHINE_ID_LEN + 1]; } SDnodeObj; +typedef struct { + int32_t nameLen; + char* name; +} SAnodeAlgo; + +typedef struct { + int32_t id; + int64_t createdTime; + int64_t updateTime; + int32_t version; + int32_t urlLen; + int32_t numOfAlgos; + int32_t status; + SRWLatch lock; + char* url; + SArray** algos; +} SAnodeObj; + typedef struct { int32_t id; int64_t createdTime; diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c new file mode 100644 index 0000000000..7e02db0e90 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -0,0 +1,902 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mndAnode.h" +#include "audit.h" +#include "mndDnode.h" +#include "mndPrivilege.h" +#include "mndShow.h" +#include "mndTrans.h" +#include "mndUser.h" +#include "tanal.h" +#include "tjson.h" + +#ifdef USE_ANAL + +#define TSDB_ANODE_VER_NUMBER 1 +#define TSDB_ANODE_RESERVE_SIZE 64 + +static SSdbRaw *mndAnodeActionEncode(SAnodeObj *pObj); +static SSdbRow *mndAnodeActionDecode(SSdbRaw *pRaw); +static int32_t mndAnodeActionInsert(SSdb *pSdb, SAnodeObj *pObj); +static int32_t mndAnodeActionUpdate(SSdb *pSdb, SAnodeObj *pOld, SAnodeObj *pNew); +static int32_t mndAnodeActionDelete(SSdb *pSdb, SAnodeObj *pObj); +static int32_t mndProcessCreateAnodeReq(SRpcMsg *pReq); +static int32_t mndProcessUpdateAnodeReq(SRpcMsg *pReq); +static int32_t mndProcessDropAnodeReq(SRpcMsg *pReq); +static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq); +static int32_t mndRetrieveAnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelGetNextAnode(SMnode *pMnode, void *pIter); +static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelGetNextAnodeFull(SMnode *pMnode, void *pIter); +static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj); +static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status); + +int32_t mndInitAnode(SMnode *pMnode) { + SSdbTable table = { + .sdbType = SDB_ANODE, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)mndAnodeActionEncode, + .decodeFp = (SdbDecodeFp)mndAnodeActionDecode, + .insertFp = (SdbInsertFp)mndAnodeActionInsert, + .updateFp = (SdbUpdateFp)mndAnodeActionUpdate, + .deleteFp = (SdbDeleteFp)mndAnodeActionDelete, + }; + + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ANODE, mndProcessCreateAnodeReq); + mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_ANODE, mndProcessUpdateAnodeReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_ANODE, mndProcessDropAnodeReq); + mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_ANAL_ALGO, mndProcessAnalAlgoReq); + + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE, mndRetrieveAnodes); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ANODE, mndCancelGetNextAnode); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE_FULL, mndRetrieveAnodesFull); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ANODE_FULL, mndCancelGetNextAnodeFull); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupAnode(SMnode *pMnode) {} + +SAnodeObj *mndAcquireAnode(SMnode *pMnode, int32_t anodeId) { + SAnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_ANODE, &anodeId); + if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + terrno = TSDB_CODE_MND_ANODE_NOT_EXIST; + } + return pObj; +} + +void mndReleaseAnode(SMnode *pMnode, SAnodeObj *pObj) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pObj); +} + +static SSdbRaw *mndAnodeActionEncode(SAnodeObj *pObj) { + int32_t code = 0; + int32_t lino = 0; + terrno = TSDB_CODE_OUT_OF_MEMORY; + + int32_t rawDataLen = sizeof(SAnodeObj) + TSDB_ANODE_RESERVE_SIZE + pObj->urlLen; + for (int32_t t = 0; t < pObj->numOfAlgos; ++t) { + SArray *algos = pObj->algos[t]; + for (int32_t a = 0; a < (int32_t)taosArrayGetSize(algos); ++a) { + SAnodeAlgo *algo = taosArrayGet(algos, a); + rawDataLen += (2 * sizeof(int32_t) + algo->nameLen); + } + rawDataLen += sizeof(int32_t); + } + + SSdbRaw *pRaw = sdbAllocRaw(SDB_ANODE, TSDB_ANODE_VER_NUMBER, rawDataLen); + if (pRaw == NULL) goto _OVER; + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER) + SDB_SET_INT32(pRaw, dataPos, pObj->version, _OVER) + SDB_SET_INT32(pRaw, dataPos, pObj->urlLen, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pObj->url, pObj->urlLen, _OVER) + SDB_SET_INT32(pRaw, dataPos, pObj->numOfAlgos, _OVER) + for (int32_t i = 0; i < pObj->numOfAlgos; ++i) { + SArray *algos = pObj->algos[i]; + SDB_SET_INT32(pRaw, dataPos, (int32_t)taosArrayGetSize(algos), _OVER) + for (int32_t j = 0; j < (int32_t)taosArrayGetSize(algos); ++j) { + SAnodeAlgo *algo = taosArrayGet(algos, j); + SDB_SET_INT32(pRaw, dataPos, algo->nameLen, _OVER) + SDB_SET_BINARY(pRaw, dataPos, algo->name, algo->nameLen, _OVER) + SDB_SET_INT32(pRaw, dataPos, 0, _OVER) // reserved + } + } + + SDB_SET_RESERVE(pRaw, dataPos, TSDB_ANODE_RESERVE_SIZE, _OVER) + + terrno = 0; + +_OVER: + if (terrno != 0) { + mError("anode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("anode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj); + return pRaw; +} + +static SSdbRow *mndAnodeActionDecode(SSdbRaw *pRaw) { + int32_t code = 0; + int32_t lino = 0; + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SAnodeObj *pObj = NULL; + + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; + + if (sver != TSDB_ANODE_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + goto _OVER; + } + + pRow = sdbAllocRow(sizeof(SAnodeObj)); + if (pRow == NULL) goto _OVER; + + pObj = sdbGetRowObj(pRow); + if (pObj == NULL) goto _OVER; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pObj->version, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pObj->urlLen, _OVER) + + if (pObj->urlLen > 0) { + pObj->url = taosMemoryCalloc(pObj->urlLen, 1); + if (pObj->url == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, pObj->url, pObj->urlLen, _OVER) + } + + SDB_GET_INT32(pRaw, dataPos, &pObj->numOfAlgos, _OVER) + if (pObj->numOfAlgos > 0) { + pObj->algos = taosMemoryCalloc(pObj->numOfAlgos, sizeof(SArray *)); + if (pObj->algos == NULL) { + goto _OVER; + } + } + + for (int32_t i = 0; i < pObj->numOfAlgos; ++i) { + int32_t numOfAlgos = 0; + SDB_GET_INT32(pRaw, dataPos, &numOfAlgos, _OVER) + + pObj->algos[i] = taosArrayInit(2, sizeof(SAnodeAlgo)); + if (pObj->algos[i] == NULL) goto _OVER; + + for (int32_t j = 0; j < numOfAlgos; ++j) { + SAnodeAlgo algoObj = {0}; + int32_t reserved = 0; + + SDB_GET_INT32(pRaw, dataPos, &algoObj.nameLen, _OVER) + if (algoObj.nameLen > 0) { + algoObj.name = taosMemoryCalloc(algoObj.nameLen, 1); + if (algoObj.name == NULL) goto _OVER; + } + + SDB_GET_BINARY(pRaw, dataPos, algoObj.name, algoObj.nameLen, _OVER) + SDB_GET_INT32(pRaw, dataPos, &reserved, _OVER); + + if (taosArrayPush(pObj->algos[i], &algoObj) == NULL) goto _OVER; + } + } + + SDB_GET_RESERVE(pRaw, dataPos, TSDB_ANODE_RESERVE_SIZE, _OVER) + + terrno = 0; + +_OVER: + if (terrno != 0) { + mError("anode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr()); + if (pObj != NULL) { + taosMemoryFreeClear(pObj->url); + } + taosMemoryFreeClear(pRow); + return NULL; + } + + mTrace("anode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj); + return pRow; +} + +static void mndFreeAnode(SAnodeObj *pObj) { + taosMemoryFreeClear(pObj->url); + for (int32_t i = 0; i < pObj->numOfAlgos; ++i) { + SArray *algos = pObj->algos[i]; + for (int32_t j = 0; j < (int32_t)taosArrayGetSize(algos); ++j) { + SAnodeAlgo *algo = taosArrayGet(algos, j); + taosMemoryFreeClear(algo->name); + } + taosArrayDestroy(algos); + } + taosMemoryFreeClear(pObj->algos); +} + +static int32_t mndAnodeActionInsert(SSdb *pSdb, SAnodeObj *pObj) { + mTrace("anode:%d, perform insert action, row:%p", pObj->id, pObj); + return 0; +} + +static int32_t mndAnodeActionDelete(SSdb *pSdb, SAnodeObj *pObj) { + mTrace("anode:%d, perform delete action, row:%p", pObj->id, pObj); + mndFreeAnode(pObj); + return 0; +} + +static int32_t mndAnodeActionUpdate(SSdb *pSdb, SAnodeObj *pOld, SAnodeObj *pNew) { + mTrace("anode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew); + + taosWLockLatch(&pOld->lock); + int32_t numOfAlgos = pNew->numOfAlgos; + void *algos = pNew->algos; + pNew->numOfAlgos = pOld->numOfAlgos; + pNew->algos = pOld->algos; + pOld->numOfAlgos = numOfAlgos; + pOld->algos = algos; + pOld->updateTime = pNew->updateTime; + pOld->version = pNew->version; + taosWUnLockLatch(&pOld->lock); + return 0; +} + +static int32_t mndSetCreateAnodeRedoLogs(STrans *pTrans, SAnodeObj *pObj) { + int32_t code = 0; + SSdbRaw *pRedoRaw = mndAnodeActionEncode(pObj); + if (pRedoRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)); + TAOS_RETURN(code); +} + +static int32_t mndSetCreateAnodeUndoLogs(STrans *pTrans, SAnodeObj *pObj) { + int32_t code = 0; + SSdbRaw *pUndoRaw = mndAnodeActionEncode(pObj); + if (pUndoRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)); + TAOS_RETURN(code); +} + +static int32_t mndSetCreateAnodeCommitLogs(STrans *pTrans, SAnodeObj *pObj) { + int32_t code = 0; + SSdbRaw *pCommitRaw = mndAnodeActionEncode(pObj); + if (pCommitRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)); + TAOS_RETURN(code); +} + +static int32_t mndCreateAnode(SMnode *pMnode, SRpcMsg *pReq, SMCreateAnodeReq *pCreate) { + int32_t code = -1; + STrans *pTrans = NULL; + + SAnodeObj anodeObj = {0}; + anodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_ANODE); + anodeObj.createdTime = taosGetTimestampMs(); + anodeObj.updateTime = anodeObj.createdTime; + anodeObj.version = 0; + anodeObj.urlLen = pCreate->urlLen; + if (anodeObj.urlLen > TSDB_ANAL_ANODE_URL_LEN) { + code = TSDB_CODE_MND_ANODE_TOO_LONG_URL; + goto _OVER; + } + + anodeObj.url = taosMemoryCalloc(1, pCreate->urlLen); + if (anodeObj.url == NULL) goto _OVER; + (void)memcpy(anodeObj.url, pCreate->url, pCreate->urlLen); + + code = mndGetAnodeAlgoList(anodeObj.url, &anodeObj); + if (code != 0) goto _OVER; + + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-anode"); + if (pTrans == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + goto _OVER; + } + mndTransSetSerial(pTrans); + + mInfo("trans:%d, used to create anode:%s as anode:%d", pTrans->id, pCreate->url, anodeObj.id); + + TAOS_CHECK_GOTO(mndSetCreateAnodeRedoLogs(pTrans, &anodeObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetCreateAnodeUndoLogs(pTrans, &anodeObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetCreateAnodeCommitLogs(pTrans, &anodeObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); + + code = 0; + +_OVER: + mndFreeAnode(&anodeObj); + mndTransDrop(pTrans); + TAOS_RETURN(code); +} + +static SAnodeObj *mndAcquireAnodeByURL(SMnode *pMnode, char *url) { + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + while (1) { + SAnodeObj *pAnode = NULL; + pIter = sdbFetch(pSdb, SDB_ANODE, pIter, (void **)&pAnode); + if (pIter == NULL) break; + + if (strcasecmp(url, pAnode->url) == 0) { + sdbCancelFetch(pSdb, pIter); + return pAnode; + } + + sdbRelease(pSdb, pAnode); + } + + terrno = TSDB_CODE_MND_ANODE_NOT_EXIST; + return NULL; +} + +static int32_t mndProcessCreateAnodeReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SAnodeObj *pObj = NULL; + SMCreateAnodeReq createReq = {0}; + + TAOS_CHECK_GOTO(tDeserializeSMCreateAnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER); + + mInfo("anode:%s, start to create", createReq.url); + TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_ANODE), NULL, _OVER); + + pObj = mndAcquireAnodeByURL(pMnode, createReq.url); + if (pObj != NULL) { + code = TSDB_CODE_MND_ANODE_ALREADY_EXIST; + goto _OVER; + } + + code = mndCreateAnode(pMnode, pReq, &createReq); + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + +_OVER: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("anode:%s, failed to create since %s", createReq.url, tstrerror(code)); + } + + mndReleaseAnode(pMnode, pObj); + tFreeSMCreateAnodeReq(&createReq); + TAOS_RETURN(code); +} + +static int32_t mndUpdateAnode(SMnode *pMnode, SAnodeObj *pAnode, SRpcMsg *pReq) { + mInfo("anode:%d, start to update", pAnode->id); + int32_t code = -1; + STrans *pTrans = NULL; + SAnodeObj anodeObj = {0}; + anodeObj.id = pAnode->id; + anodeObj.updateTime = taosGetTimestampMs(); + + code = mndGetAnodeAlgoList(pAnode->url, &anodeObj); + if (code != 0) goto _OVER; + + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-anode"); + if (pTrans == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + goto _OVER; + } + mInfo("trans:%d, used to update anode:%d", pTrans->id, anodeObj.id); + + TAOS_CHECK_GOTO(mndSetCreateAnodeCommitLogs(pTrans, &anodeObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); + code = 0; + +_OVER: + mndFreeAnode(&anodeObj); + mndTransDrop(pTrans); + TAOS_RETURN(code); +} + +static int32_t mndUpdateAllAnodes(SMnode *pMnode, SRpcMsg *pReq) { + mInfo("update all anodes"); + SSdb *pSdb = pMnode->pSdb; + int32_t code = 0; + int32_t rows = 0; + int32_t numOfRows = sdbGetSize(pSdb, SDB_ANODE); + + void *pIter = NULL; + while (1) { + SAnodeObj *pObj = NULL; + ESdbStatus objStatus = 0; + pIter = sdbFetchAll(pSdb, SDB_ANODE, pIter, (void **)&pObj, &objStatus, true); + if (pIter == NULL) break; + + rows++; + void *transReq = NULL; + if (rows == numOfRows) transReq = pReq; + code = mndUpdateAnode(pMnode, pObj, transReq); + sdbRelease(pSdb, pObj); + + if (code != 0) break; + } + + if (code == 0 && rows == numOfRows) { + code = TSDB_CODE_ACTION_IN_PROGRESS; + } + + return code; +} + +static int32_t mndProcessUpdateAnodeReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SAnodeObj *pObj = NULL; + SMUpdateAnodeReq updateReq = {0}; + + TAOS_CHECK_GOTO(tDeserializeSMUpdateAnodeReq(pReq->pCont, pReq->contLen, &updateReq), NULL, _OVER); + TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_UPDATE_ANODE), NULL, _OVER); + + if (updateReq.anodeId == -1) { + code = mndUpdateAllAnodes(pMnode, pReq); + } else { + pObj = mndAcquireAnode(pMnode, updateReq.anodeId); + if (pObj == NULL) { + code = TSDB_CODE_MND_ANODE_NOT_EXIST; + goto _OVER; + } + code = mndUpdateAnode(pMnode, pObj, pReq); + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + } + +_OVER: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + if (updateReq.anodeId != -1) { + mError("anode:%d, failed to update since %s", updateReq.anodeId, tstrerror(code)); + } + } + + mndReleaseAnode(pMnode, pObj); + tFreeSMUpdateAnodeReq(&updateReq); + TAOS_RETURN(code); +} + +static int32_t mndSetDropAnodeRedoLogs(STrans *pTrans, SAnodeObj *pObj) { + int32_t code = 0; + SSdbRaw *pRedoRaw = mndAnodeActionEncode(pObj); + if (pRedoRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)); + TAOS_RETURN(code); +} + +static int32_t mndSetDropAnodeCommitLogs(STrans *pTrans, SAnodeObj *pObj) { + int32_t code = 0; + SSdbRaw *pCommitRaw = mndAnodeActionEncode(pObj); + if (pCommitRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)); + TAOS_RETURN(code); +} + +static int32_t mndSetDropAnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SAnodeObj *pObj, bool force) { + if (pObj == NULL) return 0; + TAOS_CHECK_RETURN(mndSetDropAnodeRedoLogs(pTrans, pObj)); + TAOS_CHECK_RETURN(mndSetDropAnodeCommitLogs(pTrans, pObj)); + return 0; +} + +static int32_t mndDropAnode(SMnode *pMnode, SRpcMsg *pReq, SAnodeObj *pObj) { + int32_t code = -1; + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-anode"); + if (pTrans == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + goto _OVER; + } + mndTransSetSerial(pTrans); + + mInfo("trans:%d, used to drop anode:%d", pTrans->id, pObj->id); + TAOS_CHECK_GOTO(mndSetDropAnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER); + TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); + + code = 0; + +_OVER: + mndTransDrop(pTrans); + TAOS_RETURN(code); +} + +static int32_t mndProcessDropAnodeReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SAnodeObj *pObj = NULL; + SMDropAnodeReq dropReq = {0}; + + TAOS_CHECK_GOTO(tDeserializeSMDropAnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER); + + mInfo("anode:%d, start to drop", dropReq.anodeId); + TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_ANODE), NULL, _OVER); + + if (dropReq.anodeId <= 0) { + code = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + pObj = mndAcquireAnode(pMnode, dropReq.anodeId); + if (pObj == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + goto _OVER; + } + + code = mndDropAnode(pMnode, pReq, pObj); + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + +_OVER: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("anode:%d, failed to drop since %s", dropReq.anodeId, tstrerror(code)); + } + + mndReleaseAnode(pMnode, pObj); + tFreeSMDropAnodeReq(&dropReq); + TAOS_RETURN(code); +} + +static int32_t mndRetrieveAnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + int32_t cols = 0; + SAnodeObj *pObj = NULL; + char buf[TSDB_ANAL_ANODE_URL_LEN + VARSTR_HEADER_SIZE]; + char status[64]; + int32_t code = 0; + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_ANODE, pShow->pIter, (void **)&pObj); + if (pShow->pIter == NULL) break; + + cols = 0; + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false); + if (code != 0) goto _end; + + STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->url, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false); + if (code != 0) goto _end; + + status[0] = 0; + if (mndGetAnodeStatus(pObj, status) == 0) { + STR_TO_VARSTR(buf, status); + } else { + STR_TO_VARSTR(buf, "offline"); + } + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, numOfRows, buf, false); + if (code != 0) goto _end; + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false); + if (code != 0) goto _end; + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->updateTime, false); + if (code != 0) goto _end; + + numOfRows++; + sdbRelease(pSdb, pObj); + } + +_end: + if (code != 0) sdbRelease(pSdb, pObj); + + pShow->numOfRows += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextAnode(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetchByType(pSdb, pIter, SDB_ANODE); +} + +static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + int32_t cols = 0; + SAnodeObj *pObj = NULL; + char buf[TSDB_ANAL_ALGO_NAME_LEN + VARSTR_HEADER_SIZE]; + int32_t code = 0; + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_ANODE, pShow->pIter, (void **)&pObj); + if (pShow->pIter == NULL) break; + + for (int32_t t = 0; t < pObj->numOfAlgos; ++t) { + SArray *algos = pObj->algos[t]; + + for (int32_t a = 0; a < taosArrayGetSize(algos); ++a) { + SAnodeAlgo *algo = taosArrayGet(algos, a); + + cols = 0; + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false); + if (code != 0) goto _end; + + STR_TO_VARSTR(buf, taosAnalAlgoStr(t)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, numOfRows, buf, false); + if (code != 0) goto _end; + + STR_TO_VARSTR(buf, algo->name); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, numOfRows, buf, false); + if (code != 0) goto _end; + + numOfRows++; + } + } + + sdbRelease(pSdb, pObj); + } + +_end: + if (code != 0) sdbRelease(pSdb, pObj); + + pShow->numOfRows += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextAnodeFull(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetchByType(pSdb, pIter, SDB_ANODE); +} + +static int32_t mndDecodeAlgoList(SJson *pJson, SAnodeObj *pObj) { + int32_t code = 0; + int32_t protocol = 0; + double tmp = 0; + char buf[TSDB_ANAL_ALGO_NAME_LEN + 1] = {0}; + + code = tjsonGetDoubleValue(pJson, "protocol", &tmp); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + protocol = (int32_t)(tmp * 1000); + if (protocol != 100) return TSDB_CODE_MND_ANODE_INVALID_PROTOCOL; + + code = tjsonGetDoubleValue(pJson, "version", &tmp); + pObj->version = (int32_t)(tmp * 1000); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + if (pObj->version <= 0) return TSDB_CODE_MND_ANODE_INVALID_VERSION; + + SJson *details = tjsonGetObjectItem(pJson, "details"); + if (details == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; + int32_t numOfDetails = tjsonGetArraySize(details); + + pObj->algos = taosMemoryCalloc(ANAL_ALGO_TYPE_END, sizeof(SArray *)); + if (pObj->algos == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + pObj->numOfAlgos = ANAL_ALGO_TYPE_END; + for (int32_t i = 0; i < ANAL_ALGO_TYPE_END; ++i) { + pObj->algos[i] = taosArrayInit(4, sizeof(SAnodeAlgo)); + if (pObj->algos[i] == NULL) return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t d = 0; d < numOfDetails; ++d) { + SJson *detail = tjsonGetArrayItem(details, d); + if (detail == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; + + code = tjsonGetStringValue(detail, "type", buf); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + EAnalAlgoType type = taosAnalAlgoInt(buf); + if (type < 0 || type >= ANAL_ALGO_TYPE_END) continue; + + SJson *algos = tjsonGetObjectItem(detail, "algo"); + if (algos == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; + int32_t numOfAlgos = tjsonGetArraySize(algos); + for (int32_t a = 0; a < numOfAlgos; ++a) { + SJson *algo = tjsonGetArrayItem(algos, a); + if (algo == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; + + code = tjsonGetStringValue(algo, "name", buf); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + + SAnodeAlgo algoObj = {0}; + algoObj.nameLen = strlen(buf) + 1; + if (algoObj.nameLen > TSDB_ANAL_ALGO_NAME_LEN) return TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME; + if (algoObj.nameLen <= 1) return TSDB_CODE_OUT_OF_MEMORY; + algoObj.name = taosMemoryCalloc(algoObj.nameLen, 1); + tstrncpy(algoObj.name, buf, algoObj.nameLen); + + if (taosArrayPush(pObj->algos[type], &algoObj) == NULL) return TSDB_CODE_OUT_OF_MEMORY; + } + } + + return 0; +} + +static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj) { + char anodeUrl[TSDB_ANAL_ANODE_URL_LEN + 1] = {0}; + snprintf(anodeUrl, TSDB_ANAL_ANODE_URL_LEN, "%s/%s", url, "list"); + + SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANAL_HTTP_TYPE_GET, NULL); + if (pJson == NULL) return terrno; + + int32_t code = mndDecodeAlgoList(pJson, pObj); + if (pJson != NULL) tjsonDelete(pJson); + + TAOS_RETURN(code); +} + +static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status) { + int32_t code = 0; + int32_t protocol = 0; + double tmp = 0; + char anodeUrl[TSDB_ANAL_ANODE_URL_LEN + 1] = {0}; + snprintf(anodeUrl, TSDB_ANAL_ANODE_URL_LEN, "%s/%s", pObj->url, "status"); + + SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANAL_HTTP_TYPE_GET, NULL); + if (pJson == NULL) return terrno; + + code = tjsonGetDoubleValue(pJson, "protocol", &tmp); + if (code < 0) { + code = TSDB_CODE_INVALID_JSON_FORMAT; + goto _OVER; + } + protocol = (int32_t)(tmp * 1000); + if (protocol != 100) { + code = TSDB_CODE_MND_ANODE_INVALID_PROTOCOL; + goto _OVER; + } + + code = tjsonGetStringValue(pJson, "status", status); + if (code < 0) { + code = TSDB_CODE_INVALID_JSON_FORMAT; + goto _OVER; + } + if (strlen(status) == 0) { + code = TSDB_CODE_MND_ANODE_INVALID_PROTOCOL; + goto _OVER; + } + +_OVER: + if (pJson != NULL) tjsonDelete(pJson); + TAOS_RETURN(code); +} + +static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t code = -1; + SAnodeObj *pObj = NULL; + SAnalUrl url; + int32_t nameLen; + char name[TSDB_ANAL_ALGO_KEY_LEN]; + SRetrieveAnalAlgoReq req = {0}; + SRetrieveAnalAlgoRsp rsp = {0}; + + TAOS_CHECK_GOTO(tDeserializeRetrieveAnalAlgoReq(pReq->pCont, pReq->contLen, &req), NULL, _OVER); + + rsp.ver = sdbGetTableVer(pSdb, SDB_ANODE); + if (req.analVer != rsp.ver) { + mInfo("dnode:%d, update analysis old ver:%" PRId64 " to new ver:%" PRId64, req.dnodeId, req.analVer, rsp.ver); + rsp.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + if (rsp.hash == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + void *pIter = NULL; + while (1) { + SAnodeObj *pAnode = NULL; + pIter = sdbFetch(pSdb, SDB_ANODE, pIter, (void **)&pAnode); + if (pIter == NULL) break; + + url.anode = pAnode->id; + for (int32_t t = 0; t < pAnode->numOfAlgos; ++t) { + SArray *algos = pAnode->algos[t]; + url.type = t; + + for (int32_t a = 0; a < taosArrayGetSize(algos); ++a) { + SAnodeAlgo *algo = taosArrayGet(algos, a); + nameLen = 1 + snprintf(name, sizeof(name) - 1, "%d:%s", url.type, algo->name); + + SAnalUrl *pOldUrl = taosHashAcquire(rsp.hash, name, nameLen); + if (pOldUrl == NULL || (pOldUrl != NULL && pOldUrl->anode < url.anode)) { + if (pOldUrl != NULL) { + taosMemoryFreeClear(pOldUrl->url); + if (taosHashRemove(rsp.hash, name, nameLen) != 0) { + sdbRelease(pSdb, pAnode); + goto _OVER; + } + } + url.url = taosMemoryMalloc(TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN + 1); + if (url.url == NULL) { + sdbRelease(pSdb, pAnode); + goto _OVER; + } + + url.urlLen = 1 + snprintf(url.url, TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN, "%s/%s", pAnode->url, + taosAnalAlgoUrlStr(url.type)); + if (taosHashPut(rsp.hash, name, nameLen, &url, sizeof(SAnalUrl)) != 0) { + taosMemoryFree(url.url); + sdbRelease(pSdb, pAnode); + goto _OVER; + } + } + } + + sdbRelease(pSdb, pAnode); + } + } + } + + int32_t contLen = tSerializeRetrieveAnalAlgoRsp(NULL, 0, &rsp); + void *pHead = rpcMallocCont(contLen); + (void)tSerializeRetrieveAnalAlgoRsp(pHead, contLen, &rsp); + + pReq->info.rspLen = contLen; + pReq->info.rsp = pHead; + +_OVER: + tFreeRetrieveAnalAlgoRsp(&rsp); + TAOS_RETURN(code); +} + +#else + +static int32_t mndProcessUnsupportReq(SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; } +static int32_t mndRetrieveUnsupport(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + return TSDB_CODE_OPS_NOT_SUPPORT; +} + +int32_t mndInitAnode(SMnode *pMnode) { + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ANODE, mndProcessUnsupportReq); + mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_ANODE, mndProcessUnsupportReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_ANODE, mndProcessUnsupportReq); + mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_ANAL_ALGO, mndProcessUnsupportReq); + + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE, mndRetrieveUnsupport); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE_FULL, mndRetrieveUnsupport); + return 0; +} + +void mndCleanupAnode(SMnode *pMnode) {} + +#endif \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index a94a471e4b..5e10583a0a 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -693,7 +693,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { int64_t clusterid = mndGetClusterId(pMnode); if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) { code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER; - mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 ", code:0x%x", + mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code); goto _OVER; } @@ -730,6 +730,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode); + int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE); int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pDnode, curMs); @@ -738,9 +739,9 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { bool supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes; bool encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum; bool enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0); - bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || + bool analVerChanged = (analVer != statusReq.analVer); + bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged || pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged; - const STraceId *trace = &pReq->info.traceId; mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq); @@ -863,6 +864,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { SStatusRsp statusRsp = {0}; statusRsp.statusSeq++; + statusRsp.analVer = analVer; statusRsp.dnodeVer = dnodeVer; statusRsp.dnodeCfg.dnodeId = pDnode->id; statusRsp.dnodeCfg.clusterId = pMnode->clusterId; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 3f160d8541..617fae4d3c 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "mndAcct.h" #include "mndArbGroup.h" +#include "mndAnode.h" #include "mndCluster.h" #include "mndCompact.h" #include "mndCompactDetail.h" @@ -605,6 +606,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode)); + TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-anode", mndInitAnode, mndCleanupAnode)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-arbgroup", mndInitArbGroup, mndCleanupArbGroup)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser)); diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 55687c00ba..264fea3476 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -68,6 +68,10 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_QNODE; } else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, len) == 0) { type = TSDB_MGMT_TABLE_SNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_ANODES, len) == 0) { + type = TSDB_MGMT_TABLE_ANODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_ANODES_FULL, len) == 0) { + type = TSDB_MGMT_TABLE_ANODE_FULL; } else if (strncasecmp(name, TSDB_INS_TABLE_ARBGROUPS, len) == 0) { type = TSDB_MGMT_TABLE_ARBGROUP; } else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, len) == 0) { diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index c33b1d4366..f6d1587bb2 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -161,7 +161,8 @@ typedef enum { SDB_COMPACT_DETAIL = 25, SDB_GRANT = 26, // grant log SDB_ARBGROUP = 27, - SDB_MAX = 28 + SDB_ANODE = 28, + SDB_MAX = 29 } ESdbType; typedef struct SSdbRaw { diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 34a017a907..d98c3e5a72 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -25,6 +25,9 @@ #define SDB_RESERVE_SIZE 512 #define SDB_FILE_VER 1 +#define SDB_TABLE_SIZE_EXTRA SDB_MAX +#define SDB_RESERVE_SIZE_EXTRA (512 - (SDB_TABLE_SIZE_EXTRA - SDB_TABLE_SIZE) * 2 * sizeof(int64_t)) + static int32_t sdbDeployData(SSdb *pSdb) { int32_t code = 0; mInfo("start to deploy sdb"); @@ -154,7 +157,38 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { } } - char reserve[SDB_RESERVE_SIZE] = {0}; + // for sdb compatibility + for (int32_t i = SDB_TABLE_SIZE; i < SDB_TABLE_SIZE_EXTRA; ++i) { + int64_t maxId = 0; + ret = taosReadFile(pFile, &maxId, sizeof(int64_t)); + if (ret < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TAOS_RETURN(code); + } + if (ret != sizeof(int64_t)) { + code = TSDB_CODE_FILE_CORRUPTED; + TAOS_RETURN(code); + } + if (i < SDB_MAX) { + pSdb->maxId[i] = maxId; + } + + int64_t ver = 0; + ret = taosReadFile(pFile, &ver, sizeof(int64_t)); + if (ret < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TAOS_RETURN(code); + } + if (ret != sizeof(int64_t)) { + code = TSDB_CODE_FILE_CORRUPTED; + TAOS_RETURN(code); + } + if (i < SDB_MAX) { + pSdb->tableVer[i] = ver; + } + } + + char reserve[SDB_RESERVE_SIZE_EXTRA] = {0}; ret = taosReadFile(pFile, reserve, sizeof(reserve)); if (ret < 0) { return terrno; @@ -205,7 +239,26 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { } } - char reserve[SDB_RESERVE_SIZE] = {0}; + // for sdb compatibility + for (int32_t i = SDB_TABLE_SIZE; i < SDB_TABLE_SIZE_EXTRA; ++i) { + int64_t maxId = 0; + if (i < SDB_MAX) { + maxId = pSdb->maxId[i]; + } + if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) { + return terrno; + } + + int64_t ver = 0; + if (i < SDB_MAX) { + ver = pSdb->tableVer[i]; + } + if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) { + return terrno; + } + } + + char reserve[SDB_RESERVE_SIZE_EXTRA] = {0}; if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) { return terrno; } diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index ea44a7c549..3f85ccb087 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -74,6 +74,8 @@ const char *sdbTableName(ESdbType type) { return "grant"; case SDB_ARBGROUP: return "arb_group"; + case SDB_ANODE: + return "anode"; default: return "undefine"; } diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 0df676c6e2..7dfc7080d6 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -133,6 +133,8 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); +int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); + int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); @@ -159,6 +161,8 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); +int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); + int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); // clang-format on @@ -190,6 +194,9 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdSt int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList); void * getOperatorParam(int32_t opType, SOperatorParam* param, int32_t idx); +void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId); +void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index f473626953..e68a91d97d 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -44,22 +44,6 @@ static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** static void destroyEWindowOperatorInfo(void* param); static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); -// todo : move to util -static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, - uint64_t groupId) { - pRowSup->startRowIndex = rowIndex; - pRowSup->numOfRows = 0; - pRowSup->win.skey = tsList[rowIndex]; - pRowSup->groupId = groupId; -} - -static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { - pRowSup->win.ekey = ts; - pRowSup->prevTs = ts; - pRowSup->numOfRows += 1; - pRowSup->groupId = groupId; -} - int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3817ef5b69..34ecda6ce7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -89,14 +89,14 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); } -static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { +void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { pRowSup->win.ekey = ts; pRowSup->prevTs = ts; pRowSup->numOfRows += 1; pRowSup->groupId = groupId; } -static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, +void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) { pRowSup->startRowIndex = rowIndex; pRowSup->numOfRows = 0; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 0b2fb70eba..77905792b8 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -138,6 +138,8 @@ int32_t diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t diffFunction(SqlFunctionCtx* pCtx); int32_t diffFunctionByRow(SArray* pCtx); +bool getForecastConfEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); + bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t derivativeFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h index a50562d78d..3112245de9 100644 --- a/source/libs/function/inc/functionMgtInt.h +++ b/source/libs/function/inc/functionMgtInt.h @@ -58,6 +58,7 @@ extern "C" { #define FUNC_MGT_TSMA_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(29) #define FUNC_MGT_COUNT_LIKE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(30) // funcs that should also return 0 when no rows found #define FUNC_MGT_PROCESS_BY_ROW FUNC_MGT_FUNC_CLASSIFICATION_MASK(31) +#define FUNC_MGT_FORECAST_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(32) #define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0) diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index a2aec77c2e..28e867965f 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -154,6 +154,7 @@ SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr); SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond); SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pCountToken, const SToken* pSlidingToken); +SNode* createAnomalyWindowNode(SAstCreateContext* pCxt, SNode* pExpr, const SToken* pFuncOpt); SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, SNode* pFill); SNode* createWindowOffsetNode(SAstCreateContext* pCxt, SNode* pStartOffset, SNode* pEndOffset); @@ -251,6 +252,9 @@ SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName); SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const SToken* pPort); SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, bool force, bool unsafe); SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue); +SNode* createCreateAnodeStmt(SAstCreateContext* pCxt, const SToken* pUrl); +SNode* createDropAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode); +SNode* createUpdateAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode, bool updateAll); SNode* createEncryptKeyStmt(SAstCreateContext* pCxt, const SToken* pValue); SNode* createRealTableNodeForIndexName(SAstCreateContext* pCxt, SToken* pDbName, SToken* pIndexName); SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SNode* pIndexName, diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 377009a07f..ff47c091b7 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -3972,6 +3972,10 @@ int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam return nonCalcScalarFunction(pInput, inputNum, pOutput); } +int32_t forecastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return nonCalcScalarFunction(pInput, inputNum, pOutput); +} + int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { return avgScalarFunction(pInput, inputNum, pOutput); } diff --git a/source/util/src/tanal.c b/source/util/src/tanal.c new file mode 100644 index 0000000000..19d26e8a0a --- /dev/null +++ b/source/util/src/tanal.c @@ -0,0 +1,737 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "tanal.h" +#include "tmsg.h" +#include "ttypes.h" +#include "tutil.h" + +#ifdef USE_ANAL +#include +#define ANAL_ALGO_SPLIT "," + +typedef struct { + int64_t ver; + SHashObj *hash; // algoname:algotype -> SAnalUrl + TdThreadMutex lock; +} SAlgoMgmt; + +typedef struct { + char *data; + int64_t dataLen; +} SCurlResp; + +static SAlgoMgmt tsAlgos = {0}; +static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen); + +const char *taosAnalAlgoStr(EAnalAlgoType type) { + switch (type) { + case ANAL_ALGO_TYPE_ANOMALY_DETECT: + return "anomaly-detection"; + case ANAL_ALGO_TYPE_FORECAST: + return "forecast"; + default: + return "unknown"; + } +} + +const char *taosAnalAlgoUrlStr(EAnalAlgoType type) { + switch (type) { + case ANAL_ALGO_TYPE_ANOMALY_DETECT: + return "anomaly-detect"; + case ANAL_ALGO_TYPE_FORECAST: + return "forecast"; + default: + return "unknown"; + } +} + +EAnalAlgoType taosAnalAlgoInt(const char *name) { + for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) { + if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) { + return i; + } + } + + return ANAL_ALGO_TYPE_END; +} + +int32_t taosAnalInit() { + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + uError("failed to init curl"); + return -1; + } + + tsAlgos.ver = 0; + taosThreadMutexInit(&tsAlgos.lock, NULL); + tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + if (tsAlgos.hash == NULL) { + uError("failed to init algo hash"); + return -1; + } + + uInfo("analysis env is initialized"); + return 0; +} + +static void taosAnalFreeHash(SHashObj *hash) { + void *pIter = taosHashIterate(hash, NULL); + while (pIter != NULL) { + SAnalUrl *pUrl = (SAnalUrl *)pIter; + taosMemoryFree(pUrl->url); + pIter = taosHashIterate(hash, pIter); + } + taosHashCleanup(hash); +} + +void taosAnalCleanup() { + curl_global_cleanup(); + taosThreadMutexDestroy(&tsAlgos.lock); + taosAnalFreeHash(tsAlgos.hash); + tsAlgos.hash = NULL; + uInfo("analysis env is cleaned up"); +} + +void taosAnalUpdate(int64_t newVer, SHashObj *pHash) { + if (newVer > tsAlgos.ver) { + taosThreadMutexLock(&tsAlgos.lock); + SHashObj *hash = tsAlgos.hash; + tsAlgos.ver = newVer; + tsAlgos.hash = pHash; + taosThreadMutexUnlock(&tsAlgos.lock); + taosAnalFreeHash(hash); + } else { + taosAnalFreeHash(pHash); + } +} + +bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { + char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0}; + int32_t bufLen = snprintf(buf, sizeof(buf), "%s=", optName); + + char *pos1 = strstr(option, buf); + char *pos2 = strstr(option, ANAL_ALGO_SPLIT); + if (pos1 != NULL) { + if (optMaxLen > 0) { + int32_t copyLen = optMaxLen; + if (pos2 != NULL) { + copyLen = (int32_t)(pos2 - pos1 - strlen(optName) + 1); + copyLen = MIN(copyLen, optMaxLen); + } + tstrncpy(optValue, pos1 + bufLen, copyLen); + } + return true; + } else { + return false; + } +} + +bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue) { + char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0}; + int32_t bufLen = snprintf(buf, sizeof(buf), "%s=", optName); + + char *pos1 = strstr(option, buf); + char *pos2 = strstr(option, ANAL_ALGO_SPLIT); + if (pos1 != NULL) { + *optValue = taosStr2Int32(pos1 + bufLen + 1, NULL, 10); + return true; + } else { + return false; + } +} + +int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { + int32_t code = 0; + char name[TSDB_ANAL_ALGO_KEY_LEN] = {0}; + int32_t nameLen = 1 + snprintf(name, sizeof(name) - 1, "%d:%s", type, algoName); + + taosThreadMutexLock(&tsAlgos.lock); + SAnalUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); + if (pUrl != NULL) { + tstrncpy(url, pUrl->url, urlLen); + uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url); + } else { + url[0] = 0; + terrno = TSDB_CODE_ANAL_ALGO_NOT_FOUND; + code = terrno; + uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type)); + } + taosThreadMutexUnlock(&tsAlgos.lock); + + return code; +} + +int64_t taosAnalGetVersion() { return tsAlgos.ver; } + +static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) { + SCurlResp *pRsp = userdata; + if (contLen == 0 || nmemb == 0 || pCont == NULL) { + pRsp->dataLen = 0; + pRsp->data = NULL; + uError("curl response is received, len:%" PRId64, pRsp->dataLen); + return 0; + } + + pRsp->dataLen = (int64_t)contLen * (int64_t)nmemb; + pRsp->data = taosMemoryMalloc(pRsp->dataLen + 1); + + if (pRsp->data != NULL) { + (void)memcpy(pRsp->data, pCont, pRsp->dataLen); + pRsp->data[pRsp->dataLen] = 0; + uDebug("curl response is received, len:%" PRId64 ", content:%s", pRsp->dataLen, pRsp->data); + return pRsp->dataLen; + } else { + pRsp->dataLen = 0; + uError("failed to malloc curl response"); + return 0; + } +} + +static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) { + CURL *curl = NULL; + CURLcode code = 0; + + curl = curl_easy_init(); + if (curl == NULL) { + uError("failed to create curl handle"); + return -1; + } + + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100); + + uDebug("curl get request will sent, url:%s", url); + code = curl_easy_perform(curl); + if (code != CURLE_OK) { + uError("failed to perform curl action, code:%d", code); + } + +_OVER: + if (curl != NULL) curl_easy_cleanup(curl); + return code; +} + +static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) { + struct curl_slist *headers = NULL; + CURL *curl = NULL; + CURLcode code = 0; + + curl = curl_easy_init(); + if (curl == NULL) { + uError("failed to create curl handle"); + return -1; + } + + headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000); + curl_easy_setopt(curl, CURLOPT_POST, 1); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf); + + uDebug("curl post request will sent, url:%s len:%d", url, bufLen); + code = curl_easy_perform(curl); + if (code != CURLE_OK) { + uError("failed to perform curl action, code:%d", code); + } + +_OVER: + if (curl != NULL) { + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + } + return code; +} + +SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { + int32_t code = -1; + char *pCont = NULL; + int64_t contentLen; + SJson *pJson = NULL; + SCurlResp curlRsp = {0}; + + if (type == ANAL_HTTP_TYPE_GET) { + if (taosCurlGetRequest(url, &curlRsp) != 0) { + terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS; + goto _OVER; + } + } else { + code = taosAnalBufGetCont(pBuf, &pCont, &contentLen); + if (code != 0) { + terrno = code; + goto _OVER; + } + if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) { + terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS; + goto _OVER; + } + } + + if (curlRsp.data == NULL || curlRsp.dataLen == 0) { + terrno = TSDB_CODE_ANAL_URL_RSP_IS_NULL; + goto _OVER; + } + + pJson = tjsonParse(curlRsp.data); + if (pJson == NULL) { + terrno = TSDB_CODE_INVALID_JSON_FORMAT; + goto _OVER; + } + +_OVER: + if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data); + if (pCont != NULL) taosMemoryFree(pCont); + return pJson; +} + +static int32_t taosAnalJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) { + int32_t code = 0; + int64_t contLen; + char *pCont = NULL; + TdFilePtr pFile = NULL; + + pFile = taosOpenFile(fileName, TD_FILE_READ); + if (pFile == NULL) { + code = terrno; + goto _OVER; + } + + code = taosFStatFile(pFile, &contLen, NULL); + if (code != 0) goto _OVER; + + pCont = taosMemoryMalloc(contLen + 1); + if (pCont == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + if (taosReadFile(pFile, pCont, contLen) != contLen) { + code = terrno; + goto _OVER; + } + + pCont[contLen] = '\0'; + +_OVER: + if (code == 0) { + *ppCont = pCont; + *pContLen = contLen; + } else { + if (pCont != NULL) taosMemoryFree(pCont); + } + if (pFile != NULL) taosCloseFile(&pFile); + return code; +} + +static int32_t taosAnalJsonBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { + char buf[64] = {0}; + int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { + char buf[128] = {0}; + int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { + char buf[128] = {0}; + int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteStr(SAnalBuf *pBuf, const char *buf, int32_t bufLen) { + if (bufLen <= 0) { + bufLen = strlen(buf); + } + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteStart(SAnalBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); } + +static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { + pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); + if (pBuf->filePtr == NULL) { + return terrno; + } + + pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalColBuf)); + if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY; + pBuf->numOfCols = numOfCols; + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON) { + return taosAnalJsonBufWriteStart(pBuf); + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SAnalColBuf *pCol = &pBuf->pCols[i]; + snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i); + pCol->filePtr = + taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); + if (pCol->filePtr == NULL) { + return terrno; + } + } + + return taosAnalJsonBufWriteStart(pBuf); +} + +static int32_t taosAnalJsonBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { + char buf[128] = {0}; + bool first = (colIndex == 0); + bool last = (colIndex == pBuf->numOfCols - 1); + + if (first) { + if (taosAnalJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) { + return terrno; + } + } + + int32_t bufLen = snprintf(buf, sizeof(buf), " [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name, + tDataTypes[colType].bytes, last ? "" : ","); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + + if (last) { + if (taosAnalJsonBufWriteStr(pBuf, "],\n", 0) != 0) { + return terrno; + } + } + + return 0; +} + +static int32_t taosAnalJsonBufWriteDataBegin(SAnalBuf *pBuf) { + return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0); +} + +static int32_t taosAnalJsonBufWriteStrUseCol(SAnalBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) { + if (bufLen <= 0) { + bufLen = strlen(buf); + } + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON) { + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + } else { + if (taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen) != bufLen) { + return terrno; + } + } + + return 0; +} + +static int32_t taosAnalJsonBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { + return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex); +} + +static int32_t taosAnalJsonBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { + if (colIndex == pBuf->numOfCols - 1) { + return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex); + + } else { + return taosAnalJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex); + } +} + +static int32_t taosAnalJsonBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { + char buf[64]; + int32_t bufLen = 0; + + if (pBuf->pCols[colIndex].numOfRows != 0) { + buf[bufLen] = ','; + buf[bufLen + 1] = '\n'; + buf[bufLen + 2] = 0; + bufLen += 2; + } + + switch (colType) { + case TSDB_DATA_TYPE_BOOL: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0); + break; + case TSDB_DATA_TYPE_TINYINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue); + break; + case TSDB_DATA_TYPE_UTINYINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue); + break; + case TSDB_DATA_TYPE_SMALLINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue); + break; + case TSDB_DATA_TYPE_USMALLINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue); + break; + case TSDB_DATA_TYPE_INT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue); + break; + case TSDB_DATA_TYPE_UINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue); + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64 "", *(int64_t *)colValue); + break; + case TSDB_DATA_TYPE_UBIGINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64 "", *(uint64_t *)colValue); + break; + case TSDB_DATA_TYPE_FLOAT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue)); + break; + case TSDB_DATA_TYPE_DOUBLE: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue)); + break; + default: + buf[bufLen] = '\0'; + } + + pBuf->pCols[colIndex].numOfRows++; + return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex); +} + +static int32_t taosAnalJsonBufWriteDataEnd(SAnalBuf *pBuf) { + int32_t code = 0; + char *pCont = NULL; + int64_t contLen = 0; + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + for (int32_t i = 0; i < pBuf->numOfCols; ++i) { + SAnalColBuf *pCol = &pBuf->pCols[i]; + + code = taosFsyncFile(pCol->filePtr); + if (code != 0) return code; + + code = taosCloseFile(&pCol->filePtr); + if (code != 0) return code; + + code = taosAnalJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen); + if (code != 0) return code; + + code = taosAnalJsonBufWriteStr(pBuf, pCont, contLen); + if (code != 0) return code; + + taosMemoryFreeClear(pCont); + contLen = 0; + } + } + + return taosAnalJsonBufWriteStr(pBuf, "],\n", 0); +} + +static int32_t taosAnalJsonBufWriteEnd(SAnalBuf *pBuf) { + int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows); + if (code != 0) return code; + + return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0); +} + +int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) { + int32_t code = taosAnalJsonBufWriteEnd(pBuf); + if (code != 0) return code; + + if (pBuf->filePtr != NULL) { + code = taosFsyncFile(pBuf->filePtr); + if (code != 0) return code; + code = taosCloseFile(&pBuf->filePtr); + if (code != 0) return code; + } + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + for (int32_t i = 0; i < pBuf->numOfCols; ++i) { + SAnalColBuf *pCol = &pBuf->pCols[i]; + if (pCol->filePtr != NULL) { + code = taosFsyncFile(pCol->filePtr); + if (code != 0) return code; + code = taosCloseFile(&pCol->filePtr); + if (code != 0) return code; + } + } + } + + return 0; +} + +void taosAnalBufDestroy(SAnalBuf *pBuf) { + if (pBuf->fileName[0] != 0) { + if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr); + // taosRemoveFile(pBuf->fileName); + pBuf->fileName[0] = 0; + } + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + for (int32_t i = 0; i < pBuf->numOfCols; ++i) { + SAnalColBuf *pCol = &pBuf->pCols[i]; + if (pCol->fileName[0] != 0) { + if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr); + taosRemoveFile(pCol->fileName); + pCol->fileName[0] = 0; + } + } + } + + taosMemoryFreeClear(pBuf->pCols); + pBuf->numOfCols = 0; +} + +int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return tsosAnalJsonBufOpen(pBuf, numOfCols); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteDataBegin(pBuf); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColBegin(pBuf, colIndex); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColEnd(pBuf, colIndex); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteDataEnd(pBuf); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufClose(SAnalBuf *pBuf) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufClose(pBuf); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen) { + *ppCont = NULL; + *pContLen = 0; + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +#else + +int32_t taosAnalInit() { return 0; } +void taosAnalCleanup() {} +SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { return NULL; } + +int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } +bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } +bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue) { return true; } +int64_t taosAnalGetVersion() { return 0; } +void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {} + +int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { return 0; } +int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { return 0; } +int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { return 0; } +int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { return 0; } +int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; } +int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) { return 0; } +int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { return 0; } +int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } +int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { return 0; } +int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) { return 0; } +int32_t taosAnalBufClose(SAnalBuf *pBuf) { return 0; } +void taosAnalBufDestroy(SAnalBuf *pBuf) {} + +const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; } +EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; } +const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; } + +#endif \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index df104508da..3598262d5d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -345,6 +345,21 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB, "Stream temporarily do TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_ALREADY_EXIST, "Anode already exists") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_NOT_EXIST, "Anode not there") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_URL, "Anode too long url") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_INVALID_PROTOCOL, "Anode invalid protocol") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_INVALID_VERSION, "Anode invalid version") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO, "Anode too many algorithm") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME, "Anode too long algorithm name") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE, "Anode too many algorithm type") + +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis url response is NULL") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis url can't access") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm not found") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type") + // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "sma not exist") @@ -708,6 +723,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_ERROR, "Pseudo tag tbname n TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_TYPE, "ANOMALY_WINDOW only support mathable column") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_COL, "ANOMALY_WINDOW not support on tag column") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT, "ANOMALY_WINDOW option should include algo field") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE, "Invalid forecast clause") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 0f2504ff5e..314f205057 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -194,6 +194,10 @@ int32_t tjsonGetObjectValueString(const SJson* pJson, char** pValueString) { return TSDB_CODE_SUCCESS; } +void tjsonGetObjectValueBigInt(const SJson* pJson, int64_t* pVal) { *pVal = (int64_t)((cJSON*)pJson)->valuedouble; } + +void tjsonGetObjectValueDouble(const SJson* pJson, double* pVal) { *pVal = ((cJSON*)pJson)->valuedouble; } + int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal) { char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); if (NULL == p) {