Merge remote-tracking branch 'origin/3.0' into feature/qnode

This commit is contained in:
dapan1121 2022-01-20 18:41:53 +08:00
commit bb67155b42
33 changed files with 1226 additions and 86 deletions

View File

@ -1089,6 +1089,10 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
return buf; return buf;
} }
typedef struct SMqTmrMsg {
int32_t reserved;
} SMqTmrMsg;
typedef struct { typedef struct {
int64_t status; int64_t status;
} SMVSubscribeRsp; } SMVSubscribeRsp;
@ -1548,6 +1552,25 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
return buf; return buf;
} }
typedef struct SMqCVConsumeReq {
int64_t reqId;
int64_t offset;
int64_t clientId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqCVConsumeReq;
typedef struct SMqCVConsumeRsp {
int64_t reqId;
int64_t clientId;
int64_t committedOffset;
int64_t receiveOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t bodyLen;
char topicName[TSDB_TOPIC_FNAME_LEN];
char body[];
} SMqCvConsumeRsp;
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -140,6 +140,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg)
// Requests handled by VNODE // Requests handled by VNODE
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
@ -173,6 +174,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
// Requests handled by QNODE // Requests handled by QNODE

View File

@ -0,0 +1,66 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_FUNCTION_MGT_H_
#define _TD_FUNCTION_MGT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "nodes.h"
struct SQLFunctionCtx;
struct SResultRowEntryInfo;
struct STimeWindow;
typedef struct SFuncExecEnv {
int32_t calcMemSize;
} SFuncExecEnv;
typedef void* FuncMgtHandle;
typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
typedef bool (*FExecInit)(struct SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
typedef void (*FExecProcess)(struct SQLFunctionCtx *pCtx);
typedef void (*FExecFinalize)(struct SQLFunctionCtx *pCtx);
typedef struct SFuncExecFuncs {
FExecGetEnv getEnv;
FExecInit init;
FExecProcess process;
FExecFinalize finalize;
} SFuncExecFuncs;
int32_t fmFuncMgtInit();
int32_t fmGetHandle(FuncMgtHandle* pHandle);
int32_t fmGetFuncId(FuncMgtHandle handle, const char* name);
int32_t fmGetFuncResultType(FuncMgtHandle handle, SFunctionNode* pFunc);
bool fmIsAggFunc(int32_t funcId);
bool fmIsStringFunc(int32_t funcId);
bool fmIsTimestampFunc(int32_t funcId);
bool fmIsTimelineFunc(int32_t funcId);
bool fmIsTimeorderFunc(int32_t funcId);
bool fmIsNonstandardSQLFunc(int32_t funcId);
int32_t fmFuncScanType(int32_t funcId);
int32_t fmGetFuncExecFuncs(FuncMgtHandle handle, int32_t funcId, SFuncExecFuncs* pFpSet);
#ifdef __cplusplus
}
#endif
#endif // _TD_FUNCTION_MGT_H_

271
include/nodes/nodes.h Normal file
View File

@ -0,0 +1,271 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_NODES_H_
#define _TD_NODES_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "tarray.h"
#include "tdef.h"
typedef enum ENodeType {
QUERY_NODE_COLUMN = 1,
QUERY_NODE_VALUE,
QUERY_NODE_OPERATOR,
QUERY_NODE_LOGIC_CONDITION,
QUERY_NODE_IS_NULL_CONDITION,
QUERY_NODE_FUNCTION,
QUERY_NODE_REAL_TABLE,
QUERY_NODE_TEMP_TABLE,
QUERY_NODE_JOIN_TABLE,
QUERY_NODE_GROUPING_SET,
QUERY_NODE_ORDER_BY_EXPR,
QUERY_NODE_STATE_WINDOW,
QUERY_NODE_SESSION_WINDOW,
QUERY_NODE_INTERVAL_WINDOW,
QUERY_NODE_SET_OPERATOR,
QUERY_NODE_SELECT_STMT
} ENodeType;
/**
* The first field of a node of any type is guaranteed to be the ENodeType.
* Hence the type of any node can be gotten by casting it to SNode.
*/
typedef struct SNode {
ENodeType type;
} SNode;
#define nodeType(nodeptr) (((const SNode*)(nodeptr))->type)
typedef struct SDataType {
uint8_t type;
uint8_t precision;
uint8_t scale;
int32_t bytes;
} SDataType;
typedef struct SExprNode {
ENodeType nodeType;
SDataType resType;
char aliasName[TSDB_COL_NAME_LEN];
} SExprNode;
typedef enum EColumnType {
COLUMN_TYPE_COLUMN = 1,
COLUMN_TYPE_TAG
} EColumnType;
typedef struct SColumnNode {
SExprNode node; // QUERY_NODE_COLUMN
int16_t colId;
EColumnType colType; // column or tag
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
} SColumnNode;
typedef struct SValueNode {
SExprNode type; // QUERY_NODE_VALUE
char* literal;
} SValueNode;
typedef enum EOperatorType {
// arithmetic operator
OP_TYPE_ADD = 1,
OP_TYPE_SUB,
OP_TYPE_MULTI,
OP_TYPE_DIV,
OP_TYPE_MOD,
// comparison operator
OP_TYPE_GREATER_THAN,
OP_TYPE_GREATER_EQUAL,
OP_TYPE_LOWER_THAN,
OP_TYPE_LOWER_EQUAL,
OP_TYPE_EQUAL,
OP_TYPE_NOT_EQUAL,
OP_TYPE_IN,
OP_TYPE_NOT_IN,
OP_TYPE_LIKE,
OP_TYPE_NOT_LIKE,
OP_TYPE_MATCH,
OP_TYPE_NMATCH,
// json operator
OP_TYPE_JSON_GET_VALUE,
OP_TYPE_JSON_CONTAINS
} EOperatorType;
typedef struct SOperatorNode {
SExprNode type; // QUERY_NODE_OPERATOR
EOperatorType opType;
SNode* pLeft;
SNode* pRight;
} SOperatorNode;
typedef enum ELogicConditionType {
LOGIC_COND_TYPE_AND,
LOGIC_COND_TYPE_OR,
LOGIC_COND_TYPE_NOT,
} ELogicConditionType;
typedef struct SLogicConditionNode {
ENodeType type; // QUERY_NODE_LOGIC_CONDITION
ELogicConditionType condType;
SArray* pParameterList;
} SLogicConditionNode;
typedef struct SIsNullCondNode {
ENodeType type; // QUERY_NODE_IS_NULL_CONDITION
SNode* pExpr;
bool isNot;
} SIsNullCondNode;
typedef struct SFunctionNode {
SExprNode type; // QUERY_NODE_FUNCTION
char functionName[TSDB_FUNC_NAME_LEN];
int32_t funcId;
SArray* pParameterList; // SNode
} SFunctionNode;
typedef struct STableNode {
ENodeType type;
char tableName[TSDB_TABLE_NAME_LEN];
char tableAliasName[TSDB_COL_NAME_LEN];
} STableNode;
typedef struct SRealTableNode {
STableNode type; // QUERY_NODE_REAL_TABLE
char dbName[TSDB_DB_NAME_LEN];
} SRealTableNode;
typedef struct STempTableNode {
STableNode type; // QUERY_NODE_TEMP_TABLE
SNode* pSubquery;
} STempTableNode;
typedef enum EJoinType {
JOIN_TYPE_INNER = 1
} EJoinType;
typedef struct SJoinTableNode {
STableNode type; // QUERY_NODE_JOIN_TABLE
EJoinType joinType;
SNode* pLeft;
SNode* pRight;
SNode* pOnCond;
} SJoinTableNode;
typedef enum EGroupingSetType {
GP_TYPE_NORMAL = 1
} EGroupingSetType;
typedef struct SGroupingSetNode {
ENodeType type; // QUERY_NODE_GROUPING_SET
EGroupingSetType groupingSetType;
SArray* pParameterList;
} SGroupingSetNode;
typedef enum EOrder {
ORDER_ASC = 1,
ORDER_DESC
} EOrder;
typedef enum ENullOrder {
NULL_ORDER_FIRST = 1,
NULL_ORDER_LAST
} ENullOrder;
typedef struct SOrderByExprNode {
ENodeType type; // QUERY_NODE_ORDER_BY_EXPR
SNode* pExpr;
EOrder order;
ENullOrder nullOrder;
} SOrderByExprNode;
typedef struct SLimitInfo {
uint64_t limit;
uint64_t offset;
} SLimitInfo;
typedef struct SStateWindowNode {
ENodeType type; // QUERY_NODE_STATE_WINDOW
SNode* pCol;
} SStateWindowNode;
typedef struct SSessionWindowNode {
ENodeType type; // QUERY_NODE_SESSION_WINDOW
int64_t gap; // gap between two session window(in microseconds)
SNode* pCol;
} SSessionWindowNode;
typedef struct SIntervalWindowNode {
ENodeType type; // QUERY_NODE_INTERVAL_WINDOW
int64_t interval;
int64_t sliding;
int64_t offset;
} SIntervalWindowNode;
typedef struct SSelectStmt {
ENodeType type; // QUERY_NODE_SELECT_STMT
bool isDistinct;
SArray* pProjectionList; // SNode
SNode* pFromTable;
SNode* pWhereCond;
SArray* pPartitionByList; // SNode
SNode* pWindowClause;
SArray* pGroupByList; // SGroupingSetNode
SArray* pOrderByList; // SOrderByExprNode
SLimitInfo limit;
SLimitInfo slimit;
} SSelectStmt;
typedef enum ESetOperatorType {
SET_OP_TYPE_UNION_ALL = 1
} ESetOperatorType;
typedef struct SSetOperator {
ENodeType type; // QUERY_NODE_SET_OPERATOR
ESetOperatorType opType;
SNode* pLeft;
SNode* pRight;
} SSetOperator;
typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext);
bool nodeArrayWalker(SArray* pArray, FQueryNodeWalker walker, void* pContext);
bool nodeTreeWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext);
bool stmtWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext);
bool nodeEqual(const SNode* a, const SNode* b);
void cloneNode(const SNode* pNode);
int32_t nodeToString(const SNode* pNode, char** pStr, int32_t* pLen);
int32_t stringToNode(const char* pStr, SNode** pNode);
bool isTimeorderQuery(const SNode* pQuery);
bool isTimelineQuery(const SNode* pQuery);
#ifdef __cplusplus
}
#endif
#endif /*_TD_NODES_H_*/

View File

@ -4,3 +4,4 @@ add_subdirectory(common)
add_subdirectory(libs) add_subdirectory(libs)
add_subdirectory(client) add_subdirectory(client)
add_subdirectory(dnode) add_subdirectory(dnode)
add_subdirectory(nodes)

View File

@ -148,30 +148,30 @@ TEST(testCase, connect_Test) {
// taos_close(pConn); // taos_close(pConn);
//} //}
// //
//TEST(testCase, create_db_Test) { TEST(testCase, create_db_Test) {
//TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//assert(pConn != NULL); assert(pConn != NULL);
//TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2");
//if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
//printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
//} }
//TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
//ASSERT_TRUE(pFields == NULL); ASSERT_TRUE(pFields == NULL);
//int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
//ASSERT_EQ(numOfFields, 0); ASSERT_EQ(numOfFields, 0);
//taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create database abc1 vgroups 4");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
}
taos_close(pConn);
}
//pRes = taos_query(pConn, "create database abc1 vgroups 4");
//if (taos_errno(pRes) != 0) {
//printf("error in create db, reason:%s\n", taos_errstr(pRes));
//}
//taos_close(pConn);
//}
//
//TEST(testCase, create_dnode_Test) { //TEST(testCase, create_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);

View File

@ -100,7 +100,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;

View File

@ -354,6 +354,7 @@ typedef struct SMqSubscribeObj {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
int32_t epoch; int32_t epoch;
//TODO: replace with priority queue //TODO: replace with priority queue
int32_t nextConsumerIdx;
SArray* availConsumer; // SArray<int64_t> (consumerId) SArray* availConsumer; // SArray<int64_t> (consumerId)
SArray* assigned; // SArray<SMqConsumerEp> SArray* assigned; // SArray<SMqConsumerEp>
SArray* unassignedConsumer; // SArray<SMqConsumerEp> SArray* unassignedConsumer; // SArray<SMqConsumerEp>

View File

@ -80,6 +80,7 @@ typedef struct SMnode {
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
tmr_h timer; tmr_h timer;
tmr_h transTimer; tmr_h transTimer;
tmr_h mqTimer;
char *path; char *path;
SMnodeCfg cfg; SMnodeCfg cfg;
int64_t checkTime; int64_t checkTime;

View File

@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "mndSubscribe.h"
#include "mndConsumer.h" #include "mndConsumer.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndSubscribe.h"
#include "mndTopic.h" #include "mndTopic.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
@ -40,6 +40,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic);
int32_t mndInitSubscribe(SMnode *pMnode) { int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE, SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
@ -54,9 +58,90 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/ /*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
int i = 0;
while (key[i] != ':') {
i++;
}
key[i] = 0;
*topic = strdup(key);
key[i] = ':';
*cgroup = strdup(&key[i + 1]);
return 0;
}
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
SMqSubscribeObj *pSub = NULL;
void *pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub);
int sz;
while (pIter != NULL) {
if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) {
char *topic = NULL;
char *cgroup = NULL;
mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
// create trans
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
for (int i = 0; i < sz; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx);
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
pCEp->consumerId = consumerId;
taosArrayPush(pSub->assigned, pCEp);
pSub->nextConsumerIdx++;
// build msg
SMqSetCVgReq req = {
.vgId = pCEp->vgId,
.consumerId = consumerId,
};
strcpy(req.cGroup, cgroup);
strcpy(req.topicName, topic);
strcpy(req.sql, pTopic->sql);
strcpy(req.logicalPlan, pTopic->logicalPlan);
strcpy(req.physicalPlan, pTopic->physicalPlan);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *reqStr = malloc(tlen);
if (reqStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
void *abuf = reqStr;
tEncodeSMqSetCVgReq(abuf, &req);
// persist msg
STransAction action = {0};
action.epSet = pCEp->epset;
action.pCont = reqStr;
action.contLen = tlen;
action.msgType = TDMT_VND_MQ_SET_CONN;
mndTransAppendRedoAction(pTrans, &action);
// persist raw
SSdbRaw *pRaw = mndSubActionEncode(pSub);
mndTransAppendRedolog(pTrans, pRaw);
tfree(topic);
tfree(cgroup);
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
}
mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans);
}
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub);
}
return 0;
}
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
SMqConsumerEp CEp; SMqConsumerEp CEp;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
@ -76,7 +161,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
} }
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
SMqConsumerTopic *pConsumerTopic) { SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic) {
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i); int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i);
@ -86,7 +171,10 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
.consumerId = pConsumer->consumerId, .consumerId = pConsumer->consumerId,
}; };
strcpy(req.cGroup, pConsumer->cgroup); strcpy(req.cGroup, pConsumer->cgroup);
strcpy(req.topicName, pConsumerTopic->name); strcpy(req.topicName, pTopic->name);
strcpy(req.sql, pTopic->sql);
strcpy(req.logicalPlan, pTopic->logicalPlan);
strcpy(req.physicalPlan, pTopic->physicalPlan);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *reqStr = malloc(tlen); void *reqStr = malloc(tlen);
if (reqStr == NULL) { if (reqStr == NULL) {
@ -94,7 +182,7 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
return -1; return -1;
} }
void *abuf = reqStr; void *abuf = reqStr;
tEncodeSMqSetCVgReq(abuf, &req); tEncodeSMqSetCVgReq(&abuf, &req);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
@ -374,19 +462,25 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
// set unassigned vg // set unassigned vg
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
} }
taosArrayPush(pSub->availConsumer, &consumerId);
// TODO: no need
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub); SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
taosArrayPush(pConsumer->topics, pConsumerTopic); taosArrayPush(pConsumer->topics, pConsumerTopic);
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) { if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
// send setmsg to vnode // send setmsg to vnode
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic) < 0) { if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) {
// TODO // TODO
return -1; return -1;
} }
} }
taosArrayDestroy(pConsumerTopic->pVgInfo); taosArrayDestroy(pConsumerTopic->pVgInfo);
free(pConsumerTopic); free(pConsumerTopic);
SSdbRaw *pRaw = mndSubActionEncode(pSub);
/*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/
mndTransAppendRedolog(pTrans, pRaw);
#if 0 #if 0
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
if (pGroup == NULL) { if (pGroup == NULL) {
@ -448,7 +542,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return 0; return 0;
} }
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;

View File

@ -18,6 +18,7 @@
#include "mndAuth.h" #include "mndAuth.h"
#include "mndBnode.h" #include "mndBnode.h"
#include "mndCluster.h" #include "mndCluster.h"
#include "mndConsumer.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndFunc.h" #include "mndFunc.h"
@ -27,6 +28,7 @@
#include "mndShow.h" #include "mndShow.h"
#include "mndSnode.h" #include "mndSnode.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndSubscribe.h"
#include "mndSync.h" #include "mndSync.h"
#include "mndTelem.h" #include "mndTelem.h"
#include "mndTopic.h" #include "mndTopic.h"
@ -72,12 +74,12 @@ static void mndTransReExecute(void *param, void *tmrId) {
static void mndCalMqRebalance(void *param, void *tmrId) { static void mndCalMqRebalance(void *param, void *tmrId) {
SMnode *pMnode = param; SMnode *pMnode = param;
if (mndIsMaster(pMnode)) { if (mndIsMaster(pMnode)) {
// iterate cgroup, cal rebalance SMqTmrMsg *pMsg = rpcMallocCont(sizeof(SMqTmrMsg));
// sync with raft SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pMsg, .contLen = sizeof(SMqTmrMsg)};
// write sdb pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
} }
taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->transTimer); taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer);
} }
static int32_t mndInitTimer(SMnode *pMnode) { static int32_t mndInitTimer(SMnode *pMnode) {
@ -95,6 +97,11 @@ static int32_t mndInitTimer(SMnode *pMnode) {
return -1; return -1;
} }
if (taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0; return 0;
} }
@ -102,6 +109,8 @@ static void mndCleanupTimer(SMnode *pMnode) {
if (pMnode->timer != NULL) { if (pMnode->timer != NULL) {
taosTmrStop(pMnode->transTimer); taosTmrStop(pMnode->transTimer);
pMnode->transTimer = NULL; pMnode->transTimer = NULL;
taosTmrStop(pMnode->mqTimer);
pMnode->mqTimer = NULL;
taosTmrCleanUp(pMnode->timer); taosTmrCleanUp(pMnode->timer);
pMnode->timer = NULL; pMnode->timer = NULL;
} }
@ -171,6 +180,8 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1; if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1; if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
@ -377,7 +388,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
return NULL; return NULL;
} }
if (pRpcMsg->msgType != TDMT_MND_TRANS) { if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER) {
SRpcConnInfo connInfo = {0}; SRpcConnInfo connInfo = {0};
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);

View File

@ -25,6 +25,7 @@ target_link_libraries(
PUBLIC bdb PUBLIC bdb
PUBLIC tfs PUBLIC tfs
PUBLIC wal PUBLIC wal
PUBLIC scheduler
PUBLIC qworker PUBLIC qworker
) )

View File

@ -18,14 +18,15 @@
#include "common.h" #include "common.h"
#include "mallocator.h" #include "mallocator.h"
#include "meta.h"
#include "os.h" #include "os.h"
#include "scheduler.h"
#include "taoserror.h" #include "taoserror.h"
#include "tmsg.h"
#include "tlist.h" #include "tlist.h"
#include "tmsg.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "meta.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -159,6 +160,31 @@ typedef struct STqGroup {
STqRspHandle rspHandle; STqRspHandle rspHandle;
} STqGroup; } STqGroup;
typedef struct STqTaskItem {
int32_t status;
void* dst;
SSubQueryMsg* pMsg;
} STqTaskItem;
// new version
typedef struct STqBuffer {
int64_t firstOffset;
int64_t lastOffset;
STqTaskItem output[TQ_BUFFER_SIZE];
} STqBuffer;
typedef struct STqClientHandle {
int64_t clientId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_TOPIC_FNAME_LEN];
char* sql;
char* logicalPlan;
char* physicalPlan;
int64_t committedOffset;
int64_t currentOffset;
STqBuffer buffer;
} STqClientHandle;
typedef struct STqQueryMsg { typedef struct STqQueryMsg {
STqMsgItem* item; STqMsgItem* item;
struct STqQueryMsg* next; struct STqQueryMsg* next;
@ -325,7 +351,6 @@ typedef struct STqReadHandle {
} STqReadHandle; } STqReadHandle;
typedef struct SSubmitBlkScanInfo { typedef struct SSubmitBlkScanInfo {
} SSubmitBlkScanInfo; } SSubmitBlkScanInfo;
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg); STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg);
@ -333,7 +358,6 @@ bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo); int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
// return SArray<SColumnInfoData> // return SArray<SColumnInfoData>
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList); SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList);
//int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -55,6 +55,8 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); // return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg, pRsp); return vnodeGetTableMeta(pVnode, pMsg, pRsp);
case TDMT_VND_CONSUME:
return 0;
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;

View File

@ -14,6 +14,7 @@
*/ */
#include "vnd.h" #include "vnd.h"
#include "tq.h"
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
@ -108,6 +109,40 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: handle error // TODO: handle error
} }
break; break;
case TDMT_VND_MQ_SET_CONN: {
char* reqStr = ptr;
SMqSetCVgReq req;
tDecodeSMqSetCVgReq(reqStr, &req);
STqClientHandle* pHandle = calloc(sizeof(STqClientHandle), 1);
if (pHandle == NULL) {
// TODO: handle error
}
strcpy(pHandle->topicName, req.topicName);
strcpy(pHandle->cGroup, req.cGroup);
strcpy(pHandle->sql, req.sql);
strcpy(pHandle->logicalPlan, req.logicalPlan);
strcpy(pHandle->physicalPlan, req.physicalPlan);
SArray *pArray;
//TODO: deserialize to SQueryDag
SQueryDag *pDag;
// convert to task
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
// TODO: handle error
}
ASSERT(taosArrayGetSize(pArray) == 0);
STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
pHandle->buffer.firstOffset = -1;
pHandle->buffer.lastOffset = -1;
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
pHandle->buffer.output[i].pMsg = pMsg;
pHandle->buffer.output[i].status = 0;
}
// write mq meta
}
break;
default: default:
ASSERT(0); ASSERT(0);
break; break;

View File

@ -8,5 +8,5 @@ target_include_directories(
target_link_libraries( target_link_libraries(
function function
PRIVATE os util common PRIVATE os util common nodes
) )

View File

@ -0,0 +1,76 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_FUNCTION_MGT_INT_H_
#define _TD_FUNCTION_MGT_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "functionMgt.h"
#define FUNC_MGT_DATA_TYPE_MASK(n) (1 << n)
#define FUNC_MGT_DATA_TYPE_NULL 0
#define FUNC_MGT_DATA_TYPE_BOOL FUNC_MGT_DATA_TYPE_MASK(0)
#define FUNC_MGT_DATA_TYPE_TINYINT FUNC_MGT_DATA_TYPE_MASK(1)
#define FUNC_MGT_DATA_TYPE_SMALLINT FUNC_MGT_DATA_TYPE_MASK(2)
#define FUNC_MGT_DATA_TYPE_INT FUNC_MGT_DATA_TYPE_MASK(3)
#define FUNC_MGT_DATA_TYPE_BIGINT FUNC_MGT_DATA_TYPE_MASK(4)
#define FUNC_MGT_DATA_TYPE_FLOAT FUNC_MGT_DATA_TYPE_MASK(5)
#define FUNC_MGT_DATA_TYPE_DOUBLE FUNC_MGT_DATA_TYPE_MASK(6)
#define FUNC_MGT_DATA_TYPE_BINARY FUNC_MGT_DATA_TYPE_MASK(7)
#define FUNC_MGT_DATA_TYPE_TIMESTAMP FUNC_MGT_DATA_TYPE_MASK(8)
#define FUNC_MGT_DATA_TYPE_NCHAR FUNC_MGT_DATA_TYPE_MASK(9)
#define FUNC_MGT_DATA_TYPE_UTINYINT FUNC_MGT_DATA_TYPE_MASK(10)
#define FUNC_MGT_DATA_TYPE_USMALLINT FUNC_MGT_DATA_TYPE_MASK(11)
#define FUNC_MGT_DATA_TYPE_UINT FUNC_MGT_DATA_TYPE_MASK(12)
#define FUNC_MGT_DATA_TYPE_UBIGINT FUNC_MGT_DATA_TYPE_MASK(13)
#define FUNC_MGT_DATA_TYPE_VARCHAR FUNC_MGT_DATA_TYPE_MASK(14)
#define FUNC_MGT_DATA_TYPE_VARBINARY FUNC_MGT_DATA_TYPE_MASK(15)
#define FUNC_MGT_DATA_TYPE_JSON FUNC_MGT_DATA_TYPE_MASK(16)
#define FUNC_MGT_DATA_TYPE_DECIMAL FUNC_MGT_DATA_TYPE_MASK(17)
#define FUNC_MGT_DATA_TYPE_BLOB FUNC_MGT_DATA_TYPE_MASK(18)
#define FUNC_MGT_EXACT_NUMERIC_DATA_TYPE \
(FUNC_MGT_DATA_TYPE_TINYINT | FUNC_MGT_DATA_TYPE_SMALLINT | FUNC_MGT_DATA_TYPE_INT | FUNC_MGT_DATA_TYPE_BIGINT \
| FUNC_MGT_DATA_TYPE_UTINYINT | FUNC_MGT_DATA_TYPE_USMALLINT | FUNC_MGT_DATA_TYPE_UINT | FUNC_MGT_DATA_TYPE_UBIGINT)
#define FUNC_MGT_APPRO_NUMERIC_DATA_TYPE (FUNC_MGT_DATA_TYPE_FLOAT | FUNC_MGT_DATA_TYPE_DOUBLE)
#define FUNC_MGT_NUMERIC_DATA_TYPE (FUNC_MGT_EXACT_NUMERIC_DATA_TYPE | FUNC_MGT_APPRO_NUMERIC_DATA_TYPE)
typedef void* FuncDef;
typedef struct SFuncElement {
FuncDef (*defineFunc)();
} SFuncElement;
extern const SFuncElement gBuiltinFuncs[];
FuncDef createFuncDef(const char* name, int32_t maxNumOfParams);
FuncDef setOneParamSignature(FuncDef def, int64_t resDataType, int64_t paramDataType);
FuncDef setTwoParamsSignature(FuncDef def, int64_t resDataType, int64_t p1DataType, int64_t p2DataType);
FuncDef setFollowParamSignature(FuncDef def, int64_t paramDataType);
FuncDef setFollowParamsSignature(FuncDef def, int64_t p1DataType, int64_t p2DataType, int32_t followNo);
FuncDef setExecFuncs(FuncDef def, FExecGetEnv getEnv, FExecInit init, FExecProcess process, FExecFinalize finalize);
#ifdef __cplusplus
}
#endif
#endif // _TD_FUNCTION_MGT_INT_H_

View File

@ -95,6 +95,10 @@ static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32
memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen); memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen);
} }
#include "functionMgtInt.h"
FuncDef defineCount();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -0,0 +1,21 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "functionMgtInt.h"
#include "taggfunction.h"
const SFuncElement gBuiltinFuncs[] = {
{.defineFunc = defineCount}
};

View File

@ -0,0 +1,80 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "functionMgt.h"
#include "functionMgtInt.h"
#include "taos.h"
#include "taoserror.h"
#include "thash.h"
typedef struct SFuncMgtService {
SHashObj* pFuncNameHashTable;
} SFuncMgtService;
static SFuncMgtService gFunMgtService;
int32_t fmFuncMgtInit() {
gFunMgtService.pFuncNameHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == gFunMgtService.pFuncNameHashTable) {
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
typedef struct SFuncDef {
char name[TSDB_FUNC_NAME_LEN];
int32_t maxNumOfParams;
SFuncExecFuncs execFuncs;
} SFuncDef ;
FuncDef createFuncDef(const char* name, int32_t maxNumOfParams) {
SFuncDef* pDef = calloc(1, sizeof(SFuncDef));
if (NULL == pDef) {
return NULL;
}
strcpy(pDef->name, name);
pDef->maxNumOfParams = maxNumOfParams;
return pDef;
}
FuncDef setOneParamSignature(FuncDef def, int64_t resDataType, int64_t paramDataType) {
// todo
}
FuncDef setTwoParamsSignature(FuncDef def, int64_t resDataType, int64_t p1DataType, int64_t p2DataType) {
// todo
}
FuncDef setFollowParamSignature(FuncDef def, int64_t paramDataType) {
// todo
}
FuncDef setFollowParamsSignature(FuncDef def, int64_t p1DataType, int64_t p2DataType, int32_t followNo) {
// todo
}
FuncDef setExecFuncs(FuncDef def, FExecGetEnv getEnv, FExecInit init, FExecProcess process, FExecFinalize finalize) {
SFuncDef* pDef = (SFuncDef*)def;
pDef->execFuncs.getEnv = getEnv;
pDef->execFuncs.init = init;
pDef->execFuncs.process = process;
pDef->execFuncs.finalize = finalize;
return def;
}
int32_t registerFunc(FuncDef func) {
}

View File

@ -4835,3 +4835,9 @@ SAggFunctionInfo aggFunc[35] = {{
statisRequired, statisRequired,
} }
}; };
FuncDef defineCount() {
FuncDef def = createFuncDef("count", 1);
// todo define signature
return setExecFuncs(def, NULL, function_setup, count_function, doFinalizer);
}

View File

@ -212,7 +212,7 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ
pExpr[i] = p; pExpr[i] = p;
} }
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL); // pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL);
tfree(pExpr); tfree(pExpr);
} }

View File

@ -117,6 +117,26 @@ static bool fromPnode(const cJSON* json, const char* name, FFromJson func, void*
return func(jObj, *obj); return func(jObj, *obj);
} }
static bool fromPnodeArray(const cJSON* json, const char* name, FFromJson func, SArray** array) {
const cJSON* jArray = cJSON_GetObjectItem(json, name);
int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray));
if (size > 0) {
*array = taosArrayInit(size, POINTER_BYTES);
if (NULL == *array) {
return false;
}
}
for (int32_t i = 0; i < size; ++i) {
cJSON* jItem = cJSON_GetArrayItem(jArray, i);
void* item = calloc(1, getPnodeTypeSize(jItem));
if (NULL == item || !func(jItem, item)) {
return false;
}
taosArrayPush(*array, &item);
}
return true;
}
static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) { static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) {
size_t size = (NULL == array) ? 0 : taosArrayGetSize(array); size_t size = (NULL == array) ? 0 : taosArrayGetSize(array);
if (size > 0) { if (size > 0) {
@ -387,8 +407,8 @@ static const char* jkFunctionChild = "Child";
static bool functionToJson(const void* obj, cJSON* jFunc) { static bool functionToJson(const void* obj, cJSON* jFunc) {
const tExprNode* exprInfo = (const tExprNode*)obj; const tExprNode* exprInfo = (const tExprNode*)obj;
bool res = cJSON_AddStringToObject(jFunc, jkFunctionName, exprInfo->_function.functionName); bool res = cJSON_AddStringToObject(jFunc, jkFunctionName, exprInfo->_function.functionName);
if (res) { if (res && NULL != exprInfo->_function.pChild) {
res = addRawArray(jFunc, jkFunctionChild, exprNodeToJson, exprInfo->_function.pChild, sizeof(tExprNode*), exprInfo->_function.num); res = addRawArray(jFunc, jkFunctionChild, exprNodeToJson, *(exprInfo->_function.pChild), sizeof(tExprNode*), exprInfo->_function.num);
} }
return res; return res;
} }
@ -396,6 +416,10 @@ static bool functionToJson(const void* obj, cJSON* jFunc) {
static bool functionFromJson(const cJSON* json, void* obj) { static bool functionFromJson(const cJSON* json, void* obj) {
tExprNode* exprInfo = (tExprNode*)obj; tExprNode* exprInfo = (tExprNode*)obj;
copyString(json, jkFunctionName, exprInfo->_function.functionName); copyString(json, jkFunctionName, exprInfo->_function.functionName);
exprInfo->_function.pChild = calloc(1, sizeof(tExprNode*));
if (NULL == exprInfo->_function.pChild) {
return false;
}
return fromRawArrayWithAlloc(json, jkFunctionChild, exprNodeFromJson, (void**)exprInfo->_function.pChild, sizeof(tExprNode*), &exprInfo->_function.num); return fromRawArrayWithAlloc(json, jkFunctionChild, exprNodeFromJson, (void**)exprInfo->_function.pChild, sizeof(tExprNode*), &exprInfo->_function.num);
} }
@ -851,7 +875,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
case OP_SystemTableScan: case OP_SystemTableScan:
return scanNodeFromJson(json, obj); return scanNodeFromJson(json, obj);
case OP_Aggregate: case OP_Aggregate:
break; // todo return aggNodeFromJson(json, obj);
case OP_Project: case OP_Project:
return true; return true;
// case OP_Groupby: // case OP_Groupby:
@ -922,7 +946,7 @@ static bool phyNodeFromJson(const cJSON* json, void* obj) {
res = fromObject(json, jkPnodeSchema, dataBlockSchemaFromJson, &node->targetSchema, true); res = fromObject(json, jkPnodeSchema, dataBlockSchemaFromJson, &node->targetSchema, true);
} }
if (res) { if (res) {
res = fromArray(json, jkPnodeChildren, phyNodeFromJson, &node->pChildren, sizeof(SSlotSchema)); res = fromPnodeArray(json, jkPnodeChildren, phyNodeFromJson, &node->pChildren);
} }
if (res) { if (res) {
res = fromObject(json, node->info.name, specificPhyNodeFromJson, node, true); res = fromObject(json, node->info.name, specificPhyNodeFromJson, node, true);

View File

@ -0,0 +1,15 @@
aux_source_directory(src NODES_SRC)
add_library(nodes STATIC ${NODES_SRC})
target_include_directories(
nodes
PUBLIC "${CMAKE_SOURCE_DIR}/include/nodes"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
nodes
PRIVATE os util
)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})

View File

@ -0,0 +1,20 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
void cloneNode(const SNode* pNode) {
}

View File

@ -0,0 +1,24 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
int32_t nodeToString(const SNode* pNode, char** pStr, int32_t* pLen) {
}
int32_t stringToNode(const char* pStr, SNode** pNode) {
}

View File

@ -0,0 +1,141 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
#define COMPARE_SCALAR_FIELD(fldname) \
do { \
if (a->fldname != b->fldname) \
return false; \
} while (0)
#define COMPARE_STRING(a, b) \
(((a) != NULL && (b) != NULL) ? (strcmp(a, b) == 0) : (a) == (b))
#define COMPARE_STRING_FIELD(fldname) \
do { \
if (!COMPARE_STRING(a->fldname, b->fldname)) \
return false; \
} while (0)
#define COMPARE_NODE_FIELD(fldname) \
do { \
if (!nodeEqual(a->fldname, b->fldname)) \
return false; \
} while (0)
#define COMPARE_ARRAY_FIELD(fldname) \
do { \
if (!nodeArrayEqual(a->fldname, b->fldname)) \
return false; \
} while (0)
static bool nodeArrayEqual(const SArray* a, const SArray* b) {
if (a == b) {
return true;
}
if (NULL == a || NULL == b) {
return false;
}
if (taosArrayGetSize(a) != taosArrayGetSize(b)) {
return false;
}
size_t size = taosArrayGetSize(a);
for (size_t i = 0; i < size; ++i) {
if (!nodeEqual((SNode*)taosArrayGetP(a, i), (SNode*)taosArrayGetP(b, i))) {
return false;
}
}
return true;
}
static bool columnNodeEqual(const SColumnNode* a, const SColumnNode* b) {
COMPARE_STRING_FIELD(dbName);
COMPARE_STRING_FIELD(tableName);
COMPARE_STRING_FIELD(colName);
return true;
}
static bool valueNodeEqual(const SValueNode* a, const SValueNode* b) {
COMPARE_STRING_FIELD(literal);
return true;
}
static bool operatorNodeEqual(const SOperatorNode* a, const SOperatorNode* b) {
COMPARE_SCALAR_FIELD(opType);
COMPARE_NODE_FIELD(pLeft);
COMPARE_NODE_FIELD(pRight);
return true;
}
static bool logicConditionNodeEqual(const SLogicConditionNode* a, const SLogicConditionNode* b) {
COMPARE_SCALAR_FIELD(condType);
COMPARE_ARRAY_FIELD(pParameterList);
return true;
}
static bool isNullConditionNodeEqual(const SIsNullCondNode* a, const SIsNullCondNode* b) {
COMPARE_NODE_FIELD(pExpr);
COMPARE_SCALAR_FIELD(isNot);
return true;
}
static bool functionNodeEqual(const SFunctionNode* a, const SFunctionNode* b) {
COMPARE_SCALAR_FIELD(funcId);
COMPARE_ARRAY_FIELD(pParameterList);
return true;
}
bool nodeEqual(const SNode* a, const SNode* b) {
if (a == b) {
return true;
}
if (NULL == a || NULL == b) {
return false;
}
if (nodeType(a) != nodeType(b)) {
return false;
}
switch (nodeType(a)) {
case QUERY_NODE_COLUMN:
return columnNodeEqual((const SColumnNode*)a, (const SColumnNode*)b);
case QUERY_NODE_VALUE:
return valueNodeEqual((const SValueNode*)a, (const SValueNode*)b);
case QUERY_NODE_OPERATOR:
return operatorNodeEqual((const SOperatorNode*)a, (const SOperatorNode*)b);
case QUERY_NODE_LOGIC_CONDITION:
return logicConditionNodeEqual((const SLogicConditionNode*)a, (const SLogicConditionNode*)b);
case QUERY_NODE_IS_NULL_CONDITION:
return isNullConditionNodeEqual((const SIsNullCondNode*)a, (const SIsNullCondNode*)b);
case QUERY_NODE_FUNCTION:
return functionNodeEqual((const SFunctionNode*)a, (const SFunctionNode*)b);
case QUERY_NODE_REAL_TABLE:
case QUERY_NODE_TEMP_TABLE:
case QUERY_NODE_JOIN_TABLE:
case QUERY_NODE_GROUPING_SET:
case QUERY_NODE_ORDER_BY_EXPR:
return false; // todo
default:
break;
}
return false;
}

View File

@ -0,0 +1,83 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext);
bool nodeArrayWalker(SArray* pArray, FQueryNodeWalker walker, void* pContext) {
size_t size = taosArrayGetSize(pArray);
for (size_t i = 0; i < size; ++i) {
if (!nodeTreeWalker((SNode*)taosArrayGetP(pArray, i), walker, pContext)) {
return false;
}
}
return true;
}
bool nodeTreeWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext) {
if (NULL == pNode) {
return true;
}
if (!walker(pNode, pContext)) {
return false;
}
switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN:
case QUERY_NODE_VALUE:
// these node types with no subnodes
return true;
case QUERY_NODE_OPERATOR: {
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
if (!nodeTreeWalker(pOpNode->pLeft, walker, pContext)) {
return false;
}
return nodeTreeWalker(pOpNode->pRight, walker, pContext);
}
case QUERY_NODE_LOGIC_CONDITION:
return nodeArrayWalker(((SLogicConditionNode*)pNode)->pParameterList, walker, pContext);
case QUERY_NODE_IS_NULL_CONDITION:
return nodeTreeWalker(((SIsNullCondNode*)pNode)->pExpr, walker, pContext);
case QUERY_NODE_FUNCTION:
return nodeArrayWalker(((SFunctionNode*)pNode)->pParameterList, walker, pContext);
case QUERY_NODE_REAL_TABLE:
case QUERY_NODE_TEMP_TABLE:
return true; // todo
case QUERY_NODE_JOIN_TABLE: {
SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode;
if (!nodeTreeWalker(pJoinTableNode->pLeft, walker, pContext)) {
return false;
}
if (!nodeTreeWalker(pJoinTableNode->pRight, walker, pContext)) {
return false;
}
return nodeTreeWalker(pJoinTableNode->pOnCond, walker, pContext);
}
case QUERY_NODE_GROUPING_SET:
return nodeArrayWalker(((SGroupingSetNode*)pNode)->pParameterList, walker, pContext);
case QUERY_NODE_ORDER_BY_EXPR:
return nodeTreeWalker(((SOrderByExprNode*)pNode)->pExpr, walker, pContext);
default:
break;
}
return false;
}
bool stmtWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext) {
}

View File

@ -0,0 +1,24 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
bool isTimeorderQuery(const SNode* pQuery) {
}
bool isTimelineQuery(const SNode* pQuery) {
}

View File

@ -0,0 +1,19 @@
MESSAGE(STATUS "build nodes unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(nodesTest ${SOURCE_LIST})
TARGET_INCLUDE_DIRECTORIES(
nodesTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/nodes/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/nodes/inc"
)
TARGET_LINK_LIBRARIES(
nodesTest
PUBLIC os util common nodes gtest
)

View File

@ -0,0 +1,25 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
TEST(NodesTest, traverseTest) {
// todo
}
int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -28,9 +28,14 @@ int32_t numOfThreads = 1;
int64_t numOfTables = 200000; int64_t numOfTables = 200000;
int32_t createTable = 1; int32_t createTable = 1;
int32_t insertData = 0; int32_t insertData = 0;
int32_t batchNum = 100; int32_t batchNumOfTbl = 100;
int32_t batchNumOfRow = 1;
int32_t numOfVgroups = 2; int32_t numOfVgroups = 2;
int32_t showTablesFlag = 0; int32_t showTablesFlag = 0;
int32_t queryFlag = 0;
int64_t startTimestamp = 1640966400000; // 2020-01-01 00:00:00.000
typedef struct { typedef struct {
int64_t tableBeginIndex; int64_t tableBeginIndex;
@ -167,7 +172,7 @@ void showTables() {
void *threadFunc(void *param) { void *threadFunc(void *param) {
SThreadInfo *pInfo = (SThreadInfo *)param; SThreadInfo *pInfo = (SThreadInfo *)param;
char *qstr = malloc(2000 * 1000); char *qstr = malloc(batchNumOfTbl * batchNumOfRow * 128);
int32_t code = 0; int32_t code = 0;
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
@ -192,7 +197,7 @@ void *threadFunc(void *param) {
// batch = MIN(batch, batchNum); // batch = MIN(batch, batchNum);
int32_t len = sprintf(qstr, "create table"); int32_t len = sprintf(qstr, "create table");
for (int32_t i = 0; i < batchNum;) { for (int32_t i = 0; i < batchNumOfTbl;) {
len += sprintf(qstr + len, " %s_t%" PRId64 " using %s tags(%" PRId64 ")", stbName, t, stbName, t); len += sprintf(qstr + len, " %s_t%" PRId64 " using %s tags(%" PRId64 ")", stbName, t, stbName, t);
t++; t++;
i++; i++;
@ -205,7 +210,7 @@ void *threadFunc(void *param) {
TAOS_RES *pRes = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
code = taos_errno(pRes); code = taos_errno(pRes);
if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
pError("failed to create table t%" PRId64 ", code: %d, reason:%s", t, code, tstrerror(code)); pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code));
} }
taos_free_result(pRes); taos_free_result(pRes);
int64_t endTs = taosGetTimestampUs(); int64_t endTs = taosGetTimestampUs();
@ -227,31 +232,49 @@ void *threadFunc(void *param) {
if (insertData) { if (insertData) {
int64_t curMs = 0; int64_t curMs = 0;
int64_t beginMs = taosGetTimestampMs(); int64_t beginMs = taosGetTimestampMs();
pInfo->startMs = beginMs;
pInfo->startMs = taosGetTimestampMs(); int64_t t = pInfo->tableBeginIndex;
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { for (; t <= pInfo->tableEndIndex;) {
int64_t batch = (pInfo->tableEndIndex - t); // int64_t batch = (pInfo->tableEndIndex - t);
batch = MIN(batch, batchNum); // batch = MIN(batch, batchNum);
int32_t len = sprintf(qstr, "insert into "); int32_t len = sprintf(qstr, "insert into ");
for (int32_t i = 0; i < batch; ++i) {
len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i); for (int32_t i = 0; i < batchNumOfTbl;) {
int64_t ts = startTimestamp;
len += sprintf(qstr + len, "%s_t%" PRId64 " values ", stbName, t);
for (int32_t j = 0; j < batchNumOfRow; j++) {
len += sprintf(qstr + len, "(%" PRId64 ", 6666) ", ts++);
} }
t++;
i++;
if (t > pInfo->tableEndIndex) {
break;
}
}
int64_t startTs = taosGetTimestampUs();
TAOS_RES *pRes = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
code = taos_errno(pRes); code = taos_errno(pRes);
if (code != 0) { if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code)); pError("failed to insert %s_t%" PRId64 ", reason:%s", stbName, t, tstrerror(code));
} }
taos_free_result(pRes); taos_free_result(pRes);
int64_t endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
// printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay);
if (delay > pInfo->maxDelay) pInfo->maxDelay = delay;
if (delay < pInfo->minDelay) pInfo->minDelay = delay;
curMs = taosGetTimestampMs(); curMs = taosGetTimestampMs();
if (curMs - beginMs > 10000) { if (curMs - beginMs > 10000) {
beginMs = curMs;
// printf("==== tableBeginIndex: %"PRId64", t: %"PRId64"\n", pInfo->tableBeginIndex, t);
printInsertProgress(pInfo, t); printInsertProgress(pInfo, t);
} }
t += (batch - 1);
} }
printInsertProgress(pInfo, pInfo->tableEndIndex); printInsertProgress(pInfo, t);
} }
taos_close(con); taos_close(con);
@ -280,9 +303,13 @@ void printHelp() {
printf("%s%s\n", indent, "-i"); printf("%s%s\n", indent, "-i");
printf("%s%s%s%d\n", indent, indent, "insertData, default is ", insertData); printf("%s%s%s%d\n", indent, indent, "insertData, default is ", insertData);
printf("%s%s\n", indent, "-b"); printf("%s%s\n", indent, "-b");
printf("%s%s%s%d\n", indent, indent, "batchNum, default is ", batchNum); printf("%s%s%s%d\n", indent, indent, "batchNumOfTbl, default is ", batchNumOfTbl);
printf("%s%s\n", indent, "-w"); printf("%s%s\n", indent, "-w");
printf("%s%s%s%d\n", indent, indent, "showTablesFlag, default is ", showTablesFlag); printf("%s%s%s%d\n", indent, indent, "showTablesFlag, default is ", showTablesFlag);
printf("%s%s\n", indent, "-q");
printf("%s%s%s%d\n", indent, indent, "queryFlag, default is ", queryFlag);
printf("%s%s\n", indent, "-l");
printf("%s%s%s%d\n", indent, indent, "batchNumOfRow, default is ", batchNumOfRow);
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
@ -309,10 +336,15 @@ void parseArgument(int32_t argc, char *argv[]) {
} else if (strcmp(argv[i], "-i") == 0) { } else if (strcmp(argv[i], "-i") == 0) {
insertData = atoi(argv[++i]); insertData = atoi(argv[++i]);
} else if (strcmp(argv[i], "-b") == 0) { } else if (strcmp(argv[i], "-b") == 0) {
batchNum = atoi(argv[++i]); batchNumOfTbl = atoi(argv[++i]);
} else if (strcmp(argv[i], "-l") == 0) {
batchNumOfRow = atoi(argv[++i]);
} else if (strcmp(argv[i], "-w") == 0) { } else if (strcmp(argv[i], "-w") == 0) {
showTablesFlag = atoi(argv[++i]); showTablesFlag = atoi(argv[++i]);
} else if (strcmp(argv[i], "-q") == 0) {
queryFlag = atoi(argv[++i]);
} else { } else {
pPrint("%s unknow para: %s %s", GREEN, argv[++i], NC);
} }
} }
@ -324,8 +356,10 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint("%s numOfVgroups:%d %s", GREEN, numOfVgroups, NC); pPrint("%s numOfVgroups:%d %s", GREEN, numOfVgroups, NC);
pPrint("%s createTable:%d %s", GREEN, createTable, NC); pPrint("%s createTable:%d %s", GREEN, createTable, NC);
pPrint("%s insertData:%d %s", GREEN, insertData, NC); pPrint("%s insertData:%d %s", GREEN, insertData, NC);
pPrint("%s batchNum:%d %s", GREEN, batchNum, NC); pPrint("%s batchNumOfTbl:%d %s", GREEN, batchNumOfTbl, NC);
pPrint("%s batchNumOfRow:%d %s", GREEN, batchNumOfRow, NC);
pPrint("%s showTablesFlag:%d %s", GREEN, showTablesFlag, NC); pPrint("%s showTablesFlag:%d %s", GREEN, showTablesFlag, NC);
pPrint("%s queryFlag:%d %s", GREEN, queryFlag, NC);
pPrint("%s start create table performace test %s", GREEN, NC); pPrint("%s start create table performace test %s", GREEN, NC);
} }
@ -338,7 +372,14 @@ int32_t main(int32_t argc, char *argv[]) {
return 0; return 0;
} }
if (queryFlag) {
//selectRowsFromTable();
return 0;
}
if (createTable) {
createDbAndStb(); createDbAndStb();
}
pPrint("%d threads are spawned to create %" PRId64 " tables", numOfThreads, numOfTables); pPrint("%d threads are spawned to create %" PRId64 " tables", numOfThreads, numOfTables);
@ -396,9 +437,11 @@ int32_t main(int32_t argc, char *argv[]) {
insertDataSpeed += pInfo[i].insertDataSpeed; insertDataSpeed += pInfo[i].insertDataSpeed;
} }
if (createTable) {
pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64 pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64
"us %s", "us %s",
GREEN, numOfTables, createTableSpeed, numOfThreads, maxDelay, minDelay, NC); GREEN, numOfTables, createTableSpeed, numOfThreads, maxDelay, minDelay, NC);
}
if (insertData) { if (insertData) {
pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed,