Merge remote-tracking branch 'origin/3.0' into feature/tfs
This commit is contained in:
commit
1f01574792
|
@ -1527,7 +1527,6 @@ typedef struct SMqSetCVgReq {
|
||||||
SArray* tasks; // SArray<SSubQueryMsg>
|
SArray* tasks; // SArray<SSubQueryMsg>
|
||||||
} SMqSetCVgReq;
|
} SMqSetCVgReq;
|
||||||
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
||||||
|
@ -1552,6 +1551,39 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct SMqSetCVgRsp {
|
||||||
|
int32_t vgId;
|
||||||
|
int64_t consumerId;
|
||||||
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
|
} SMqSetCVgRsp;
|
||||||
|
|
||||||
|
typedef struct SMqCVConsumeReq {
|
||||||
|
int64_t reqId;
|
||||||
|
int64_t offset;
|
||||||
|
int64_t consumerId;
|
||||||
|
int64_t blockingTime;
|
||||||
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
|
} SMqCVConsumeReq;
|
||||||
|
|
||||||
|
typedef struct SMqConsumeRspBlock {
|
||||||
|
int32_t bodyLen;
|
||||||
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
char body[];
|
||||||
|
} SMqConsumeRspBlock;
|
||||||
|
|
||||||
|
typedef struct SMqCVConsumeRsp {
|
||||||
|
int64_t reqId;
|
||||||
|
int64_t clientId;
|
||||||
|
int64_t committedOffset;
|
||||||
|
int64_t receiveOffset;
|
||||||
|
int64_t rspOffset;
|
||||||
|
int32_t skipLogNum;
|
||||||
|
int32_t bodyLen;
|
||||||
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
SMqConsumeRspBlock blocks[];
|
||||||
|
} SMqCvConsumeRsp;
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -160,7 +160,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
|
||||||
|
@ -175,6 +175,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL)
|
||||||
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
|
||||||
|
|
||||||
|
|
||||||
// Requests handled by QNODE
|
// Requests handled by QNODE
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_FUNCTION_MGT_H_
|
||||||
|
#define _TD_FUNCTION_MGT_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "nodes.h"
|
||||||
|
|
||||||
|
struct SQLFunctionCtx;
|
||||||
|
struct SResultRowEntryInfo;
|
||||||
|
struct STimeWindow;
|
||||||
|
|
||||||
|
typedef struct SFuncExecEnv {
|
||||||
|
int32_t calcMemSize;
|
||||||
|
} SFuncExecEnv;
|
||||||
|
|
||||||
|
typedef void* FuncMgtHandle;
|
||||||
|
typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
typedef bool (*FExecInit)(struct SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
|
||||||
|
typedef void (*FExecProcess)(struct SQLFunctionCtx *pCtx);
|
||||||
|
typedef void (*FExecFinalize)(struct SQLFunctionCtx *pCtx);
|
||||||
|
|
||||||
|
typedef struct SFuncExecFuncs {
|
||||||
|
FExecGetEnv getEnv;
|
||||||
|
FExecInit init;
|
||||||
|
FExecProcess process;
|
||||||
|
FExecFinalize finalize;
|
||||||
|
} SFuncExecFuncs;
|
||||||
|
|
||||||
|
int32_t fmFuncMgtInit();
|
||||||
|
|
||||||
|
int32_t fmGetHandle(FuncMgtHandle* pHandle);
|
||||||
|
|
||||||
|
int32_t fmGetFuncId(FuncMgtHandle handle, const char* name);
|
||||||
|
int32_t fmGetFuncResultType(FuncMgtHandle handle, SFunctionNode* pFunc);
|
||||||
|
bool fmIsAggFunc(int32_t funcId);
|
||||||
|
bool fmIsStringFunc(int32_t funcId);
|
||||||
|
bool fmIsTimestampFunc(int32_t funcId);
|
||||||
|
bool fmIsTimelineFunc(int32_t funcId);
|
||||||
|
bool fmIsTimeorderFunc(int32_t funcId);
|
||||||
|
bool fmIsNonstandardSQLFunc(int32_t funcId);
|
||||||
|
int32_t fmFuncScanType(int32_t funcId);
|
||||||
|
|
||||||
|
int32_t fmGetFuncExecFuncs(FuncMgtHandle handle, int32_t funcId, SFuncExecFuncs* pFpSet);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // _TD_FUNCTION_MGT_H_
|
|
@ -80,8 +80,8 @@ typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2 } EWal
|
||||||
|
|
||||||
typedef struct SWalReadHead {
|
typedef struct SWalReadHead {
|
||||||
int8_t headVer;
|
int8_t headVer;
|
||||||
uint8_t msgType;
|
int16_t msgType;
|
||||||
int8_t reserved[2];
|
int8_t reserved;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int64_t ingestTs; // not implemented
|
int64_t ingestTs; // not implemented
|
||||||
int64_t version;
|
int64_t version;
|
||||||
|
|
|
@ -0,0 +1,271 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_NODES_H_
|
||||||
|
#define _TD_NODES_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "tarray.h"
|
||||||
|
#include "tdef.h"
|
||||||
|
|
||||||
|
typedef enum ENodeType {
|
||||||
|
QUERY_NODE_COLUMN = 1,
|
||||||
|
QUERY_NODE_VALUE,
|
||||||
|
QUERY_NODE_OPERATOR,
|
||||||
|
QUERY_NODE_LOGIC_CONDITION,
|
||||||
|
QUERY_NODE_IS_NULL_CONDITION,
|
||||||
|
QUERY_NODE_FUNCTION,
|
||||||
|
QUERY_NODE_REAL_TABLE,
|
||||||
|
QUERY_NODE_TEMP_TABLE,
|
||||||
|
QUERY_NODE_JOIN_TABLE,
|
||||||
|
QUERY_NODE_GROUPING_SET,
|
||||||
|
QUERY_NODE_ORDER_BY_EXPR,
|
||||||
|
QUERY_NODE_STATE_WINDOW,
|
||||||
|
QUERY_NODE_SESSION_WINDOW,
|
||||||
|
QUERY_NODE_INTERVAL_WINDOW,
|
||||||
|
|
||||||
|
QUERY_NODE_SET_OPERATOR,
|
||||||
|
QUERY_NODE_SELECT_STMT
|
||||||
|
} ENodeType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The first field of a node of any type is guaranteed to be the ENodeType.
|
||||||
|
* Hence the type of any node can be gotten by casting it to SNode.
|
||||||
|
*/
|
||||||
|
typedef struct SNode {
|
||||||
|
ENodeType type;
|
||||||
|
} SNode;
|
||||||
|
|
||||||
|
#define nodeType(nodeptr) (((const SNode*)(nodeptr))->type)
|
||||||
|
|
||||||
|
typedef struct SDataType {
|
||||||
|
uint8_t type;
|
||||||
|
uint8_t precision;
|
||||||
|
uint8_t scale;
|
||||||
|
int32_t bytes;
|
||||||
|
} SDataType;
|
||||||
|
|
||||||
|
typedef struct SExprNode {
|
||||||
|
ENodeType nodeType;
|
||||||
|
SDataType resType;
|
||||||
|
char aliasName[TSDB_COL_NAME_LEN];
|
||||||
|
} SExprNode;
|
||||||
|
|
||||||
|
typedef enum EColumnType {
|
||||||
|
COLUMN_TYPE_COLUMN = 1,
|
||||||
|
COLUMN_TYPE_TAG
|
||||||
|
} EColumnType;
|
||||||
|
|
||||||
|
typedef struct SColumnNode {
|
||||||
|
SExprNode node; // QUERY_NODE_COLUMN
|
||||||
|
int16_t colId;
|
||||||
|
EColumnType colType; // column or tag
|
||||||
|
char dbName[TSDB_DB_NAME_LEN];
|
||||||
|
char tableName[TSDB_TABLE_NAME_LEN];
|
||||||
|
char colName[TSDB_COL_NAME_LEN];
|
||||||
|
} SColumnNode;
|
||||||
|
|
||||||
|
typedef struct SValueNode {
|
||||||
|
SExprNode type; // QUERY_NODE_VALUE
|
||||||
|
char* literal;
|
||||||
|
} SValueNode;
|
||||||
|
|
||||||
|
typedef enum EOperatorType {
|
||||||
|
// arithmetic operator
|
||||||
|
OP_TYPE_ADD = 1,
|
||||||
|
OP_TYPE_SUB,
|
||||||
|
OP_TYPE_MULTI,
|
||||||
|
OP_TYPE_DIV,
|
||||||
|
OP_TYPE_MOD,
|
||||||
|
|
||||||
|
// comparison operator
|
||||||
|
OP_TYPE_GREATER_THAN,
|
||||||
|
OP_TYPE_GREATER_EQUAL,
|
||||||
|
OP_TYPE_LOWER_THAN,
|
||||||
|
OP_TYPE_LOWER_EQUAL,
|
||||||
|
OP_TYPE_EQUAL,
|
||||||
|
OP_TYPE_NOT_EQUAL,
|
||||||
|
OP_TYPE_IN,
|
||||||
|
OP_TYPE_NOT_IN,
|
||||||
|
OP_TYPE_LIKE,
|
||||||
|
OP_TYPE_NOT_LIKE,
|
||||||
|
OP_TYPE_MATCH,
|
||||||
|
OP_TYPE_NMATCH,
|
||||||
|
|
||||||
|
// json operator
|
||||||
|
OP_TYPE_JSON_GET_VALUE,
|
||||||
|
OP_TYPE_JSON_CONTAINS
|
||||||
|
} EOperatorType;
|
||||||
|
|
||||||
|
typedef struct SOperatorNode {
|
||||||
|
SExprNode type; // QUERY_NODE_OPERATOR
|
||||||
|
EOperatorType opType;
|
||||||
|
SNode* pLeft;
|
||||||
|
SNode* pRight;
|
||||||
|
} SOperatorNode;
|
||||||
|
|
||||||
|
typedef enum ELogicConditionType {
|
||||||
|
LOGIC_COND_TYPE_AND,
|
||||||
|
LOGIC_COND_TYPE_OR,
|
||||||
|
LOGIC_COND_TYPE_NOT,
|
||||||
|
} ELogicConditionType;
|
||||||
|
|
||||||
|
typedef struct SLogicConditionNode {
|
||||||
|
ENodeType type; // QUERY_NODE_LOGIC_CONDITION
|
||||||
|
ELogicConditionType condType;
|
||||||
|
SArray* pParameterList;
|
||||||
|
} SLogicConditionNode;
|
||||||
|
|
||||||
|
typedef struct SIsNullCondNode {
|
||||||
|
ENodeType type; // QUERY_NODE_IS_NULL_CONDITION
|
||||||
|
SNode* pExpr;
|
||||||
|
bool isNot;
|
||||||
|
} SIsNullCondNode;
|
||||||
|
|
||||||
|
typedef struct SFunctionNode {
|
||||||
|
SExprNode type; // QUERY_NODE_FUNCTION
|
||||||
|
char functionName[TSDB_FUNC_NAME_LEN];
|
||||||
|
int32_t funcId;
|
||||||
|
SArray* pParameterList; // SNode
|
||||||
|
} SFunctionNode;
|
||||||
|
|
||||||
|
typedef struct STableNode {
|
||||||
|
ENodeType type;
|
||||||
|
char tableName[TSDB_TABLE_NAME_LEN];
|
||||||
|
char tableAliasName[TSDB_COL_NAME_LEN];
|
||||||
|
} STableNode;
|
||||||
|
|
||||||
|
typedef struct SRealTableNode {
|
||||||
|
STableNode type; // QUERY_NODE_REAL_TABLE
|
||||||
|
char dbName[TSDB_DB_NAME_LEN];
|
||||||
|
} SRealTableNode;
|
||||||
|
|
||||||
|
typedef struct STempTableNode {
|
||||||
|
STableNode type; // QUERY_NODE_TEMP_TABLE
|
||||||
|
SNode* pSubquery;
|
||||||
|
} STempTableNode;
|
||||||
|
|
||||||
|
typedef enum EJoinType {
|
||||||
|
JOIN_TYPE_INNER = 1
|
||||||
|
} EJoinType;
|
||||||
|
|
||||||
|
typedef struct SJoinTableNode {
|
||||||
|
STableNode type; // QUERY_NODE_JOIN_TABLE
|
||||||
|
EJoinType joinType;
|
||||||
|
SNode* pLeft;
|
||||||
|
SNode* pRight;
|
||||||
|
SNode* pOnCond;
|
||||||
|
} SJoinTableNode;
|
||||||
|
|
||||||
|
typedef enum EGroupingSetType {
|
||||||
|
GP_TYPE_NORMAL = 1
|
||||||
|
} EGroupingSetType;
|
||||||
|
|
||||||
|
typedef struct SGroupingSetNode {
|
||||||
|
ENodeType type; // QUERY_NODE_GROUPING_SET
|
||||||
|
EGroupingSetType groupingSetType;
|
||||||
|
SArray* pParameterList;
|
||||||
|
} SGroupingSetNode;
|
||||||
|
|
||||||
|
typedef enum EOrder {
|
||||||
|
ORDER_ASC = 1,
|
||||||
|
ORDER_DESC
|
||||||
|
} EOrder;
|
||||||
|
|
||||||
|
typedef enum ENullOrder {
|
||||||
|
NULL_ORDER_FIRST = 1,
|
||||||
|
NULL_ORDER_LAST
|
||||||
|
} ENullOrder;
|
||||||
|
|
||||||
|
typedef struct SOrderByExprNode {
|
||||||
|
ENodeType type; // QUERY_NODE_ORDER_BY_EXPR
|
||||||
|
SNode* pExpr;
|
||||||
|
EOrder order;
|
||||||
|
ENullOrder nullOrder;
|
||||||
|
} SOrderByExprNode;
|
||||||
|
|
||||||
|
typedef struct SLimitInfo {
|
||||||
|
uint64_t limit;
|
||||||
|
uint64_t offset;
|
||||||
|
} SLimitInfo;
|
||||||
|
|
||||||
|
typedef struct SStateWindowNode {
|
||||||
|
ENodeType type; // QUERY_NODE_STATE_WINDOW
|
||||||
|
SNode* pCol;
|
||||||
|
} SStateWindowNode;
|
||||||
|
|
||||||
|
typedef struct SSessionWindowNode {
|
||||||
|
ENodeType type; // QUERY_NODE_SESSION_WINDOW
|
||||||
|
int64_t gap; // gap between two session window(in microseconds)
|
||||||
|
SNode* pCol;
|
||||||
|
} SSessionWindowNode;
|
||||||
|
|
||||||
|
typedef struct SIntervalWindowNode {
|
||||||
|
ENodeType type; // QUERY_NODE_INTERVAL_WINDOW
|
||||||
|
int64_t interval;
|
||||||
|
int64_t sliding;
|
||||||
|
int64_t offset;
|
||||||
|
} SIntervalWindowNode;
|
||||||
|
|
||||||
|
typedef struct SSelectStmt {
|
||||||
|
ENodeType type; // QUERY_NODE_SELECT_STMT
|
||||||
|
bool isDistinct;
|
||||||
|
SArray* pProjectionList; // SNode
|
||||||
|
SNode* pFromTable;
|
||||||
|
SNode* pWhereCond;
|
||||||
|
SArray* pPartitionByList; // SNode
|
||||||
|
SNode* pWindowClause;
|
||||||
|
SArray* pGroupByList; // SGroupingSetNode
|
||||||
|
SArray* pOrderByList; // SOrderByExprNode
|
||||||
|
SLimitInfo limit;
|
||||||
|
SLimitInfo slimit;
|
||||||
|
} SSelectStmt;
|
||||||
|
|
||||||
|
typedef enum ESetOperatorType {
|
||||||
|
SET_OP_TYPE_UNION_ALL = 1
|
||||||
|
} ESetOperatorType;
|
||||||
|
|
||||||
|
typedef struct SSetOperator {
|
||||||
|
ENodeType type; // QUERY_NODE_SET_OPERATOR
|
||||||
|
ESetOperatorType opType;
|
||||||
|
SNode* pLeft;
|
||||||
|
SNode* pRight;
|
||||||
|
} SSetOperator;
|
||||||
|
|
||||||
|
typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext);
|
||||||
|
|
||||||
|
bool nodeArrayWalker(SArray* pArray, FQueryNodeWalker walker, void* pContext);
|
||||||
|
bool nodeTreeWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext);
|
||||||
|
|
||||||
|
bool stmtWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext);
|
||||||
|
|
||||||
|
bool nodeEqual(const SNode* a, const SNode* b);
|
||||||
|
|
||||||
|
void cloneNode(const SNode* pNode);
|
||||||
|
|
||||||
|
int32_t nodeToString(const SNode* pNode, char** pStr, int32_t* pLen);
|
||||||
|
int32_t stringToNode(const char* pStr, SNode** pNode);
|
||||||
|
|
||||||
|
bool isTimeorderQuery(const SNode* pQuery);
|
||||||
|
bool isTimelineQuery(const SNode* pQuery);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_TD_NODES_H_*/
|
|
@ -3,4 +3,5 @@ add_subdirectory(util)
|
||||||
add_subdirectory(common)
|
add_subdirectory(common)
|
||||||
add_subdirectory(libs)
|
add_subdirectory(libs)
|
||||||
add_subdirectory(client)
|
add_subdirectory(client)
|
||||||
add_subdirectory(dnode)
|
add_subdirectory(dnode)
|
||||||
|
add_subdirectory(nodes)
|
|
@ -55,8 +55,6 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
||||||
.deleteFp = (SdbDeleteFp)mndSubActionDelete};
|
.deleteFp = (SdbDeleteFp)mndSubActionDelete};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
|
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
|
@ -95,7 +93,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
|
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
|
||||||
pCEp->consumerId = consumerId;
|
pCEp->consumerId = consumerId;
|
||||||
taosArrayPush(pSub->assigned, pCEp);
|
taosArrayPush(pSub->assigned, pCEp);
|
||||||
pSub->nextConsumerIdx++;
|
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
|
||||||
|
|
||||||
// build msg
|
// build msg
|
||||||
SMqSetCVgReq req = {
|
SMqSetCVgReq req = {
|
||||||
|
@ -464,7 +462,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
taosArrayPush(pSub->availConsumer, &consumerId);
|
taosArrayPush(pSub->availConsumer, &consumerId);
|
||||||
|
|
||||||
//TODO: no need
|
// TODO: no need
|
||||||
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
|
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
|
||||||
taosArrayPush(pConsumer->topics, pConsumerTopic);
|
taosArrayPush(pConsumer->topics, pConsumerTopic);
|
||||||
|
|
||||||
|
@ -542,7 +540,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; }
|
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {
|
||||||
|
mndTransProcessRsp(pRsp);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
|
|
|
@ -25,6 +25,7 @@ target_link_libraries(
|
||||||
PUBLIC bdb
|
PUBLIC bdb
|
||||||
PUBLIC tfs
|
PUBLIC tfs
|
||||||
PUBLIC wal
|
PUBLIC wal
|
||||||
|
PUBLIC scheduler
|
||||||
PUBLIC qworker
|
PUBLIC qworker
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -18,14 +18,16 @@
|
||||||
|
|
||||||
#include "common.h"
|
#include "common.h"
|
||||||
#include "mallocator.h"
|
#include "mallocator.h"
|
||||||
|
#include "meta.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "scheduler.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tmsg.h"
|
|
||||||
#include "tlist.h"
|
#include "tlist.h"
|
||||||
|
#include "tmsg.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "meta.h"
|
#include "wal.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -150,31 +152,52 @@ typedef struct STqListHandle {
|
||||||
} STqList;
|
} STqList;
|
||||||
|
|
||||||
typedef struct STqGroup {
|
typedef struct STqGroup {
|
||||||
int64_t clientId;
|
int64_t clientId;
|
||||||
int64_t cgId;
|
int64_t cgId;
|
||||||
void* ahandle;
|
void* ahandle;
|
||||||
int32_t topicNum;
|
int32_t topicNum;
|
||||||
STqList* head;
|
STqList* head;
|
||||||
SList* topicList; // SList<STqTopic>
|
SList* topicList; // SList<STqTopic>
|
||||||
STqRspHandle rspHandle;
|
STqRspHandle rspHandle;
|
||||||
} STqGroup;
|
} STqGroup;
|
||||||
|
|
||||||
|
typedef struct STqTaskItem {
|
||||||
|
int8_t status;
|
||||||
|
int64_t offset;
|
||||||
|
void* dst;
|
||||||
|
SSubQueryMsg* pMsg;
|
||||||
|
} STqTaskItem;
|
||||||
|
|
||||||
|
// new version
|
||||||
|
typedef struct STqBuffer {
|
||||||
|
int64_t firstOffset;
|
||||||
|
int64_t lastOffset;
|
||||||
|
STqTaskItem output[TQ_BUFFER_SIZE];
|
||||||
|
} STqBuffer;
|
||||||
|
|
||||||
|
typedef struct STqTopicHandle {
|
||||||
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
char cgroup[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
char* sql;
|
||||||
|
char* logicalPlan;
|
||||||
|
char* physicalPlan;
|
||||||
|
int64_t committedOffset;
|
||||||
|
int64_t currentOffset;
|
||||||
|
STqBuffer buffer;
|
||||||
|
SWalReadHandle* pReadhandle;
|
||||||
|
} STqTopicHandle;
|
||||||
|
|
||||||
|
typedef struct STqConsumerHandle {
|
||||||
|
int64_t consumerId;
|
||||||
|
int64_t epoch;
|
||||||
|
SArray* topics; // SArray<STqClientTopic>
|
||||||
|
} STqConsumerHandle;
|
||||||
|
|
||||||
typedef struct STqQueryMsg {
|
typedef struct STqQueryMsg {
|
||||||
STqMsgItem* item;
|
STqMsgItem* item;
|
||||||
struct STqQueryMsg* next;
|
struct STqQueryMsg* next;
|
||||||
} STqQueryMsg;
|
} STqQueryMsg;
|
||||||
|
|
||||||
typedef struct STqLogHandle {
|
|
||||||
void* logHandle;
|
|
||||||
void* (*openLogReader)(void* logHandle);
|
|
||||||
void (*closeLogReader)(void* logReader);
|
|
||||||
int32_t (*logRead)(void* logReader, void** data, int64_t ver);
|
|
||||||
|
|
||||||
int64_t (*logGetFirstVer)(void* logHandle);
|
|
||||||
int64_t (*logGetSnapshotVer)(void* logHandle);
|
|
||||||
int64_t (*logGetLastVer)(void* logHandle);
|
|
||||||
} STqLogHandle;
|
|
||||||
|
|
||||||
typedef struct STqCfg {
|
typedef struct STqCfg {
|
||||||
// TODO
|
// TODO
|
||||||
} STqCfg;
|
} STqCfg;
|
||||||
|
@ -253,7 +276,7 @@ typedef struct STqMetaStore {
|
||||||
// a table head
|
// a table head
|
||||||
STqMetaList* unpersistHead;
|
STqMetaList* unpersistHead;
|
||||||
// topics that are not connectted
|
// topics that are not connectted
|
||||||
STqMetaList* unconnectTopic;
|
STqMetaList* unconnectTopic;
|
||||||
|
|
||||||
// TODO:temporaral use, to be replaced by unified tfile
|
// TODO:temporaral use, to be replaced by unified tfile
|
||||||
int fileFd;
|
int fileFd;
|
||||||
|
@ -272,9 +295,9 @@ typedef struct STQ {
|
||||||
// the handle of meta kvstore
|
// the handle of meta kvstore
|
||||||
char* path;
|
char* path;
|
||||||
STqCfg* tqConfig;
|
STqCfg* tqConfig;
|
||||||
STqLogHandle* tqLogHandle;
|
|
||||||
STqMemRef tqMemRef;
|
STqMemRef tqMemRef;
|
||||||
STqMetaStore* tqMeta;
|
STqMetaStore* tqMeta;
|
||||||
|
SWal * pWal;
|
||||||
} STQ;
|
} STQ;
|
||||||
|
|
||||||
typedef struct STqMgmt {
|
typedef struct STqMgmt {
|
||||||
|
@ -289,51 +312,41 @@ int tqInit();
|
||||||
void tqCleanUp();
|
void tqCleanUp();
|
||||||
|
|
||||||
// open in each vnode
|
// open in each vnode
|
||||||
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac);
|
STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
|
||||||
void tqClose(STQ*);
|
void tqClose(STQ*);
|
||||||
|
|
||||||
// void* will be replace by a msg type
|
// void* will be replace by a msg type
|
||||||
int tqPushMsg(STQ*, void* msg, int64_t version);
|
int tqPushMsg(STQ*, void* msg, int64_t version);
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp);
|
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp);
|
||||||
int tqSetCursor(STQ*, STqSetCurReq* pMsg);
|
int tqSetCursor(STQ*, STqSetCurReq* pMsg);
|
||||||
int tqBufferSetOffset(STqTopic*, int64_t offset);
|
int tqBufferSetOffset(STqTopic*, int64_t offset);
|
||||||
|
|
||||||
STqTopic* tqFindTopic(STqGroup*, int64_t topicId);
|
STqTopic* tqFindTopic(STqGroup*, int64_t topicId);
|
||||||
|
|
||||||
STqGroup* tqGetGroup(STQ*, int64_t clientId);
|
STqGroup* tqGetGroup(STQ*, int64_t clientId);
|
||||||
|
|
||||||
STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
|
STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
|
||||||
int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
|
int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
|
||||||
int tqRegisterContext(STqGroup*, void* ahandle);
|
int tqRegisterContext(STqGroup*, void* ahandle);
|
||||||
int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
|
int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
|
||||||
|
#endif
|
||||||
|
|
||||||
int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
|
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
|
||||||
|
|
||||||
const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
|
|
||||||
|
|
||||||
static int tqQueryExecuting(int32_t status) { return status; }
|
|
||||||
|
|
||||||
typedef struct STqReadHandle {
|
typedef struct STqReadHandle {
|
||||||
int64_t ver;
|
int64_t ver;
|
||||||
SSubmitMsg* pMsg;
|
SSubmitMsg* pMsg;
|
||||||
SSubmitBlk* pBlock;
|
SSubmitBlk* pBlock;
|
||||||
SSubmitMsgIter msgIter;
|
SSubmitMsgIter msgIter;
|
||||||
SSubmitBlkIter blkIter;
|
SSubmitBlkIter blkIter;
|
||||||
SMeta* pMeta;
|
SMeta* pMeta;
|
||||||
} STqReadHandle;
|
} STqReadHandle;
|
||||||
|
|
||||||
typedef struct SSubmitBlkScanInfo {
|
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg);
|
||||||
|
bool tqNextDataBlock(STqReadHandle* pHandle);
|
||||||
} SSubmitBlkScanInfo;
|
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
|
||||||
|
// return SArray<SColumnInfoData>
|
||||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg);
|
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList);
|
||||||
bool tqNextDataBlock(STqReadHandle* pHandle);
|
|
||||||
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo *pBlockInfo);
|
|
||||||
//return SArray<SColumnInfoData>
|
|
||||||
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList);
|
|
||||||
//int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,9 @@ extern int32_t tqDebugFlag;
|
||||||
// delete persistent storage for meta info
|
// delete persistent storage for meta info
|
||||||
// int tqDropTCGroup(STQ*, const char* topic, int cgId);
|
// int tqDropTCGroup(STQ*, const char* topic, int cgId);
|
||||||
|
|
||||||
|
int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
|
||||||
|
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup);
|
||||||
|
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -177,4 +177,4 @@ bool vmaIsFull(SVMemAllocator* pVMA);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_VNODE_DEF_H_*/
|
#endif /*_TD_VNODE_DEF_H_*/
|
||||||
|
|
|
@ -37,7 +37,7 @@ const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
|
||||||
|
|
||||||
int tqInit() {
|
int tqInit() {
|
||||||
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
|
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
|
||||||
if(old == 1) return 0;
|
if (old == 1) return 0;
|
||||||
|
|
||||||
tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
|
tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -45,12 +45,12 @@ int tqInit() {
|
||||||
|
|
||||||
void tqCleanUp() {
|
void tqCleanUp() {
|
||||||
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
|
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
|
||||||
if(old == 0) return;
|
if (old == 0) return;
|
||||||
taosTmrStop(tqMgmt.timer);
|
taosTmrStop(tqMgmt.timer);
|
||||||
taosTmrCleanUp(tqMgmt.timer);
|
taosTmrCleanUp(tqMgmt.timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac) {
|
STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
|
||||||
STQ* pTq = malloc(sizeof(STQ));
|
STQ* pTq = malloc(sizeof(STQ));
|
||||||
if (pTq == NULL) {
|
if (pTq == NULL) {
|
||||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
|
@ -58,7 +58,6 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemA
|
||||||
}
|
}
|
||||||
pTq->path = strdup(path);
|
pTq->path = strdup(path);
|
||||||
pTq->tqConfig = tqConfig;
|
pTq->tqConfig = tqConfig;
|
||||||
pTq->tqLogHandle = tqLogHandle;
|
|
||||||
#if 0
|
#if 0
|
||||||
pTq->tqMemRef.pAllocatorFactory = allocFac;
|
pTq->tqMemRef.pAllocatorFactory = allocFac;
|
||||||
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
|
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
|
||||||
|
@ -150,7 +149,7 @@ int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup
|
||||||
memset(pGroup, 0, sizeof(STqGroup));
|
memset(pGroup, 0, sizeof(STqGroup));
|
||||||
|
|
||||||
pGroup->topicList = tdListNew(sizeof(STqTopic));
|
pGroup->topicList = tdListNew(sizeof(STqTopic));
|
||||||
if(pGroup->topicList == NULL) {
|
if (pGroup->topicList == NULL) {
|
||||||
free(pGroup);
|
free(pGroup);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -190,7 +189,7 @@ static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) {
|
||||||
int totSize = 0;
|
int totSize = 0;
|
||||||
int numOfMsgs = 0;
|
int numOfMsgs = 0;
|
||||||
// TODO: make it a macro
|
// TODO: make it a macro
|
||||||
int sizeLimit = 4 * 1024;
|
int sizeLimit = 4 * 1024;
|
||||||
|
|
||||||
void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit);
|
void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit);
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
|
@ -329,9 +328,9 @@ int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
||||||
STqConsumeReq *pMsg = pReq->pCont;
|
STqConsumeReq* pMsg = pReq->pCont;
|
||||||
int64_t clientId = pMsg->head.clientId;
|
int64_t clientId = pMsg->head.clientId;
|
||||||
STqGroup* pGroup = tqGetGroup(pTq, clientId);
|
STqGroup* pGroup = tqGetGroup(pTq, clientId);
|
||||||
if (pGroup == NULL) {
|
if (pGroup == NULL) {
|
||||||
terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
|
terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -343,9 +342,8 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
||||||
int numOfMsgs = 0;
|
int numOfMsgs = 0;
|
||||||
int sizeLimit = 4096;
|
int sizeLimit = 4096;
|
||||||
|
|
||||||
|
STqConsumeRsp* pCsmRsp = (*pRsp)->pCont;
|
||||||
STqConsumeRsp *pCsmRsp = (*pRsp)->pCont;
|
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit);
|
||||||
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit);
|
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -356,16 +354,16 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
||||||
tdListInitIter(topicList, &iter, TD_LIST_FORWARD);
|
tdListInitIter(topicList, &iter, TD_LIST_FORWARD);
|
||||||
|
|
||||||
STqMsgContent* buffer = NULL;
|
STqMsgContent* buffer = NULL;
|
||||||
SArray* pArray = taosArrayInit(0, sizeof(void*));
|
SArray* pArray = taosArrayInit(0, sizeof(void*));
|
||||||
|
|
||||||
SListNode *pn;
|
SListNode* pn;
|
||||||
while((pn = tdListNext(&iter)) != NULL) {
|
while ((pn = tdListNext(&iter)) != NULL) {
|
||||||
STqTopic* pTopic = *(STqTopic**)pn->data;
|
STqTopic* pTopic = *(STqTopic**)pn->data;
|
||||||
int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE;
|
int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE;
|
||||||
STqMsgItem* pItem = &pTopic->buffer[idx];
|
STqMsgItem* pItem = &pTopic->buffer[idx];
|
||||||
if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) {
|
if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) {
|
||||||
if(pItem->status == TQ_ITEM_READY) {
|
if (pItem->status == TQ_ITEM_READY) {
|
||||||
//if has data
|
// if has data
|
||||||
totSize += pTopic->buffer[idx].size;
|
totSize += pTopic->buffer[idx].size;
|
||||||
if (totSize > sizeLimit) {
|
if (totSize > sizeLimit) {
|
||||||
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize);
|
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize);
|
||||||
|
@ -388,13 +386,13 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
||||||
if (totSize > sizeLimit) {
|
if (totSize > sizeLimit) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else if(pItem->status == TQ_ITEM_PROCESS) {
|
} else if (pItem->status == TQ_ITEM_PROCESS) {
|
||||||
//if not have data but in process
|
// if not have data but in process
|
||||||
|
|
||||||
} else if(pItem->status == TQ_ITEM_EMPTY){
|
} else if (pItem->status == TQ_ITEM_EMPTY) {
|
||||||
//if not have data and not in process
|
// if not have data and not in process
|
||||||
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS);
|
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS);
|
||||||
if(old != TQ_ITEM_EMPTY) {
|
if (old != TQ_ITEM_EMPTY) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
pItem->offset = pTopic->floatingCursor;
|
pItem->offset = pTopic->floatingCursor;
|
||||||
|
@ -416,22 +414,22 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetched a num of msgs, rpc response
|
// fetched a num of msgs, rpc response
|
||||||
for(int i = 0; i < pArray->size; i++) {
|
for (int i = 0; i < pArray->size; i++) {
|
||||||
STqMsgItem* pItem = taosArrayGet(pArray, i);
|
STqMsgItem* pItem = taosArrayGet(pArray, i);
|
||||||
|
|
||||||
//read from wal
|
// read from wal
|
||||||
void* raw = NULL;
|
void* raw = NULL;
|
||||||
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
|
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
|
||||||
int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);
|
/*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/
|
||||||
if(code < 0) {
|
/*if (code < 0) {*/
|
||||||
//TODO: error
|
// TODO: error
|
||||||
}
|
/*}*/
|
||||||
//get msgType
|
// get msgType
|
||||||
//if submitblk
|
// if submitblk
|
||||||
pItem->executor->assign(pItem->executor->runtimeEnv, raw);
|
pItem->executor->assign(pItem->executor->runtimeEnv, raw);
|
||||||
SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv);
|
SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv);
|
||||||
pItem->content = content;
|
pItem->content = content;
|
||||||
//if other type, send just put into buffer
|
// if other type, send just put into buffer
|
||||||
/*pItem->content = raw;*/
|
/*pItem->content = raw;*/
|
||||||
|
|
||||||
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY);
|
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY);
|
||||||
|
@ -608,7 +606,48 @@ int tqItemSSize() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) {
|
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||||
|
SMqCVConsumeReq* pReq = pMsg->pCont;
|
||||||
|
int64_t reqId = pReq->reqId;
|
||||||
|
int64_t consumerId = pReq->consumerId;
|
||||||
|
int64_t offset = pReq->offset;
|
||||||
|
int64_t blockingTime = pReq->blockingTime;
|
||||||
|
|
||||||
|
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
|
||||||
|
int sz = taosArrayGetSize(pConsumer->topics);
|
||||||
|
|
||||||
|
for (int i = 0 ; i < sz; i++) {
|
||||||
|
STqTopicHandle *pHandle = taosArrayGet(pConsumer->topics, i);
|
||||||
|
|
||||||
|
int8_t pos = offset % TQ_BUFFER_SIZE;
|
||||||
|
int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1);
|
||||||
|
if (old == 1) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
SWalHead* pHead = pHandle->pReadhandle->pHead;
|
||||||
|
while (pHead->head.msgType != TDMT_VND_SUBMIT) {
|
||||||
|
// read until find TDMT_VND_SUBMIT
|
||||||
|
}
|
||||||
|
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
|
||||||
|
|
||||||
|
SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;
|
||||||
|
|
||||||
|
void* outputData;
|
||||||
|
atomic_store_8(&pHandle->buffer.output[pos].status, 1);
|
||||||
|
|
||||||
|
// put output into rsp
|
||||||
|
}
|
||||||
|
|
||||||
|
// launch query
|
||||||
|
// get result
|
||||||
|
SMqCvConsumeRsp* pRsp;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg) {
|
||||||
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
|
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
|
||||||
if (pReadHandle == NULL) {
|
if (pReadHandle == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -621,39 +660,39 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||||
if(tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
|
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
|
||||||
SMemRow row;
|
SMemRow row;
|
||||||
int32_t sversion = pHandle->pBlock->sversion;
|
int32_t sversion = pHandle->pBlock->sversion;
|
||||||
SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);
|
SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);
|
||||||
pBlockInfo->numOfCols = pSchema->nCols;
|
pBlockInfo->numOfCols = pSchema->nCols;
|
||||||
pBlockInfo->rows = pHandle->pBlock->numOfRows;
|
pBlockInfo->rows = pHandle->pBlock->numOfRows;
|
||||||
pBlockInfo->uid = pHandle->pBlock->uid;
|
pBlockInfo->uid = pHandle->pBlock->uid;
|
||||||
//TODO: filter out unused column
|
// TODO: filter out unused column
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
|
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
|
||||||
int32_t sversion = pHandle->pBlock->sversion;
|
int32_t sversion = pHandle->pBlock->sversion;
|
||||||
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
|
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
|
||||||
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
|
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
|
||||||
SArray *pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
|
SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
|
||||||
if (pArray == NULL) {
|
if (pArray == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
SColumnInfoData colInfo;
|
SColumnInfoData colInfo;
|
||||||
int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
|
int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
|
||||||
colInfo.pData = malloc(sz);
|
colInfo.pData = malloc(sz);
|
||||||
if (colInfo.pData == NULL) {
|
if (colInfo.pData == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pTschema->numOfCols; i++) {
|
for (int i = 0; i < pTschema->numOfCols; i++) {
|
||||||
//TODO: filter out unused column
|
// TODO: filter out unused column
|
||||||
taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId));
|
taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -661,16 +700,17 @@ SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
|
||||||
int32_t kvIdx;
|
int32_t kvIdx;
|
||||||
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
||||||
for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
|
for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
|
||||||
//TODO: filter out unused column
|
// TODO: filter out unused column
|
||||||
STColumn *pCol = schemaColAt(pTschema, i);
|
STColumn* pCol = schemaColAt(pTschema, i);
|
||||||
void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
|
void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
|
||||||
//TODO: handle varlen
|
// TODO: handle varlen
|
||||||
memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
|
memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosArrayPush(pArray, &colInfo);
|
taosArrayPush(pArray, &colInfo);
|
||||||
return pArray;
|
return pArray;
|
||||||
}
|
}
|
||||||
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status) {*/
|
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t
|
||||||
/*return 0;*/
|
* status) {*/
|
||||||
|
/*return 0;*/
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
|
@ -117,14 +117,6 @@ static int vnodeOpenImpl(SVnode *pVnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Open TQ
|
|
||||||
sprintf(dir, "%s/tq", pVnode->path);
|
|
||||||
pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, vBufPoolGetMAF(pVnode));
|
|
||||||
if (pVnode->pTq == NULL) {
|
|
||||||
// TODO: handle error
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open WAL
|
// Open WAL
|
||||||
sprintf(dir, "%s/wal", pVnode->path);
|
sprintf(dir, "%s/wal", pVnode->path);
|
||||||
pVnode->pWal = walOpen(dir, &(pVnode->config.walCfg));
|
pVnode->pWal = walOpen(dir, &(pVnode->config.walCfg));
|
||||||
|
@ -133,6 +125,14 @@ static int vnodeOpenImpl(SVnode *pVnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Open TQ
|
||||||
|
sprintf(dir, "%s/tq", pVnode->path);
|
||||||
|
pVnode->pTq = tqOpen(dir, pVnode->pWal, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
|
||||||
|
if (pVnode->pTq == NULL) {
|
||||||
|
// TODO: handle error
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
// Open Query
|
// Open Query
|
||||||
if (vnodeQueryOpen(pVnode)) {
|
if (vnodeQueryOpen(pVnode)) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -151,4 +151,4 @@ static void vnodeCloseImpl(SVnode *pVnode) {
|
||||||
tqClose(pVnode->pTq);
|
tqClose(pVnode->pTq);
|
||||||
walClose(pVnode->pWal);
|
walClose(pVnode->pWal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,8 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
|
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
|
||||||
case TDMT_VND_TABLE_META:
|
case TDMT_VND_TABLE_META:
|
||||||
return vnodeGetTableMeta(pVnode, pMsg, pRsp);
|
return vnodeGetTableMeta(pVnode, pMsg, pRsp);
|
||||||
|
case TDMT_VND_CONSUME:
|
||||||
|
return tqProcessConsume(pVnode->pTq, pMsg, pRsp);
|
||||||
default:
|
default:
|
||||||
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||||
return TSDB_CODE_VND_APP_ERROR;
|
return TSDB_CODE_VND_APP_ERROR;
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
#include "tq.h"
|
||||||
|
|
||||||
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
|
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
|
@ -109,11 +110,40 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_MQ_SET_CONN: {
|
case TDMT_VND_MQ_SET_CONN: {
|
||||||
|
//TODO: wrap in a function
|
||||||
char* reqStr = ptr;
|
char* reqStr = ptr;
|
||||||
SMqSetCVgReq req;
|
SMqSetCVgReq req;
|
||||||
/*tDecodeSMqSetCVgReq(reqStr, &req);*/
|
tDecodeSMqSetCVgReq(reqStr, &req);
|
||||||
// create topic if not exist
|
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
|
||||||
|
|
||||||
|
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
|
||||||
|
if (pTopic == NULL) {
|
||||||
|
// TODO: handle error
|
||||||
|
}
|
||||||
|
strcpy(pTopic->topicName, req.topicName);
|
||||||
|
strcpy(pTopic->cgroup, req.cGroup);
|
||||||
|
strcpy(pTopic->sql, req.sql);
|
||||||
|
strcpy(pTopic->logicalPlan, req.logicalPlan);
|
||||||
|
strcpy(pTopic->physicalPlan, req.physicalPlan);
|
||||||
|
SArray *pArray;
|
||||||
|
//TODO: deserialize to SQueryDag
|
||||||
|
SQueryDag *pDag;
|
||||||
// convert to task
|
// convert to task
|
||||||
|
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
|
||||||
|
// TODO: handle error
|
||||||
|
}
|
||||||
|
ASSERT(taosArrayGetSize(pArray) == 0);
|
||||||
|
STaskInfo *pInfo = taosArrayGet(pArray, 0);
|
||||||
|
SArray* pTasks;
|
||||||
|
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
|
||||||
|
pTopic->buffer.firstOffset = -1;
|
||||||
|
pTopic->buffer.lastOffset = -1;
|
||||||
|
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||||
|
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
|
||||||
|
pTopic->buffer.output[i].pMsg = pMsg;
|
||||||
|
pTopic->buffer.output[i].status = 0;
|
||||||
|
}
|
||||||
|
pTopic->pReadhandle = walOpenReadHandle(pVnode->pTq->pWal);
|
||||||
// write mq meta
|
// write mq meta
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -8,5 +8,5 @@ target_include_directories(
|
||||||
|
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
function
|
function
|
||||||
PRIVATE os util common
|
PRIVATE os util common nodes
|
||||||
)
|
)
|
|
@ -0,0 +1,76 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_FUNCTION_MGT_INT_H_
|
||||||
|
#define _TD_FUNCTION_MGT_INT_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "functionMgt.h"
|
||||||
|
|
||||||
|
#define FUNC_MGT_DATA_TYPE_MASK(n) (1 << n)
|
||||||
|
|
||||||
|
#define FUNC_MGT_DATA_TYPE_NULL 0
|
||||||
|
#define FUNC_MGT_DATA_TYPE_BOOL FUNC_MGT_DATA_TYPE_MASK(0)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_TINYINT FUNC_MGT_DATA_TYPE_MASK(1)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_SMALLINT FUNC_MGT_DATA_TYPE_MASK(2)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_INT FUNC_MGT_DATA_TYPE_MASK(3)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_BIGINT FUNC_MGT_DATA_TYPE_MASK(4)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_FLOAT FUNC_MGT_DATA_TYPE_MASK(5)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_DOUBLE FUNC_MGT_DATA_TYPE_MASK(6)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_BINARY FUNC_MGT_DATA_TYPE_MASK(7)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_TIMESTAMP FUNC_MGT_DATA_TYPE_MASK(8)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_NCHAR FUNC_MGT_DATA_TYPE_MASK(9)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_UTINYINT FUNC_MGT_DATA_TYPE_MASK(10)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_USMALLINT FUNC_MGT_DATA_TYPE_MASK(11)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_UINT FUNC_MGT_DATA_TYPE_MASK(12)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_UBIGINT FUNC_MGT_DATA_TYPE_MASK(13)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_VARCHAR FUNC_MGT_DATA_TYPE_MASK(14)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_VARBINARY FUNC_MGT_DATA_TYPE_MASK(15)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_JSON FUNC_MGT_DATA_TYPE_MASK(16)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_DECIMAL FUNC_MGT_DATA_TYPE_MASK(17)
|
||||||
|
#define FUNC_MGT_DATA_TYPE_BLOB FUNC_MGT_DATA_TYPE_MASK(18)
|
||||||
|
|
||||||
|
#define FUNC_MGT_EXACT_NUMERIC_DATA_TYPE \
|
||||||
|
(FUNC_MGT_DATA_TYPE_TINYINT | FUNC_MGT_DATA_TYPE_SMALLINT | FUNC_MGT_DATA_TYPE_INT | FUNC_MGT_DATA_TYPE_BIGINT \
|
||||||
|
| FUNC_MGT_DATA_TYPE_UTINYINT | FUNC_MGT_DATA_TYPE_USMALLINT | FUNC_MGT_DATA_TYPE_UINT | FUNC_MGT_DATA_TYPE_UBIGINT)
|
||||||
|
|
||||||
|
#define FUNC_MGT_APPRO_NUMERIC_DATA_TYPE (FUNC_MGT_DATA_TYPE_FLOAT | FUNC_MGT_DATA_TYPE_DOUBLE)
|
||||||
|
|
||||||
|
#define FUNC_MGT_NUMERIC_DATA_TYPE (FUNC_MGT_EXACT_NUMERIC_DATA_TYPE | FUNC_MGT_APPRO_NUMERIC_DATA_TYPE)
|
||||||
|
|
||||||
|
typedef void* FuncDef;
|
||||||
|
|
||||||
|
typedef struct SFuncElement {
|
||||||
|
FuncDef (*defineFunc)();
|
||||||
|
} SFuncElement;
|
||||||
|
|
||||||
|
extern const SFuncElement gBuiltinFuncs[];
|
||||||
|
|
||||||
|
FuncDef createFuncDef(const char* name, int32_t maxNumOfParams);
|
||||||
|
FuncDef setOneParamSignature(FuncDef def, int64_t resDataType, int64_t paramDataType);
|
||||||
|
FuncDef setTwoParamsSignature(FuncDef def, int64_t resDataType, int64_t p1DataType, int64_t p2DataType);
|
||||||
|
FuncDef setFollowParamSignature(FuncDef def, int64_t paramDataType);
|
||||||
|
FuncDef setFollowParamsSignature(FuncDef def, int64_t p1DataType, int64_t p2DataType, int32_t followNo);
|
||||||
|
|
||||||
|
FuncDef setExecFuncs(FuncDef def, FExecGetEnv getEnv, FExecInit init, FExecProcess process, FExecFinalize finalize);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // _TD_FUNCTION_MGT_INT_H_
|
|
@ -95,6 +95,10 @@ static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32
|
||||||
memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen);
|
memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#include "functionMgtInt.h"
|
||||||
|
|
||||||
|
FuncDef defineCount();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "functionMgtInt.h"
|
||||||
|
#include "taggfunction.h"
|
||||||
|
|
||||||
|
const SFuncElement gBuiltinFuncs[] = {
|
||||||
|
{.defineFunc = defineCount}
|
||||||
|
};
|
|
@ -0,0 +1,80 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "functionMgt.h"
|
||||||
|
|
||||||
|
#include "functionMgtInt.h"
|
||||||
|
#include "taos.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "thash.h"
|
||||||
|
|
||||||
|
typedef struct SFuncMgtService {
|
||||||
|
SHashObj* pFuncNameHashTable;
|
||||||
|
} SFuncMgtService;
|
||||||
|
|
||||||
|
static SFuncMgtService gFunMgtService;
|
||||||
|
|
||||||
|
int32_t fmFuncMgtInit() {
|
||||||
|
gFunMgtService.pFuncNameHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
if (NULL == gFunMgtService.pFuncNameHashTable) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct SFuncDef {
|
||||||
|
char name[TSDB_FUNC_NAME_LEN];
|
||||||
|
int32_t maxNumOfParams;
|
||||||
|
SFuncExecFuncs execFuncs;
|
||||||
|
} SFuncDef ;
|
||||||
|
|
||||||
|
FuncDef createFuncDef(const char* name, int32_t maxNumOfParams) {
|
||||||
|
SFuncDef* pDef = calloc(1, sizeof(SFuncDef));
|
||||||
|
if (NULL == pDef) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
strcpy(pDef->name, name);
|
||||||
|
pDef->maxNumOfParams = maxNumOfParams;
|
||||||
|
return pDef;
|
||||||
|
}
|
||||||
|
|
||||||
|
FuncDef setOneParamSignature(FuncDef def, int64_t resDataType, int64_t paramDataType) {
|
||||||
|
// todo
|
||||||
|
}
|
||||||
|
|
||||||
|
FuncDef setTwoParamsSignature(FuncDef def, int64_t resDataType, int64_t p1DataType, int64_t p2DataType) {
|
||||||
|
// todo
|
||||||
|
}
|
||||||
|
|
||||||
|
FuncDef setFollowParamSignature(FuncDef def, int64_t paramDataType) {
|
||||||
|
// todo
|
||||||
|
}
|
||||||
|
|
||||||
|
FuncDef setFollowParamsSignature(FuncDef def, int64_t p1DataType, int64_t p2DataType, int32_t followNo) {
|
||||||
|
// todo
|
||||||
|
}
|
||||||
|
|
||||||
|
FuncDef setExecFuncs(FuncDef def, FExecGetEnv getEnv, FExecInit init, FExecProcess process, FExecFinalize finalize) {
|
||||||
|
SFuncDef* pDef = (SFuncDef*)def;
|
||||||
|
pDef->execFuncs.getEnv = getEnv;
|
||||||
|
pDef->execFuncs.init = init;
|
||||||
|
pDef->execFuncs.process = process;
|
||||||
|
pDef->execFuncs.finalize = finalize;
|
||||||
|
return def;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t registerFunc(FuncDef func) {
|
||||||
|
|
||||||
|
}
|
|
@ -4835,3 +4835,9 @@ SAggFunctionInfo aggFunc[35] = {{
|
||||||
statisRequired,
|
statisRequired,
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
FuncDef defineCount() {
|
||||||
|
FuncDef def = createFuncDef("count", 1);
|
||||||
|
// todo define signature
|
||||||
|
return setExecFuncs(def, NULL, function_setup, count_function, doFinalizer);
|
||||||
|
}
|
||||||
|
|
|
@ -212,7 +212,7 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ
|
||||||
pExpr[i] = p;
|
pExpr[i] = p;
|
||||||
}
|
}
|
||||||
|
|
||||||
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL);
|
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL);
|
||||||
tfree(pExpr);
|
tfree(pExpr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -109,6 +109,26 @@ static bool fromPnode(const cJSON* json, const char* name, FFromJson func, void*
|
||||||
return func(jObj, *obj);
|
return func(jObj, *obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool fromPnodeArray(const cJSON* json, const char* name, FFromJson func, SArray** array) {
|
||||||
|
const cJSON* jArray = cJSON_GetObjectItem(json, name);
|
||||||
|
int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray));
|
||||||
|
if (size > 0) {
|
||||||
|
*array = taosArrayInit(size, POINTER_BYTES);
|
||||||
|
if (NULL == *array) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
cJSON* jItem = cJSON_GetArrayItem(jArray, i);
|
||||||
|
void* item = calloc(1, getPnodeTypeSize(jItem));
|
||||||
|
if (NULL == item || !func(jItem, item)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
taosArrayPush(*array, &item);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) {
|
static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) {
|
||||||
size_t size = (NULL == array) ? 0 : taosArrayGetSize(array);
|
size_t size = (NULL == array) ? 0 : taosArrayGetSize(array);
|
||||||
if (size > 0) {
|
if (size > 0) {
|
||||||
|
@ -379,8 +399,8 @@ static const char* jkFunctionChild = "Child";
|
||||||
static bool functionToJson(const void* obj, cJSON* jFunc) {
|
static bool functionToJson(const void* obj, cJSON* jFunc) {
|
||||||
const tExprNode* exprInfo = (const tExprNode*)obj;
|
const tExprNode* exprInfo = (const tExprNode*)obj;
|
||||||
bool res = cJSON_AddStringToObject(jFunc, jkFunctionName, exprInfo->_function.functionName);
|
bool res = cJSON_AddStringToObject(jFunc, jkFunctionName, exprInfo->_function.functionName);
|
||||||
if (res) {
|
if (res && NULL != exprInfo->_function.pChild) {
|
||||||
res = addRawArray(jFunc, jkFunctionChild, exprNodeToJson, exprInfo->_function.pChild, sizeof(tExprNode*), exprInfo->_function.num);
|
res = addRawArray(jFunc, jkFunctionChild, exprNodeToJson, *(exprInfo->_function.pChild), sizeof(tExprNode*), exprInfo->_function.num);
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
@ -388,6 +408,10 @@ static bool functionToJson(const void* obj, cJSON* jFunc) {
|
||||||
static bool functionFromJson(const cJSON* json, void* obj) {
|
static bool functionFromJson(const cJSON* json, void* obj) {
|
||||||
tExprNode* exprInfo = (tExprNode*)obj;
|
tExprNode* exprInfo = (tExprNode*)obj;
|
||||||
copyString(json, jkFunctionName, exprInfo->_function.functionName);
|
copyString(json, jkFunctionName, exprInfo->_function.functionName);
|
||||||
|
exprInfo->_function.pChild = calloc(1, sizeof(tExprNode*));
|
||||||
|
if (NULL == exprInfo->_function.pChild) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return fromRawArrayWithAlloc(json, jkFunctionChild, exprNodeFromJson, (void**)exprInfo->_function.pChild, sizeof(tExprNode*), &exprInfo->_function.num);
|
return fromRawArrayWithAlloc(json, jkFunctionChild, exprNodeFromJson, (void**)exprInfo->_function.pChild, sizeof(tExprNode*), &exprInfo->_function.num);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -808,7 +832,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
|
||||||
case OP_SystemTableScan:
|
case OP_SystemTableScan:
|
||||||
return scanNodeFromJson(json, obj);
|
return scanNodeFromJson(json, obj);
|
||||||
case OP_Aggregate:
|
case OP_Aggregate:
|
||||||
break; // todo
|
return aggNodeFromJson(json, obj);
|
||||||
case OP_Project:
|
case OP_Project:
|
||||||
return true;
|
return true;
|
||||||
// case OP_Groupby:
|
// case OP_Groupby:
|
||||||
|
@ -879,7 +903,7 @@ static bool phyNodeFromJson(const cJSON* json, void* obj) {
|
||||||
res = fromObject(json, jkPnodeSchema, dataBlockSchemaFromJson, &node->targetSchema, true);
|
res = fromObject(json, jkPnodeSchema, dataBlockSchemaFromJson, &node->targetSchema, true);
|
||||||
}
|
}
|
||||||
if (res) {
|
if (res) {
|
||||||
res = fromArray(json, jkPnodeChildren, phyNodeFromJson, &node->pChildren, sizeof(SSlotSchema));
|
res = fromPnodeArray(json, jkPnodeChildren, phyNodeFromJson, &node->pChildren);
|
||||||
}
|
}
|
||||||
if (res) {
|
if (res) {
|
||||||
res = fromObject(json, node->info.name, specificPhyNodeFromJson, node, true);
|
res = fromObject(json, node->info.name, specificPhyNodeFromJson, node, true);
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
aux_source_directory(src NODES_SRC)
|
||||||
|
add_library(nodes STATIC ${NODES_SRC})
|
||||||
|
target_include_directories(
|
||||||
|
nodes
|
||||||
|
PUBLIC "${CMAKE_SOURCE_DIR}/include/nodes"
|
||||||
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
|
)
|
||||||
|
target_link_libraries(
|
||||||
|
nodes
|
||||||
|
PRIVATE os util
|
||||||
|
)
|
||||||
|
|
||||||
|
if(${BUILD_TEST})
|
||||||
|
ADD_SUBDIRECTORY(test)
|
||||||
|
endif(${BUILD_TEST})
|
|
@ -0,0 +1,20 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "nodes.h"
|
||||||
|
|
||||||
|
void cloneNode(const SNode* pNode) {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "nodes.h"
|
||||||
|
|
||||||
|
int32_t nodeToString(const SNode* pNode, char** pStr, int32_t* pLen) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t stringToNode(const char* pStr, SNode** pNode) {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,141 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "nodes.h"
|
||||||
|
|
||||||
|
#define COMPARE_SCALAR_FIELD(fldname) \
|
||||||
|
do { \
|
||||||
|
if (a->fldname != b->fldname) \
|
||||||
|
return false; \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define COMPARE_STRING(a, b) \
|
||||||
|
(((a) != NULL && (b) != NULL) ? (strcmp(a, b) == 0) : (a) == (b))
|
||||||
|
|
||||||
|
#define COMPARE_STRING_FIELD(fldname) \
|
||||||
|
do { \
|
||||||
|
if (!COMPARE_STRING(a->fldname, b->fldname)) \
|
||||||
|
return false; \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define COMPARE_NODE_FIELD(fldname) \
|
||||||
|
do { \
|
||||||
|
if (!nodeEqual(a->fldname, b->fldname)) \
|
||||||
|
return false; \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define COMPARE_ARRAY_FIELD(fldname) \
|
||||||
|
do { \
|
||||||
|
if (!nodeArrayEqual(a->fldname, b->fldname)) \
|
||||||
|
return false; \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
static bool nodeArrayEqual(const SArray* a, const SArray* b) {
|
||||||
|
if (a == b) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == a || NULL == b) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosArrayGetSize(a) != taosArrayGetSize(b)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t size = taosArrayGetSize(a);
|
||||||
|
for (size_t i = 0; i < size; ++i) {
|
||||||
|
if (!nodeEqual((SNode*)taosArrayGetP(a, i), (SNode*)taosArrayGetP(b, i))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool columnNodeEqual(const SColumnNode* a, const SColumnNode* b) {
|
||||||
|
COMPARE_STRING_FIELD(dbName);
|
||||||
|
COMPARE_STRING_FIELD(tableName);
|
||||||
|
COMPARE_STRING_FIELD(colName);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool valueNodeEqual(const SValueNode* a, const SValueNode* b) {
|
||||||
|
COMPARE_STRING_FIELD(literal);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool operatorNodeEqual(const SOperatorNode* a, const SOperatorNode* b) {
|
||||||
|
COMPARE_SCALAR_FIELD(opType);
|
||||||
|
COMPARE_NODE_FIELD(pLeft);
|
||||||
|
COMPARE_NODE_FIELD(pRight);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool logicConditionNodeEqual(const SLogicConditionNode* a, const SLogicConditionNode* b) {
|
||||||
|
COMPARE_SCALAR_FIELD(condType);
|
||||||
|
COMPARE_ARRAY_FIELD(pParameterList);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool isNullConditionNodeEqual(const SIsNullCondNode* a, const SIsNullCondNode* b) {
|
||||||
|
COMPARE_NODE_FIELD(pExpr);
|
||||||
|
COMPARE_SCALAR_FIELD(isNot);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool functionNodeEqual(const SFunctionNode* a, const SFunctionNode* b) {
|
||||||
|
COMPARE_SCALAR_FIELD(funcId);
|
||||||
|
COMPARE_ARRAY_FIELD(pParameterList);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool nodeEqual(const SNode* a, const SNode* b) {
|
||||||
|
if (a == b) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == a || NULL == b) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nodeType(a) != nodeType(b)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (nodeType(a)) {
|
||||||
|
case QUERY_NODE_COLUMN:
|
||||||
|
return columnNodeEqual((const SColumnNode*)a, (const SColumnNode*)b);
|
||||||
|
case QUERY_NODE_VALUE:
|
||||||
|
return valueNodeEqual((const SValueNode*)a, (const SValueNode*)b);
|
||||||
|
case QUERY_NODE_OPERATOR:
|
||||||
|
return operatorNodeEqual((const SOperatorNode*)a, (const SOperatorNode*)b);
|
||||||
|
case QUERY_NODE_LOGIC_CONDITION:
|
||||||
|
return logicConditionNodeEqual((const SLogicConditionNode*)a, (const SLogicConditionNode*)b);
|
||||||
|
case QUERY_NODE_IS_NULL_CONDITION:
|
||||||
|
return isNullConditionNodeEqual((const SIsNullCondNode*)a, (const SIsNullCondNode*)b);
|
||||||
|
case QUERY_NODE_FUNCTION:
|
||||||
|
return functionNodeEqual((const SFunctionNode*)a, (const SFunctionNode*)b);
|
||||||
|
case QUERY_NODE_REAL_TABLE:
|
||||||
|
case QUERY_NODE_TEMP_TABLE:
|
||||||
|
case QUERY_NODE_JOIN_TABLE:
|
||||||
|
case QUERY_NODE_GROUPING_SET:
|
||||||
|
case QUERY_NODE_ORDER_BY_EXPR:
|
||||||
|
return false; // todo
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "nodes.h"
|
||||||
|
|
||||||
|
typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext);
|
||||||
|
|
||||||
|
bool nodeArrayWalker(SArray* pArray, FQueryNodeWalker walker, void* pContext) {
|
||||||
|
size_t size = taosArrayGetSize(pArray);
|
||||||
|
for (size_t i = 0; i < size; ++i) {
|
||||||
|
if (!nodeTreeWalker((SNode*)taosArrayGetP(pArray, i), walker, pContext)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool nodeTreeWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext) {
|
||||||
|
if (NULL == pNode) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!walker(pNode, pContext)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (nodeType(pNode)) {
|
||||||
|
case QUERY_NODE_COLUMN:
|
||||||
|
case QUERY_NODE_VALUE:
|
||||||
|
// these node types with no subnodes
|
||||||
|
return true;
|
||||||
|
case QUERY_NODE_OPERATOR: {
|
||||||
|
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
|
||||||
|
if (!nodeTreeWalker(pOpNode->pLeft, walker, pContext)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return nodeTreeWalker(pOpNode->pRight, walker, pContext);
|
||||||
|
}
|
||||||
|
case QUERY_NODE_LOGIC_CONDITION:
|
||||||
|
return nodeArrayWalker(((SLogicConditionNode*)pNode)->pParameterList, walker, pContext);
|
||||||
|
case QUERY_NODE_IS_NULL_CONDITION:
|
||||||
|
return nodeTreeWalker(((SIsNullCondNode*)pNode)->pExpr, walker, pContext);
|
||||||
|
case QUERY_NODE_FUNCTION:
|
||||||
|
return nodeArrayWalker(((SFunctionNode*)pNode)->pParameterList, walker, pContext);
|
||||||
|
case QUERY_NODE_REAL_TABLE:
|
||||||
|
case QUERY_NODE_TEMP_TABLE:
|
||||||
|
return true; // todo
|
||||||
|
case QUERY_NODE_JOIN_TABLE: {
|
||||||
|
SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode;
|
||||||
|
if (!nodeTreeWalker(pJoinTableNode->pLeft, walker, pContext)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!nodeTreeWalker(pJoinTableNode->pRight, walker, pContext)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return nodeTreeWalker(pJoinTableNode->pOnCond, walker, pContext);
|
||||||
|
}
|
||||||
|
case QUERY_NODE_GROUPING_SET:
|
||||||
|
return nodeArrayWalker(((SGroupingSetNode*)pNode)->pParameterList, walker, pContext);
|
||||||
|
case QUERY_NODE_ORDER_BY_EXPR:
|
||||||
|
return nodeTreeWalker(((SOrderByExprNode*)pNode)->pExpr, walker, pContext);
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool stmtWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext) {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "nodes.h"
|
||||||
|
|
||||||
|
bool isTimeorderQuery(const SNode* pQuery) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isTimelineQuery(const SNode* pQuery) {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
|
||||||
|
MESSAGE(STATUS "build nodes unit test")
|
||||||
|
|
||||||
|
# GoogleTest requires at least C++11
|
||||||
|
SET(CMAKE_CXX_STANDARD 11)
|
||||||
|
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
|
|
||||||
|
ADD_EXECUTABLE(nodesTest ${SOURCE_LIST})
|
||||||
|
|
||||||
|
TARGET_INCLUDE_DIRECTORIES(
|
||||||
|
nodesTest
|
||||||
|
PUBLIC "${CMAKE_SOURCE_DIR}/include/nodes/"
|
||||||
|
PRIVATE "${CMAKE_SOURCE_DIR}/source/nodes/inc"
|
||||||
|
)
|
||||||
|
|
||||||
|
TARGET_LINK_LIBRARIES(
|
||||||
|
nodesTest
|
||||||
|
PUBLIC os util common nodes gtest
|
||||||
|
)
|
|
@ -0,0 +1,25 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
TEST(NodesTest, traverseTest) {
|
||||||
|
// todo
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char* argv[]) {
|
||||||
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
Loading…
Reference in New Issue