Merge pull request #28265 from taosdata/feat/anode1

feat: anode
This commit is contained in:
Shengliang Guan 2024-10-09 18:48:55 +08:00 committed by GitHub
commit ef39c9c41e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 2100 additions and 47 deletions

View File

@ -144,6 +144,12 @@ option(
OFF
)
option(
BUILD_WITH_ANALYSIS
"If build with analysis"
ON
)
ENDIF ()
IF(NOT TD_ENTERPRISE)
@ -151,6 +157,13 @@ MESSAGE("switch s3 off with community version")
set(BUILD_S3 OFF)
set(BUILD_WITH_S3 OFF)
set(BUILD_WITH_COS OFF)
set(BUILD_WITH_ANALYSIS OFF)
ENDIF ()
IF(${BUILD_WITH_ANALYSIS})
message("build with analysis")
set(BUILD_S3 ON)
set(BUILD_WITH_S3 ON)
ENDIF()
IF(${BUILD_S3})

View File

@ -29,6 +29,8 @@ extern "C" {
#define TSDB_INS_TABLE_QNODES "ins_qnodes"
#define TSDB_INS_TABLE_BNODES "ins_bnodes" // no longer used
#define TSDB_INS_TABLE_SNODES "ins_snodes"
#define TSDB_INS_TABLE_ANODES "ins_anodes"
#define TSDB_INS_TABLE_ANODES_FULL "ins_anodes_full"
#define TSDB_INS_TABLE_ARBGROUPS "ins_arbgroups"
#define TSDB_INS_TABLE_CLUSTER "ins_cluster"
#define TSDB_INS_TABLE_DATABASES "ins_databases"

96
include/common/tanal.h Normal file
View File

@ -0,0 +1,96 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_ANAL_H_
#define _TD_UTIL_ANAL_H_
#include "os.h"
#include "tdef.h"
#include "thash.h"
#include "tjson.h"
#ifdef __cplusplus
extern "C" {
#endif
#define ANAL_FORECAST_DEFAULT_PERIOD 10
#define ANAL_FORECAST_DEFAULT_ROWS 10
#define ANAL_FORECAST_DEFAULT_CONF 95
#define ANAL_FORECAST_DEFAULT_ALPHA 0.05
#define ANAL_FORECAST_DEFAULT_PARAM "diff"
typedef struct {
EAnalAlgoType type;
int32_t anode;
int32_t urlLen;
char *url;
} SAnalUrl;
typedef enum {
ANAL_BUF_TYPE_JSON = 0,
ANAL_BUF_TYPE_JSON_COL = 1,
ANAL_BUF_TYPE_OTHERS,
} EAnalBufType;
typedef enum {
ANAL_HTTP_TYPE_GET = 0,
ANAL_HTTP_TYPE_POST,
} EAnalHttpType;
typedef struct {
TdFilePtr filePtr;
char fileName[TSDB_FILENAME_LEN + 10];
int64_t numOfRows;
} SAnalColBuf;
typedef struct {
EAnalBufType bufType;
TdFilePtr filePtr;
char fileName[TSDB_FILENAME_LEN];
int32_t numOfCols;
SAnalColBuf *pCols;
} SAnalBuf;
int32_t taosAnalInit();
void taosAnalCleanup();
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf);
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen);
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen);
bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue);
int64_t taosAnalGetVersion();
void taosAnalUpdate(int64_t newVer, SHashObj *pHash);
int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols);
int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal);
int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal);
int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal);
int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName);
int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf);
int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex);
int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue);
int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex);
int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf);
int32_t taosAnalBufClose(SAnalBuf *pBuf);
void taosAnalBufDestroy(SAnalBuf *pBuf);
const char *taosAnalAlgoStr(EAnalAlgoType algoType);
EAnalAlgoType taosAnalAlgoInt(const char *algoName);
const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType);
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_ANAL_H_*/

View File

@ -159,6 +159,8 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_ARBGROUP,
TSDB_MGMT_TABLE_ENCRYPTIONS,
TSDB_MGMT_TABLE_USER_FULL,
TSDB_MGMT_TABLE_ANODE,
TSDB_MGMT_TABLE_ANODE_FULL,
TSDB_MGMT_TABLE_MAX,
} EShowType;
@ -260,6 +262,7 @@ typedef enum ENodeType {
QUERY_NODE_COUNT_WINDOW,
QUERY_NODE_COLUMN_OPTIONS,
QUERY_NODE_TSMA_OPTIONS,
QUERY_NODE_ANOMALY_WINDOW,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
@ -345,6 +348,9 @@ typedef enum ENodeType {
QUERY_NODE_CREATE_VIEW_STMT,
QUERY_NODE_DROP_VIEW_STMT,
QUERY_NODE_CREATE_SUBTABLE_FROM_FILE_CLAUSE,
QUERY_NODE_CREATE_ANODE_STMT,
QUERY_NODE_DROP_ANODE_STMT,
QUERY_NODE_UPDATE_ANODE_STMT,
// show statement nodes
// see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET'
@ -386,6 +392,8 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT,
QUERY_NODE_SHOW_ENCRYPTIONS_STMT,
QUERY_NODE_SHOW_TSMAS_STMT,
QUERY_NODE_SHOW_ANODES_STMT,
QUERY_NODE_SHOW_ANODES_FULL_STMT,
QUERY_NODE_CREATE_TSMA_STMT,
QUERY_NODE_SHOW_CREATE_TSMA_STMT,
QUERY_NODE_DROP_TSMA_STMT,
@ -408,6 +416,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN,
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE,
QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL,
QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC,
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100,
@ -458,6 +467,9 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY,
QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY,
QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC,
} ENodeType;
typedef struct {
@ -1092,6 +1104,22 @@ typedef struct {
int32_t tSerializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
typedef struct {
int32_t dnodeId;
int64_t analVer;
} SRetrieveAnalAlgoReq;
typedef struct {
int64_t ver;
SHashObj* hash; // algoname:algotype -> SAnalUrl
} SRetrieveAnalAlgoRsp;
int32_t tSerializeRetrieveAnalAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalAlgoReq* pReq);
int32_t tDeserializeRetrieveAnalAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalAlgoReq* pReq);
int32_t tSerializeRetrieveAnalAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalAlgoRsp* pRsp);
int32_t tDeserializeRetrieveAnalAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalAlgoRsp* pRsp);
void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp* pRsp);
typedef struct {
int8_t alterType;
int8_t superUser;
@ -1766,6 +1794,7 @@ typedef struct {
SArray* pVloads; // array of SVnodeLoad
int32_t statusSeq;
int64_t ipWhiteVer;
int64_t analVer;
} SStatusReq;
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
@ -1831,6 +1860,7 @@ typedef struct {
SArray* pDnodeEps; // Array of SDnodeEp
int32_t statusSeq;
int64_t ipWhiteVer;
int64_t analVer;
} SStatusRsp;
int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
@ -2377,6 +2407,30 @@ typedef struct {
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
typedef struct {
int32_t urlLen;
int32_t sqlLen;
char* url;
char* sql;
} SMCreateAnodeReq;
int32_t tSerializeSMCreateAnodeReq(void* buf, int32_t bufLen, SMCreateAnodeReq* pReq);
int32_t tDeserializeSMCreateAnodeReq(void* buf, int32_t bufLen, SMCreateAnodeReq* pReq);
void tFreeSMCreateAnodeReq(SMCreateAnodeReq* pReq);
typedef struct {
int32_t anodeId;
int32_t sqlLen;
char* sql;
} SMDropAnodeReq, SMUpdateAnodeReq;
int32_t tSerializeSMDropAnodeReq(void* buf, int32_t bufLen, SMDropAnodeReq* pReq);
int32_t tDeserializeSMDropAnodeReq(void* buf, int32_t bufLen, SMDropAnodeReq* pReq);
void tFreeSMDropAnodeReq(SMDropAnodeReq* pReq);
int32_t tSerializeSMUpdateAnodeReq(void* buf, int32_t bufLen, SMUpdateAnodeReq* pReq);
int32_t tDeserializeSMUpdateAnodeReq(void* buf, int32_t bufLen, SMUpdateAnodeReq* pReq);
void tFreeSMUpdateAnodeReq(SMUpdateAnodeReq* pReq);
typedef struct {
int32_t vgId;
int32_t hbSeq;

View File

@ -125,6 +125,11 @@
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_ENCRYPT_KEY, "create-encrypt-key", NULL, NULL)
// mnode msg overload
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ANODE, "create-anode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_ANODE, "update-anode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_ANODE, "drop-anode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_ANAL_ALGO, "retrieve-anal-algo", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_DND_MSG)
TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8

View File

@ -62,6 +62,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_UNIQUE,
FUNCTION_TYPE_STATE_COUNT,
FUNCTION_TYPE_STATE_DURATION,
FUNCTION_TYPE_FORECAST,
// math function
FUNCTION_TYPE_ABS = 1000,
@ -149,6 +150,9 @@ typedef enum EFunctionType {
FUNCTION_TYPE_TBUID,
FUNCTION_TYPE_VGID,
FUNCTION_TYPE_VGVER,
FUNCTION_TYPE_FORECAST_LOW,
FUNCTION_TYPE_FORECAST_HIGH,
FUNCTION_TYPE_FORECAST_ROWTS,
// internal function
FUNCTION_TYPE_SELECT_VALUE = 3750,
@ -263,6 +267,7 @@ bool fmIsForbidSysTableFunc(int32_t funcId);
bool fmIsIntervalInterpoFunc(int32_t funcId);
bool fmIsInterpFunc(int32_t funcId);
bool fmIsLastRowFunc(int32_t funcId);
bool fmIsForecastFunc(int32_t funcId);
bool fmIsNotNullOutputFunc(int32_t funcId);
bool fmIsSelectValueFunc(int32_t funcId);
bool fmIsSystemInfoFunc(int32_t funcId);
@ -272,6 +277,7 @@ bool fmIsMultiRowsFunc(int32_t funcId);
bool fmIsKeepOrderFunc(int32_t funcId);
bool fmIsCumulativeFunc(int32_t funcId);
bool fmIsInterpPseudoColumnFunc(int32_t funcId);
bool fmIsForecastPseudoColumnFunc(int32_t funcId);
bool fmIsGroupKeyFunc(int32_t funcId);
bool fmIsBlockDistFunc(int32_t funcId);
bool fmIsIgnoreNullFunc(int32_t funcId);

View File

@ -318,6 +318,21 @@ typedef struct SAlterDnodeStmt {
char value[TSDB_DNODE_VALUE_LEN];
} SAlterDnodeStmt;
typedef struct {
ENodeType type;
char url[TSDB_ANAL_ANODE_URL_LEN];
} SCreateAnodeStmt;
typedef struct {
ENodeType type;
int32_t anodeId;
} SDropAnodeStmt;
typedef struct {
ENodeType type;
int32_t anodeId;
} SUpdateAnodeStmt;
typedef struct SShowStmt {
ENodeType type;
SNode* pDbName; // SValueNode

View File

@ -204,6 +204,11 @@ typedef struct SInterpFuncLogicNode {
SNode* pTimeSeries; // SColumnNode
} SInterpFuncLogicNode;
typedef struct SForecastFuncLogicNode {
SLogicNode node;
SNodeList* pFuncs;
} SForecastFuncLogicNode;
typedef struct SGroupCacheLogicNode {
SLogicNode node;
bool grpColsMayBeNull;
@ -274,7 +279,8 @@ typedef enum EWindowType {
WINDOW_TYPE_SESSION,
WINDOW_TYPE_STATE,
WINDOW_TYPE_EVENT,
WINDOW_TYPE_COUNT
WINDOW_TYPE_COUNT,
WINDOW_TYPE_ANOMALY
} EWindowType;
typedef enum EWindowAlgorithm {
@ -315,6 +321,8 @@ typedef struct SWindowLogicNode {
int64_t windowCount;
int64_t windowSliding;
SNodeList* pTsmaSubplans;
SNode* pAnomalyExpr;
char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN];
} SWindowLogicNode;
typedef struct SFillLogicNode {
@ -507,6 +515,12 @@ typedef struct SInterpFuncPhysiNode {
SNode* pTimeSeries; // SColumnNode
} SInterpFuncPhysiNode;
typedef struct SForecastFuncPhysiNode {
SPhysiNode node;
SNodeList* pExprs;
SNodeList* pFuncs;
} SForecastFuncPhysiNode;
typedef struct SSortMergeJoinPhysiNode {
SPhysiNode node;
EJoinType joinType;
@ -704,6 +718,12 @@ typedef struct SCountWinodwPhysiNode {
typedef SCountWinodwPhysiNode SStreamCountWinodwPhysiNode;
typedef struct SAnomalyWindowPhysiNode {
SWindowPhysiNode window;
SNode* pAnomalyKey;
char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN];
} SAnomalyWindowPhysiNode;
typedef struct SSortPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function

View File

@ -347,6 +347,13 @@ typedef struct SCountWindowNode {
int64_t windowSliding;
} SCountWindowNode;
typedef struct SAnomalyWindowNode {
ENodeType type; // QUERY_NODE_ANOMALY_WINDOW
SNode* pCol; // timestamp primary key
SNode* pExpr;
char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN];
} SAnomalyWindowNode;
typedef enum EFillMode {
FILL_MODE_NONE = 1,
FILL_MODE_VALUE,
@ -442,6 +449,8 @@ typedef struct SSelectStmt {
bool hasTailFunc;
bool hasInterpFunc;
bool hasInterpPseudoColFunc;
bool hasForecastFunc;
bool hasForecastPseudoColFunc;
bool hasLastRowFunc;
bool hasLastFunc;
bool hasTimeLineFunc;

View File

@ -410,25 +410,25 @@ void* getTaskPoolWorkerCb();
#define qFatal(...) \
do { \
if (qDebugFlag & DEBUG_FATAL) { \
taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qError(...) \
do { \
if (qDebugFlag & DEBUG_ERROR) { \
taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qWarn(...) \
do { \
if (qDebugFlag & DEBUG_WARN) { \
taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qInfo(...) \
do { \
if (qDebugFlag & DEBUG_INFO) { \
taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebug(...) \

View File

@ -139,6 +139,7 @@ int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t forecastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);

View File

@ -476,6 +476,22 @@ int32_t taosGetErrSize();
#define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429)
#define TSDB_CODE_MNODE_STOPPED TAOS_DEF_ERROR_CODE(0, 0x042A)
// anode
#define TSDB_CODE_MND_ANODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0430)
#define TSDB_CODE_MND_ANODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0431)
#define TSDB_CODE_MND_ANODE_TOO_LONG_URL TAOS_DEF_ERROR_CODE(0, 0x0432)
#define TSDB_CODE_MND_ANODE_INVALID_PROTOCOL TAOS_DEF_ERROR_CODE(0, 0x0433)
#define TSDB_CODE_MND_ANODE_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0434)
#define TSDB_CODE_MND_ANODE_TOO_MANY_ALGO TAOS_DEF_ERROR_CODE(0, 0x0435)
#define TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME TAOS_DEF_ERROR_CODE(0, 0x0436)
#define TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE TAOS_DEF_ERROR_CODE(0, 0x0437)
// analysis
#define TSDB_CODE_ANAL_URL_RSP_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x0440)
#define TSDB_CODE_ANAL_URL_CANT_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0441)
#define TSDB_CODE_ANAL_ALGO_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0442)
#define TSDB_CODE_ANAL_ALGO_NOT_LOAD TAOS_DEF_ERROR_CODE(0, 0x0443)
#define TSDB_CODE_ANAL_BUF_INVALID_TYPE TAOS_DEF_ERROR_CODE(0, 0x0444)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
@ -868,6 +884,10 @@ int32_t taosGetErrSize();
#define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F)
#define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680)
#define TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2681)
#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_TYPE TAOS_DEF_ERROR_CODE(0, 0x2682)
#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_COL TAOS_DEF_ERROR_CODE(0, 0x2683)
#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT TAOS_DEF_ERROR_CODE(0, 0x2684)
#define TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x2685)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner

View File

@ -293,6 +293,12 @@ typedef enum ELogicConditionType {
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000
#define TSDB_LOG_VAR_LEN 32
#define TSDB_ANAL_ANODE_URL_LEN 128
#define TSDB_ANAL_ALGO_NAME_LEN 64
#define TSDB_ANAL_ALGO_TYPE_LEN 24
#define TSDB_ANAL_ALGO_KEY_LEN (TSDB_ANAL_ALGO_NAME_LEN + 9)
#define TSDB_ANAL_ALGO_URL_LEN (TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN + 1)
#define TSDB_ANAL_ALGO_OPTION_LEN 256
#define TSDB_MAX_EP_NUM 10
@ -603,6 +609,13 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 };
#define MONITOR_TAG_NAME_LEN 100
#define MONITOR_TAG_VALUE_LEN 300
#define MONITOR_METRIC_NAME_LEN 100
typedef enum {
ANAL_ALGO_TYPE_ANOMALY_DETECT = 0,
ANAL_ALGO_TYPE_FORECAST = 1,
ANAL_ALGO_TYPE_END,
} EAnalAlgoType;
#ifdef __cplusplus
}
#endif

View File

@ -68,6 +68,8 @@ int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem);
SJson* tjsonGetObjectItem(const SJson* pJson, const char* pName);
int32_t tjsonGetObjectName(const SJson* pJson, char** pName);
int32_t tjsonGetObjectValueString(const SJson* pJson, char** pStringValue);
void tjsonGetObjectValueBigInt(const SJson* pJson, int64_t* pVal);
void tjsonGetObjectValueDouble(const SJson* pJson, double* pVal);
int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal);
int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal);
int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal);

View File

@ -47,6 +47,10 @@ target_link_libraries(
INTERFACE api
)
if(${BUILD_WITH_ANALYSIS})
add_definitions(-DUSE_ANAL)
endif()
if(${BUILD_S3})
if(${BUILD_WITH_S3})

View File

@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "libs/function/tudf.h"
#include "tanal.h"
static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
int32_t code = 0;
@ -77,7 +78,11 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
}
if ((code = udfStartUdfd(pMgmt->pData->dnodeId)) != 0) {
dError("failed to start udfd");
dError("failed to start udfd since %s", tstrerror(code));
}
if ((code = taosAnalInit()) != 0) {
dError("failed to init analysis env since %s", tstrerror(code));
}
pOutput->pMgmt = pMgmt;

View File

@ -141,6 +141,9 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ANODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_UPDATE_ANODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_ANODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
@ -180,6 +183,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_ANAL_ALGO, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -20,6 +20,7 @@
#include "libs/function/tudf.h"
#include "tgrant.h"
#include "tcompare.h"
#include "tanal.h"
// clang-format on
#define DM_INIT_AUDIT() \
@ -214,6 +215,7 @@ void dmCleanup() {
dError("failed to close udfc");
}
udfStopUdfd();
taosAnalCleanup();
taosStopCacheRefreshWorker();
(void)dmDiskClose();
DestroyRegexCache();

View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_ANODE_H_
#define _TD_MND_ANODE_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mndInitAnode(SMnode *pMnode);
void mndCleanupAnode(SMnode *pMnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_ANODE_H_*/

View File

@ -78,6 +78,9 @@ typedef enum {
MND_OPER_DROP_VIEW,
MND_OPER_CONFIG_CLUSTER,
MND_OPER_BALANCE_VGROUP_LEADER,
MND_OPER_CREATE_ANODE,
MND_OPER_UPDATE_ANODE,
MND_OPER_DROP_ANODE
} EOperType;
typedef enum {
@ -232,6 +235,24 @@ typedef struct {
char machineId[TSDB_MACHINE_ID_LEN + 1];
} SDnodeObj;
typedef struct {
int32_t nameLen;
char* name;
} SAnodeAlgo;
typedef struct {
int32_t id;
int64_t createdTime;
int64_t updateTime;
int32_t version;
int32_t urlLen;
int32_t numOfAlgos;
int32_t status;
SRWLatch lock;
char* url;
SArray** algos;
} SAnodeObj;
typedef struct {
int32_t id;
int64_t createdTime;

View File

@ -0,0 +1,902 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndAnode.h"
#include "audit.h"
#include "mndDnode.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "tanal.h"
#include "tjson.h"
#ifdef USE_ANAL
#define TSDB_ANODE_VER_NUMBER 1
#define TSDB_ANODE_RESERVE_SIZE 64
static SSdbRaw *mndAnodeActionEncode(SAnodeObj *pObj);
static SSdbRow *mndAnodeActionDecode(SSdbRaw *pRaw);
static int32_t mndAnodeActionInsert(SSdb *pSdb, SAnodeObj *pObj);
static int32_t mndAnodeActionUpdate(SSdb *pSdb, SAnodeObj *pOld, SAnodeObj *pNew);
static int32_t mndAnodeActionDelete(SSdb *pSdb, SAnodeObj *pObj);
static int32_t mndProcessCreateAnodeReq(SRpcMsg *pReq);
static int32_t mndProcessUpdateAnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropAnodeReq(SRpcMsg *pReq);
static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq);
static int32_t mndRetrieveAnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextAnode(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextAnodeFull(SMnode *pMnode, void *pIter);
static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj);
static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status);
int32_t mndInitAnode(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_ANODE,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndAnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndAnodeActionDecode,
.insertFp = (SdbInsertFp)mndAnodeActionInsert,
.updateFp = (SdbUpdateFp)mndAnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndAnodeActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ANODE, mndProcessCreateAnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_ANODE, mndProcessUpdateAnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_ANODE, mndProcessDropAnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_ANAL_ALGO, mndProcessAnalAlgoReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE, mndRetrieveAnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ANODE, mndCancelGetNextAnode);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE_FULL, mndRetrieveAnodesFull);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ANODE_FULL, mndCancelGetNextAnodeFull);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupAnode(SMnode *pMnode) {}
SAnodeObj *mndAcquireAnode(SMnode *pMnode, int32_t anodeId) {
SAnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_ANODE, &anodeId);
if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_ANODE_NOT_EXIST;
}
return pObj;
}
void mndReleaseAnode(SMnode *pMnode, SAnodeObj *pObj) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pObj);
}
static SSdbRaw *mndAnodeActionEncode(SAnodeObj *pObj) {
int32_t code = 0;
int32_t lino = 0;
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t rawDataLen = sizeof(SAnodeObj) + TSDB_ANODE_RESERVE_SIZE + pObj->urlLen;
for (int32_t t = 0; t < pObj->numOfAlgos; ++t) {
SArray *algos = pObj->algos[t];
for (int32_t a = 0; a < (int32_t)taosArrayGetSize(algos); ++a) {
SAnodeAlgo *algo = taosArrayGet(algos, a);
rawDataLen += (2 * sizeof(int32_t) + algo->nameLen);
}
rawDataLen += sizeof(int32_t);
}
SSdbRaw *pRaw = sdbAllocRaw(SDB_ANODE, TSDB_ANODE_VER_NUMBER, rawDataLen);
if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
SDB_SET_INT32(pRaw, dataPos, pObj->version, _OVER)
SDB_SET_INT32(pRaw, dataPos, pObj->urlLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pObj->url, pObj->urlLen, _OVER)
SDB_SET_INT32(pRaw, dataPos, pObj->numOfAlgos, _OVER)
for (int32_t i = 0; i < pObj->numOfAlgos; ++i) {
SArray *algos = pObj->algos[i];
SDB_SET_INT32(pRaw, dataPos, (int32_t)taosArrayGetSize(algos), _OVER)
for (int32_t j = 0; j < (int32_t)taosArrayGetSize(algos); ++j) {
SAnodeAlgo *algo = taosArrayGet(algos, j);
SDB_SET_INT32(pRaw, dataPos, algo->nameLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, algo->name, algo->nameLen, _OVER)
SDB_SET_INT32(pRaw, dataPos, 0, _OVER) // reserved
}
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_ANODE_RESERVE_SIZE, _OVER)
terrno = 0;
_OVER:
if (terrno != 0) {
mError("anode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("anode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
return pRaw;
}
static SSdbRow *mndAnodeActionDecode(SSdbRaw *pRaw) {
int32_t code = 0;
int32_t lino = 0;
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SAnodeObj *pObj = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_ANODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
pRow = sdbAllocRow(sizeof(SAnodeObj));
if (pRow == NULL) goto _OVER;
pObj = sdbGetRowObj(pRow);
if (pObj == NULL) goto _OVER;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pObj->version, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pObj->urlLen, _OVER)
if (pObj->urlLen > 0) {
pObj->url = taosMemoryCalloc(pObj->urlLen, 1);
if (pObj->url == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, pObj->url, pObj->urlLen, _OVER)
}
SDB_GET_INT32(pRaw, dataPos, &pObj->numOfAlgos, _OVER)
if (pObj->numOfAlgos > 0) {
pObj->algos = taosMemoryCalloc(pObj->numOfAlgos, sizeof(SArray *));
if (pObj->algos == NULL) {
goto _OVER;
}
}
for (int32_t i = 0; i < pObj->numOfAlgos; ++i) {
int32_t numOfAlgos = 0;
SDB_GET_INT32(pRaw, dataPos, &numOfAlgos, _OVER)
pObj->algos[i] = taosArrayInit(2, sizeof(SAnodeAlgo));
if (pObj->algos[i] == NULL) goto _OVER;
for (int32_t j = 0; j < numOfAlgos; ++j) {
SAnodeAlgo algoObj = {0};
int32_t reserved = 0;
SDB_GET_INT32(pRaw, dataPos, &algoObj.nameLen, _OVER)
if (algoObj.nameLen > 0) {
algoObj.name = taosMemoryCalloc(algoObj.nameLen, 1);
if (algoObj.name == NULL) goto _OVER;
}
SDB_GET_BINARY(pRaw, dataPos, algoObj.name, algoObj.nameLen, _OVER)
SDB_GET_INT32(pRaw, dataPos, &reserved, _OVER);
if (taosArrayPush(pObj->algos[i], &algoObj) == NULL) goto _OVER;
}
}
SDB_GET_RESERVE(pRaw, dataPos, TSDB_ANODE_RESERVE_SIZE, _OVER)
terrno = 0;
_OVER:
if (terrno != 0) {
mError("anode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
if (pObj != NULL) {
taosMemoryFreeClear(pObj->url);
}
taosMemoryFreeClear(pRow);
return NULL;
}
mTrace("anode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
return pRow;
}
static void mndFreeAnode(SAnodeObj *pObj) {
taosMemoryFreeClear(pObj->url);
for (int32_t i = 0; i < pObj->numOfAlgos; ++i) {
SArray *algos = pObj->algos[i];
for (int32_t j = 0; j < (int32_t)taosArrayGetSize(algos); ++j) {
SAnodeAlgo *algo = taosArrayGet(algos, j);
taosMemoryFreeClear(algo->name);
}
taosArrayDestroy(algos);
}
taosMemoryFreeClear(pObj->algos);
}
static int32_t mndAnodeActionInsert(SSdb *pSdb, SAnodeObj *pObj) {
mTrace("anode:%d, perform insert action, row:%p", pObj->id, pObj);
return 0;
}
static int32_t mndAnodeActionDelete(SSdb *pSdb, SAnodeObj *pObj) {
mTrace("anode:%d, perform delete action, row:%p", pObj->id, pObj);
mndFreeAnode(pObj);
return 0;
}
static int32_t mndAnodeActionUpdate(SSdb *pSdb, SAnodeObj *pOld, SAnodeObj *pNew) {
mTrace("anode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
taosWLockLatch(&pOld->lock);
int32_t numOfAlgos = pNew->numOfAlgos;
void *algos = pNew->algos;
pNew->numOfAlgos = pOld->numOfAlgos;
pNew->algos = pOld->algos;
pOld->numOfAlgos = numOfAlgos;
pOld->algos = algos;
pOld->updateTime = pNew->updateTime;
pOld->version = pNew->version;
taosWUnLockLatch(&pOld->lock);
return 0;
}
static int32_t mndSetCreateAnodeRedoLogs(STrans *pTrans, SAnodeObj *pObj) {
int32_t code = 0;
SSdbRaw *pRedoRaw = mndAnodeActionEncode(pObj);
if (pRedoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
TAOS_RETURN(code);
}
static int32_t mndSetCreateAnodeUndoLogs(STrans *pTrans, SAnodeObj *pObj) {
int32_t code = 0;
SSdbRaw *pUndoRaw = mndAnodeActionEncode(pObj);
if (pUndoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
TAOS_RETURN(code);
}
static int32_t mndSetCreateAnodeCommitLogs(STrans *pTrans, SAnodeObj *pObj) {
int32_t code = 0;
SSdbRaw *pCommitRaw = mndAnodeActionEncode(pObj);
if (pCommitRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
TAOS_RETURN(code);
}
static int32_t mndCreateAnode(SMnode *pMnode, SRpcMsg *pReq, SMCreateAnodeReq *pCreate) {
int32_t code = -1;
STrans *pTrans = NULL;
SAnodeObj anodeObj = {0};
anodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_ANODE);
anodeObj.createdTime = taosGetTimestampMs();
anodeObj.updateTime = anodeObj.createdTime;
anodeObj.version = 0;
anodeObj.urlLen = pCreate->urlLen;
if (anodeObj.urlLen > TSDB_ANAL_ANODE_URL_LEN) {
code = TSDB_CODE_MND_ANODE_TOO_LONG_URL;
goto _OVER;
}
anodeObj.url = taosMemoryCalloc(1, pCreate->urlLen);
if (anodeObj.url == NULL) goto _OVER;
(void)memcpy(anodeObj.url, pCreate->url, pCreate->urlLen);
code = mndGetAnodeAlgoList(anodeObj.url, &anodeObj);
if (code != 0) goto _OVER;
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-anode");
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create anode:%s as anode:%d", pTrans->id, pCreate->url, anodeObj.id);
TAOS_CHECK_GOTO(mndSetCreateAnodeRedoLogs(pTrans, &anodeObj), NULL, _OVER);
TAOS_CHECK_GOTO(mndSetCreateAnodeUndoLogs(pTrans, &anodeObj), NULL, _OVER);
TAOS_CHECK_GOTO(mndSetCreateAnodeCommitLogs(pTrans, &anodeObj), NULL, _OVER);
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
code = 0;
_OVER:
mndFreeAnode(&anodeObj);
mndTransDrop(pTrans);
TAOS_RETURN(code);
}
static SAnodeObj *mndAcquireAnodeByURL(SMnode *pMnode, char *url) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SAnodeObj *pAnode = NULL;
pIter = sdbFetch(pSdb, SDB_ANODE, pIter, (void **)&pAnode);
if (pIter == NULL) break;
if (strcasecmp(url, pAnode->url) == 0) {
sdbCancelFetch(pSdb, pIter);
return pAnode;
}
sdbRelease(pSdb, pAnode);
}
terrno = TSDB_CODE_MND_ANODE_NOT_EXIST;
return NULL;
}
static int32_t mndProcessCreateAnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SAnodeObj *pObj = NULL;
SMCreateAnodeReq createReq = {0};
TAOS_CHECK_GOTO(tDeserializeSMCreateAnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
mInfo("anode:%s, start to create", createReq.url);
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_ANODE), NULL, _OVER);
pObj = mndAcquireAnodeByURL(pMnode, createReq.url);
if (pObj != NULL) {
code = TSDB_CODE_MND_ANODE_ALREADY_EXIST;
goto _OVER;
}
code = mndCreateAnode(pMnode, pReq, &createReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("anode:%s, failed to create since %s", createReq.url, tstrerror(code));
}
mndReleaseAnode(pMnode, pObj);
tFreeSMCreateAnodeReq(&createReq);
TAOS_RETURN(code);
}
static int32_t mndUpdateAnode(SMnode *pMnode, SAnodeObj *pAnode, SRpcMsg *pReq) {
mInfo("anode:%d, start to update", pAnode->id);
int32_t code = -1;
STrans *pTrans = NULL;
SAnodeObj anodeObj = {0};
anodeObj.id = pAnode->id;
anodeObj.updateTime = taosGetTimestampMs();
code = mndGetAnodeAlgoList(pAnode->url, &anodeObj);
if (code != 0) goto _OVER;
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-anode");
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
mInfo("trans:%d, used to update anode:%d", pTrans->id, anodeObj.id);
TAOS_CHECK_GOTO(mndSetCreateAnodeCommitLogs(pTrans, &anodeObj), NULL, _OVER);
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
code = 0;
_OVER:
mndFreeAnode(&anodeObj);
mndTransDrop(pTrans);
TAOS_RETURN(code);
}
static int32_t mndUpdateAllAnodes(SMnode *pMnode, SRpcMsg *pReq) {
mInfo("update all anodes");
SSdb *pSdb = pMnode->pSdb;
int32_t code = 0;
int32_t rows = 0;
int32_t numOfRows = sdbGetSize(pSdb, SDB_ANODE);
void *pIter = NULL;
while (1) {
SAnodeObj *pObj = NULL;
ESdbStatus objStatus = 0;
pIter = sdbFetchAll(pSdb, SDB_ANODE, pIter, (void **)&pObj, &objStatus, true);
if (pIter == NULL) break;
rows++;
void *transReq = NULL;
if (rows == numOfRows) transReq = pReq;
code = mndUpdateAnode(pMnode, pObj, transReq);
sdbRelease(pSdb, pObj);
if (code != 0) break;
}
if (code == 0 && rows == numOfRows) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
return code;
}
static int32_t mndProcessUpdateAnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SAnodeObj *pObj = NULL;
SMUpdateAnodeReq updateReq = {0};
TAOS_CHECK_GOTO(tDeserializeSMUpdateAnodeReq(pReq->pCont, pReq->contLen, &updateReq), NULL, _OVER);
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_UPDATE_ANODE), NULL, _OVER);
if (updateReq.anodeId == -1) {
code = mndUpdateAllAnodes(pMnode, pReq);
} else {
pObj = mndAcquireAnode(pMnode, updateReq.anodeId);
if (pObj == NULL) {
code = TSDB_CODE_MND_ANODE_NOT_EXIST;
goto _OVER;
}
code = mndUpdateAnode(pMnode, pObj, pReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
}
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (updateReq.anodeId != -1) {
mError("anode:%d, failed to update since %s", updateReq.anodeId, tstrerror(code));
}
}
mndReleaseAnode(pMnode, pObj);
tFreeSMUpdateAnodeReq(&updateReq);
TAOS_RETURN(code);
}
static int32_t mndSetDropAnodeRedoLogs(STrans *pTrans, SAnodeObj *pObj) {
int32_t code = 0;
SSdbRaw *pRedoRaw = mndAnodeActionEncode(pObj);
if (pRedoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
TAOS_RETURN(code);
}
static int32_t mndSetDropAnodeCommitLogs(STrans *pTrans, SAnodeObj *pObj) {
int32_t code = 0;
SSdbRaw *pCommitRaw = mndAnodeActionEncode(pObj);
if (pCommitRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
TAOS_RETURN(code);
}
static int32_t mndSetDropAnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SAnodeObj *pObj, bool force) {
if (pObj == NULL) return 0;
TAOS_CHECK_RETURN(mndSetDropAnodeRedoLogs(pTrans, pObj));
TAOS_CHECK_RETURN(mndSetDropAnodeCommitLogs(pTrans, pObj));
return 0;
}
static int32_t mndDropAnode(SMnode *pMnode, SRpcMsg *pReq, SAnodeObj *pObj) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-anode");
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to drop anode:%d", pTrans->id, pObj->id);
TAOS_CHECK_GOTO(mndSetDropAnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER);
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
code = 0;
_OVER:
mndTransDrop(pTrans);
TAOS_RETURN(code);
}
static int32_t mndProcessDropAnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SAnodeObj *pObj = NULL;
SMDropAnodeReq dropReq = {0};
TAOS_CHECK_GOTO(tDeserializeSMDropAnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
mInfo("anode:%d, start to drop", dropReq.anodeId);
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_ANODE), NULL, _OVER);
if (dropReq.anodeId <= 0) {
code = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
pObj = mndAcquireAnode(pMnode, dropReq.anodeId);
if (pObj == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
code = mndDropAnode(pMnode, pReq, pObj);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("anode:%d, failed to drop since %s", dropReq.anodeId, tstrerror(code));
}
mndReleaseAnode(pMnode, pObj);
tFreeSMDropAnodeReq(&dropReq);
TAOS_RETURN(code);
}
static int32_t mndRetrieveAnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SAnodeObj *pObj = NULL;
char buf[TSDB_ANAL_ANODE_URL_LEN + VARSTR_HEADER_SIZE];
char status[64];
int32_t code = 0;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_ANODE, pShow->pIter, (void **)&pObj);
if (pShow->pIter == NULL) break;
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
if (code != 0) goto _end;
STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->url, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
if (code != 0) goto _end;
status[0] = 0;
if (mndGetAnodeStatus(pObj, status) == 0) {
STR_TO_VARSTR(buf, status);
} else {
STR_TO_VARSTR(buf, "offline");
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, buf, false);
if (code != 0) goto _end;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
if (code != 0) goto _end;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->updateTime, false);
if (code != 0) goto _end;
numOfRows++;
sdbRelease(pSdb, pObj);
}
_end:
if (code != 0) sdbRelease(pSdb, pObj);
pShow->numOfRows += numOfRows;
return numOfRows;
}
static void mndCancelGetNextAnode(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetchByType(pSdb, pIter, SDB_ANODE);
}
static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SAnodeObj *pObj = NULL;
char buf[TSDB_ANAL_ALGO_NAME_LEN + VARSTR_HEADER_SIZE];
int32_t code = 0;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_ANODE, pShow->pIter, (void **)&pObj);
if (pShow->pIter == NULL) break;
for (int32_t t = 0; t < pObj->numOfAlgos; ++t) {
SArray *algos = pObj->algos[t];
for (int32_t a = 0; a < taosArrayGetSize(algos); ++a) {
SAnodeAlgo *algo = taosArrayGet(algos, a);
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
if (code != 0) goto _end;
STR_TO_VARSTR(buf, taosAnalAlgoStr(t));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, buf, false);
if (code != 0) goto _end;
STR_TO_VARSTR(buf, algo->name);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, buf, false);
if (code != 0) goto _end;
numOfRows++;
}
}
sdbRelease(pSdb, pObj);
}
_end:
if (code != 0) sdbRelease(pSdb, pObj);
pShow->numOfRows += numOfRows;
return numOfRows;
}
static void mndCancelGetNextAnodeFull(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetchByType(pSdb, pIter, SDB_ANODE);
}
static int32_t mndDecodeAlgoList(SJson *pJson, SAnodeObj *pObj) {
int32_t code = 0;
int32_t protocol = 0;
double tmp = 0;
char buf[TSDB_ANAL_ALGO_NAME_LEN + 1] = {0};
code = tjsonGetDoubleValue(pJson, "protocol", &tmp);
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
protocol = (int32_t)(tmp * 1000);
if (protocol != 100) return TSDB_CODE_MND_ANODE_INVALID_PROTOCOL;
code = tjsonGetDoubleValue(pJson, "version", &tmp);
pObj->version = (int32_t)(tmp * 1000);
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
if (pObj->version <= 0) return TSDB_CODE_MND_ANODE_INVALID_VERSION;
SJson *details = tjsonGetObjectItem(pJson, "details");
if (details == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
int32_t numOfDetails = tjsonGetArraySize(details);
pObj->algos = taosMemoryCalloc(ANAL_ALGO_TYPE_END, sizeof(SArray *));
if (pObj->algos == NULL) return TSDB_CODE_OUT_OF_MEMORY;
pObj->numOfAlgos = ANAL_ALGO_TYPE_END;
for (int32_t i = 0; i < ANAL_ALGO_TYPE_END; ++i) {
pObj->algos[i] = taosArrayInit(4, sizeof(SAnodeAlgo));
if (pObj->algos[i] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t d = 0; d < numOfDetails; ++d) {
SJson *detail = tjsonGetArrayItem(details, d);
if (detail == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
code = tjsonGetStringValue(detail, "type", buf);
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
EAnalAlgoType type = taosAnalAlgoInt(buf);
if (type < 0 || type >= ANAL_ALGO_TYPE_END) continue;
SJson *algos = tjsonGetObjectItem(detail, "algo");
if (algos == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
int32_t numOfAlgos = tjsonGetArraySize(algos);
for (int32_t a = 0; a < numOfAlgos; ++a) {
SJson *algo = tjsonGetArrayItem(algos, a);
if (algo == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
code = tjsonGetStringValue(algo, "name", buf);
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
SAnodeAlgo algoObj = {0};
algoObj.nameLen = strlen(buf) + 1;
if (algoObj.nameLen > TSDB_ANAL_ALGO_NAME_LEN) return TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME;
if (algoObj.nameLen <= 1) return TSDB_CODE_OUT_OF_MEMORY;
algoObj.name = taosMemoryCalloc(algoObj.nameLen, 1);
tstrncpy(algoObj.name, buf, algoObj.nameLen);
if (taosArrayPush(pObj->algos[type], &algoObj) == NULL) return TSDB_CODE_OUT_OF_MEMORY;
}
}
return 0;
}
static int32_t mndGetAnodeAlgoList(const char *url, SAnodeObj *pObj) {
char anodeUrl[TSDB_ANAL_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANAL_ANODE_URL_LEN, "%s/%s", url, "list");
SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANAL_HTTP_TYPE_GET, NULL);
if (pJson == NULL) return terrno;
int32_t code = mndDecodeAlgoList(pJson, pObj);
if (pJson != NULL) tjsonDelete(pJson);
TAOS_RETURN(code);
}
static int32_t mndGetAnodeStatus(SAnodeObj *pObj, char *status) {
int32_t code = 0;
int32_t protocol = 0;
double tmp = 0;
char anodeUrl[TSDB_ANAL_ANODE_URL_LEN + 1] = {0};
snprintf(anodeUrl, TSDB_ANAL_ANODE_URL_LEN, "%s/%s", pObj->url, "status");
SJson *pJson = taosAnalSendReqRetJson(anodeUrl, ANAL_HTTP_TYPE_GET, NULL);
if (pJson == NULL) return terrno;
code = tjsonGetDoubleValue(pJson, "protocol", &tmp);
if (code < 0) {
code = TSDB_CODE_INVALID_JSON_FORMAT;
goto _OVER;
}
protocol = (int32_t)(tmp * 1000);
if (protocol != 100) {
code = TSDB_CODE_MND_ANODE_INVALID_PROTOCOL;
goto _OVER;
}
code = tjsonGetStringValue(pJson, "status", status);
if (code < 0) {
code = TSDB_CODE_INVALID_JSON_FORMAT;
goto _OVER;
}
if (strlen(status) == 0) {
code = TSDB_CODE_MND_ANODE_INVALID_PROTOCOL;
goto _OVER;
}
_OVER:
if (pJson != NULL) tjsonDelete(pJson);
TAOS_RETURN(code);
}
static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t code = -1;
SAnodeObj *pObj = NULL;
SAnalUrl url;
int32_t nameLen;
char name[TSDB_ANAL_ALGO_KEY_LEN];
SRetrieveAnalAlgoReq req = {0};
SRetrieveAnalAlgoRsp rsp = {0};
TAOS_CHECK_GOTO(tDeserializeRetrieveAnalAlgoReq(pReq->pCont, pReq->contLen, &req), NULL, _OVER);
rsp.ver = sdbGetTableVer(pSdb, SDB_ANODE);
if (req.analVer != rsp.ver) {
mInfo("dnode:%d, update analysis old ver:%" PRId64 " to new ver:%" PRId64, req.dnodeId, req.analVer, rsp.ver);
rsp.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
if (rsp.hash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
void *pIter = NULL;
while (1) {
SAnodeObj *pAnode = NULL;
pIter = sdbFetch(pSdb, SDB_ANODE, pIter, (void **)&pAnode);
if (pIter == NULL) break;
url.anode = pAnode->id;
for (int32_t t = 0; t < pAnode->numOfAlgos; ++t) {
SArray *algos = pAnode->algos[t];
url.type = t;
for (int32_t a = 0; a < taosArrayGetSize(algos); ++a) {
SAnodeAlgo *algo = taosArrayGet(algos, a);
nameLen = 1 + snprintf(name, sizeof(name) - 1, "%d:%s", url.type, algo->name);
SAnalUrl *pOldUrl = taosHashAcquire(rsp.hash, name, nameLen);
if (pOldUrl == NULL || (pOldUrl != NULL && pOldUrl->anode < url.anode)) {
if (pOldUrl != NULL) {
taosMemoryFreeClear(pOldUrl->url);
if (taosHashRemove(rsp.hash, name, nameLen) != 0) {
sdbRelease(pSdb, pAnode);
goto _OVER;
}
}
url.url = taosMemoryMalloc(TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN + 1);
if (url.url == NULL) {
sdbRelease(pSdb, pAnode);
goto _OVER;
}
url.urlLen = 1 + snprintf(url.url, TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN, "%s/%s", pAnode->url,
taosAnalAlgoUrlStr(url.type));
if (taosHashPut(rsp.hash, name, nameLen, &url, sizeof(SAnalUrl)) != 0) {
taosMemoryFree(url.url);
sdbRelease(pSdb, pAnode);
goto _OVER;
}
}
}
sdbRelease(pSdb, pAnode);
}
}
}
int32_t contLen = tSerializeRetrieveAnalAlgoRsp(NULL, 0, &rsp);
void *pHead = rpcMallocCont(contLen);
(void)tSerializeRetrieveAnalAlgoRsp(pHead, contLen, &rsp);
pReq->info.rspLen = contLen;
pReq->info.rsp = pHead;
_OVER:
tFreeRetrieveAnalAlgoRsp(&rsp);
TAOS_RETURN(code);
}
#else
static int32_t mndProcessUnsupportReq(SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; }
static int32_t mndRetrieveUnsupport(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
int32_t mndInitAnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ANODE, mndProcessUnsupportReq);
mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_ANODE, mndProcessUnsupportReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_ANODE, mndProcessUnsupportReq);
mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_ANAL_ALGO, mndProcessUnsupportReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE, mndRetrieveUnsupport);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE_FULL, mndRetrieveUnsupport);
return 0;
}
void mndCleanupAnode(SMnode *pMnode) {}
#endif

View File

@ -693,7 +693,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
int64_t clusterid = mndGetClusterId(pMnode);
if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 ", code:0x%x",
mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x",
statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
goto _OVER;
}
@ -730,6 +730,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
int64_t analVer = sdbGetTableVer(pMnode->pSdb, SDB_ANODE);
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pDnode, curMs);
@ -738,9 +739,9 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
bool supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
bool encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum;
bool enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0);
bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged ||
bool analVerChanged = (analVer != statusReq.analVer);
bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
const STraceId *trace = &pReq->info.traceId;
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
@ -863,6 +864,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SStatusRsp statusRsp = {0};
statusRsp.statusSeq++;
statusRsp.analVer = analVer;
statusRsp.dnodeVer = dnodeVer;
statusRsp.dnodeCfg.dnodeId = pDnode->id;
statusRsp.dnodeCfg.clusterId = pMnode->clusterId;

View File

@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "mndAcct.h"
#include "mndArbGroup.h"
#include "mndAnode.h"
#include "mndCluster.h"
#include "mndCompact.h"
#include "mndCompactDetail.h"
@ -605,6 +606,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-anode", mndInitAnode, mndCleanupAnode));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-arbgroup", mndInitArbGroup, mndCleanupArbGroup));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser));

View File

@ -68,6 +68,10 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_QNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, len) == 0) {
type = TSDB_MGMT_TABLE_SNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_ANODES, len) == 0) {
type = TSDB_MGMT_TABLE_ANODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_ANODES_FULL, len) == 0) {
type = TSDB_MGMT_TABLE_ANODE_FULL;
} else if (strncasecmp(name, TSDB_INS_TABLE_ARBGROUPS, len) == 0) {
type = TSDB_MGMT_TABLE_ARBGROUP;
} else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, len) == 0) {

View File

@ -161,7 +161,8 @@ typedef enum {
SDB_COMPACT_DETAIL = 25,
SDB_GRANT = 26, // grant log
SDB_ARBGROUP = 27,
SDB_MAX = 28
SDB_ANODE = 28,
SDB_MAX = 29
} ESdbType;
typedef struct SSdbRaw {

View File

@ -25,6 +25,9 @@
#define SDB_RESERVE_SIZE 512
#define SDB_FILE_VER 1
#define SDB_TABLE_SIZE_EXTRA SDB_MAX
#define SDB_RESERVE_SIZE_EXTRA (512 - (SDB_TABLE_SIZE_EXTRA - SDB_TABLE_SIZE) * 2 * sizeof(int64_t))
static int32_t sdbDeployData(SSdb *pSdb) {
int32_t code = 0;
mInfo("start to deploy sdb");
@ -154,7 +157,38 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
}
}
char reserve[SDB_RESERVE_SIZE] = {0};
// for sdb compatibility
for (int32_t i = SDB_TABLE_SIZE; i < SDB_TABLE_SIZE_EXTRA; ++i) {
int64_t maxId = 0;
ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TAOS_RETURN(code);
}
if (ret != sizeof(int64_t)) {
code = TSDB_CODE_FILE_CORRUPTED;
TAOS_RETURN(code);
}
if (i < SDB_MAX) {
pSdb->maxId[i] = maxId;
}
int64_t ver = 0;
ret = taosReadFile(pFile, &ver, sizeof(int64_t));
if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TAOS_RETURN(code);
}
if (ret != sizeof(int64_t)) {
code = TSDB_CODE_FILE_CORRUPTED;
TAOS_RETURN(code);
}
if (i < SDB_MAX) {
pSdb->tableVer[i] = ver;
}
}
char reserve[SDB_RESERVE_SIZE_EXTRA] = {0};
ret = taosReadFile(pFile, reserve, sizeof(reserve));
if (ret < 0) {
return terrno;
@ -205,7 +239,26 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
}
}
char reserve[SDB_RESERVE_SIZE] = {0};
// for sdb compatibility
for (int32_t i = SDB_TABLE_SIZE; i < SDB_TABLE_SIZE_EXTRA; ++i) {
int64_t maxId = 0;
if (i < SDB_MAX) {
maxId = pSdb->maxId[i];
}
if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
return terrno;
}
int64_t ver = 0;
if (i < SDB_MAX) {
ver = pSdb->tableVer[i];
}
if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
return terrno;
}
}
char reserve[SDB_RESERVE_SIZE_EXTRA] = {0};
if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
return terrno;
}

View File

@ -74,6 +74,8 @@ const char *sdbTableName(ESdbType type) {
return "grant";
case SDB_ARBGROUP:
return "arb_group";
case SDB_ANODE:
return "anode";
default:
return "undefine";
}

View File

@ -133,6 +133,8 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
@ -159,6 +161,8 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
// clang-format on
@ -190,6 +194,9 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdSt
int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList);
void * getOperatorParam(int32_t opType, SOperatorParam* param, int32_t idx);
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId);
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId);
#ifdef __cplusplus
}
#endif

View File

@ -44,22 +44,6 @@ static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
static void destroyEWindowOperatorInfo(void* param);
static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
// todo : move to util
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
uint64_t groupId) {
pRowSup->startRowIndex = rowIndex;
pRowSup->numOfRows = 0;
pRowSup->win.skey = tsList[rowIndex];
pRowSup->groupId = groupId;
}
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
pRowSup->win.ekey = ts;
pRowSup->prevTs = ts;
pRowSup->numOfRows += 1;
pRowSup->groupId = groupId;
}
int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);

View File

@ -89,14 +89,14 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
}
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
pRowSup->win.ekey = ts;
pRowSup->prevTs = ts;
pRowSup->numOfRows += 1;
pRowSup->groupId = groupId;
}
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
uint64_t groupId) {
pRowSup->startRowIndex = rowIndex;
pRowSup->numOfRows = 0;

View File

@ -138,6 +138,8 @@ int32_t diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
int32_t diffFunction(SqlFunctionCtx* pCtx);
int32_t diffFunctionByRow(SArray* pCtx);
bool getForecastConfEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
int32_t derivativeFunction(SqlFunctionCtx* pCtx);

View File

@ -58,6 +58,7 @@ extern "C" {
#define FUNC_MGT_TSMA_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(29)
#define FUNC_MGT_COUNT_LIKE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(30) // funcs that should also return 0 when no rows found
#define FUNC_MGT_PROCESS_BY_ROW FUNC_MGT_FUNC_CLASSIFICATION_MASK(31)
#define FUNC_MGT_FORECAST_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(32)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)

View File

@ -154,6 +154,7 @@ SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode*
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr);
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond);
SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pCountToken, const SToken* pSlidingToken);
SNode* createAnomalyWindowNode(SAstCreateContext* pCxt, SNode* pExpr, const SToken* pFuncOpt);
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,
SNode* pFill);
SNode* createWindowOffsetNode(SAstCreateContext* pCxt, SNode* pStartOffset, SNode* pEndOffset);
@ -251,6 +252,9 @@ SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName);
SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const SToken* pPort);
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, bool force, bool unsafe);
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue);
SNode* createCreateAnodeStmt(SAstCreateContext* pCxt, const SToken* pUrl);
SNode* createDropAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode);
SNode* createUpdateAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode, bool updateAll);
SNode* createEncryptKeyStmt(SAstCreateContext* pCxt, const SToken* pValue);
SNode* createRealTableNodeForIndexName(SAstCreateContext* pCxt, SToken* pDbName, SToken* pIndexName);
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SNode* pIndexName,

View File

@ -3972,6 +3972,10 @@ int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
return nonCalcScalarFunction(pInput, inputNum, pOutput);
}
int32_t forecastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return nonCalcScalarFunction(pInput, inputNum, pOutput);
}
int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return avgScalarFunction(pInput, inputNum, pOutput);
}

737
source/util/src/tanal.c Normal file
View File

@ -0,0 +1,737 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tanal.h"
#include "tmsg.h"
#include "ttypes.h"
#include "tutil.h"
#ifdef USE_ANAL
#include <curl/curl.h>
#define ANAL_ALGO_SPLIT ","
typedef struct {
int64_t ver;
SHashObj *hash; // algoname:algotype -> SAnalUrl
TdThreadMutex lock;
} SAlgoMgmt;
typedef struct {
char *data;
int64_t dataLen;
} SCurlResp;
static SAlgoMgmt tsAlgos = {0};
static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen);
const char *taosAnalAlgoStr(EAnalAlgoType type) {
switch (type) {
case ANAL_ALGO_TYPE_ANOMALY_DETECT:
return "anomaly-detection";
case ANAL_ALGO_TYPE_FORECAST:
return "forecast";
default:
return "unknown";
}
}
const char *taosAnalAlgoUrlStr(EAnalAlgoType type) {
switch (type) {
case ANAL_ALGO_TYPE_ANOMALY_DETECT:
return "anomaly-detect";
case ANAL_ALGO_TYPE_FORECAST:
return "forecast";
default:
return "unknown";
}
}
EAnalAlgoType taosAnalAlgoInt(const char *name) {
for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) {
if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) {
return i;
}
}
return ANAL_ALGO_TYPE_END;
}
int32_t taosAnalInit() {
if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
uError("failed to init curl");
return -1;
}
tsAlgos.ver = 0;
taosThreadMutexInit(&tsAlgos.lock, NULL);
tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
if (tsAlgos.hash == NULL) {
uError("failed to init algo hash");
return -1;
}
uInfo("analysis env is initialized");
return 0;
}
static void taosAnalFreeHash(SHashObj *hash) {
void *pIter = taosHashIterate(hash, NULL);
while (pIter != NULL) {
SAnalUrl *pUrl = (SAnalUrl *)pIter;
taosMemoryFree(pUrl->url);
pIter = taosHashIterate(hash, pIter);
}
taosHashCleanup(hash);
}
void taosAnalCleanup() {
curl_global_cleanup();
taosThreadMutexDestroy(&tsAlgos.lock);
taosAnalFreeHash(tsAlgos.hash);
tsAlgos.hash = NULL;
uInfo("analysis env is cleaned up");
}
void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {
if (newVer > tsAlgos.ver) {
taosThreadMutexLock(&tsAlgos.lock);
SHashObj *hash = tsAlgos.hash;
tsAlgos.ver = newVer;
tsAlgos.hash = pHash;
taosThreadMutexUnlock(&tsAlgos.lock);
taosAnalFreeHash(hash);
} else {
taosAnalFreeHash(pHash);
}
}
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) {
char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0};
int32_t bufLen = snprintf(buf, sizeof(buf), "%s=", optName);
char *pos1 = strstr(option, buf);
char *pos2 = strstr(option, ANAL_ALGO_SPLIT);
if (pos1 != NULL) {
if (optMaxLen > 0) {
int32_t copyLen = optMaxLen;
if (pos2 != NULL) {
copyLen = (int32_t)(pos2 - pos1 - strlen(optName) + 1);
copyLen = MIN(copyLen, optMaxLen);
}
tstrncpy(optValue, pos1 + bufLen, copyLen);
}
return true;
} else {
return false;
}
}
bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue) {
char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0};
int32_t bufLen = snprintf(buf, sizeof(buf), "%s=", optName);
char *pos1 = strstr(option, buf);
char *pos2 = strstr(option, ANAL_ALGO_SPLIT);
if (pos1 != NULL) {
*optValue = taosStr2Int32(pos1 + bufLen + 1, NULL, 10);
return true;
} else {
return false;
}
}
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) {
int32_t code = 0;
char name[TSDB_ANAL_ALGO_KEY_LEN] = {0};
int32_t nameLen = 1 + snprintf(name, sizeof(name) - 1, "%d:%s", type, algoName);
taosThreadMutexLock(&tsAlgos.lock);
SAnalUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen);
if (pUrl != NULL) {
tstrncpy(url, pUrl->url, urlLen);
uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url);
} else {
url[0] = 0;
terrno = TSDB_CODE_ANAL_ALGO_NOT_FOUND;
code = terrno;
uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type));
}
taosThreadMutexUnlock(&tsAlgos.lock);
return code;
}
int64_t taosAnalGetVersion() { return tsAlgos.ver; }
static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) {
SCurlResp *pRsp = userdata;
if (contLen == 0 || nmemb == 0 || pCont == NULL) {
pRsp->dataLen = 0;
pRsp->data = NULL;
uError("curl response is received, len:%" PRId64, pRsp->dataLen);
return 0;
}
pRsp->dataLen = (int64_t)contLen * (int64_t)nmemb;
pRsp->data = taosMemoryMalloc(pRsp->dataLen + 1);
if (pRsp->data != NULL) {
(void)memcpy(pRsp->data, pCont, pRsp->dataLen);
pRsp->data[pRsp->dataLen] = 0;
uDebug("curl response is received, len:%" PRId64 ", content:%s", pRsp->dataLen, pRsp->data);
return pRsp->dataLen;
} else {
pRsp->dataLen = 0;
uError("failed to malloc curl response");
return 0;
}
}
static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) {
CURL *curl = NULL;
CURLcode code = 0;
curl = curl_easy_init();
if (curl == NULL) {
uError("failed to create curl handle");
return -1;
}
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100);
uDebug("curl get request will sent, url:%s", url);
code = curl_easy_perform(curl);
if (code != CURLE_OK) {
uError("failed to perform curl action, code:%d", code);
}
_OVER:
if (curl != NULL) curl_easy_cleanup(curl);
return code;
}
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) {
struct curl_slist *headers = NULL;
CURL *curl = NULL;
CURLcode code = 0;
curl = curl_easy_init();
if (curl == NULL) {
uError("failed to create curl handle");
return -1;
}
headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000);
curl_easy_setopt(curl, CURLOPT_POST, 1);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf);
uDebug("curl post request will sent, url:%s len:%d", url, bufLen);
code = curl_easy_perform(curl);
if (code != CURLE_OK) {
uError("failed to perform curl action, code:%d", code);
}
_OVER:
if (curl != NULL) {
curl_slist_free_all(headers);
curl_easy_cleanup(curl);
}
return code;
}
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) {
int32_t code = -1;
char *pCont = NULL;
int64_t contentLen;
SJson *pJson = NULL;
SCurlResp curlRsp = {0};
if (type == ANAL_HTTP_TYPE_GET) {
if (taosCurlGetRequest(url, &curlRsp) != 0) {
terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS;
goto _OVER;
}
} else {
code = taosAnalBufGetCont(pBuf, &pCont, &contentLen);
if (code != 0) {
terrno = code;
goto _OVER;
}
if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) {
terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS;
goto _OVER;
}
}
if (curlRsp.data == NULL || curlRsp.dataLen == 0) {
terrno = TSDB_CODE_ANAL_URL_RSP_IS_NULL;
goto _OVER;
}
pJson = tjsonParse(curlRsp.data);
if (pJson == NULL) {
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
goto _OVER;
}
_OVER:
if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data);
if (pCont != NULL) taosMemoryFree(pCont);
return pJson;
}
static int32_t taosAnalJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) {
int32_t code = 0;
int64_t contLen;
char *pCont = NULL;
TdFilePtr pFile = NULL;
pFile = taosOpenFile(fileName, TD_FILE_READ);
if (pFile == NULL) {
code = terrno;
goto _OVER;
}
code = taosFStatFile(pFile, &contLen, NULL);
if (code != 0) goto _OVER;
pCont = taosMemoryMalloc(contLen + 1);
if (pCont == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (taosReadFile(pFile, pCont, contLen) != contLen) {
code = terrno;
goto _OVER;
}
pCont[contLen] = '\0';
_OVER:
if (code == 0) {
*ppCont = pCont;
*pContLen = contLen;
} else {
if (pCont != NULL) taosMemoryFree(pCont);
}
if (pFile != NULL) taosCloseFile(&pFile);
return code;
}
static int32_t taosAnalJsonBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) {
char buf[64] = {0};
int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal);
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
return terrno;
}
return 0;
}
static int32_t taosAnalJsonBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) {
char buf[128] = {0};
int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal);
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
return terrno;
}
return 0;
}
static int32_t taosAnalJsonBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) {
char buf[128] = {0};
int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal);
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
return terrno;
}
return 0;
}
static int32_t taosAnalJsonBufWriteStr(SAnalBuf *pBuf, const char *buf, int32_t bufLen) {
if (bufLen <= 0) {
bufLen = strlen(buf);
}
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
return terrno;
}
return 0;
}
static int32_t taosAnalJsonBufWriteStart(SAnalBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); }
static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) {
pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pBuf->filePtr == NULL) {
return terrno;
}
pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalColBuf));
if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY;
pBuf->numOfCols = numOfCols;
if (pBuf->bufType == ANAL_BUF_TYPE_JSON) {
return taosAnalJsonBufWriteStart(pBuf);
}
for (int32_t i = 0; i < numOfCols; ++i) {
SAnalColBuf *pCol = &pBuf->pCols[i];
snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i);
pCol->filePtr =
taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pCol->filePtr == NULL) {
return terrno;
}
}
return taosAnalJsonBufWriteStart(pBuf);
}
static int32_t taosAnalJsonBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
char buf[128] = {0};
bool first = (colIndex == 0);
bool last = (colIndex == pBuf->numOfCols - 1);
if (first) {
if (taosAnalJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) {
return terrno;
}
}
int32_t bufLen = snprintf(buf, sizeof(buf), " [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name,
tDataTypes[colType].bytes, last ? "" : ",");
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
return terrno;
}
if (last) {
if (taosAnalJsonBufWriteStr(pBuf, "],\n", 0) != 0) {
return terrno;
}
}
return 0;
}
static int32_t taosAnalJsonBufWriteDataBegin(SAnalBuf *pBuf) {
return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0);
}
static int32_t taosAnalJsonBufWriteStrUseCol(SAnalBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) {
if (bufLen <= 0) {
bufLen = strlen(buf);
}
if (pBuf->bufType == ANAL_BUF_TYPE_JSON) {
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
return terrno;
}
} else {
if (taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen) != bufLen) {
return terrno;
}
}
return 0;
}
static int32_t taosAnalJsonBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) {
return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex);
}
static int32_t taosAnalJsonBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) {
if (colIndex == pBuf->numOfCols - 1) {
return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex);
} else {
return taosAnalJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex);
}
}
static int32_t taosAnalJsonBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
char buf[64];
int32_t bufLen = 0;
if (pBuf->pCols[colIndex].numOfRows != 0) {
buf[bufLen] = ',';
buf[bufLen + 1] = '\n';
buf[bufLen + 2] = 0;
bufLen += 2;
}
switch (colType) {
case TSDB_DATA_TYPE_BOOL:
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0);
break;
case TSDB_DATA_TYPE_TINYINT:
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue);
break;
case TSDB_DATA_TYPE_UTINYINT:
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue);
break;
case TSDB_DATA_TYPE_SMALLINT:
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue);
break;
case TSDB_DATA_TYPE_USMALLINT:
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue);
break;
case TSDB_DATA_TYPE_INT:
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue);
break;
case TSDB_DATA_TYPE_UINT:
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue);
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64 "", *(int64_t *)colValue);
break;
case TSDB_DATA_TYPE_UBIGINT:
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64 "", *(uint64_t *)colValue);
break;
case TSDB_DATA_TYPE_FLOAT:
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue));
break;
case TSDB_DATA_TYPE_DOUBLE:
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue));
break;
default:
buf[bufLen] = '\0';
}
pBuf->pCols[colIndex].numOfRows++;
return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex);
}
static int32_t taosAnalJsonBufWriteDataEnd(SAnalBuf *pBuf) {
int32_t code = 0;
char *pCont = NULL;
int64_t contLen = 0;
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
SAnalColBuf *pCol = &pBuf->pCols[i];
code = taosFsyncFile(pCol->filePtr);
if (code != 0) return code;
code = taosCloseFile(&pCol->filePtr);
if (code != 0) return code;
code = taosAnalJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen);
if (code != 0) return code;
code = taosAnalJsonBufWriteStr(pBuf, pCont, contLen);
if (code != 0) return code;
taosMemoryFreeClear(pCont);
contLen = 0;
}
}
return taosAnalJsonBufWriteStr(pBuf, "],\n", 0);
}
static int32_t taosAnalJsonBufWriteEnd(SAnalBuf *pBuf) {
int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows);
if (code != 0) return code;
return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0);
}
int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) {
int32_t code = taosAnalJsonBufWriteEnd(pBuf);
if (code != 0) return code;
if (pBuf->filePtr != NULL) {
code = taosFsyncFile(pBuf->filePtr);
if (code != 0) return code;
code = taosCloseFile(&pBuf->filePtr);
if (code != 0) return code;
}
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
SAnalColBuf *pCol = &pBuf->pCols[i];
if (pCol->filePtr != NULL) {
code = taosFsyncFile(pCol->filePtr);
if (code != 0) return code;
code = taosCloseFile(&pCol->filePtr);
if (code != 0) return code;
}
}
}
return 0;
}
void taosAnalBufDestroy(SAnalBuf *pBuf) {
if (pBuf->fileName[0] != 0) {
if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr);
// taosRemoveFile(pBuf->fileName);
pBuf->fileName[0] = 0;
}
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
SAnalColBuf *pCol = &pBuf->pCols[i];
if (pCol->fileName[0] != 0) {
if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr);
taosRemoveFile(pCol->fileName);
pCol->fileName[0] = 0;
}
}
}
taosMemoryFreeClear(pBuf->pCols);
pBuf->numOfCols = 0;
}
int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return tsosAnalJsonBufOpen(pBuf, numOfCols);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteDataBegin(pBuf);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColBegin(pBuf, colIndex);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteColEnd(pBuf, colIndex);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufWriteDataEnd(pBuf);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
int32_t taosAnalBufClose(SAnalBuf *pBuf) {
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufClose(pBuf);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen) {
*ppCont = NULL;
*pContLen = 0;
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen);
} else {
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
}
}
#else
int32_t taosAnalInit() { return 0; }
void taosAnalCleanup() {}
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { return NULL; }
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }
bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue) { return true; }
int64_t taosAnalGetVersion() { return 0; }
void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {}
int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { return 0; }
int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { return 0; }
int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { return 0; }
int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; }
int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) { return 0; }
int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { return 0; }
int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { return 0; }
int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) { return 0; }
int32_t taosAnalBufClose(SAnalBuf *pBuf) { return 0; }
void taosAnalBufDestroy(SAnalBuf *pBuf) {}
const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; }
EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; }
const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; }
#endif

View File

@ -345,6 +345,21 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB, "Stream temporarily do
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_ALREADY_EXIST, "Anode already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_NOT_EXIST, "Anode not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_URL, "Anode too long url")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_INVALID_PROTOCOL, "Anode invalid protocol")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_INVALID_VERSION, "Anode invalid version")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO, "Anode too many algorithm")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME, "Anode too long algorithm name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE, "Anode too many algorithm type")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis url response is NULL")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis url can't access")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm not found")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded")
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type")
// mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "sma not exist")
@ -709,6 +724,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicat
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_TYPE, "ANOMALY_WINDOW only support mathable column")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_COL, "ANOMALY_WINDOW not support on tag column")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT, "ANOMALY_WINDOW option should include algo field")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE, "Invalid forecast clause")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
//planner

View File

@ -194,6 +194,10 @@ int32_t tjsonGetObjectValueString(const SJson* pJson, char** pValueString) {
return TSDB_CODE_SUCCESS;
}
void tjsonGetObjectValueBigInt(const SJson* pJson, int64_t* pVal) { *pVal = (int64_t)((cJSON*)pJson)->valuedouble; }
void tjsonGetObjectValueDouble(const SJson* pJson, double* pVal) { *pVal = ((cJSON*)pJson)->valuedouble; }
int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal) {
char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName));
if (NULL == p) {