From 4dc0c2f47e9dd473869772a67c0d17af59d4c166 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 9 Oct 2024 17:09:43 +0800 Subject: [PATCH] feat: data analysis --- cmake/cmake.options | 13 + include/common/systable.h | 2 + include/common/tanal.h | 96 +++ include/common/tmsg.h | 54 ++ include/common/tmsgdef.h | 5 + include/libs/function/functionMgt.h | 6 + include/libs/nodes/cmdnodes.h | 15 + include/libs/nodes/plannodes.h | 22 +- include/libs/nodes/querynodes.h | 9 + include/libs/qcom/query.h | 40 +- include/libs/scalar/scalar.h | 1 + include/util/taoserror.h | 22 +- include/util/tdef.h | 13 + include/util/tjson.h | 2 + source/common/CMakeLists.txt | 4 + source/dnode/mnode/impl/inc/mndAnode.h | 32 + source/dnode/mnode/impl/inc/mndDef.h | 21 + source/libs/executor/inc/operator.h | 7 + .../libs/executor/src/eventwindowoperator.c | 16 - source/libs/executor/src/timewindowoperator.c | 4 +- source/libs/function/inc/builtinsimpl.h | 2 + source/libs/function/inc/functionMgtInt.h | 1 + source/libs/parser/inc/parAst.h | 4 + source/libs/scalar/src/sclfunc.c | 4 + source/util/src/tanal.c | 737 ++++++++++++++++++ source/util/src/terror.c | 20 + source/util/src/tjson.c | 4 + 27 files changed, 1116 insertions(+), 40 deletions(-) create mode 100644 include/common/tanal.h create mode 100644 source/dnode/mnode/impl/inc/mndAnode.h create mode 100644 source/util/src/tanal.c diff --git a/cmake/cmake.options b/cmake/cmake.options index 2158157780..e3b5782d85 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -144,6 +144,12 @@ option( OFF ) +option( + BUILD_WITH_ANALYSIS + "If build with analysis" + ON +) + ENDIF () IF(NOT TD_ENTERPRISE) @@ -151,8 +157,15 @@ MESSAGE("switch s3 off with community version") set(BUILD_S3 OFF) set(BUILD_WITH_S3 OFF) set(BUILD_WITH_COS OFF) +set(BUILD_WITH_ANALYSIS OFF) ENDIF () +IF(${BUILD_WITH_ANALYSIS}) + message("build with analysis") + set(BUILD_S3 ON) + set(BUILD_WITH_S3 ON) +ENDIF() + IF(${BUILD_S3}) IF(${BUILD_WITH_S3}) diff --git a/include/common/systable.h b/include/common/systable.h index 65b3b36af8..0acafbfc30 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -29,6 +29,8 @@ extern "C" { #define TSDB_INS_TABLE_QNODES "ins_qnodes" #define TSDB_INS_TABLE_BNODES "ins_bnodes" // no longer used #define TSDB_INS_TABLE_SNODES "ins_snodes" +#define TSDB_INS_TABLE_ANODES "ins_anodes" +#define TSDB_INS_TABLE_ANODES_FULL "ins_anodes_full" #define TSDB_INS_TABLE_ARBGROUPS "ins_arbgroups" #define TSDB_INS_TABLE_CLUSTER "ins_cluster" #define TSDB_INS_TABLE_DATABASES "ins_databases" diff --git a/include/common/tanal.h b/include/common/tanal.h new file mode 100644 index 0000000000..59a28ddbe3 --- /dev/null +++ b/include/common/tanal.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_ANAL_H_ +#define _TD_UTIL_ANAL_H_ + +#include "os.h" +#include "tdef.h" +#include "thash.h" +#include "tjson.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define ANAL_FORECAST_DEFAULT_PERIOD 10 +#define ANAL_FORECAST_DEFAULT_ROWS 10 +#define ANAL_FORECAST_DEFAULT_CONF 95 +#define ANAL_FORECAST_DEFAULT_ALPHA 0.05 +#define ANAL_FORECAST_DEFAULT_PARAM "diff" + +typedef struct { + EAnalAlgoType type; + int32_t anode; + int32_t urlLen; + char *url; +} SAnalUrl; + +typedef enum { + ANAL_BUF_TYPE_JSON = 0, + ANAL_BUF_TYPE_JSON_COL = 1, + ANAL_BUF_TYPE_OTHERS, +} EAnalBufType; + +typedef enum { + ANAL_HTTP_TYPE_GET = 0, + ANAL_HTTP_TYPE_POST, +} EAnalHttpType; + +typedef struct { + TdFilePtr filePtr; + char fileName[TSDB_FILENAME_LEN + 10]; + int64_t numOfRows; +} SAnalColBuf; + +typedef struct { + EAnalBufType bufType; + TdFilePtr filePtr; + char fileName[TSDB_FILENAME_LEN]; + int32_t numOfCols; + SAnalColBuf *pCols; +} SAnalBuf; + +int32_t taosAnalInit(); +void taosAnalCleanup(); +SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf); + +int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen); +bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen); +bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue); +int64_t taosAnalGetVersion(); +void taosAnalUpdate(int64_t newVer, SHashObj *pHash); + +int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols); +int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal); +int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal); +int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal); +int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName); +int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf); +int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex); +int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue); +int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex); +int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf); +int32_t taosAnalBufClose(SAnalBuf *pBuf); +void taosAnalBufDestroy(SAnalBuf *pBuf); + +const char *taosAnalAlgoStr(EAnalAlgoType algoType); +EAnalAlgoType taosAnalAlgoInt(const char *algoName); +const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType); + +#ifdef __cplusplus +} +#endif +#endif /*_TD_UTIL_ANAL_H_*/ \ No newline at end of file diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1a10f02c96..ae9df866c5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -159,6 +159,8 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_ARBGROUP, TSDB_MGMT_TABLE_ENCRYPTIONS, TSDB_MGMT_TABLE_USER_FULL, + TSDB_MGMT_TABLE_ANODE, + TSDB_MGMT_TABLE_ANODE_FULL, TSDB_MGMT_TABLE_MAX, } EShowType; @@ -260,6 +262,7 @@ typedef enum ENodeType { QUERY_NODE_COUNT_WINDOW, QUERY_NODE_COLUMN_OPTIONS, QUERY_NODE_TSMA_OPTIONS, + QUERY_NODE_ANOMALY_WINDOW, // Statement nodes are used in parser and planner module. QUERY_NODE_SET_OPERATOR = 100, @@ -345,6 +348,9 @@ typedef enum ENodeType { QUERY_NODE_CREATE_VIEW_STMT, QUERY_NODE_DROP_VIEW_STMT, QUERY_NODE_CREATE_SUBTABLE_FROM_FILE_CLAUSE, + QUERY_NODE_CREATE_ANODE_STMT, + QUERY_NODE_DROP_ANODE_STMT, + QUERY_NODE_UPDATE_ANODE_STMT, // show statement nodes // see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET' @@ -386,6 +392,8 @@ typedef enum ENodeType { QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT, QUERY_NODE_SHOW_ENCRYPTIONS_STMT, QUERY_NODE_SHOW_TSMAS_STMT, + QUERY_NODE_SHOW_ANODES_STMT, + QUERY_NODE_SHOW_ANODES_FULL_STMT, QUERY_NODE_CREATE_TSMA_STMT, QUERY_NODE_SHOW_CREATE_TSMA_STMT, QUERY_NODE_DROP_TSMA_STMT, @@ -408,6 +416,7 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN, QUERY_NODE_LOGIC_PLAN_GROUP_CACHE, QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL, + QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC, // physical plan node QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100, @@ -458,6 +467,9 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY, + QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY, + QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, } ENodeType; typedef struct { @@ -1092,6 +1104,22 @@ typedef struct { int32_t tSerializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq); int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq); +typedef struct { + int32_t dnodeId; + int64_t analVer; +} SRetrieveAnalAlgoReq; + +typedef struct { + int64_t ver; + SHashObj* hash; // algoname:algotype -> SAnalUrl +} SRetrieveAnalAlgoRsp; + +int32_t tSerializeRetrieveAnalAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalAlgoReq* pReq); +int32_t tDeserializeRetrieveAnalAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalAlgoReq* pReq); +int32_t tSerializeRetrieveAnalAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalAlgoRsp* pRsp); +int32_t tDeserializeRetrieveAnalAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalAlgoRsp* pRsp); +void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp* pRsp); + typedef struct { int8_t alterType; int8_t superUser; @@ -1766,6 +1794,7 @@ typedef struct { SArray* pVloads; // array of SVnodeLoad int32_t statusSeq; int64_t ipWhiteVer; + int64_t analVer; } SStatusReq; int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); @@ -1831,6 +1860,7 @@ typedef struct { SArray* pDnodeEps; // Array of SDnodeEp int32_t statusSeq; int64_t ipWhiteVer; + int64_t analVer; } SStatusRsp; int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); @@ -2377,6 +2407,30 @@ typedef struct { int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); +typedef struct { + int32_t urlLen; + int32_t sqlLen; + char* url; + char* sql; +} SMCreateAnodeReq; + +int32_t tSerializeSMCreateAnodeReq(void* buf, int32_t bufLen, SMCreateAnodeReq* pReq); +int32_t tDeserializeSMCreateAnodeReq(void* buf, int32_t bufLen, SMCreateAnodeReq* pReq); +void tFreeSMCreateAnodeReq(SMCreateAnodeReq* pReq); + +typedef struct { + int32_t anodeId; + int32_t sqlLen; + char* sql; +} SMDropAnodeReq, SMUpdateAnodeReq; + +int32_t tSerializeSMDropAnodeReq(void* buf, int32_t bufLen, SMDropAnodeReq* pReq); +int32_t tDeserializeSMDropAnodeReq(void* buf, int32_t bufLen, SMDropAnodeReq* pReq); +void tFreeSMDropAnodeReq(SMDropAnodeReq* pReq); +int32_t tSerializeSMUpdateAnodeReq(void* buf, int32_t bufLen, SMUpdateAnodeReq* pReq); +int32_t tDeserializeSMUpdateAnodeReq(void* buf, int32_t bufLen, SMUpdateAnodeReq* pReq); +void tFreeSMUpdateAnodeReq(SMUpdateAnodeReq* pReq); + typedef struct { int32_t vgId; int32_t hbSeq; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 40464dc29a..6540e7b135 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -125,6 +125,11 @@ TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CREATE_ENCRYPT_KEY, "create-encrypt-key", NULL, NULL) + // mnode msg overload + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ANODE, "create-anode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_ANODE, "update-anode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_DROP_ANODE, "drop-anode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_ANAL_ALGO, "retrieve-anal-algo", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_DND_MSG) TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8 diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 519207377b..e5bacf85b2 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -62,6 +62,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_UNIQUE, FUNCTION_TYPE_STATE_COUNT, FUNCTION_TYPE_STATE_DURATION, + FUNCTION_TYPE_FORECAST, // math function FUNCTION_TYPE_ABS = 1000, @@ -149,6 +150,9 @@ typedef enum EFunctionType { FUNCTION_TYPE_TBUID, FUNCTION_TYPE_VGID, FUNCTION_TYPE_VGVER, + FUNCTION_TYPE_FORECAST_LOW, + FUNCTION_TYPE_FORECAST_HIGH, + FUNCTION_TYPE_FORECAST_ROWTS, // internal function FUNCTION_TYPE_SELECT_VALUE = 3750, @@ -263,6 +267,7 @@ bool fmIsForbidSysTableFunc(int32_t funcId); bool fmIsIntervalInterpoFunc(int32_t funcId); bool fmIsInterpFunc(int32_t funcId); bool fmIsLastRowFunc(int32_t funcId); +bool fmIsForecastFunc(int32_t funcId); bool fmIsNotNullOutputFunc(int32_t funcId); bool fmIsSelectValueFunc(int32_t funcId); bool fmIsSystemInfoFunc(int32_t funcId); @@ -272,6 +277,7 @@ bool fmIsMultiRowsFunc(int32_t funcId); bool fmIsKeepOrderFunc(int32_t funcId); bool fmIsCumulativeFunc(int32_t funcId); bool fmIsInterpPseudoColumnFunc(int32_t funcId); +bool fmIsForecastPseudoColumnFunc(int32_t funcId); bool fmIsGroupKeyFunc(int32_t funcId); bool fmIsBlockDistFunc(int32_t funcId); bool fmIsIgnoreNullFunc(int32_t funcId); diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 2a18e800b8..ba1e21b897 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -318,6 +318,21 @@ typedef struct SAlterDnodeStmt { char value[TSDB_DNODE_VALUE_LEN]; } SAlterDnodeStmt; +typedef struct { + ENodeType type; + char url[TSDB_ANAL_ANODE_URL_LEN]; +} SCreateAnodeStmt; + +typedef struct { + ENodeType type; + int32_t anodeId; +} SDropAnodeStmt; + +typedef struct { + ENodeType type; + int32_t anodeId; +} SUpdateAnodeStmt; + typedef struct SShowStmt { ENodeType type; SNode* pDbName; // SValueNode diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index a691433ee6..8e4a3ea32b 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -204,6 +204,11 @@ typedef struct SInterpFuncLogicNode { SNode* pTimeSeries; // SColumnNode } SInterpFuncLogicNode; +typedef struct SForecastFuncLogicNode { + SLogicNode node; + SNodeList* pFuncs; +} SForecastFuncLogicNode; + typedef struct SGroupCacheLogicNode { SLogicNode node; bool grpColsMayBeNull; @@ -274,7 +279,8 @@ typedef enum EWindowType { WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE, WINDOW_TYPE_EVENT, - WINDOW_TYPE_COUNT + WINDOW_TYPE_COUNT, + WINDOW_TYPE_ANOMALY } EWindowType; typedef enum EWindowAlgorithm { @@ -315,6 +321,8 @@ typedef struct SWindowLogicNode { int64_t windowCount; int64_t windowSliding; SNodeList* pTsmaSubplans; + SNode* pAnomalyExpr; + char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN]; } SWindowLogicNode; typedef struct SFillLogicNode { @@ -507,6 +515,12 @@ typedef struct SInterpFuncPhysiNode { SNode* pTimeSeries; // SColumnNode } SInterpFuncPhysiNode; +typedef struct SForecastFuncPhysiNode { + SPhysiNode node; + SNodeList* pExprs; + SNodeList* pFuncs; +} SForecastFuncPhysiNode; + typedef struct SSortMergeJoinPhysiNode { SPhysiNode node; EJoinType joinType; @@ -704,6 +718,12 @@ typedef struct SCountWinodwPhysiNode { typedef SCountWinodwPhysiNode SStreamCountWinodwPhysiNode; +typedef struct SAnomalyWindowPhysiNode { + SWindowPhysiNode window; + SNode* pAnomalyKey; + char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN]; +} SAnomalyWindowPhysiNode; + typedef struct SSortPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index f5567c735e..4763077ed9 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -347,6 +347,13 @@ typedef struct SCountWindowNode { int64_t windowSliding; } SCountWindowNode; +typedef struct SAnomalyWindowNode { + ENodeType type; // QUERY_NODE_ANOMALY_WINDOW + SNode* pCol; // timestamp primary key + SNode* pExpr; + char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN]; +} SAnomalyWindowNode; + typedef enum EFillMode { FILL_MODE_NONE = 1, FILL_MODE_VALUE, @@ -442,6 +449,8 @@ typedef struct SSelectStmt { bool hasTailFunc; bool hasInterpFunc; bool hasInterpPseudoColFunc; + bool hasForecastFunc; + bool hasForecastPseudoColFunc; bool hasLastRowFunc; bool hasLastFunc; bool hasTimeLineFunc; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index e0c7ff9a20..81a3952463 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -407,29 +407,29 @@ void* getTaskPoolWorkerCb(); #define IS_AUDIT_CTB_NAME(_ctbname) \ ((*(_ctbname) == 't') && (0 == strncmp(_ctbname, TSDB_AUDIT_CTB_OPERATION, TSDB_AUDIT_CTB_OPERATION_LEN))) -#define qFatal(...) \ - do { \ - if (qDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \ - } \ +#define qFatal(...) \ + do { \ + if (qDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qError(...) \ - do { \ - if (qDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \ - } \ +#define qError(...) \ + do { \ + if (qDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qWarn(...) \ - do { \ - if (qDebugFlag & DEBUG_WARN) { \ - taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \ - } \ +#define qWarn(...) \ + do { \ + if (qDebugFlag & DEBUG_WARN) { \ + taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define qInfo(...) \ - do { \ - if (qDebugFlag & DEBUG_INFO) { \ - taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \ - } \ +#define qInfo(...) \ + do { \ + if (qDebugFlag & DEBUG_INFO) { \ + taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + } \ } while (0) #define qDebug(...) \ do { \ diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index b06b7c74c7..fd936dd087 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -139,6 +139,7 @@ int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t forecastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 558902075b..603207d8c0 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -474,8 +474,24 @@ int32_t taosGetErrSize(); #define TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR TAOS_DEF_ERROR_CODE(0, 0x0427) #define TSDB_CODE_DNODE_INVALID_EN_WHITELIST TAOS_DEF_ERROR_CODE(0, 0x0428) #define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429) -#define TSDB_CODE_MNODE_STOPPED TAOS_DEF_ERROR_CODE(0, 0x042A) +#define TSDB_CODE_MNODE_STOPPED TAOS_DEF_ERROR_CODE(0, 0x042A) +// anode +#define TSDB_CODE_MND_ANODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0430) +#define TSDB_CODE_MND_ANODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0431) +#define TSDB_CODE_MND_ANODE_TOO_LONG_URL TAOS_DEF_ERROR_CODE(0, 0x0432) +#define TSDB_CODE_MND_ANODE_INVALID_PROTOCOL TAOS_DEF_ERROR_CODE(0, 0x0433) +#define TSDB_CODE_MND_ANODE_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0434) +#define TSDB_CODE_MND_ANODE_TOO_MANY_ALGO TAOS_DEF_ERROR_CODE(0, 0x0435) +#define TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME TAOS_DEF_ERROR_CODE(0, 0x0436) +#define TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE TAOS_DEF_ERROR_CODE(0, 0x0437) + +// analysis +#define TSDB_CODE_ANAL_URL_RSP_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x0440) +#define TSDB_CODE_ANAL_URL_CANT_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0441) +#define TSDB_CODE_ANAL_ALGO_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0442) +#define TSDB_CODE_ANAL_ALGO_NOT_LOAD TAOS_DEF_ERROR_CODE(0, 0x0443) +#define TSDB_CODE_ANAL_BUF_INVALID_TYPE TAOS_DEF_ERROR_CODE(0, 0x0444) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) @@ -868,6 +884,10 @@ int32_t taosGetErrSize(); #define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F) #define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680) #define TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2681) +#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_TYPE TAOS_DEF_ERROR_CODE(0, 0x2682) +#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_COL TAOS_DEF_ERROR_CODE(0, 0x2683) +#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT TAOS_DEF_ERROR_CODE(0, 0x2684) +#define TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x2685) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/include/util/tdef.h b/include/util/tdef.h index 46c84ab26a..768ff82ade 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -293,6 +293,12 @@ typedef enum ELogicConditionType { #define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_LOG_VAR_LEN 32 +#define TSDB_ANAL_ANODE_URL_LEN 128 +#define TSDB_ANAL_ALGO_NAME_LEN 64 +#define TSDB_ANAL_ALGO_TYPE_LEN 24 +#define TSDB_ANAL_ALGO_KEY_LEN (TSDB_ANAL_ALGO_NAME_LEN + 9) +#define TSDB_ANAL_ALGO_URL_LEN (TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN + 1) +#define TSDB_ANAL_ALGO_OPTION_LEN 256 #define TSDB_MAX_EP_NUM 10 @@ -603,6 +609,13 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 }; #define MONITOR_TAG_NAME_LEN 100 #define MONITOR_TAG_VALUE_LEN 300 #define MONITOR_METRIC_NAME_LEN 100 + +typedef enum { + ANAL_ALGO_TYPE_ANOMALY_DETECT = 0, + ANAL_ALGO_TYPE_FORECAST = 1, + ANAL_ALGO_TYPE_END, +} EAnalAlgoType; + #ifdef __cplusplus } #endif diff --git a/include/util/tjson.h b/include/util/tjson.h index b9ea72b4bb..50d1a4d438 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -68,6 +68,8 @@ int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem); SJson* tjsonGetObjectItem(const SJson* pJson, const char* pName); int32_t tjsonGetObjectName(const SJson* pJson, char** pName); int32_t tjsonGetObjectValueString(const SJson* pJson, char** pStringValue); +void tjsonGetObjectValueBigInt(const SJson* pJson, int64_t* pVal); +void tjsonGetObjectValueDouble(const SJson* pJson, double* pVal); int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal); int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal); int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal); diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index eb3dd95e95..f01c8dcbb9 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -47,6 +47,10 @@ target_link_libraries( INTERFACE api ) +if(${BUILD_WITH_ANALYSIS}) + add_definitions(-DUSE_ANAL) +endif() + if(${BUILD_S3}) if(${BUILD_WITH_S3}) diff --git a/source/dnode/mnode/impl/inc/mndAnode.h b/source/dnode/mnode/impl/inc/mndAnode.h new file mode 100644 index 0000000000..63e8f9090e --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndAnode.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_ANODE_H_ +#define _TD_MND_ANODE_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitAnode(SMnode *pMnode); +void mndCleanupAnode(SMnode *pMnode); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_ANODE_H_*/ \ No newline at end of file diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 60b732f817..742db8f450 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -78,6 +78,9 @@ typedef enum { MND_OPER_DROP_VIEW, MND_OPER_CONFIG_CLUSTER, MND_OPER_BALANCE_VGROUP_LEADER, + MND_OPER_CREATE_ANODE, + MND_OPER_UPDATE_ANODE, + MND_OPER_DROP_ANODE } EOperType; typedef enum { @@ -232,6 +235,24 @@ typedef struct { char machineId[TSDB_MACHINE_ID_LEN + 1]; } SDnodeObj; +typedef struct { + int32_t nameLen; + char* name; +} SAnodeAlgo; + +typedef struct { + int32_t id; + int64_t createdTime; + int64_t updateTime; + int32_t version; + int32_t urlLen; + int32_t numOfAlgos; + int32_t status; + SRWLatch lock; + char* url; + SArray** algos; +} SAnodeObj; + typedef struct { int32_t id; int64_t createdTime; diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 0df676c6e2..7dfc7080d6 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -133,6 +133,8 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); +int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); + int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); @@ -159,6 +161,8 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); +int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); + int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); // clang-format on @@ -190,6 +194,9 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdSt int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList); void * getOperatorParam(int32_t opType, SOperatorParam* param, int32_t idx); +void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId); +void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index f473626953..e68a91d97d 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -44,22 +44,6 @@ static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** static void destroyEWindowOperatorInfo(void* param); static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); -// todo : move to util -static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, - uint64_t groupId) { - pRowSup->startRowIndex = rowIndex; - pRowSup->numOfRows = 0; - pRowSup->win.skey = tsList[rowIndex]; - pRowSup->groupId = groupId; -} - -static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { - pRowSup->win.ekey = ts; - pRowSup->prevTs = ts; - pRowSup->numOfRows += 1; - pRowSup->groupId = groupId; -} - int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3817ef5b69..34ecda6ce7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -89,14 +89,14 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); } -static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { +void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { pRowSup->win.ekey = ts; pRowSup->prevTs = ts; pRowSup->numOfRows += 1; pRowSup->groupId = groupId; } -static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, +void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) { pRowSup->startRowIndex = rowIndex; pRowSup->numOfRows = 0; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 0b2fb70eba..77905792b8 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -138,6 +138,8 @@ int32_t diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t diffFunction(SqlFunctionCtx* pCtx); int32_t diffFunctionByRow(SArray* pCtx); +bool getForecastConfEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); + bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t derivativeFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h index a50562d78d..3112245de9 100644 --- a/source/libs/function/inc/functionMgtInt.h +++ b/source/libs/function/inc/functionMgtInt.h @@ -58,6 +58,7 @@ extern "C" { #define FUNC_MGT_TSMA_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(29) #define FUNC_MGT_COUNT_LIKE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(30) // funcs that should also return 0 when no rows found #define FUNC_MGT_PROCESS_BY_ROW FUNC_MGT_FUNC_CLASSIFICATION_MASK(31) +#define FUNC_MGT_FORECAST_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(32) #define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0) diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index a2aec77c2e..28e867965f 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -154,6 +154,7 @@ SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr); SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond); SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pCountToken, const SToken* pSlidingToken); +SNode* createAnomalyWindowNode(SAstCreateContext* pCxt, SNode* pExpr, const SToken* pFuncOpt); SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, SNode* pFill); SNode* createWindowOffsetNode(SAstCreateContext* pCxt, SNode* pStartOffset, SNode* pEndOffset); @@ -251,6 +252,9 @@ SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName); SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const SToken* pPort); SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, bool force, bool unsafe); SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue); +SNode* createCreateAnodeStmt(SAstCreateContext* pCxt, const SToken* pUrl); +SNode* createDropAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode); +SNode* createUpdateAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode, bool updateAll); SNode* createEncryptKeyStmt(SAstCreateContext* pCxt, const SToken* pValue); SNode* createRealTableNodeForIndexName(SAstCreateContext* pCxt, SToken* pDbName, SToken* pIndexName); SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SNode* pIndexName, diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 377009a07f..ff47c091b7 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -3972,6 +3972,10 @@ int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam return nonCalcScalarFunction(pInput, inputNum, pOutput); } +int32_t forecastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return nonCalcScalarFunction(pInput, inputNum, pOutput); +} + int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { return avgScalarFunction(pInput, inputNum, pOutput); } diff --git a/source/util/src/tanal.c b/source/util/src/tanal.c new file mode 100644 index 0000000000..19d26e8a0a --- /dev/null +++ b/source/util/src/tanal.c @@ -0,0 +1,737 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "tanal.h" +#include "tmsg.h" +#include "ttypes.h" +#include "tutil.h" + +#ifdef USE_ANAL +#include +#define ANAL_ALGO_SPLIT "," + +typedef struct { + int64_t ver; + SHashObj *hash; // algoname:algotype -> SAnalUrl + TdThreadMutex lock; +} SAlgoMgmt; + +typedef struct { + char *data; + int64_t dataLen; +} SCurlResp; + +static SAlgoMgmt tsAlgos = {0}; +static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen); + +const char *taosAnalAlgoStr(EAnalAlgoType type) { + switch (type) { + case ANAL_ALGO_TYPE_ANOMALY_DETECT: + return "anomaly-detection"; + case ANAL_ALGO_TYPE_FORECAST: + return "forecast"; + default: + return "unknown"; + } +} + +const char *taosAnalAlgoUrlStr(EAnalAlgoType type) { + switch (type) { + case ANAL_ALGO_TYPE_ANOMALY_DETECT: + return "anomaly-detect"; + case ANAL_ALGO_TYPE_FORECAST: + return "forecast"; + default: + return "unknown"; + } +} + +EAnalAlgoType taosAnalAlgoInt(const char *name) { + for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) { + if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) { + return i; + } + } + + return ANAL_ALGO_TYPE_END; +} + +int32_t taosAnalInit() { + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + uError("failed to init curl"); + return -1; + } + + tsAlgos.ver = 0; + taosThreadMutexInit(&tsAlgos.lock, NULL); + tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + if (tsAlgos.hash == NULL) { + uError("failed to init algo hash"); + return -1; + } + + uInfo("analysis env is initialized"); + return 0; +} + +static void taosAnalFreeHash(SHashObj *hash) { + void *pIter = taosHashIterate(hash, NULL); + while (pIter != NULL) { + SAnalUrl *pUrl = (SAnalUrl *)pIter; + taosMemoryFree(pUrl->url); + pIter = taosHashIterate(hash, pIter); + } + taosHashCleanup(hash); +} + +void taosAnalCleanup() { + curl_global_cleanup(); + taosThreadMutexDestroy(&tsAlgos.lock); + taosAnalFreeHash(tsAlgos.hash); + tsAlgos.hash = NULL; + uInfo("analysis env is cleaned up"); +} + +void taosAnalUpdate(int64_t newVer, SHashObj *pHash) { + if (newVer > tsAlgos.ver) { + taosThreadMutexLock(&tsAlgos.lock); + SHashObj *hash = tsAlgos.hash; + tsAlgos.ver = newVer; + tsAlgos.hash = pHash; + taosThreadMutexUnlock(&tsAlgos.lock); + taosAnalFreeHash(hash); + } else { + taosAnalFreeHash(pHash); + } +} + +bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { + char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0}; + int32_t bufLen = snprintf(buf, sizeof(buf), "%s=", optName); + + char *pos1 = strstr(option, buf); + char *pos2 = strstr(option, ANAL_ALGO_SPLIT); + if (pos1 != NULL) { + if (optMaxLen > 0) { + int32_t copyLen = optMaxLen; + if (pos2 != NULL) { + copyLen = (int32_t)(pos2 - pos1 - strlen(optName) + 1); + copyLen = MIN(copyLen, optMaxLen); + } + tstrncpy(optValue, pos1 + bufLen, copyLen); + } + return true; + } else { + return false; + } +} + +bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue) { + char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0}; + int32_t bufLen = snprintf(buf, sizeof(buf), "%s=", optName); + + char *pos1 = strstr(option, buf); + char *pos2 = strstr(option, ANAL_ALGO_SPLIT); + if (pos1 != NULL) { + *optValue = taosStr2Int32(pos1 + bufLen + 1, NULL, 10); + return true; + } else { + return false; + } +} + +int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { + int32_t code = 0; + char name[TSDB_ANAL_ALGO_KEY_LEN] = {0}; + int32_t nameLen = 1 + snprintf(name, sizeof(name) - 1, "%d:%s", type, algoName); + + taosThreadMutexLock(&tsAlgos.lock); + SAnalUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); + if (pUrl != NULL) { + tstrncpy(url, pUrl->url, urlLen); + uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url); + } else { + url[0] = 0; + terrno = TSDB_CODE_ANAL_ALGO_NOT_FOUND; + code = terrno; + uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type)); + } + taosThreadMutexUnlock(&tsAlgos.lock); + + return code; +} + +int64_t taosAnalGetVersion() { return tsAlgos.ver; } + +static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) { + SCurlResp *pRsp = userdata; + if (contLen == 0 || nmemb == 0 || pCont == NULL) { + pRsp->dataLen = 0; + pRsp->data = NULL; + uError("curl response is received, len:%" PRId64, pRsp->dataLen); + return 0; + } + + pRsp->dataLen = (int64_t)contLen * (int64_t)nmemb; + pRsp->data = taosMemoryMalloc(pRsp->dataLen + 1); + + if (pRsp->data != NULL) { + (void)memcpy(pRsp->data, pCont, pRsp->dataLen); + pRsp->data[pRsp->dataLen] = 0; + uDebug("curl response is received, len:%" PRId64 ", content:%s", pRsp->dataLen, pRsp->data); + return pRsp->dataLen; + } else { + pRsp->dataLen = 0; + uError("failed to malloc curl response"); + return 0; + } +} + +static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) { + CURL *curl = NULL; + CURLcode code = 0; + + curl = curl_easy_init(); + if (curl == NULL) { + uError("failed to create curl handle"); + return -1; + } + + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100); + + uDebug("curl get request will sent, url:%s", url); + code = curl_easy_perform(curl); + if (code != CURLE_OK) { + uError("failed to perform curl action, code:%d", code); + } + +_OVER: + if (curl != NULL) curl_easy_cleanup(curl); + return code; +} + +static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) { + struct curl_slist *headers = NULL; + CURL *curl = NULL; + CURLcode code = 0; + + curl = curl_easy_init(); + if (curl == NULL) { + uError("failed to create curl handle"); + return -1; + } + + headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000); + curl_easy_setopt(curl, CURLOPT_POST, 1); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf); + + uDebug("curl post request will sent, url:%s len:%d", url, bufLen); + code = curl_easy_perform(curl); + if (code != CURLE_OK) { + uError("failed to perform curl action, code:%d", code); + } + +_OVER: + if (curl != NULL) { + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + } + return code; +} + +SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { + int32_t code = -1; + char *pCont = NULL; + int64_t contentLen; + SJson *pJson = NULL; + SCurlResp curlRsp = {0}; + + if (type == ANAL_HTTP_TYPE_GET) { + if (taosCurlGetRequest(url, &curlRsp) != 0) { + terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS; + goto _OVER; + } + } else { + code = taosAnalBufGetCont(pBuf, &pCont, &contentLen); + if (code != 0) { + terrno = code; + goto _OVER; + } + if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) { + terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS; + goto _OVER; + } + } + + if (curlRsp.data == NULL || curlRsp.dataLen == 0) { + terrno = TSDB_CODE_ANAL_URL_RSP_IS_NULL; + goto _OVER; + } + + pJson = tjsonParse(curlRsp.data); + if (pJson == NULL) { + terrno = TSDB_CODE_INVALID_JSON_FORMAT; + goto _OVER; + } + +_OVER: + if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data); + if (pCont != NULL) taosMemoryFree(pCont); + return pJson; +} + +static int32_t taosAnalJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) { + int32_t code = 0; + int64_t contLen; + char *pCont = NULL; + TdFilePtr pFile = NULL; + + pFile = taosOpenFile(fileName, TD_FILE_READ); + if (pFile == NULL) { + code = terrno; + goto _OVER; + } + + code = taosFStatFile(pFile, &contLen, NULL); + if (code != 0) goto _OVER; + + pCont = taosMemoryMalloc(contLen + 1); + if (pCont == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + if (taosReadFile(pFile, pCont, contLen) != contLen) { + code = terrno; + goto _OVER; + } + + pCont[contLen] = '\0'; + +_OVER: + if (code == 0) { + *ppCont = pCont; + *pContLen = contLen; + } else { + if (pCont != NULL) taosMemoryFree(pCont); + } + if (pFile != NULL) taosCloseFile(&pFile); + return code; +} + +static int32_t taosAnalJsonBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { + char buf[64] = {0}; + int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { + char buf[128] = {0}; + int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { + char buf[128] = {0}; + int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteStr(SAnalBuf *pBuf, const char *buf, int32_t bufLen) { + if (bufLen <= 0) { + bufLen = strlen(buf); + } + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteStart(SAnalBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); } + +static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { + pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); + if (pBuf->filePtr == NULL) { + return terrno; + } + + pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalColBuf)); + if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY; + pBuf->numOfCols = numOfCols; + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON) { + return taosAnalJsonBufWriteStart(pBuf); + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SAnalColBuf *pCol = &pBuf->pCols[i]; + snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i); + pCol->filePtr = + taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); + if (pCol->filePtr == NULL) { + return terrno; + } + } + + return taosAnalJsonBufWriteStart(pBuf); +} + +static int32_t taosAnalJsonBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { + char buf[128] = {0}; + bool first = (colIndex == 0); + bool last = (colIndex == pBuf->numOfCols - 1); + + if (first) { + if (taosAnalJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) { + return terrno; + } + } + + int32_t bufLen = snprintf(buf, sizeof(buf), " [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name, + tDataTypes[colType].bytes, last ? "" : ","); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + + if (last) { + if (taosAnalJsonBufWriteStr(pBuf, "],\n", 0) != 0) { + return terrno; + } + } + + return 0; +} + +static int32_t taosAnalJsonBufWriteDataBegin(SAnalBuf *pBuf) { + return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0); +} + +static int32_t taosAnalJsonBufWriteStrUseCol(SAnalBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) { + if (bufLen <= 0) { + bufLen = strlen(buf); + } + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON) { + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + } else { + if (taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen) != bufLen) { + return terrno; + } + } + + return 0; +} + +static int32_t taosAnalJsonBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { + return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex); +} + +static int32_t taosAnalJsonBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { + if (colIndex == pBuf->numOfCols - 1) { + return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex); + + } else { + return taosAnalJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex); + } +} + +static int32_t taosAnalJsonBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { + char buf[64]; + int32_t bufLen = 0; + + if (pBuf->pCols[colIndex].numOfRows != 0) { + buf[bufLen] = ','; + buf[bufLen + 1] = '\n'; + buf[bufLen + 2] = 0; + bufLen += 2; + } + + switch (colType) { + case TSDB_DATA_TYPE_BOOL: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0); + break; + case TSDB_DATA_TYPE_TINYINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue); + break; + case TSDB_DATA_TYPE_UTINYINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue); + break; + case TSDB_DATA_TYPE_SMALLINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue); + break; + case TSDB_DATA_TYPE_USMALLINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue); + break; + case TSDB_DATA_TYPE_INT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue); + break; + case TSDB_DATA_TYPE_UINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue); + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64 "", *(int64_t *)colValue); + break; + case TSDB_DATA_TYPE_UBIGINT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64 "", *(uint64_t *)colValue); + break; + case TSDB_DATA_TYPE_FLOAT: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue)); + break; + case TSDB_DATA_TYPE_DOUBLE: + bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue)); + break; + default: + buf[bufLen] = '\0'; + } + + pBuf->pCols[colIndex].numOfRows++; + return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex); +} + +static int32_t taosAnalJsonBufWriteDataEnd(SAnalBuf *pBuf) { + int32_t code = 0; + char *pCont = NULL; + int64_t contLen = 0; + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + for (int32_t i = 0; i < pBuf->numOfCols; ++i) { + SAnalColBuf *pCol = &pBuf->pCols[i]; + + code = taosFsyncFile(pCol->filePtr); + if (code != 0) return code; + + code = taosCloseFile(&pCol->filePtr); + if (code != 0) return code; + + code = taosAnalJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen); + if (code != 0) return code; + + code = taosAnalJsonBufWriteStr(pBuf, pCont, contLen); + if (code != 0) return code; + + taosMemoryFreeClear(pCont); + contLen = 0; + } + } + + return taosAnalJsonBufWriteStr(pBuf, "],\n", 0); +} + +static int32_t taosAnalJsonBufWriteEnd(SAnalBuf *pBuf) { + int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows); + if (code != 0) return code; + + return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0); +} + +int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) { + int32_t code = taosAnalJsonBufWriteEnd(pBuf); + if (code != 0) return code; + + if (pBuf->filePtr != NULL) { + code = taosFsyncFile(pBuf->filePtr); + if (code != 0) return code; + code = taosCloseFile(&pBuf->filePtr); + if (code != 0) return code; + } + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + for (int32_t i = 0; i < pBuf->numOfCols; ++i) { + SAnalColBuf *pCol = &pBuf->pCols[i]; + if (pCol->filePtr != NULL) { + code = taosFsyncFile(pCol->filePtr); + if (code != 0) return code; + code = taosCloseFile(&pCol->filePtr); + if (code != 0) return code; + } + } + } + + return 0; +} + +void taosAnalBufDestroy(SAnalBuf *pBuf) { + if (pBuf->fileName[0] != 0) { + if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr); + // taosRemoveFile(pBuf->fileName); + pBuf->fileName[0] = 0; + } + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + for (int32_t i = 0; i < pBuf->numOfCols; ++i) { + SAnalColBuf *pCol = &pBuf->pCols[i]; + if (pCol->fileName[0] != 0) { + if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr); + taosRemoveFile(pCol->fileName); + pCol->fileName[0] = 0; + } + } + } + + taosMemoryFreeClear(pBuf->pCols); + pBuf->numOfCols = 0; +} + +int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return tsosAnalJsonBufOpen(pBuf, numOfCols); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteDataBegin(pBuf); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColBegin(pBuf, colIndex); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColEnd(pBuf, colIndex); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteDataEnd(pBuf); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufClose(SAnalBuf *pBuf) { + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufClose(pBuf); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen) { + *ppCont = NULL; + *pContLen = 0; + + if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen); + } else { + return TSDB_CODE_ANAL_BUF_INVALID_TYPE; + } +} + +#else + +int32_t taosAnalInit() { return 0; } +void taosAnalCleanup() {} +SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { return NULL; } + +int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } +bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } +bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue) { return true; } +int64_t taosAnalGetVersion() { return 0; } +void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {} + +int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { return 0; } +int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { return 0; } +int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { return 0; } +int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { return 0; } +int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; } +int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) { return 0; } +int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { return 0; } +int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } +int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { return 0; } +int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) { return 0; } +int32_t taosAnalBufClose(SAnalBuf *pBuf) { return 0; } +void taosAnalBufDestroy(SAnalBuf *pBuf) {} + +const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; } +EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; } +const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; } + +#endif \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index df104508da..3598262d5d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -345,6 +345,21 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB, "Stream temporarily do TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_ALREADY_EXIST, "Anode already exists") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_NOT_EXIST, "Anode not there") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_URL, "Anode too long url") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_INVALID_PROTOCOL, "Anode invalid protocol") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_INVALID_VERSION, "Anode invalid version") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO, "Anode too many algorithm") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME, "Anode too long algorithm name") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE, "Anode too many algorithm type") + +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis url response is NULL") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis url can't access") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm not found") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded") +TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type") + // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "sma not exist") @@ -708,6 +723,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_ERROR, "Pseudo tag tbname n TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_TYPE, "ANOMALY_WINDOW only support mathable column") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_COL, "ANOMALY_WINDOW not support on tag column") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT, "ANOMALY_WINDOW option should include algo field") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE, "Invalid forecast clause") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 0f2504ff5e..314f205057 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -194,6 +194,10 @@ int32_t tjsonGetObjectValueString(const SJson* pJson, char** pValueString) { return TSDB_CODE_SUCCESS; } +void tjsonGetObjectValueBigInt(const SJson* pJson, int64_t* pVal) { *pVal = (int64_t)((cJSON*)pJson)->valuedouble; } + +void tjsonGetObjectValueDouble(const SJson* pJson, double* pVal) { *pVal = ((cJSON*)pJson)->valuedouble; } + int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal) { char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); if (NULL == p) {