feat: data analysis
This commit is contained in:
parent
0fe3565c90
commit
4dc0c2f47e
|
@ -144,6 +144,12 @@ option(
|
||||||
OFF
|
OFF
|
||||||
)
|
)
|
||||||
|
|
||||||
|
option(
|
||||||
|
BUILD_WITH_ANALYSIS
|
||||||
|
"If build with analysis"
|
||||||
|
ON
|
||||||
|
)
|
||||||
|
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
IF(NOT TD_ENTERPRISE)
|
IF(NOT TD_ENTERPRISE)
|
||||||
|
@ -151,8 +157,15 @@ MESSAGE("switch s3 off with community version")
|
||||||
set(BUILD_S3 OFF)
|
set(BUILD_S3 OFF)
|
||||||
set(BUILD_WITH_S3 OFF)
|
set(BUILD_WITH_S3 OFF)
|
||||||
set(BUILD_WITH_COS OFF)
|
set(BUILD_WITH_COS OFF)
|
||||||
|
set(BUILD_WITH_ANALYSIS OFF)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
|
IF(${BUILD_WITH_ANALYSIS})
|
||||||
|
message("build with analysis")
|
||||||
|
set(BUILD_S3 ON)
|
||||||
|
set(BUILD_WITH_S3 ON)
|
||||||
|
ENDIF()
|
||||||
|
|
||||||
IF(${BUILD_S3})
|
IF(${BUILD_S3})
|
||||||
|
|
||||||
IF(${BUILD_WITH_S3})
|
IF(${BUILD_WITH_S3})
|
||||||
|
|
|
@ -29,6 +29,8 @@ extern "C" {
|
||||||
#define TSDB_INS_TABLE_QNODES "ins_qnodes"
|
#define TSDB_INS_TABLE_QNODES "ins_qnodes"
|
||||||
#define TSDB_INS_TABLE_BNODES "ins_bnodes" // no longer used
|
#define TSDB_INS_TABLE_BNODES "ins_bnodes" // no longer used
|
||||||
#define TSDB_INS_TABLE_SNODES "ins_snodes"
|
#define TSDB_INS_TABLE_SNODES "ins_snodes"
|
||||||
|
#define TSDB_INS_TABLE_ANODES "ins_anodes"
|
||||||
|
#define TSDB_INS_TABLE_ANODES_FULL "ins_anodes_full"
|
||||||
#define TSDB_INS_TABLE_ARBGROUPS "ins_arbgroups"
|
#define TSDB_INS_TABLE_ARBGROUPS "ins_arbgroups"
|
||||||
#define TSDB_INS_TABLE_CLUSTER "ins_cluster"
|
#define TSDB_INS_TABLE_CLUSTER "ins_cluster"
|
||||||
#define TSDB_INS_TABLE_DATABASES "ins_databases"
|
#define TSDB_INS_TABLE_DATABASES "ins_databases"
|
||||||
|
|
|
@ -0,0 +1,96 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_UTIL_ANAL_H_
|
||||||
|
#define _TD_UTIL_ANAL_H_
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "tdef.h"
|
||||||
|
#include "thash.h"
|
||||||
|
#include "tjson.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#define ANAL_FORECAST_DEFAULT_PERIOD 10
|
||||||
|
#define ANAL_FORECAST_DEFAULT_ROWS 10
|
||||||
|
#define ANAL_FORECAST_DEFAULT_CONF 95
|
||||||
|
#define ANAL_FORECAST_DEFAULT_ALPHA 0.05
|
||||||
|
#define ANAL_FORECAST_DEFAULT_PARAM "diff"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
EAnalAlgoType type;
|
||||||
|
int32_t anode;
|
||||||
|
int32_t urlLen;
|
||||||
|
char *url;
|
||||||
|
} SAnalUrl;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
ANAL_BUF_TYPE_JSON = 0,
|
||||||
|
ANAL_BUF_TYPE_JSON_COL = 1,
|
||||||
|
ANAL_BUF_TYPE_OTHERS,
|
||||||
|
} EAnalBufType;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
ANAL_HTTP_TYPE_GET = 0,
|
||||||
|
ANAL_HTTP_TYPE_POST,
|
||||||
|
} EAnalHttpType;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
TdFilePtr filePtr;
|
||||||
|
char fileName[TSDB_FILENAME_LEN + 10];
|
||||||
|
int64_t numOfRows;
|
||||||
|
} SAnalColBuf;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
EAnalBufType bufType;
|
||||||
|
TdFilePtr filePtr;
|
||||||
|
char fileName[TSDB_FILENAME_LEN];
|
||||||
|
int32_t numOfCols;
|
||||||
|
SAnalColBuf *pCols;
|
||||||
|
} SAnalBuf;
|
||||||
|
|
||||||
|
int32_t taosAnalInit();
|
||||||
|
void taosAnalCleanup();
|
||||||
|
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf);
|
||||||
|
|
||||||
|
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen);
|
||||||
|
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen);
|
||||||
|
bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue);
|
||||||
|
int64_t taosAnalGetVersion();
|
||||||
|
void taosAnalUpdate(int64_t newVer, SHashObj *pHash);
|
||||||
|
|
||||||
|
int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols);
|
||||||
|
int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal);
|
||||||
|
int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal);
|
||||||
|
int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal);
|
||||||
|
int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName);
|
||||||
|
int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf);
|
||||||
|
int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex);
|
||||||
|
int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue);
|
||||||
|
int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex);
|
||||||
|
int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf);
|
||||||
|
int32_t taosAnalBufClose(SAnalBuf *pBuf);
|
||||||
|
void taosAnalBufDestroy(SAnalBuf *pBuf);
|
||||||
|
|
||||||
|
const char *taosAnalAlgoStr(EAnalAlgoType algoType);
|
||||||
|
EAnalAlgoType taosAnalAlgoInt(const char *algoName);
|
||||||
|
const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
#endif /*_TD_UTIL_ANAL_H_*/
|
|
@ -159,6 +159,8 @@ typedef enum _mgmt_table {
|
||||||
TSDB_MGMT_TABLE_ARBGROUP,
|
TSDB_MGMT_TABLE_ARBGROUP,
|
||||||
TSDB_MGMT_TABLE_ENCRYPTIONS,
|
TSDB_MGMT_TABLE_ENCRYPTIONS,
|
||||||
TSDB_MGMT_TABLE_USER_FULL,
|
TSDB_MGMT_TABLE_USER_FULL,
|
||||||
|
TSDB_MGMT_TABLE_ANODE,
|
||||||
|
TSDB_MGMT_TABLE_ANODE_FULL,
|
||||||
TSDB_MGMT_TABLE_MAX,
|
TSDB_MGMT_TABLE_MAX,
|
||||||
} EShowType;
|
} EShowType;
|
||||||
|
|
||||||
|
@ -260,6 +262,7 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_COUNT_WINDOW,
|
QUERY_NODE_COUNT_WINDOW,
|
||||||
QUERY_NODE_COLUMN_OPTIONS,
|
QUERY_NODE_COLUMN_OPTIONS,
|
||||||
QUERY_NODE_TSMA_OPTIONS,
|
QUERY_NODE_TSMA_OPTIONS,
|
||||||
|
QUERY_NODE_ANOMALY_WINDOW,
|
||||||
|
|
||||||
// Statement nodes are used in parser and planner module.
|
// Statement nodes are used in parser and planner module.
|
||||||
QUERY_NODE_SET_OPERATOR = 100,
|
QUERY_NODE_SET_OPERATOR = 100,
|
||||||
|
@ -345,6 +348,9 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_CREATE_VIEW_STMT,
|
QUERY_NODE_CREATE_VIEW_STMT,
|
||||||
QUERY_NODE_DROP_VIEW_STMT,
|
QUERY_NODE_DROP_VIEW_STMT,
|
||||||
QUERY_NODE_CREATE_SUBTABLE_FROM_FILE_CLAUSE,
|
QUERY_NODE_CREATE_SUBTABLE_FROM_FILE_CLAUSE,
|
||||||
|
QUERY_NODE_CREATE_ANODE_STMT,
|
||||||
|
QUERY_NODE_DROP_ANODE_STMT,
|
||||||
|
QUERY_NODE_UPDATE_ANODE_STMT,
|
||||||
|
|
||||||
// show statement nodes
|
// show statement nodes
|
||||||
// see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET'
|
// see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET'
|
||||||
|
@ -386,6 +392,8 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT,
|
QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT,
|
||||||
QUERY_NODE_SHOW_ENCRYPTIONS_STMT,
|
QUERY_NODE_SHOW_ENCRYPTIONS_STMT,
|
||||||
QUERY_NODE_SHOW_TSMAS_STMT,
|
QUERY_NODE_SHOW_TSMAS_STMT,
|
||||||
|
QUERY_NODE_SHOW_ANODES_STMT,
|
||||||
|
QUERY_NODE_SHOW_ANODES_FULL_STMT,
|
||||||
QUERY_NODE_CREATE_TSMA_STMT,
|
QUERY_NODE_CREATE_TSMA_STMT,
|
||||||
QUERY_NODE_SHOW_CREATE_TSMA_STMT,
|
QUERY_NODE_SHOW_CREATE_TSMA_STMT,
|
||||||
QUERY_NODE_DROP_TSMA_STMT,
|
QUERY_NODE_DROP_TSMA_STMT,
|
||||||
|
@ -408,6 +416,7 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_LOGIC_PLAN,
|
QUERY_NODE_LOGIC_PLAN,
|
||||||
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE,
|
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE,
|
||||||
QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL,
|
QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL,
|
||||||
|
QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC,
|
||||||
|
|
||||||
// physical plan node
|
// physical plan node
|
||||||
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100,
|
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100,
|
||||||
|
@ -458,6 +467,9 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT,
|
QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT,
|
||||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT,
|
QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT,
|
||||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL,
|
QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL,
|
||||||
|
QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY,
|
||||||
|
QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY,
|
||||||
|
QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC,
|
||||||
} ENodeType;
|
} ENodeType;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -1092,6 +1104,22 @@ typedef struct {
|
||||||
int32_t tSerializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
|
int32_t tSerializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
|
||||||
int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
|
int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t dnodeId;
|
||||||
|
int64_t analVer;
|
||||||
|
} SRetrieveAnalAlgoReq;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t ver;
|
||||||
|
SHashObj* hash; // algoname:algotype -> SAnalUrl
|
||||||
|
} SRetrieveAnalAlgoRsp;
|
||||||
|
|
||||||
|
int32_t tSerializeRetrieveAnalAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalAlgoReq* pReq);
|
||||||
|
int32_t tDeserializeRetrieveAnalAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalAlgoReq* pReq);
|
||||||
|
int32_t tSerializeRetrieveAnalAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalAlgoRsp* pRsp);
|
||||||
|
int32_t tDeserializeRetrieveAnalAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalAlgoRsp* pRsp);
|
||||||
|
void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t alterType;
|
int8_t alterType;
|
||||||
int8_t superUser;
|
int8_t superUser;
|
||||||
|
@ -1766,6 +1794,7 @@ typedef struct {
|
||||||
SArray* pVloads; // array of SVnodeLoad
|
SArray* pVloads; // array of SVnodeLoad
|
||||||
int32_t statusSeq;
|
int32_t statusSeq;
|
||||||
int64_t ipWhiteVer;
|
int64_t ipWhiteVer;
|
||||||
|
int64_t analVer;
|
||||||
} SStatusReq;
|
} SStatusReq;
|
||||||
|
|
||||||
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
||||||
|
@ -1831,6 +1860,7 @@ typedef struct {
|
||||||
SArray* pDnodeEps; // Array of SDnodeEp
|
SArray* pDnodeEps; // Array of SDnodeEp
|
||||||
int32_t statusSeq;
|
int32_t statusSeq;
|
||||||
int64_t ipWhiteVer;
|
int64_t ipWhiteVer;
|
||||||
|
int64_t analVer;
|
||||||
} SStatusRsp;
|
} SStatusRsp;
|
||||||
|
|
||||||
int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
|
int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
|
||||||
|
@ -2377,6 +2407,30 @@ typedef struct {
|
||||||
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
||||||
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t urlLen;
|
||||||
|
int32_t sqlLen;
|
||||||
|
char* url;
|
||||||
|
char* sql;
|
||||||
|
} SMCreateAnodeReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSMCreateAnodeReq(void* buf, int32_t bufLen, SMCreateAnodeReq* pReq);
|
||||||
|
int32_t tDeserializeSMCreateAnodeReq(void* buf, int32_t bufLen, SMCreateAnodeReq* pReq);
|
||||||
|
void tFreeSMCreateAnodeReq(SMCreateAnodeReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t anodeId;
|
||||||
|
int32_t sqlLen;
|
||||||
|
char* sql;
|
||||||
|
} SMDropAnodeReq, SMUpdateAnodeReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSMDropAnodeReq(void* buf, int32_t bufLen, SMDropAnodeReq* pReq);
|
||||||
|
int32_t tDeserializeSMDropAnodeReq(void* buf, int32_t bufLen, SMDropAnodeReq* pReq);
|
||||||
|
void tFreeSMDropAnodeReq(SMDropAnodeReq* pReq);
|
||||||
|
int32_t tSerializeSMUpdateAnodeReq(void* buf, int32_t bufLen, SMUpdateAnodeReq* pReq);
|
||||||
|
int32_t tDeserializeSMUpdateAnodeReq(void* buf, int32_t bufLen, SMUpdateAnodeReq* pReq);
|
||||||
|
void tFreeSMUpdateAnodeReq(SMUpdateAnodeReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t hbSeq;
|
int32_t hbSeq;
|
||||||
|
|
|
@ -125,6 +125,11 @@
|
||||||
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_ENCRYPT_KEY, "create-encrypt-key", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_ENCRYPT_KEY, "create-encrypt-key", NULL, NULL)
|
||||||
|
// mnode msg overload
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ANODE, "create-anode", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_ANODE, "update-anode", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_ANODE, "drop-anode", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_ANAL_ALGO, "retrieve-anal-algo", NULL, NULL)
|
||||||
TD_CLOSE_MSG_SEG(TDMT_DND_MSG)
|
TD_CLOSE_MSG_SEG(TDMT_DND_MSG)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8
|
TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8
|
||||||
|
|
|
@ -62,6 +62,7 @@ typedef enum EFunctionType {
|
||||||
FUNCTION_TYPE_UNIQUE,
|
FUNCTION_TYPE_UNIQUE,
|
||||||
FUNCTION_TYPE_STATE_COUNT,
|
FUNCTION_TYPE_STATE_COUNT,
|
||||||
FUNCTION_TYPE_STATE_DURATION,
|
FUNCTION_TYPE_STATE_DURATION,
|
||||||
|
FUNCTION_TYPE_FORECAST,
|
||||||
|
|
||||||
// math function
|
// math function
|
||||||
FUNCTION_TYPE_ABS = 1000,
|
FUNCTION_TYPE_ABS = 1000,
|
||||||
|
@ -149,6 +150,9 @@ typedef enum EFunctionType {
|
||||||
FUNCTION_TYPE_TBUID,
|
FUNCTION_TYPE_TBUID,
|
||||||
FUNCTION_TYPE_VGID,
|
FUNCTION_TYPE_VGID,
|
||||||
FUNCTION_TYPE_VGVER,
|
FUNCTION_TYPE_VGVER,
|
||||||
|
FUNCTION_TYPE_FORECAST_LOW,
|
||||||
|
FUNCTION_TYPE_FORECAST_HIGH,
|
||||||
|
FUNCTION_TYPE_FORECAST_ROWTS,
|
||||||
|
|
||||||
// internal function
|
// internal function
|
||||||
FUNCTION_TYPE_SELECT_VALUE = 3750,
|
FUNCTION_TYPE_SELECT_VALUE = 3750,
|
||||||
|
@ -263,6 +267,7 @@ bool fmIsForbidSysTableFunc(int32_t funcId);
|
||||||
bool fmIsIntervalInterpoFunc(int32_t funcId);
|
bool fmIsIntervalInterpoFunc(int32_t funcId);
|
||||||
bool fmIsInterpFunc(int32_t funcId);
|
bool fmIsInterpFunc(int32_t funcId);
|
||||||
bool fmIsLastRowFunc(int32_t funcId);
|
bool fmIsLastRowFunc(int32_t funcId);
|
||||||
|
bool fmIsForecastFunc(int32_t funcId);
|
||||||
bool fmIsNotNullOutputFunc(int32_t funcId);
|
bool fmIsNotNullOutputFunc(int32_t funcId);
|
||||||
bool fmIsSelectValueFunc(int32_t funcId);
|
bool fmIsSelectValueFunc(int32_t funcId);
|
||||||
bool fmIsSystemInfoFunc(int32_t funcId);
|
bool fmIsSystemInfoFunc(int32_t funcId);
|
||||||
|
@ -272,6 +277,7 @@ bool fmIsMultiRowsFunc(int32_t funcId);
|
||||||
bool fmIsKeepOrderFunc(int32_t funcId);
|
bool fmIsKeepOrderFunc(int32_t funcId);
|
||||||
bool fmIsCumulativeFunc(int32_t funcId);
|
bool fmIsCumulativeFunc(int32_t funcId);
|
||||||
bool fmIsInterpPseudoColumnFunc(int32_t funcId);
|
bool fmIsInterpPseudoColumnFunc(int32_t funcId);
|
||||||
|
bool fmIsForecastPseudoColumnFunc(int32_t funcId);
|
||||||
bool fmIsGroupKeyFunc(int32_t funcId);
|
bool fmIsGroupKeyFunc(int32_t funcId);
|
||||||
bool fmIsBlockDistFunc(int32_t funcId);
|
bool fmIsBlockDistFunc(int32_t funcId);
|
||||||
bool fmIsIgnoreNullFunc(int32_t funcId);
|
bool fmIsIgnoreNullFunc(int32_t funcId);
|
||||||
|
|
|
@ -318,6 +318,21 @@ typedef struct SAlterDnodeStmt {
|
||||||
char value[TSDB_DNODE_VALUE_LEN];
|
char value[TSDB_DNODE_VALUE_LEN];
|
||||||
} SAlterDnodeStmt;
|
} SAlterDnodeStmt;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
ENodeType type;
|
||||||
|
char url[TSDB_ANAL_ANODE_URL_LEN];
|
||||||
|
} SCreateAnodeStmt;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
ENodeType type;
|
||||||
|
int32_t anodeId;
|
||||||
|
} SDropAnodeStmt;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
ENodeType type;
|
||||||
|
int32_t anodeId;
|
||||||
|
} SUpdateAnodeStmt;
|
||||||
|
|
||||||
typedef struct SShowStmt {
|
typedef struct SShowStmt {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
SNode* pDbName; // SValueNode
|
SNode* pDbName; // SValueNode
|
||||||
|
|
|
@ -204,6 +204,11 @@ typedef struct SInterpFuncLogicNode {
|
||||||
SNode* pTimeSeries; // SColumnNode
|
SNode* pTimeSeries; // SColumnNode
|
||||||
} SInterpFuncLogicNode;
|
} SInterpFuncLogicNode;
|
||||||
|
|
||||||
|
typedef struct SForecastFuncLogicNode {
|
||||||
|
SLogicNode node;
|
||||||
|
SNodeList* pFuncs;
|
||||||
|
} SForecastFuncLogicNode;
|
||||||
|
|
||||||
typedef struct SGroupCacheLogicNode {
|
typedef struct SGroupCacheLogicNode {
|
||||||
SLogicNode node;
|
SLogicNode node;
|
||||||
bool grpColsMayBeNull;
|
bool grpColsMayBeNull;
|
||||||
|
@ -274,7 +279,8 @@ typedef enum EWindowType {
|
||||||
WINDOW_TYPE_SESSION,
|
WINDOW_TYPE_SESSION,
|
||||||
WINDOW_TYPE_STATE,
|
WINDOW_TYPE_STATE,
|
||||||
WINDOW_TYPE_EVENT,
|
WINDOW_TYPE_EVENT,
|
||||||
WINDOW_TYPE_COUNT
|
WINDOW_TYPE_COUNT,
|
||||||
|
WINDOW_TYPE_ANOMALY
|
||||||
} EWindowType;
|
} EWindowType;
|
||||||
|
|
||||||
typedef enum EWindowAlgorithm {
|
typedef enum EWindowAlgorithm {
|
||||||
|
@ -315,6 +321,8 @@ typedef struct SWindowLogicNode {
|
||||||
int64_t windowCount;
|
int64_t windowCount;
|
||||||
int64_t windowSliding;
|
int64_t windowSliding;
|
||||||
SNodeList* pTsmaSubplans;
|
SNodeList* pTsmaSubplans;
|
||||||
|
SNode* pAnomalyExpr;
|
||||||
|
char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN];
|
||||||
} SWindowLogicNode;
|
} SWindowLogicNode;
|
||||||
|
|
||||||
typedef struct SFillLogicNode {
|
typedef struct SFillLogicNode {
|
||||||
|
@ -507,6 +515,12 @@ typedef struct SInterpFuncPhysiNode {
|
||||||
SNode* pTimeSeries; // SColumnNode
|
SNode* pTimeSeries; // SColumnNode
|
||||||
} SInterpFuncPhysiNode;
|
} SInterpFuncPhysiNode;
|
||||||
|
|
||||||
|
typedef struct SForecastFuncPhysiNode {
|
||||||
|
SPhysiNode node;
|
||||||
|
SNodeList* pExprs;
|
||||||
|
SNodeList* pFuncs;
|
||||||
|
} SForecastFuncPhysiNode;
|
||||||
|
|
||||||
typedef struct SSortMergeJoinPhysiNode {
|
typedef struct SSortMergeJoinPhysiNode {
|
||||||
SPhysiNode node;
|
SPhysiNode node;
|
||||||
EJoinType joinType;
|
EJoinType joinType;
|
||||||
|
@ -704,6 +718,12 @@ typedef struct SCountWinodwPhysiNode {
|
||||||
|
|
||||||
typedef SCountWinodwPhysiNode SStreamCountWinodwPhysiNode;
|
typedef SCountWinodwPhysiNode SStreamCountWinodwPhysiNode;
|
||||||
|
|
||||||
|
typedef struct SAnomalyWindowPhysiNode {
|
||||||
|
SWindowPhysiNode window;
|
||||||
|
SNode* pAnomalyKey;
|
||||||
|
char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN];
|
||||||
|
} SAnomalyWindowPhysiNode;
|
||||||
|
|
||||||
typedef struct SSortPhysiNode {
|
typedef struct SSortPhysiNode {
|
||||||
SPhysiNode node;
|
SPhysiNode node;
|
||||||
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
|
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
|
||||||
|
|
|
@ -347,6 +347,13 @@ typedef struct SCountWindowNode {
|
||||||
int64_t windowSliding;
|
int64_t windowSliding;
|
||||||
} SCountWindowNode;
|
} SCountWindowNode;
|
||||||
|
|
||||||
|
typedef struct SAnomalyWindowNode {
|
||||||
|
ENodeType type; // QUERY_NODE_ANOMALY_WINDOW
|
||||||
|
SNode* pCol; // timestamp primary key
|
||||||
|
SNode* pExpr;
|
||||||
|
char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN];
|
||||||
|
} SAnomalyWindowNode;
|
||||||
|
|
||||||
typedef enum EFillMode {
|
typedef enum EFillMode {
|
||||||
FILL_MODE_NONE = 1,
|
FILL_MODE_NONE = 1,
|
||||||
FILL_MODE_VALUE,
|
FILL_MODE_VALUE,
|
||||||
|
@ -442,6 +449,8 @@ typedef struct SSelectStmt {
|
||||||
bool hasTailFunc;
|
bool hasTailFunc;
|
||||||
bool hasInterpFunc;
|
bool hasInterpFunc;
|
||||||
bool hasInterpPseudoColFunc;
|
bool hasInterpPseudoColFunc;
|
||||||
|
bool hasForecastFunc;
|
||||||
|
bool hasForecastPseudoColFunc;
|
||||||
bool hasLastRowFunc;
|
bool hasLastRowFunc;
|
||||||
bool hasLastFunc;
|
bool hasLastFunc;
|
||||||
bool hasTimeLineFunc;
|
bool hasTimeLineFunc;
|
||||||
|
|
|
@ -410,25 +410,25 @@ void* getTaskPoolWorkerCb();
|
||||||
#define qFatal(...) \
|
#define qFatal(...) \
|
||||||
do { \
|
do { \
|
||||||
if (qDebugFlag & DEBUG_FATAL) { \
|
if (qDebugFlag & DEBUG_FATAL) { \
|
||||||
taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \
|
taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
#define qError(...) \
|
#define qError(...) \
|
||||||
do { \
|
do { \
|
||||||
if (qDebugFlag & DEBUG_ERROR) { \
|
if (qDebugFlag & DEBUG_ERROR) { \
|
||||||
taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \
|
taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
#define qWarn(...) \
|
#define qWarn(...) \
|
||||||
do { \
|
do { \
|
||||||
if (qDebugFlag & DEBUG_WARN) { \
|
if (qDebugFlag & DEBUG_WARN) { \
|
||||||
taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \
|
taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
#define qInfo(...) \
|
#define qInfo(...) \
|
||||||
do { \
|
do { \
|
||||||
if (qDebugFlag & DEBUG_INFO) { \
|
if (qDebugFlag & DEBUG_INFO) { \
|
||||||
taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \
|
taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
#define qDebug(...) \
|
#define qDebug(...) \
|
||||||
|
|
|
@ -139,6 +139,7 @@ int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
|
||||||
int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t forecastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
|
@ -476,6 +476,22 @@ int32_t taosGetErrSize();
|
||||||
#define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429)
|
#define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429)
|
||||||
#define TSDB_CODE_MNODE_STOPPED TAOS_DEF_ERROR_CODE(0, 0x042A)
|
#define TSDB_CODE_MNODE_STOPPED TAOS_DEF_ERROR_CODE(0, 0x042A)
|
||||||
|
|
||||||
|
// anode
|
||||||
|
#define TSDB_CODE_MND_ANODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0430)
|
||||||
|
#define TSDB_CODE_MND_ANODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0431)
|
||||||
|
#define TSDB_CODE_MND_ANODE_TOO_LONG_URL TAOS_DEF_ERROR_CODE(0, 0x0432)
|
||||||
|
#define TSDB_CODE_MND_ANODE_INVALID_PROTOCOL TAOS_DEF_ERROR_CODE(0, 0x0433)
|
||||||
|
#define TSDB_CODE_MND_ANODE_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0434)
|
||||||
|
#define TSDB_CODE_MND_ANODE_TOO_MANY_ALGO TAOS_DEF_ERROR_CODE(0, 0x0435)
|
||||||
|
#define TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME TAOS_DEF_ERROR_CODE(0, 0x0436)
|
||||||
|
#define TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE TAOS_DEF_ERROR_CODE(0, 0x0437)
|
||||||
|
|
||||||
|
// analysis
|
||||||
|
#define TSDB_CODE_ANAL_URL_RSP_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x0440)
|
||||||
|
#define TSDB_CODE_ANAL_URL_CANT_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0441)
|
||||||
|
#define TSDB_CODE_ANAL_ALGO_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0442)
|
||||||
|
#define TSDB_CODE_ANAL_ALGO_NOT_LOAD TAOS_DEF_ERROR_CODE(0, 0x0443)
|
||||||
|
#define TSDB_CODE_ANAL_BUF_INVALID_TYPE TAOS_DEF_ERROR_CODE(0, 0x0444)
|
||||||
|
|
||||||
// mnode-sma
|
// mnode-sma
|
||||||
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
|
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
|
||||||
|
@ -868,6 +884,10 @@ int32_t taosGetErrSize();
|
||||||
#define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F)
|
#define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F)
|
||||||
#define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680)
|
#define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680)
|
||||||
#define TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2681)
|
#define TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2681)
|
||||||
|
#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_TYPE TAOS_DEF_ERROR_CODE(0, 0x2682)
|
||||||
|
#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_COL TAOS_DEF_ERROR_CODE(0, 0x2683)
|
||||||
|
#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT TAOS_DEF_ERROR_CODE(0, 0x2684)
|
||||||
|
#define TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x2685)
|
||||||
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
|
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
|
||||||
|
|
||||||
//planner
|
//planner
|
||||||
|
|
|
@ -293,6 +293,12 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_SLOW_QUERY_SQL_LEN 512
|
#define TSDB_SLOW_QUERY_SQL_LEN 512
|
||||||
#define TSDB_SHOW_SUBQUERY_LEN 1000
|
#define TSDB_SHOW_SUBQUERY_LEN 1000
|
||||||
#define TSDB_LOG_VAR_LEN 32
|
#define TSDB_LOG_VAR_LEN 32
|
||||||
|
#define TSDB_ANAL_ANODE_URL_LEN 128
|
||||||
|
#define TSDB_ANAL_ALGO_NAME_LEN 64
|
||||||
|
#define TSDB_ANAL_ALGO_TYPE_LEN 24
|
||||||
|
#define TSDB_ANAL_ALGO_KEY_LEN (TSDB_ANAL_ALGO_NAME_LEN + 9)
|
||||||
|
#define TSDB_ANAL_ALGO_URL_LEN (TSDB_ANAL_ANODE_URL_LEN + TSDB_ANAL_ALGO_TYPE_LEN + 1)
|
||||||
|
#define TSDB_ANAL_ALGO_OPTION_LEN 256
|
||||||
|
|
||||||
#define TSDB_MAX_EP_NUM 10
|
#define TSDB_MAX_EP_NUM 10
|
||||||
|
|
||||||
|
@ -603,6 +609,13 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 };
|
||||||
#define MONITOR_TAG_NAME_LEN 100
|
#define MONITOR_TAG_NAME_LEN 100
|
||||||
#define MONITOR_TAG_VALUE_LEN 300
|
#define MONITOR_TAG_VALUE_LEN 300
|
||||||
#define MONITOR_METRIC_NAME_LEN 100
|
#define MONITOR_METRIC_NAME_LEN 100
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
ANAL_ALGO_TYPE_ANOMALY_DETECT = 0,
|
||||||
|
ANAL_ALGO_TYPE_FORECAST = 1,
|
||||||
|
ANAL_ALGO_TYPE_END,
|
||||||
|
} EAnalAlgoType;
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -68,6 +68,8 @@ int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem);
|
||||||
SJson* tjsonGetObjectItem(const SJson* pJson, const char* pName);
|
SJson* tjsonGetObjectItem(const SJson* pJson, const char* pName);
|
||||||
int32_t tjsonGetObjectName(const SJson* pJson, char** pName);
|
int32_t tjsonGetObjectName(const SJson* pJson, char** pName);
|
||||||
int32_t tjsonGetObjectValueString(const SJson* pJson, char** pStringValue);
|
int32_t tjsonGetObjectValueString(const SJson* pJson, char** pStringValue);
|
||||||
|
void tjsonGetObjectValueBigInt(const SJson* pJson, int64_t* pVal);
|
||||||
|
void tjsonGetObjectValueDouble(const SJson* pJson, double* pVal);
|
||||||
int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal);
|
int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal);
|
||||||
int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal);
|
int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal);
|
||||||
int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal);
|
int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal);
|
||||||
|
|
|
@ -47,6 +47,10 @@ target_link_libraries(
|
||||||
INTERFACE api
|
INTERFACE api
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if(${BUILD_WITH_ANALYSIS})
|
||||||
|
add_definitions(-DUSE_ANAL)
|
||||||
|
endif()
|
||||||
|
|
||||||
if(${BUILD_S3})
|
if(${BUILD_S3})
|
||||||
|
|
||||||
if(${BUILD_WITH_S3})
|
if(${BUILD_WITH_S3})
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_MND_ANODE_H_
|
||||||
|
#define _TD_MND_ANODE_H_
|
||||||
|
|
||||||
|
#include "mndInt.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t mndInitAnode(SMnode *pMnode);
|
||||||
|
void mndCleanupAnode(SMnode *pMnode);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_TD_MND_ANODE_H_*/
|
|
@ -78,6 +78,9 @@ typedef enum {
|
||||||
MND_OPER_DROP_VIEW,
|
MND_OPER_DROP_VIEW,
|
||||||
MND_OPER_CONFIG_CLUSTER,
|
MND_OPER_CONFIG_CLUSTER,
|
||||||
MND_OPER_BALANCE_VGROUP_LEADER,
|
MND_OPER_BALANCE_VGROUP_LEADER,
|
||||||
|
MND_OPER_CREATE_ANODE,
|
||||||
|
MND_OPER_UPDATE_ANODE,
|
||||||
|
MND_OPER_DROP_ANODE
|
||||||
} EOperType;
|
} EOperType;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -232,6 +235,24 @@ typedef struct {
|
||||||
char machineId[TSDB_MACHINE_ID_LEN + 1];
|
char machineId[TSDB_MACHINE_ID_LEN + 1];
|
||||||
} SDnodeObj;
|
} SDnodeObj;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t nameLen;
|
||||||
|
char* name;
|
||||||
|
} SAnodeAlgo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t id;
|
||||||
|
int64_t createdTime;
|
||||||
|
int64_t updateTime;
|
||||||
|
int32_t version;
|
||||||
|
int32_t urlLen;
|
||||||
|
int32_t numOfAlgos;
|
||||||
|
int32_t status;
|
||||||
|
SRWLatch lock;
|
||||||
|
char* url;
|
||||||
|
SArray** algos;
|
||||||
|
} SAnodeObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t id;
|
int32_t id;
|
||||||
int64_t createdTime;
|
int64_t createdTime;
|
||||||
|
|
|
@ -133,6 +133,8 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
|
||||||
|
|
||||||
int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
||||||
|
|
||||||
|
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
||||||
|
|
||||||
int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
||||||
|
|
||||||
int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
||||||
|
@ -159,6 +161,8 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
|
||||||
|
|
||||||
int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
||||||
|
|
||||||
|
int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
||||||
|
|
||||||
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
@ -190,6 +194,9 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdSt
|
||||||
int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList);
|
int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList);
|
||||||
void * getOperatorParam(int32_t opType, SOperatorParam* param, int32_t idx);
|
void * getOperatorParam(int32_t opType, SOperatorParam* param, int32_t idx);
|
||||||
|
|
||||||
|
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId);
|
||||||
|
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -44,22 +44,6 @@ static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
|
||||||
static void destroyEWindowOperatorInfo(void* param);
|
static void destroyEWindowOperatorInfo(void* param);
|
||||||
static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
|
static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
|
||||||
|
|
||||||
// todo : move to util
|
|
||||||
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
|
|
||||||
uint64_t groupId) {
|
|
||||||
pRowSup->startRowIndex = rowIndex;
|
|
||||||
pRowSup->numOfRows = 0;
|
|
||||||
pRowSup->win.skey = tsList[rowIndex];
|
|
||||||
pRowSup->groupId = groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
|
|
||||||
pRowSup->win.ekey = ts;
|
|
||||||
pRowSup->prevTs = ts;
|
|
||||||
pRowSup->numOfRows += 1;
|
|
||||||
pRowSup->groupId = groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
|
int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
|
||||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||||
QRY_PARAM_CHECK(pOptrInfo);
|
QRY_PARAM_CHECK(pOptrInfo);
|
||||||
|
|
|
@ -89,14 +89,14 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
|
||||||
return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
|
return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
|
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
|
||||||
pRowSup->win.ekey = ts;
|
pRowSup->win.ekey = ts;
|
||||||
pRowSup->prevTs = ts;
|
pRowSup->prevTs = ts;
|
||||||
pRowSup->numOfRows += 1;
|
pRowSup->numOfRows += 1;
|
||||||
pRowSup->groupId = groupId;
|
pRowSup->groupId = groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
|
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
|
||||||
uint64_t groupId) {
|
uint64_t groupId) {
|
||||||
pRowSup->startRowIndex = rowIndex;
|
pRowSup->startRowIndex = rowIndex;
|
||||||
pRowSup->numOfRows = 0;
|
pRowSup->numOfRows = 0;
|
||||||
|
|
|
@ -138,6 +138,8 @@ int32_t diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
|
||||||
int32_t diffFunction(SqlFunctionCtx* pCtx);
|
int32_t diffFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t diffFunctionByRow(SArray* pCtx);
|
int32_t diffFunctionByRow(SArray* pCtx);
|
||||||
|
|
||||||
|
bool getForecastConfEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||||
|
|
||||||
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
|
int32_t derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
|
||||||
int32_t derivativeFunction(SqlFunctionCtx* pCtx);
|
int32_t derivativeFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
|
@ -58,6 +58,7 @@ extern "C" {
|
||||||
#define FUNC_MGT_TSMA_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(29)
|
#define FUNC_MGT_TSMA_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(29)
|
||||||
#define FUNC_MGT_COUNT_LIKE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(30) // funcs that should also return 0 when no rows found
|
#define FUNC_MGT_COUNT_LIKE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(30) // funcs that should also return 0 when no rows found
|
||||||
#define FUNC_MGT_PROCESS_BY_ROW FUNC_MGT_FUNC_CLASSIFICATION_MASK(31)
|
#define FUNC_MGT_PROCESS_BY_ROW FUNC_MGT_FUNC_CLASSIFICATION_MASK(31)
|
||||||
|
#define FUNC_MGT_FORECAST_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(32)
|
||||||
|
|
||||||
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||||
|
|
||||||
|
|
|
@ -154,6 +154,7 @@ SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode*
|
||||||
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr);
|
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr);
|
||||||
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond);
|
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond);
|
||||||
SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pCountToken, const SToken* pSlidingToken);
|
SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pCountToken, const SToken* pSlidingToken);
|
||||||
|
SNode* createAnomalyWindowNode(SAstCreateContext* pCxt, SNode* pExpr, const SToken* pFuncOpt);
|
||||||
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,
|
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,
|
||||||
SNode* pFill);
|
SNode* pFill);
|
||||||
SNode* createWindowOffsetNode(SAstCreateContext* pCxt, SNode* pStartOffset, SNode* pEndOffset);
|
SNode* createWindowOffsetNode(SAstCreateContext* pCxt, SNode* pStartOffset, SNode* pEndOffset);
|
||||||
|
@ -251,6 +252,9 @@ SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName);
|
||||||
SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const SToken* pPort);
|
SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const SToken* pPort);
|
||||||
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, bool force, bool unsafe);
|
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, bool force, bool unsafe);
|
||||||
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue);
|
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue);
|
||||||
|
SNode* createCreateAnodeStmt(SAstCreateContext* pCxt, const SToken* pUrl);
|
||||||
|
SNode* createDropAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode);
|
||||||
|
SNode* createUpdateAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode, bool updateAll);
|
||||||
SNode* createEncryptKeyStmt(SAstCreateContext* pCxt, const SToken* pValue);
|
SNode* createEncryptKeyStmt(SAstCreateContext* pCxt, const SToken* pValue);
|
||||||
SNode* createRealTableNodeForIndexName(SAstCreateContext* pCxt, SToken* pDbName, SToken* pIndexName);
|
SNode* createRealTableNodeForIndexName(SAstCreateContext* pCxt, SToken* pDbName, SToken* pIndexName);
|
||||||
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SNode* pIndexName,
|
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SNode* pIndexName,
|
||||||
|
|
|
@ -3972,6 +3972,10 @@ int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
|
||||||
return nonCalcScalarFunction(pInput, inputNum, pOutput);
|
return nonCalcScalarFunction(pInput, inputNum, pOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t forecastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
return nonCalcScalarFunction(pInput, inputNum, pOutput);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t twaScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
return avgScalarFunction(pInput, inputNum, pOutput);
|
return avgScalarFunction(pInput, inputNum, pOutput);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,737 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "tanal.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
#include "ttypes.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
|
#ifdef USE_ANAL
|
||||||
|
#include <curl/curl.h>
|
||||||
|
#define ANAL_ALGO_SPLIT ","
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t ver;
|
||||||
|
SHashObj *hash; // algoname:algotype -> SAnalUrl
|
||||||
|
TdThreadMutex lock;
|
||||||
|
} SAlgoMgmt;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char *data;
|
||||||
|
int64_t dataLen;
|
||||||
|
} SCurlResp;
|
||||||
|
|
||||||
|
static SAlgoMgmt tsAlgos = {0};
|
||||||
|
static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen);
|
||||||
|
|
||||||
|
const char *taosAnalAlgoStr(EAnalAlgoType type) {
|
||||||
|
switch (type) {
|
||||||
|
case ANAL_ALGO_TYPE_ANOMALY_DETECT:
|
||||||
|
return "anomaly-detection";
|
||||||
|
case ANAL_ALGO_TYPE_FORECAST:
|
||||||
|
return "forecast";
|
||||||
|
default:
|
||||||
|
return "unknown";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const char *taosAnalAlgoUrlStr(EAnalAlgoType type) {
|
||||||
|
switch (type) {
|
||||||
|
case ANAL_ALGO_TYPE_ANOMALY_DETECT:
|
||||||
|
return "anomaly-detect";
|
||||||
|
case ANAL_ALGO_TYPE_FORECAST:
|
||||||
|
return "forecast";
|
||||||
|
default:
|
||||||
|
return "unknown";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
EAnalAlgoType taosAnalAlgoInt(const char *name) {
|
||||||
|
for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) {
|
||||||
|
if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ANAL_ALGO_TYPE_END;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalInit() {
|
||||||
|
if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
|
||||||
|
uError("failed to init curl");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsAlgos.ver = 0;
|
||||||
|
taosThreadMutexInit(&tsAlgos.lock, NULL);
|
||||||
|
tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
|
if (tsAlgos.hash == NULL) {
|
||||||
|
uError("failed to init algo hash");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
uInfo("analysis env is initialized");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void taosAnalFreeHash(SHashObj *hash) {
|
||||||
|
void *pIter = taosHashIterate(hash, NULL);
|
||||||
|
while (pIter != NULL) {
|
||||||
|
SAnalUrl *pUrl = (SAnalUrl *)pIter;
|
||||||
|
taosMemoryFree(pUrl->url);
|
||||||
|
pIter = taosHashIterate(hash, pIter);
|
||||||
|
}
|
||||||
|
taosHashCleanup(hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosAnalCleanup() {
|
||||||
|
curl_global_cleanup();
|
||||||
|
taosThreadMutexDestroy(&tsAlgos.lock);
|
||||||
|
taosAnalFreeHash(tsAlgos.hash);
|
||||||
|
tsAlgos.hash = NULL;
|
||||||
|
uInfo("analysis env is cleaned up");
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {
|
||||||
|
if (newVer > tsAlgos.ver) {
|
||||||
|
taosThreadMutexLock(&tsAlgos.lock);
|
||||||
|
SHashObj *hash = tsAlgos.hash;
|
||||||
|
tsAlgos.ver = newVer;
|
||||||
|
tsAlgos.hash = pHash;
|
||||||
|
taosThreadMutexUnlock(&tsAlgos.lock);
|
||||||
|
taosAnalFreeHash(hash);
|
||||||
|
} else {
|
||||||
|
taosAnalFreeHash(pHash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) {
|
||||||
|
char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0};
|
||||||
|
int32_t bufLen = snprintf(buf, sizeof(buf), "%s=", optName);
|
||||||
|
|
||||||
|
char *pos1 = strstr(option, buf);
|
||||||
|
char *pos2 = strstr(option, ANAL_ALGO_SPLIT);
|
||||||
|
if (pos1 != NULL) {
|
||||||
|
if (optMaxLen > 0) {
|
||||||
|
int32_t copyLen = optMaxLen;
|
||||||
|
if (pos2 != NULL) {
|
||||||
|
copyLen = (int32_t)(pos2 - pos1 - strlen(optName) + 1);
|
||||||
|
copyLen = MIN(copyLen, optMaxLen);
|
||||||
|
}
|
||||||
|
tstrncpy(optValue, pos1 + bufLen, copyLen);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue) {
|
||||||
|
char buf[TSDB_ANAL_ALGO_OPTION_LEN] = {0};
|
||||||
|
int32_t bufLen = snprintf(buf, sizeof(buf), "%s=", optName);
|
||||||
|
|
||||||
|
char *pos1 = strstr(option, buf);
|
||||||
|
char *pos2 = strstr(option, ANAL_ALGO_SPLIT);
|
||||||
|
if (pos1 != NULL) {
|
||||||
|
*optValue = taosStr2Int32(pos1 + bufLen + 1, NULL, 10);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) {
|
||||||
|
int32_t code = 0;
|
||||||
|
char name[TSDB_ANAL_ALGO_KEY_LEN] = {0};
|
||||||
|
int32_t nameLen = 1 + snprintf(name, sizeof(name) - 1, "%d:%s", type, algoName);
|
||||||
|
|
||||||
|
taosThreadMutexLock(&tsAlgos.lock);
|
||||||
|
SAnalUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen);
|
||||||
|
if (pUrl != NULL) {
|
||||||
|
tstrncpy(url, pUrl->url, urlLen);
|
||||||
|
uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url);
|
||||||
|
} else {
|
||||||
|
url[0] = 0;
|
||||||
|
terrno = TSDB_CODE_ANAL_ALGO_NOT_FOUND;
|
||||||
|
code = terrno;
|
||||||
|
uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type));
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&tsAlgos.lock);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t taosAnalGetVersion() { return tsAlgos.ver; }
|
||||||
|
|
||||||
|
static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) {
|
||||||
|
SCurlResp *pRsp = userdata;
|
||||||
|
if (contLen == 0 || nmemb == 0 || pCont == NULL) {
|
||||||
|
pRsp->dataLen = 0;
|
||||||
|
pRsp->data = NULL;
|
||||||
|
uError("curl response is received, len:%" PRId64, pRsp->dataLen);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRsp->dataLen = (int64_t)contLen * (int64_t)nmemb;
|
||||||
|
pRsp->data = taosMemoryMalloc(pRsp->dataLen + 1);
|
||||||
|
|
||||||
|
if (pRsp->data != NULL) {
|
||||||
|
(void)memcpy(pRsp->data, pCont, pRsp->dataLen);
|
||||||
|
pRsp->data[pRsp->dataLen] = 0;
|
||||||
|
uDebug("curl response is received, len:%" PRId64 ", content:%s", pRsp->dataLen, pRsp->data);
|
||||||
|
return pRsp->dataLen;
|
||||||
|
} else {
|
||||||
|
pRsp->dataLen = 0;
|
||||||
|
uError("failed to malloc curl response");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) {
|
||||||
|
CURL *curl = NULL;
|
||||||
|
CURLcode code = 0;
|
||||||
|
|
||||||
|
curl = curl_easy_init();
|
||||||
|
if (curl == NULL) {
|
||||||
|
uError("failed to create curl handle");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
curl_easy_setopt(curl, CURLOPT_URL, url);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100);
|
||||||
|
|
||||||
|
uDebug("curl get request will sent, url:%s", url);
|
||||||
|
code = curl_easy_perform(curl);
|
||||||
|
if (code != CURLE_OK) {
|
||||||
|
uError("failed to perform curl action, code:%d", code);
|
||||||
|
}
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
if (curl != NULL) curl_easy_cleanup(curl);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) {
|
||||||
|
struct curl_slist *headers = NULL;
|
||||||
|
CURL *curl = NULL;
|
||||||
|
CURLcode code = 0;
|
||||||
|
|
||||||
|
curl = curl_easy_init();
|
||||||
|
if (curl == NULL) {
|
||||||
|
uError("failed to create curl handle");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8");
|
||||||
|
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_URL, url);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_POST, 1);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf);
|
||||||
|
|
||||||
|
uDebug("curl post request will sent, url:%s len:%d", url, bufLen);
|
||||||
|
code = curl_easy_perform(curl);
|
||||||
|
if (code != CURLE_OK) {
|
||||||
|
uError("failed to perform curl action, code:%d", code);
|
||||||
|
}
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
if (curl != NULL) {
|
||||||
|
curl_slist_free_all(headers);
|
||||||
|
curl_easy_cleanup(curl);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) {
|
||||||
|
int32_t code = -1;
|
||||||
|
char *pCont = NULL;
|
||||||
|
int64_t contentLen;
|
||||||
|
SJson *pJson = NULL;
|
||||||
|
SCurlResp curlRsp = {0};
|
||||||
|
|
||||||
|
if (type == ANAL_HTTP_TYPE_GET) {
|
||||||
|
if (taosCurlGetRequest(url, &curlRsp) != 0) {
|
||||||
|
terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
code = taosAnalBufGetCont(pBuf, &pCont, &contentLen);
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = code;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) {
|
||||||
|
terrno = TSDB_CODE_ANAL_URL_CANT_ACCESS;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (curlRsp.data == NULL || curlRsp.dataLen == 0) {
|
||||||
|
terrno = TSDB_CODE_ANAL_URL_RSP_IS_NULL;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pJson = tjsonParse(curlRsp.data);
|
||||||
|
if (pJson == NULL) {
|
||||||
|
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data);
|
||||||
|
if (pCont != NULL) taosMemoryFree(pCont);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int64_t contLen;
|
||||||
|
char *pCont = NULL;
|
||||||
|
TdFilePtr pFile = NULL;
|
||||||
|
|
||||||
|
pFile = taosOpenFile(fileName, TD_FILE_READ);
|
||||||
|
if (pFile == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = taosFStatFile(pFile, &contLen, NULL);
|
||||||
|
if (code != 0) goto _OVER;
|
||||||
|
|
||||||
|
pCont = taosMemoryMalloc(contLen + 1);
|
||||||
|
if (pCont == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosReadFile(pFile, pCont, contLen) != contLen) {
|
||||||
|
code = terrno;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCont[contLen] = '\0';
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
if (code == 0) {
|
||||||
|
*ppCont = pCont;
|
||||||
|
*pContLen = contLen;
|
||||||
|
} else {
|
||||||
|
if (pCont != NULL) taosMemoryFree(pCont);
|
||||||
|
}
|
||||||
|
if (pFile != NULL) taosCloseFile(&pFile);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) {
|
||||||
|
char buf[64] = {0};
|
||||||
|
int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal);
|
||||||
|
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) {
|
||||||
|
char buf[128] = {0};
|
||||||
|
int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal);
|
||||||
|
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) {
|
||||||
|
char buf[128] = {0};
|
||||||
|
int32_t bufLen = snprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal);
|
||||||
|
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteStr(SAnalBuf *pBuf, const char *buf, int32_t bufLen) {
|
||||||
|
if (bufLen <= 0) {
|
||||||
|
bufLen = strlen(buf);
|
||||||
|
}
|
||||||
|
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteStart(SAnalBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); }
|
||||||
|
|
||||||
|
static int32_t tsosAnalJsonBufOpen(SAnalBuf *pBuf, int32_t numOfCols) {
|
||||||
|
pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
||||||
|
if (pBuf->filePtr == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalColBuf));
|
||||||
|
if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
pBuf->numOfCols = numOfCols;
|
||||||
|
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON) {
|
||||||
|
return taosAnalJsonBufWriteStart(pBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SAnalColBuf *pCol = &pBuf->pCols[i];
|
||||||
|
snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i);
|
||||||
|
pCol->filePtr =
|
||||||
|
taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
||||||
|
if (pCol->filePtr == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return taosAnalJsonBufWriteStart(pBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
|
||||||
|
char buf[128] = {0};
|
||||||
|
bool first = (colIndex == 0);
|
||||||
|
bool last = (colIndex == pBuf->numOfCols - 1);
|
||||||
|
|
||||||
|
if (first) {
|
||||||
|
if (taosAnalJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t bufLen = snprintf(buf, sizeof(buf), " [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name,
|
||||||
|
tDataTypes[colType].bytes, last ? "" : ",");
|
||||||
|
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (last) {
|
||||||
|
if (taosAnalJsonBufWriteStr(pBuf, "],\n", 0) != 0) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteDataBegin(SAnalBuf *pBuf) {
|
||||||
|
return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteStrUseCol(SAnalBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) {
|
||||||
|
if (bufLen <= 0) {
|
||||||
|
bufLen = strlen(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON) {
|
||||||
|
if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen) != bufLen) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) {
|
||||||
|
return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) {
|
||||||
|
if (colIndex == pBuf->numOfCols - 1) {
|
||||||
|
return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
return taosAnalJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
|
||||||
|
char buf[64];
|
||||||
|
int32_t bufLen = 0;
|
||||||
|
|
||||||
|
if (pBuf->pCols[colIndex].numOfRows != 0) {
|
||||||
|
buf[bufLen] = ',';
|
||||||
|
buf[bufLen + 1] = '\n';
|
||||||
|
buf[bufLen + 2] = 0;
|
||||||
|
bufLen += 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (colType) {
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
|
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UINT:
|
||||||
|
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
|
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64 "", *(int64_t *)colValue);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
|
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64 "", *(uint64_t *)colValue);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
|
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue));
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
|
bufLen += snprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
buf[bufLen] = '\0';
|
||||||
|
}
|
||||||
|
|
||||||
|
pBuf->pCols[colIndex].numOfRows++;
|
||||||
|
return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteDataEnd(SAnalBuf *pBuf) {
|
||||||
|
int32_t code = 0;
|
||||||
|
char *pCont = NULL;
|
||||||
|
int64_t contLen = 0;
|
||||||
|
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
|
||||||
|
SAnalColBuf *pCol = &pBuf->pCols[i];
|
||||||
|
|
||||||
|
code = taosFsyncFile(pCol->filePtr);
|
||||||
|
if (code != 0) return code;
|
||||||
|
|
||||||
|
code = taosCloseFile(&pCol->filePtr);
|
||||||
|
if (code != 0) return code;
|
||||||
|
|
||||||
|
code = taosAnalJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen);
|
||||||
|
if (code != 0) return code;
|
||||||
|
|
||||||
|
code = taosAnalJsonBufWriteStr(pBuf, pCont, contLen);
|
||||||
|
if (code != 0) return code;
|
||||||
|
|
||||||
|
taosMemoryFreeClear(pCont);
|
||||||
|
contLen = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return taosAnalJsonBufWriteStr(pBuf, "],\n", 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalJsonBufWriteEnd(SAnalBuf *pBuf) {
|
||||||
|
int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows);
|
||||||
|
if (code != 0) return code;
|
||||||
|
|
||||||
|
return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalJsonBufClose(SAnalBuf *pBuf) {
|
||||||
|
int32_t code = taosAnalJsonBufWriteEnd(pBuf);
|
||||||
|
if (code != 0) return code;
|
||||||
|
|
||||||
|
if (pBuf->filePtr != NULL) {
|
||||||
|
code = taosFsyncFile(pBuf->filePtr);
|
||||||
|
if (code != 0) return code;
|
||||||
|
code = taosCloseFile(&pBuf->filePtr);
|
||||||
|
if (code != 0) return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
|
||||||
|
SAnalColBuf *pCol = &pBuf->pCols[i];
|
||||||
|
if (pCol->filePtr != NULL) {
|
||||||
|
code = taosFsyncFile(pCol->filePtr);
|
||||||
|
if (code != 0) return code;
|
||||||
|
code = taosCloseFile(&pCol->filePtr);
|
||||||
|
if (code != 0) return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosAnalBufDestroy(SAnalBuf *pBuf) {
|
||||||
|
if (pBuf->fileName[0] != 0) {
|
||||||
|
if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr);
|
||||||
|
// taosRemoveFile(pBuf->fileName);
|
||||||
|
pBuf->fileName[0] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
|
||||||
|
SAnalColBuf *pCol = &pBuf->pCols[i];
|
||||||
|
if (pCol->fileName[0] != 0) {
|
||||||
|
if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr);
|
||||||
|
taosRemoveFile(pCol->fileName);
|
||||||
|
pCol->fileName[0] = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFreeClear(pBuf->pCols);
|
||||||
|
pBuf->numOfCols = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) {
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return tsosAnalJsonBufOpen(pBuf, numOfCols);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) {
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) {
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) {
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) {
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return taosAnalJsonBufWriteDataBegin(pBuf);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) {
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return taosAnalJsonBufWriteColBegin(pBuf, colIndex);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) {
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return taosAnalJsonBufWriteColEnd(pBuf, colIndex);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) {
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return taosAnalJsonBufWriteDataEnd(pBuf);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosAnalBufClose(SAnalBuf *pBuf) {
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return taosAnalJsonBufClose(pBuf);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t taosAnalBufGetCont(SAnalBuf *pBuf, char **ppCont, int64_t *pContLen) {
|
||||||
|
*ppCont = NULL;
|
||||||
|
*pContLen = 0;
|
||||||
|
|
||||||
|
if (pBuf->bufType == ANAL_BUF_TYPE_JSON || pBuf->bufType == ANAL_BUF_TYPE_JSON_COL) {
|
||||||
|
return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_ANAL_BUF_INVALID_TYPE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
int32_t taosAnalInit() { return 0; }
|
||||||
|
void taosAnalCleanup() {}
|
||||||
|
SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalBuf *pBuf) { return NULL; }
|
||||||
|
|
||||||
|
int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }
|
||||||
|
bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }
|
||||||
|
bool taosAnalGetOptInt(const char *option, const char *optName, int32_t *optValue) { return true; }
|
||||||
|
int64_t taosAnalGetVersion() { return 0; }
|
||||||
|
void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {}
|
||||||
|
|
||||||
|
int32_t tsosAnalBufOpen(SAnalBuf *pBuf, int32_t numOfCols) { return 0; }
|
||||||
|
int32_t taosAnalBufWriteOptStr(SAnalBuf *pBuf, const char *optName, const char *optVal) { return 0; }
|
||||||
|
int32_t taosAnalBufWriteOptInt(SAnalBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
|
||||||
|
int32_t taosAnalBufWriteOptFloat(SAnalBuf *pBuf, const char *optName, float optVal) { return 0; }
|
||||||
|
int32_t taosAnalBufWriteColMeta(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; }
|
||||||
|
int32_t taosAnalBufWriteDataBegin(SAnalBuf *pBuf) { return 0; }
|
||||||
|
int32_t taosAnalBufWriteColBegin(SAnalBuf *pBuf, int32_t colIndex) { return 0; }
|
||||||
|
int32_t taosAnalBufWriteColData(SAnalBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
|
||||||
|
int32_t taosAnalBufWriteColEnd(SAnalBuf *pBuf, int32_t colIndex) { return 0; }
|
||||||
|
int32_t taosAnalBufWriteDataEnd(SAnalBuf *pBuf) { return 0; }
|
||||||
|
int32_t taosAnalBufClose(SAnalBuf *pBuf) { return 0; }
|
||||||
|
void taosAnalBufDestroy(SAnalBuf *pBuf) {}
|
||||||
|
|
||||||
|
const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; }
|
||||||
|
EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; }
|
||||||
|
const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; }
|
||||||
|
|
||||||
|
#endif
|
|
@ -345,6 +345,21 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB, "Stream temporarily do
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream")
|
||||||
|
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_ALREADY_EXIST, "Anode already exists")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_NOT_EXIST, "Anode not there")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_URL, "Anode too long url")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_INVALID_PROTOCOL, "Anode invalid protocol")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_INVALID_VERSION, "Anode invalid version")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO, "Anode too many algorithm")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_LONG_ALGO_NAME, "Anode too long algorithm name")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ANODE_TOO_MANY_ALGO_TYPE, "Anode too many algorithm type")
|
||||||
|
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_RSP_IS_NULL, "Analysis url response is NULL")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_URL_CANT_ACCESS, "Analysis url can't access")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_FOUND, "Analysis algorithm not found")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_ALGO_NOT_LOAD, "Analysis algorithm not loaded")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_ANAL_BUF_INVALID_TYPE, "Analysis invalid buffer type")
|
||||||
|
|
||||||
// mnode-sma
|
// mnode-sma
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "sma not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "sma not exist")
|
||||||
|
@ -709,6 +724,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicat
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_TYPE, "ANOMALY_WINDOW only support mathable column")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_COL, "ANOMALY_WINDOW not support on tag column")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT, "ANOMALY_WINDOW option should include algo field")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE, "Invalid forecast clause")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
|
||||||
|
|
||||||
//planner
|
//planner
|
||||||
|
|
|
@ -194,6 +194,10 @@ int32_t tjsonGetObjectValueString(const SJson* pJson, char** pValueString) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tjsonGetObjectValueBigInt(const SJson* pJson, int64_t* pVal) { *pVal = (int64_t)((cJSON*)pJson)->valuedouble; }
|
||||||
|
|
||||||
|
void tjsonGetObjectValueDouble(const SJson* pJson, double* pVal) { *pVal = ((cJSON*)pJson)->valuedouble; }
|
||||||
|
|
||||||
int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal) {
|
int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal) {
|
||||||
char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName));
|
char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName));
|
||||||
if (NULL == p) {
|
if (NULL == p) {
|
||||||
|
|
Loading…
Reference in New Issue