From 0f3db7d7c2c721ac2a10059033decbf703963d95 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 8 Oct 2021 11:37:54 +0800 Subject: [PATCH 1/4] add tq header --- CMakeLists.txt | 2 +- include/client/consumer/consumer.h | 24 ++++++++++++++---------- include/server/vnode/tq/tq.h | 20 ++++++++++++++++++-- source/client/consumer/consumer.c | 4 +++- source/server/vnode/tq/CMakeLists.txt | 7 ++++++- source/server/vnode/tq/inc/tqInt.h | 5 ++++- source/server/vnode/tq/src/tq.c | 11 +++++++++-- 7 files changed, 55 insertions(+), 18 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 95e1cad2fe..efb92b114f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,4 +54,4 @@ add_library(api INTERFACE ${API_SRC}) # src add_subdirectory(source) -# tests (TODO) \ No newline at end of file +# tests (TODO) diff --git a/include/client/consumer/consumer.h b/include/client/consumer/consumer.h index 742e2c12fe..8d1c9835e6 100644 --- a/include/client/consumer/consumer.h +++ b/include/client/consumer/consumer.h @@ -16,6 +16,10 @@ #ifndef _TD_CONSUMER_H_ #define _TD_CONSUMER_H_ +#include "tlist.h" +#include "tarray.h" +#include "hash.h" + #ifdef __cplusplus extern "C" { #endif @@ -32,16 +36,15 @@ extern "C" { struct tmq_resp_err_t; typedef struct tmq_resp_err_t tmq_resp_err_t; - //topic list - //resouces are supposed to be free by users by calling tmq_list_destroy - struct tmq_topic_list_t; - typedef struct tmq_topic_list_t tmq_topic_list_t; - int32_t tmq_topic_list_add(tmq_topic_list_t*, const char*); - void tmq_topic_list_destroy(tmq_topic_list_t*); + struct tmq_message_t; + typedef struct tmq_message_t tmq_message_t; + + struct tmq_col_batch_t; + typedef struct tmq_col_batch_t tmq_col_batch_t; //get content of message - tmq_col_batch_t *tmq_get_msg_col_by_idx(tmq_message_t*, int32_t); - tmq_col_batch_t *tmq_get_msg_col_by_name(tmq_message_t*, const char*); + tmq_col_batch_t* tmq_get_msg_col_by_idx(tmq_message_t*, int32_t col_id); + tmq_col_batch_t* tmq_get_msg_col_by_name(tmq_message_t*, const char*); //consumer config int32_t tmq_conf_set(tmq_consumer_config_t* , const char* config_key, const char* config_value, char* errstr, int32_t errstr_cap); @@ -51,11 +54,12 @@ extern "C" { tmq_consumer_t* tmq_consumer_new(tmq_consumer_config_t* , char* errstr, int32_t errstr_cap); //subscribe - tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const tmq_topic_list_t*); + tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const SList*); + tmq_resp_err_t tmq_unsubscribe(tmq_consumer_t*); //consume //resouces are supposed to be free by users by calling tmq_message_destroy - tmq_message_t tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time); + tmq_message_t* tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time); //destroy message and free memory void tmq_message_destroy(tmq_message_t*); diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 4c626a1e25..3c163f5045 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -16,17 +16,33 @@ #ifndef _TD_TQ_H_ #define _TD_TQ_H_ +#include "os.h" + #ifdef __cplusplus extern "C" { #endif typedef struct STQ STQ; -int tqPushMsg(void *); +STQ* tqInit(); +void tqCleanUp(STQ* pTQ); + +//create persistent storage for meta info such as consuming offset +//return value > 0: cgId +//return value < 0: error code +int tqCreateGroup(STQ *pTQ); +//create ring buffer in memory and load consuming offset +int tqOpenGroup(STQ* pTQ, int cgId); +//destroy ring buffer and persist consuming offset +int tqCloseGroup(STQ *pTQ, int cgId); +//delete persistent storage for meta info +int tqDropGroup(STQ *pTQ); + +int tqPushMsg(STQ *pTQ, void *, int64_t version); int tqCommit(STQ *pTQ); #ifdef __cplusplus } #endif -#endif /*_TD_TQ_H_*/ \ No newline at end of file +#endif /*_TD_TQ_H_*/ diff --git a/source/client/consumer/consumer.c b/source/client/consumer/consumer.c index 6dea4a4e57..4ba1f95144 100644 --- a/source/client/consumer/consumer.c +++ b/source/client/consumer/consumer.c @@ -11,4 +11,6 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "consumer.h" diff --git a/source/server/vnode/tq/CMakeLists.txt b/source/server/vnode/tq/CMakeLists.txt index f4102b8d4b..7e80da75e9 100644 --- a/source/server/vnode/tq/CMakeLists.txt +++ b/source/server/vnode/tq/CMakeLists.txt @@ -3,5 +3,10 @@ add_library(tq ${TQ_SRC}) target_include_directories( tq PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" + PRIVATE "${CMAKE_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) \ No newline at end of file +) + +target_link_libraries( + os +) diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index 9a4b973106..435a1150b4 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -20,8 +20,11 @@ extern "C" { #endif +//implement the array index +//implement the ring buffer + #ifdef __cplusplus } #endif -#endif /*_TD_TQ_INT_H_*/ \ No newline at end of file +#endif /*_TD_TQ_INT_H_*/ diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index a539182115..f88c203fc9 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -15,5 +15,12 @@ #include "tq.h" -int tqPushMsg(void * p) {return 0;} -int tqCommit(STQ *pTQ) {return 0;} +int tqPushMsg(STQ *pTQ, void * p, int64_t version) { + //add reference + // + return 0; +} + +int tqCommit(STQ *pTQ) { + return 0; +} From cce27ce1f86ac0aa0f27bed3054c3d0a11a58a03 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 8 Oct 2021 13:35:55 +0800 Subject: [PATCH 2/4] add qnode header file --- include/common/taosMsg.h | 55 -- include/common/taosmsg.h | 961 ------------------------------- include/server/qnode/qnode.h | 88 +++ include/server/vnode/meta/meta.h | 4 +- include/server/vnode/tsdb/tsdb.h | 4 +- 5 files changed, 92 insertions(+), 1020 deletions(-) delete mode 100644 include/common/taosMsg.h delete mode 100644 include/common/taosmsg.h diff --git a/include/common/taosMsg.h b/include/common/taosMsg.h deleted file mode 100644 index 0d083a4ca5..0000000000 --- a/include/common/taosMsg.h +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_TAOS_MSG_H_ -#define _TD_TAOS_MSG_H_ - -typedef struct { - /* data */ -} SSubmitReq; - -typedef struct { - /* data */ -} SSubmitRsp; - -typedef struct { - /* data */ -} SSubmitReqReader; - -typedef struct { - /* data */ -} SCreateTableReq; - -typedef struct { - /* data */ -} SCreateTableRsp; - -typedef struct { - /* data */ -} SDropTableReq; - -typedef struct { - /* data */ -} SDropTableRsp; - -typedef struct { - /* data */ -} SAlterTableReq; - -typedef struct { - /* data */ -} SAlterTableRsp; - -#endif /*_TD_TAOS_MSG_H_*/ \ No newline at end of file diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h deleted file mode 100644 index c296317758..0000000000 --- a/include/common/taosmsg.h +++ /dev/null @@ -1,961 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_COMMON_TAOS_MSG_H_ -#define _TD_COMMON_TAOS_MSG_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#include "taosdef.h" -#include "taoserror.h" -#include "tdataformat.h" - -// message type - -#ifdef TAOS_MESSAGE_C -#define TAOS_DEFINE_MESSAGE_TYPE( name, msg ) msg, msg "-rsp", -char *taosMsg[] = { - "null", -#else -#define TAOS_DEFINE_MESSAGE_TYPE( name, msg ) name, name##_RSP, -enum { - TSDB_MESSAGE_NULL = 0, -#endif - -// message from client to dnode -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) - -// message from mnode to dnode -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_TABLE, "create-table" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_TABLE, "drop-table" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_TABLE, "alter-table" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_VNODE, "create-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_VNODE, "drop-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_COMPACT_VNODE, "compact-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) - - -// message from client to mnode -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONNECT, "connect" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_ACCT, "create-acct" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_ACCT, "alter-acct" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_ACCT, "drop-acct" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_USER, "create-user" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_USER, "alter-user" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_USER, "drop-user" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DNODE, "create-dnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DNODE, "drop-dnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DB, "create-db" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_FUNCTION, "create-function" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DB, "drop-db" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_FUNCTION, "drop-function" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_DB, "use-db" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_DB, "alter-db" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SYNC_DB, "sync-db-replica" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TABLE, "create-table" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_COMPACT_VNODE, "compact-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "multiTable-meta" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE, "retrieve" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_QUERY, "kill-query" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_STREAM, "kill-stream" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_CONN, "kill-conn" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONFIG_DNODE, "cm-config-dnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_HEARTBEAT, "heartbeat" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE_FUNC, "retrieve-func" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" ) - -// message from dnode to mnode -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_TABLE, "config-table" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_VNODE, "config-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_STATUS, "status" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_GRANT, "grant" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) - -// message for topic -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) - -#ifndef TAOS_MESSAGE_C - TSDB_MSG_TYPE_MAX // 105 -#endif - -}; - -// IE type -#define TSDB_IE_TYPE_SEC 1 -#define TSDB_IE_TYPE_META 2 -#define TSDB_IE_TYPE_MGMT_IP 3 -#define TSDB_IE_TYPE_DNODE_CFG 4 -#define TSDB_IE_TYPE_NEW_VERSION 5 -#define TSDB_IE_TYPE_DNODE_EXT 6 -#define TSDB_IE_TYPE_DNODE_STATE 7 - -enum _mgmt_table { - TSDB_MGMT_TABLE_ACCT, - TSDB_MGMT_TABLE_USER, - TSDB_MGMT_TABLE_DB, - TSDB_MGMT_TABLE_TABLE, - TSDB_MGMT_TABLE_DNODE, - TSDB_MGMT_TABLE_MNODE, - TSDB_MGMT_TABLE_VGROUP, - TSDB_MGMT_TABLE_METRIC, - TSDB_MGMT_TABLE_MODULE, - TSDB_MGMT_TABLE_QUERIES, - TSDB_MGMT_TABLE_STREAMS, - TSDB_MGMT_TABLE_VARIABLES, - TSDB_MGMT_TABLE_CONNS, - TSDB_MGMT_TABLE_SCORES, - TSDB_MGMT_TABLE_GRANTS, - TSDB_MGMT_TABLE_VNODES, - TSDB_MGMT_TABLE_STREAMTABLES, - TSDB_MGMT_TABLE_CLUSTER, - TSDB_MGMT_TABLE_TP, - TSDB_MGMT_TABLE_FUNCTION, - TSDB_MGMT_TABLE_MAX, -}; - -#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 -#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 -#define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3 -#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 - -#define TSDB_ALTER_TABLE_ADD_COLUMN 5 -#define TSDB_ALTER_TABLE_DROP_COLUMN 6 -#define TSDB_ALTER_TABLE_CHANGE_COLUMN 7 -#define TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN 8 - -#define TSDB_FILL_NONE 0 -#define TSDB_FILL_NULL 1 -#define TSDB_FILL_SET_VALUE 2 -#define TSDB_FILL_LINEAR 3 -#define TSDB_FILL_PREV 4 -#define TSDB_FILL_NEXT 5 - -#define TSDB_ALTER_USER_PASSWD 0x1 -#define TSDB_ALTER_USER_PRIVILEGES 0x2 - -#define TSDB_KILL_MSG_LEN 30 - -#define TSDB_VN_READ_ACCCESS ((char)0x1) -#define TSDB_VN_WRITE_ACCCESS ((char)0x2) -#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) - -#define TSDB_COL_NORMAL 0x0u // the normal column of the table -#define TSDB_COL_TAG 0x1u // the tag column type -#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column -#define TSDB_COL_NULL 0x4u // the column filter NULL or not - -#define TSDB_COL_IS_TAG(f) (((f&(~(TSDB_COL_NULL)))&TSDB_COL_TAG) != 0) -#define TSDB_COL_IS_NORMAL_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) -#define TSDB_COL_IS_UD_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_UDC) -#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) - - -extern char *taosMsg[]; - -#pragma pack(push, 1) - -// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta -typedef struct { - char fqdn[TSDB_FQDN_LEN]; - uint16_t port; -} SEpAddrMsg; - -typedef struct { - char* fqdn; - uint16_t port; -} SEpAddr1; - -typedef struct { - int32_t numOfVnodes; -} SMsgDesc; - -typedef struct SMsgHead { - int32_t contLen; - int32_t vgId; -} SMsgHead; - -// Submit message for one table -typedef struct SSubmitBlk { - uint64_t uid; // table unique id - int32_t tid; // table id - int32_t padding; // TODO just for padding here - int32_t sversion; // data schema version - int32_t dataLen; // data part length, not including the SSubmitBlk head - int32_t schemaLen; // schema length, if length is 0, no schema exists - int16_t numOfRows; // total number of rows in current submit block - char data[]; -} SSubmitBlk; - -// Submit message for this TSDB -typedef struct SSubmitMsg { - SMsgHead header; - int32_t length; - int32_t numOfBlocks; - char blocks[]; -} SSubmitMsg; - -typedef struct { - int32_t index; // index of failed block in submit blocks - int32_t vnode; // vnode index of failed block - int32_t sid; // table index of failed block - int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table -} SShellSubmitRspBlock; - -typedef struct { - int32_t code; // 0-success, > 0 error code - int32_t numOfRows; // number of records the client is trying to write - int32_t affectedRows; // number of records actually written - int32_t failedRows; // number of failed records (exclude duplicate records) - int32_t numOfFailedBlocks; - SShellSubmitRspBlock failedBlocks[]; -} SShellSubmitRspMsg; - -typedef struct SSchema { - uint8_t type; - char name[TSDB_COL_NAME_LEN]; - int16_t colId; - int16_t bytes; -} SSchema; - -typedef struct { - int32_t contLen; - int32_t vgId; - int8_t tableType; - int16_t numOfColumns; - int16_t numOfTags; - int32_t tid; - int32_t sversion; - int32_t tversion; - int32_t tagDataLen; - int32_t sqlDataLen; - uint64_t uid; - uint64_t superTableUid; - uint64_t createdTime; - char tableFname[TSDB_TABLE_FNAME_LEN]; - char stableFname[TSDB_TABLE_FNAME_LEN]; - char data[]; -} SMDCreateTableMsg; - -typedef struct { - int32_t len; // one create table message - char tableName[TSDB_TABLE_FNAME_LEN]; - int8_t igExists; - int8_t getMeta; - int16_t numOfTags; - int16_t numOfColumns; - int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string - int8_t reserved[16]; - char schema[]; -} SCreateTableMsg; - -typedef struct { - int32_t numOfTables; - int32_t contLen; -} SCMCreateTableMsg; - -typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; - // if user specify DROP STABLE, this flag will be set. And an error will be returned if it is not a super table - int8_t supertable; - int8_t igNotExists; -} SCMDropTableMsg; - -typedef struct { - char tableFname[TSDB_TABLE_FNAME_LEN]; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; - int16_t type; /* operation type */ - int16_t numOfCols; /* number of schema */ - int32_t tagValLen; - SSchema schema[]; - // tagVal is padded after schema - // char tagVal[]; -} SAlterTableMsg; - -typedef struct { - SMsgHead head; - int64_t uid; - int32_t tid; - int16_t tversion; - int16_t colId; - int8_t type; - int16_t bytes; - int32_t tagValLen; - int16_t numOfTags; - int32_t schemaLen; - char data[]; -} SUpdateTableTagValMsg; - -typedef struct { - char clientVersion[TSDB_VERSION_LEN]; - char msgVersion[TSDB_VERSION_LEN]; - char db[TSDB_TABLE_FNAME_LEN]; - char appName[TSDB_APPNAME_LEN]; - int32_t pid; -} SConnectMsg; - -typedef struct SEpSet { - int8_t inUse; - int8_t numOfEps; - uint16_t port[TSDB_MAX_REPLICA]; - char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; -} SEpSet; - -typedef struct { - char acctId[TSDB_ACCT_ID_LEN]; - char serverVersion[TSDB_VERSION_LEN]; - char clusterId[TSDB_CLUSTER_ID_LEN]; - int8_t writeAuth; - int8_t superAuth; - int8_t reserved1; - int8_t reserved2; - int32_t connId; - SEpSet epSet; -} SConnectRsp; - -typedef struct { - int32_t maxUsers; - int32_t maxDbs; - int32_t maxTimeSeries; - int32_t maxConnections; - int32_t maxStreams; - int32_t maxPointsPerSecond; - int64_t maxStorage; // In unit of GB - int64_t maxQueryTime; // In unit of hour - int64_t maxInbound; - int64_t maxOutbound; - int8_t accessState; // Configured only by command -} SAcctCfg; - -typedef struct { - char user[TSDB_USER_LEN]; - char pass[TSDB_KEY_LEN]; - SAcctCfg cfg; -} SCreateAcctMsg, SAlterAcctMsg; - -typedef struct { - char user[TSDB_USER_LEN]; -} SDropUserMsg, SDropAcctMsg; - -typedef struct { - char user[TSDB_USER_LEN]; - char pass[TSDB_KEY_LEN]; - int8_t privilege; - int8_t flag; -} SCreateUserMsg, SAlterUserMsg; - -typedef struct { - int32_t contLen; - int32_t vgId; - int32_t tid; - uint64_t uid; - char tableFname[TSDB_TABLE_FNAME_LEN]; -} SMDDropTableMsg; - -typedef struct { - int32_t contLen; - int32_t vgId; - uint64_t uid; - char tableFname[TSDB_TABLE_FNAME_LEN]; -} SDropSTableMsg; - -typedef struct { - int32_t vgId; -} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; - -typedef struct SColIndex { - int16_t colId; // column id - int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag - uint16_t flag; // denote if it is a tag or a normal column - char name[TSDB_COL_NAME_LEN + TSDB_DB_NAME_LEN + 1]; -} SColIndex; - -typedef struct SColumnFilterInfo { - int16_t lowerRelOptr; - int16_t upperRelOptr; - int16_t filterstr; // denote if current column is char(binary/nchar) - - union { - struct { - int64_t lowerBndi; - int64_t upperBndi; - }; - struct { - double lowerBndd; - double upperBndd; - }; - struct { - int64_t pz; - int64_t len; - }; - }; -} SColumnFilterInfo; - -typedef struct SColumnFilterList { - int16_t numOfFilters; - union{ - int64_t placeholder; - SColumnFilterInfo *filterInfo; - }; -} SColumnFilterList; -/* - * for client side struct, we only need the column id, type, bytes are not necessary - * But for data in vnode side, we need all the following information. - */ -typedef struct SColumnInfo { - int16_t colId; - int16_t type; - int16_t bytes; - SColumnFilterList flist; -} SColumnInfo; - -typedef struct STableIdInfo { - uint64_t uid; - int32_t tid; - TSKEY key; // last accessed ts, for subscription -} STableIdInfo; - -typedef struct STimeWindow { - TSKEY skey; - TSKEY ekey; -} STimeWindow; - -typedef struct { - int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed - int32_t tsLen; // total length of ts comp block - int32_t tsNumOfBlocks; // ts comp block numbers - int32_t tsOrder; // ts comp block order -} STsBufInfo; - -typedef struct SInterval { - int32_t tz; // query client timezone - char intervalUnit; - char slidingUnit; - char offsetUnit; - int64_t interval; - int64_t sliding; - int64_t offset; -} SInterval; - -typedef struct SSessionWindow { - int64_t gap; // gap between two session window(in microseconds) - int32_t primaryColId; // primary timestamp column -} SSessionWindow; - -typedef struct { - SMsgHead head; - char version[TSDB_VERSION_LEN]; - - bool stableQuery; // super table query or not - bool topBotQuery; // TODO used bitwise flag - bool interpQuery; // interp query or not - bool groupbyColumn; // denote if this is a groupby normal column query - bool hasTagResults; // if there are tag values in final result or not - bool timeWindowInterpo;// if the time window start/end required interpolation - bool queryBlockDist; // if query data block distribution - bool stabledev; // super table stddev query - bool tsCompQuery; // is tscomp query - bool simpleAgg; - bool pointInterpQuery; // point interpolation query - bool needReverseScan; // need reverse scan - bool stateWindow; // state window flag - - STimeWindow window; - int32_t numOfTables; - int16_t order; - int16_t orderColId; - int16_t numOfCols; // the number of columns will be load from vnode - SInterval interval; - SSessionWindow sw; // session window - uint16_t tagCondLen; // tag length in current query - uint16_t colCondLen; // column length in current query - int16_t numOfGroupCols; // num of group by columns - int16_t orderByIdx; - int16_t orderType; // used in group by xx order by xxx - int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. - int16_t prjOrder; // global order in super table projection query. - int64_t limit; - int64_t offset; - uint32_t queryType; // denote another query process - int16_t numOfOutput; // final output columns numbers - int16_t fillType; // interpolate type - uint64_t fillVal; // default value array list - int32_t secondStageOutput; - STsBufInfo tsBuf; // tsBuf info - int32_t numOfTags; // number of tags columns involved - int32_t sqlstrLen; // sql query string - int32_t prevResultLen; // previous result length - int32_t numOfOperator; - int32_t tableScanOperator;// table scan operator. -1 means no scan operator - int32_t udfNum; // number of udf function - int32_t udfContentOffset; - int32_t udfContentLen; - SColumnInfo tableCols[]; -} SQueryTableMsg; - -typedef struct { - int32_t code; - union{uint64_t qhandle; uint64_t qId;}; // query handle -} SQueryTableRsp; - -// todo: the show handle should be replaced with id -typedef struct { - SMsgHead header; - union{uint64_t qhandle; uint64_t qId;}; // query handle - uint16_t free; -} SRetrieveTableMsg; - -typedef struct SRetrieveTableRsp { - int32_t numOfRows; - int8_t completed; // all results are returned to client - int16_t precision; - int64_t offset; // updated offset value for multi-vnode projection query - int64_t useconds; - int8_t compressed; - int32_t compLen; - char data[]; -} SRetrieveTableRsp; - -typedef struct { - int32_t vgId; - int32_t dbCfgVersion; - int64_t totalStorage; - int64_t compStorage; - int64_t pointsWritten; - uint64_t vnodeVersion; - int32_t vgCfgVersion; - uint8_t status; - uint8_t role; - uint8_t replica; - uint8_t compact; -} SVnodeLoad; - -typedef struct { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; - int32_t cacheBlockSize; //MB - int32_t totalBlocks; - int32_t maxTables; - int32_t daysPerFile; - int32_t daysToKeep0; - int32_t daysToKeep1; - int32_t daysToKeep2; - int32_t minRowsPerFileBlock; - int32_t maxRowsPerFileBlock; - int32_t commitTime; - int32_t fsyncPeriod; - uint8_t precision; // time resolution - int8_t compression; - int8_t walLevel; - int8_t replications; - int8_t quorum; - int8_t ignoreExist; - int8_t update; - int8_t cacheLastRow; - int8_t dbType; - int16_t partitions; - int8_t reserve[5]; -} SCreateDbMsg, SAlterDbMsg; - -typedef struct { - char name[TSDB_FUNC_NAME_LEN]; - char path[PATH_MAX]; - int32_t funcType; - uint8_t outputType; - int16_t outputLen; - int32_t bufSize; - int32_t codeLen; - char code[]; -} SCreateFuncMsg; - -typedef struct { - int32_t num; - char name[]; -} SRetrieveFuncMsg; - -typedef struct { - char name[TSDB_FUNC_NAME_LEN]; - int32_t funcType; - int8_t resType; - int16_t resBytes; - int32_t bufSize; - int32_t len; - char content[]; -} SFunctionInfoMsg; - -typedef struct { - int32_t num; - char content[]; -} SUdfFuncMsg; - -typedef struct { - char name[TSDB_FUNC_NAME_LEN]; -} SDropFuncMsg; - -typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; - uint8_t ignoreNotExists; -} SDropDbMsg, SUseDbMsg, SSyncDbMsg; - -// IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed -// TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE -typedef struct { - int64_t pointsWritten; // In unit of points - int64_t totalStorage; // In unit of bytes - int64_t compStorage; // In unit of bytes - int64_t queryTime; // In unit of second ?? - char reserved[64]; -} SVnodeStatisticInfo; - -typedef struct SVgroupAccess { - int32_t vgId; - int8_t accessState; -} SVgroupAccess; - -typedef struct { - int32_t dnodeId; - uint32_t moduleStatus; - uint32_t numOfVnodes; - char clusterId[TSDB_CLUSTER_ID_LEN]; - char reserved[16]; -} SDnodeCfg; - -typedef struct { - int32_t dnodeId; - uint16_t dnodePort; - char dnodeFqdn[TSDB_FQDN_LEN]; -} SDnodeEp; - -typedef struct { - int32_t dnodeNum; - SDnodeEp dnodeEps[]; -} SDnodeEps; - -typedef struct { - int32_t mnodeId; - char mnodeEp[TSDB_EP_LEN]; -} SMInfo; - -typedef struct SMInfos { - int8_t inUse; - int8_t mnodeNum; - SMInfo mnodeInfos[TSDB_MAX_REPLICA]; -} SMInfos; - -typedef struct { - int32_t numOfMnodes; // tsNumOfMnodes - int32_t mnodeEqualVnodeNum; // tsMnodeEqualVnodeNum - int32_t offlineThreshold; // tsOfflineThreshold - int32_t statusInterval; // tsStatusInterval - int32_t maxtablesPerVnode; - int32_t maxVgroupsPerDb; - char arbitrator[TSDB_EP_LEN]; // tsArbitrator - char reserve[2]; // to solve arm32 bus error - char timezone[64]; // tsTimezone - int64_t checkTime; // 1970-01-01 00:00:00.000 - char locale[TSDB_LOCALE_LEN]; // tsLocale - char charset[TSDB_LOCALE_LEN]; // tsCharset - int8_t enableBalance; // tsEnableBalance - int8_t flowCtrl; - int8_t slaveQuery; - int8_t adjustMaster; - int8_t reserved[4]; -} SClusterCfg; - -typedef struct SStatusMsg { - uint32_t version; - int32_t dnodeId; - char dnodeEp[TSDB_EP_LEN]; - uint32_t moduleStatus; - uint32_t lastReboot; // time stamp for last reboot - uint16_t reserve1; // from config file - uint16_t openVnodes; - uint16_t numOfCores; - float diskAvailable; // GB - char clusterId[TSDB_CLUSTER_ID_LEN]; - uint8_t alternativeRole; - uint8_t reserve2[15]; - SClusterCfg clusterCfg; - SVnodeLoad load[]; -} SStatusMsg; - -typedef struct { - SMInfos mnodes; - SDnodeCfg dnodeCfg; - SVgroupAccess vgAccess[]; -} SStatusRsp; - -typedef struct { - uint32_t vgId; - int32_t dbCfgVersion; - int32_t maxTables; - int32_t cacheBlockSize; - int32_t totalBlocks; - int32_t daysPerFile; - int32_t daysToKeep; - int32_t daysToKeep1; - int32_t daysToKeep2; - int32_t minRowsPerFileBlock; - int32_t maxRowsPerFileBlock; - int32_t commitTime; - int32_t fsyncPeriod; - int8_t precision; - int8_t compression; - int8_t walLevel; - int8_t vgReplica; - int8_t wals; - int8_t quorum; - int8_t update; - int8_t cacheLastRow; - int32_t vgCfgVersion; - int8_t dbReplica; - int8_t dbType; - int8_t reserved[8]; -} SVnodeCfg; - -typedef struct { - int32_t nodeId; - char nodeEp[TSDB_EP_LEN]; -} SVnodeDesc; - -typedef struct { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; - SVnodeCfg cfg; - SVnodeDesc nodes[TSDB_MAX_REPLICA]; -} SCreateVnodeMsg, SAlterVnodeMsg; - -typedef struct { - char tableFname[TSDB_TABLE_FNAME_LEN]; - int16_t createFlag; - char tags[]; -} STableInfoMsg; - -typedef struct { - uint8_t metaClone; // create local clone of the cached table meta - int32_t numOfVgroups; - int32_t numOfTables; - int32_t numOfUdfs; - char tableNames[]; -} SMultiTableInfoMsg; - -typedef struct SSTableVgroupMsg { - int32_t numOfTables; -} SSTableVgroupMsg, SSTableVgroupRspMsg; - -typedef struct { - int32_t vgId; - int8_t numOfEps; - SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; -} SVgroupMsg; - -typedef struct { - int32_t numOfVgroups; - SVgroupMsg vgroups[]; -} SVgroupsMsg, SVgroupsInfo; - -typedef struct STableMetaMsg { - int32_t contLen; - char tableFname[TSDB_TABLE_FNAME_LEN]; // table id - uint8_t numOfTags; - uint8_t precision; - uint8_t tableType; - int16_t numOfColumns; - int16_t sversion; - int16_t tversion; - int32_t tid; - uint64_t uid; - SVgroupMsg vgroup; - - char sTableName[TSDB_TABLE_FNAME_LEN]; - uint64_t suid; - SSchema schema[]; -} STableMetaMsg; - -typedef struct SMultiTableMeta { - int32_t numOfTables; - int32_t numOfVgroup; - int32_t numOfUdf; - int32_t contLen; - uint8_t compressed; // denote if compressed or not - uint32_t rawLen; // size before compress - uint8_t metaClone; // make meta clone after retrieve meta from mnode - char meta[]; -} SMultiTableMeta; - -typedef struct { - int32_t dataLen; - char name[TSDB_TABLE_FNAME_LEN]; - char *data; -} STagData; - -/* - * sql: show tables like '%a_%' - * payload is the query condition, e.g., '%a_%' - * payloadLen is the length of payload - */ -typedef struct { - int8_t type; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; - uint16_t payloadLen; - char payload[]; -} SShowMsg; - -typedef struct { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; - int32_t numOfVgroup; - int32_t vgid[]; -} SCompactMsg; - -typedef struct SShowRsp { - uint64_t qhandle; - STableMetaMsg tableMeta; -} SShowRsp; - -typedef struct { - char ep[TSDB_EP_LEN]; // end point, hostname:port -} SCreateDnodeMsg, SDropDnodeMsg; - -typedef struct { - int32_t dnodeId; - char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port - SMInfos mnodes; -} SCreateMnodeMsg; - -typedef struct { - int32_t dnodeId; - int32_t vgId; - int32_t tid; -} SConfigTableMsg; - -typedef struct { - uint32_t dnodeId; - int32_t vgId; -} SConfigVnodeMsg; - -typedef struct { - char ep[TSDB_EP_LEN]; // end point, hostname:port - char config[64]; -} SCfgDnodeMsg; - -typedef struct { - char sql[TSDB_SHOW_SQL_LEN]; - uint32_t queryId; - int64_t useconds; - int64_t stime; - uint64_t qId; - uint64_t sqlObjId; - int32_t pid; - char fqdn[TSDB_FQDN_LEN]; - uint8_t stableQuery; - int32_t numOfSub; - char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; //include subqueries' index, Obj IDs and states(C-complete/I-imcomplete) -} SQueryDesc; - -typedef struct { - char sql[TSDB_SHOW_SQL_LEN]; - char dstTable[TSDB_TABLE_NAME_LEN]; - uint32_t streamId; - int64_t num; // number of computing/cycles - int64_t useconds; - int64_t ctime; - int64_t stime; - int64_t slidingTime; - int64_t interval; -} SStreamDesc; - -typedef struct { - char clientVer[TSDB_VERSION_LEN]; - uint32_t connId; - int32_t pid; - int32_t numOfQueries; - int32_t numOfStreams; - char appName[TSDB_APPNAME_LEN]; - char pData[]; -} SHeartBeatMsg; - -typedef struct { - uint32_t queryId; - uint32_t streamId; - uint32_t totalDnodes; - uint32_t onlineDnodes; - uint32_t connId; - int8_t killConnection; - SEpSet epSet; -} SHeartBeatRsp; - -typedef struct { - char queryId[TSDB_KILL_MSG_LEN + 1]; -} SKillQueryMsg, SKillStreamMsg, SKillConnMsg; - -typedef struct { - int32_t vnode; - int32_t sid; - uint64_t uid; - uint64_t stime; // stream starting time - int32_t status; - char tableFname[TSDB_TABLE_FNAME_LEN]; -} SAlterStreamMsg; - -typedef struct { - char user[TSDB_USER_LEN]; - char spi; - char encrypt; - char secret[TSDB_KEY_LEN]; - char ckey[TSDB_KEY_LEN]; -} SAuthMsg, SAuthRsp; - -typedef struct { - int8_t finished; - int8_t reserved1[7]; - char name[TSDB_STEP_NAME_LEN]; - char desc[TSDB_STEP_DESC_LEN]; - char reserved2[64]; -} SStartupStep; - -#pragma pack(pop) - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_COMMON_TAOS_MSG_H_*/ diff --git a/include/server/qnode/qnode.h b/include/server/qnode/qnode.h index 00daf2b051..65779b2099 100644 --- a/include/server/qnode/qnode.h +++ b/include/server/qnode/qnode.h @@ -20,6 +20,94 @@ extern "C" { #endif + +typedef struct { + uint64_t numOfStartTask; + uint64_t numOfStopTask; + uint64_t numOfRecvedFetch; + uint64_t numOfSentHb; + uint64_t numOfSentFetch; + uint64_t numOfTaskInQueue; + uint64_t numOfFetchInQueue; + uint64_t numOfErrors; +} SQnodeStat; + +/* start Task msg */ +typedef struct { + uint32_t schedulerIp; + uint16_t schedulerPort; + int64_t taskId; + int64_t queryId; + uint32_t srcIp; + uint16_t srcPort; +} SQnodeStartTaskMsg; + +/* stop Task msg */ +typedef struct { + int64_t taskId; +} SQnodeStopTaskMsg; + +/* start/stop Task msg response */ +typedef struct { + int64_t taskId; + int32_t code; +} SQnodeTaskRespMsg; + +/* Task status msg */ +typedef struct { + int64_t taskId; + int32_t status; + int64_t queryId; +} SQnodeTaskStatusMsg; + +/* Qnode/Scheduler heartbeat msg */ +typedef struct { + int32_t status; + int32_t load; + +} SQnodeHeartbeatMsg; + +/* Qnode sent/received msg */ +typedef struct { + int8_t msgType; + int32_t msgLen; + char msg[]; +} SQnodeMsg; + + +/** + * Start one Qnode in Dnode. + * @return Error Code. + */ +int32_t qnodeStart(); + +/** + * Stop Qnode in Dnode. + * + * @param qnodeId Qnode ID to stop, -1 for all Qnodes. + */ +void qnodeStop(int64_t qnodeId); + + +/** + * Get the statistical information of Qnode + * + * @param qnodeId Qnode ID to get statistics, -1 for all + * @param stat Statistical information. + * @return Error Code. + */ +int32_t qnodeGetStatistics(int64_t qnodeId, SQnodeStat *stat); + +/** + * Interface for processing Qnode messages. + * + * @param pMsg Message to be processed. + * @return Error code + */ +void qnodeProcessReq(SRpcMsg *pMsg); + + + #ifdef __cplusplus } #endif diff --git a/include/server/vnode/meta/meta.h b/include/server/vnode/meta/meta.h index ca7cba9b0c..a68409d2fc 100644 --- a/include/server/vnode/meta/meta.h +++ b/include/server/vnode/meta/meta.h @@ -16,7 +16,7 @@ #ifndef _TD_META_H_ #define _TD_META_H_ -#include "taosMsg.h" +#include "taosmsg.h" #ifdef __cplusplus extern "C" { @@ -33,4 +33,4 @@ int metaCommit(SMeta *pMeta); } #endif -#endif /*_TD_META_H_*/ \ No newline at end of file +#endif /*_TD_META_H_*/ diff --git a/include/server/vnode/tsdb/tsdb.h b/include/server/vnode/tsdb/tsdb.h index 968bac2fa2..4c6eb7a1e3 100644 --- a/include/server/vnode/tsdb/tsdb.h +++ b/include/server/vnode/tsdb/tsdb.h @@ -17,7 +17,7 @@ #define _TD_TSDB_H_ #include "os.h" -#include "taosMsg.h" +#include "taosmsg.h" #ifdef __cplusplus extern "C" { @@ -55,4 +55,4 @@ int tsdbCommit(STsdb *pTsdb); } #endif -#endif /*_TD_TSDB_H_*/ \ No newline at end of file +#endif /*_TD_TSDB_H_*/ From 066d11cea071a18375d0599901e0e20946b391cc Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 8 Oct 2021 13:43:12 +0800 Subject: [PATCH 3/4] add tq header --- include/libs/wal/wal.h | 11 +++++++---- include/server/vnode/tq/tq.h | 20 +++++++++++--------- source/libs/wal/CMakeLists.txt | 7 ++++++- source/server/vnode/tq/CMakeLists.txt | 2 +- source/server/vnode/tq/inc/tqInt.h | 2 ++ source/server/vnode/tq/src/tq.c | 13 ++++++++++--- 6 files changed, 37 insertions(+), 18 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 8f6d09f8d6..e59d60f7dc 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -15,13 +15,16 @@ #ifndef _TD_WAL_H_ #define _TD_WAL_H_ +#include "os.h" + #ifdef __cplusplus extern "C" { #endif typedef enum { TAOS_WAL_NOLOG = 0, - TAOS_WAL_WRITE = 1 + TAOS_WAL_WRITE = 1, + TAOS_WAL_FSYNC = 2 } EWalType; typedef struct { @@ -55,8 +58,8 @@ void walStop(twalh); void walClose(twalh); //write -int32_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); -void walWaitFsync(twalh, bool forceHint); +int64_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); +void walFsync(twalh, bool forceHint); //int32_t walCommit(twalh, uint64_t ver); //int32_t walRollback(twalh, uint64_t ver); @@ -67,7 +70,7 @@ int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum); //life cycle int32_t walDataPersisted(twalh, int64_t ver); int32_t walFirstVer(twalh); -int32_t walLastVer(twal); +int32_t walLastVer(twalh); //int32_t walDataCorrupted(twalh); #ifdef __cplusplus diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 3c163f5045..91688e890d 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -24,22 +24,24 @@ extern "C" { typedef struct STQ STQ; -STQ* tqInit(); -void tqCleanUp(STQ* pTQ); +STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); +void tqCleanUp(STQ* pTq); //create persistent storage for meta info such as consuming offset //return value > 0: cgId -//return value < 0: error code -int tqCreateGroup(STQ *pTQ); +//return value <= 0: error code +int tqCreateGroup(STQ*); //create ring buffer in memory and load consuming offset -int tqOpenGroup(STQ* pTQ, int cgId); +int tqOpenGroup(STQ*, int cgId); //destroy ring buffer and persist consuming offset -int tqCloseGroup(STQ *pTQ, int cgId); +int tqCloseGroup(STQ*, int cgId); //delete persistent storage for meta info -int tqDropGroup(STQ *pTQ); +int tqDropGroup(STQ*, int cgId); -int tqPushMsg(STQ *pTQ, void *, int64_t version); -int tqCommit(STQ *pTQ); +int tqPushMsg(STQ*, void *, int64_t version); +int tqCommit(STQ*); + +int tqHandleMsg(STQ*, void *msg); #ifdef __cplusplus } diff --git a/source/libs/wal/CMakeLists.txt b/source/libs/wal/CMakeLists.txt index 1e19586276..fbcdff59ee 100644 --- a/source/libs/wal/CMakeLists.txt +++ b/source/libs/wal/CMakeLists.txt @@ -4,4 +4,9 @@ target_include_directories( wal PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) \ No newline at end of file + PRIVATE "${CMAKE_SOURCE_DIR}/include/os" +) + +target_link_libraries( + os +) diff --git a/source/server/vnode/tq/CMakeLists.txt b/source/server/vnode/tq/CMakeLists.txt index 7e80da75e9..9577007400 100644 --- a/source/server/vnode/tq/CMakeLists.txt +++ b/source/server/vnode/tq/CMakeLists.txt @@ -3,8 +3,8 @@ add_library(tq ${TQ_SRC}) target_include_directories( tq PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" - PRIVATE "${CMAKE_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" + PRIVATE "${CMAKE_SOURCE_DIR}/include/os" ) target_link_libraries( diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index 435a1150b4..416a915456 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -16,6 +16,8 @@ #ifndef _TD_TQ_INT_H_ #define _TD_TQ_INT_H_ +#include "tq.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index f88c203fc9..3255f3fb3a 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -13,14 +13,21 @@ * along with this program. If not, see . */ -#include "tq.h" +#include "tqInt.h" -int tqPushMsg(STQ *pTQ, void * p, int64_t version) { +//static +//read next version data +// +//send to fetch queue +// +//handle management message + +int tqPushMsg(STQ* pTq , void* p, int64_t version) { //add reference // return 0; } -int tqCommit(STQ *pTQ) { +int tqCommit(STQ* pTq) { return 0; } From c45b1911b517bebfceca073af85348cbf856a301 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 8 Oct 2021 13:53:49 +0800 Subject: [PATCH 4/4] fix taosmsg issue --- include/common/taosmsg.h | 997 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 997 insertions(+) create mode 100644 include/common/taosmsg.h diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h new file mode 100644 index 0000000000..6cabc8568c --- /dev/null +++ b/include/common/taosmsg.h @@ -0,0 +1,997 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_COMMON_TAOS_MSG_H_ +#define _TD_COMMON_TAOS_MSG_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "taosdef.h" +#include "taoserror.h" +#include "tdataformat.h" + +// message type + +#ifdef TAOS_MESSAGE_C +#define TAOS_DEFINE_MESSAGE_TYPE( name, msg ) msg, msg "-rsp", +char *taosMsg[] = { + "null", +#else +#define TAOS_DEFINE_MESSAGE_TYPE( name, msg ) name, name##_RSP, +enum { + TSDB_MESSAGE_NULL = 0, +#endif + +// message from client to dnode +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) + +// message from mnode to dnode +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_TABLE, "create-table" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_TABLE, "drop-table" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_TABLE, "alter-table" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_VNODE, "create-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_VNODE, "drop-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_COMPACT_VNODE, "compact-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) + + +// message from client to mnode +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONNECT, "connect" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_ACCT, "create-acct" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_ACCT, "alter-acct" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_ACCT, "drop-acct" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_USER, "create-user" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_USER, "alter-user" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_USER, "drop-user" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DNODE, "create-dnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DNODE, "drop-dnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DB, "create-db" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_FUNCTION, "create-function" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DB, "drop-db" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_FUNCTION, "drop-function" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_DB, "use-db" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_DB, "alter-db" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SYNC_DB, "sync-db-replica" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TABLE, "create-table" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_COMPACT_VNODE, "compact-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "multiTable-meta" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE, "retrieve" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_QUERY, "kill-query" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_STREAM, "kill-stream" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_CONN, "kill-conn" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONFIG_DNODE, "cm-config-dnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_HEARTBEAT, "heartbeat" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE_FUNC, "retrieve-func" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" ) + +// message from dnode to mnode +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_TABLE, "config-table" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_VNODE, "config-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_STATUS, "status" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_GRANT, "grant" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) + +// message for topic +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) + +#ifndef TAOS_MESSAGE_C + TSDB_MSG_TYPE_MAX // 105 +#endif + +}; + +// IE type +#define TSDB_IE_TYPE_SEC 1 +#define TSDB_IE_TYPE_META 2 +#define TSDB_IE_TYPE_MGMT_IP 3 +#define TSDB_IE_TYPE_DNODE_CFG 4 +#define TSDB_IE_TYPE_NEW_VERSION 5 +#define TSDB_IE_TYPE_DNODE_EXT 6 +#define TSDB_IE_TYPE_DNODE_STATE 7 + +enum _mgmt_table { + TSDB_MGMT_TABLE_ACCT, + TSDB_MGMT_TABLE_USER, + TSDB_MGMT_TABLE_DB, + TSDB_MGMT_TABLE_TABLE, + TSDB_MGMT_TABLE_DNODE, + TSDB_MGMT_TABLE_MNODE, + TSDB_MGMT_TABLE_VGROUP, + TSDB_MGMT_TABLE_METRIC, + TSDB_MGMT_TABLE_MODULE, + TSDB_MGMT_TABLE_QUERIES, + TSDB_MGMT_TABLE_STREAMS, + TSDB_MGMT_TABLE_VARIABLES, + TSDB_MGMT_TABLE_CONNS, + TSDB_MGMT_TABLE_SCORES, + TSDB_MGMT_TABLE_GRANTS, + TSDB_MGMT_TABLE_VNODES, + TSDB_MGMT_TABLE_STREAMTABLES, + TSDB_MGMT_TABLE_CLUSTER, + TSDB_MGMT_TABLE_TP, + TSDB_MGMT_TABLE_FUNCTION, + TSDB_MGMT_TABLE_MAX, +}; + +#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 +#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 +#define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3 +#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 + +#define TSDB_ALTER_TABLE_ADD_COLUMN 5 +#define TSDB_ALTER_TABLE_DROP_COLUMN 6 +#define TSDB_ALTER_TABLE_CHANGE_COLUMN 7 +#define TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN 8 + +#define TSDB_FILL_NONE 0 +#define TSDB_FILL_NULL 1 +#define TSDB_FILL_SET_VALUE 2 +#define TSDB_FILL_LINEAR 3 +#define TSDB_FILL_PREV 4 +#define TSDB_FILL_NEXT 5 + +#define TSDB_ALTER_USER_PASSWD 0x1 +#define TSDB_ALTER_USER_PRIVILEGES 0x2 + +#define TSDB_KILL_MSG_LEN 30 + +#define TSDB_VN_READ_ACCCESS ((char)0x1) +#define TSDB_VN_WRITE_ACCCESS ((char)0x2) +#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) + +#define TSDB_COL_NORMAL 0x0u // the normal column of the table +#define TSDB_COL_TAG 0x1u // the tag column type +#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column +#define TSDB_COL_NULL 0x4u // the column filter NULL or not + +#define TSDB_COL_IS_TAG(f) (((f&(~(TSDB_COL_NULL)))&TSDB_COL_TAG) != 0) +#define TSDB_COL_IS_NORMAL_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) +#define TSDB_COL_IS_UD_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_UDC) +#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) + + +extern char *taosMsg[]; + +#pragma pack(push, 1) + +// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta +typedef struct { + char fqdn[TSDB_FQDN_LEN]; + uint16_t port; +} SEpAddrMsg; + +typedef struct { + char* fqdn; + uint16_t port; +} SEpAddr1; + +typedef struct { + int32_t numOfVnodes; +} SMsgDesc; + +typedef struct SMsgHead { + int32_t contLen; + int32_t vgId; +} SMsgHead; + +// Submit message for one table +typedef struct SSubmitBlk { + uint64_t uid; // table unique id + int32_t tid; // table id + int32_t padding; // TODO just for padding here + int32_t sversion; // data schema version + int32_t dataLen; // data part length, not including the SSubmitBlk head + int32_t schemaLen; // schema length, if length is 0, no schema exists + int16_t numOfRows; // total number of rows in current submit block + char data[]; +} SSubmitBlk; + +// Submit message for this TSDB +typedef struct SSubmitMsg { + SMsgHead header; + int32_t length; + int32_t numOfBlocks; + char blocks[]; +} SSubmitMsg; + +typedef struct { + int32_t index; // index of failed block in submit blocks + int32_t vnode; // vnode index of failed block + int32_t sid; // table index of failed block + int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table +} SShellSubmitRspBlock; + +typedef struct { + int32_t code; // 0-success, > 0 error code + int32_t numOfRows; // number of records the client is trying to write + int32_t affectedRows; // number of records actually written + int32_t failedRows; // number of failed records (exclude duplicate records) + int32_t numOfFailedBlocks; + SShellSubmitRspBlock failedBlocks[]; +} SShellSubmitRspMsg; + +typedef struct SSchema { + uint8_t type; + char name[TSDB_COL_NAME_LEN]; + int16_t colId; + int16_t bytes; +} SSchema; + +typedef struct { + int32_t contLen; + int32_t vgId; + int8_t tableType; + int16_t numOfColumns; + int16_t numOfTags; + int32_t tid; + int32_t sversion; + int32_t tversion; + int32_t tagDataLen; + int32_t sqlDataLen; + uint64_t uid; + uint64_t superTableUid; + uint64_t createdTime; + char tableFname[TSDB_TABLE_FNAME_LEN]; + char stableFname[TSDB_TABLE_FNAME_LEN]; + char data[]; +} SMDCreateTableMsg; + +typedef struct { + int32_t len; // one create table message + char tableName[TSDB_TABLE_FNAME_LEN]; + int8_t igExists; + int8_t getMeta; + int16_t numOfTags; + int16_t numOfColumns; + int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string + int8_t reserved[16]; + char schema[]; +} SCreateTableMsg; + +typedef struct { + int32_t numOfTables; + int32_t contLen; +} SCMCreateTableMsg; + +typedef struct { + char name[TSDB_TABLE_FNAME_LEN]; + // if user specify DROP STABLE, this flag will be set. And an error will be returned if it is not a super table + int8_t supertable; + int8_t igNotExists; +} SCMDropTableMsg; + +typedef struct { + char tableFname[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + int16_t type; /* operation type */ + int16_t numOfCols; /* number of schema */ + int32_t tagValLen; + SSchema schema[]; + // tagVal is padded after schema + // char tagVal[]; +} SAlterTableMsg; + +typedef struct { + SMsgHead head; + int64_t uid; + int32_t tid; + int16_t tversion; + int16_t colId; + int8_t type; + int16_t bytes; + int32_t tagValLen; + int16_t numOfTags; + int32_t schemaLen; + char data[]; +} SUpdateTableTagValMsg; + +typedef struct { + char clientVersion[TSDB_VERSION_LEN]; + char msgVersion[TSDB_VERSION_LEN]; + char db[TSDB_TABLE_FNAME_LEN]; + char appName[TSDB_APPNAME_LEN]; + int32_t pid; +} SConnectMsg; + +typedef struct SEpSet { + int8_t inUse; + int8_t numOfEps; + uint16_t port[TSDB_MAX_REPLICA]; + char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; +} SEpSet; + +typedef struct { + char acctId[TSDB_ACCT_ID_LEN]; + char serverVersion[TSDB_VERSION_LEN]; + char clusterId[TSDB_CLUSTER_ID_LEN]; + int8_t writeAuth; + int8_t superAuth; + int8_t reserved1; + int8_t reserved2; + int32_t connId; + SEpSet epSet; +} SConnectRsp; + +typedef struct { + int32_t maxUsers; + int32_t maxDbs; + int32_t maxTimeSeries; + int32_t maxConnections; + int32_t maxStreams; + int32_t maxPointsPerSecond; + int64_t maxStorage; // In unit of GB + int64_t maxQueryTime; // In unit of hour + int64_t maxInbound; + int64_t maxOutbound; + int8_t accessState; // Configured only by command +} SAcctCfg; + +typedef struct { + char user[TSDB_USER_LEN]; + char pass[TSDB_KEY_LEN]; + SAcctCfg cfg; +} SCreateAcctMsg, SAlterAcctMsg; + +typedef struct { + char user[TSDB_USER_LEN]; +} SDropUserMsg, SDropAcctMsg; + +typedef struct { + char user[TSDB_USER_LEN]; + char pass[TSDB_KEY_LEN]; + int8_t privilege; + int8_t flag; +} SCreateUserMsg, SAlterUserMsg; + +typedef struct { + int32_t contLen; + int32_t vgId; + int32_t tid; + uint64_t uid; + char tableFname[TSDB_TABLE_FNAME_LEN]; +} SMDDropTableMsg; + +typedef struct { + int32_t contLen; + int32_t vgId; + uint64_t uid; + char tableFname[TSDB_TABLE_FNAME_LEN]; +} SDropSTableMsg; + +typedef struct { + int32_t vgId; +} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; + +typedef struct SColIndex { + int16_t colId; // column id + int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag + uint16_t flag; // denote if it is a tag or a normal column + char name[TSDB_COL_NAME_LEN + TSDB_DB_NAME_LEN + 1]; +} SColIndex; + +typedef struct SColumnFilterInfo { + int16_t lowerRelOptr; + int16_t upperRelOptr; + int16_t filterstr; // denote if current column is char(binary/nchar) + + union { + struct { + int64_t lowerBndi; + int64_t upperBndi; + }; + struct { + double lowerBndd; + double upperBndd; + }; + struct { + int64_t pz; + int64_t len; + }; + }; +} SColumnFilterInfo; + +typedef struct SColumnFilterList { + int16_t numOfFilters; + union{ + int64_t placeholder; + SColumnFilterInfo *filterInfo; + }; +} SColumnFilterList; +/* + * for client side struct, we only need the column id, type, bytes are not necessary + * But for data in vnode side, we need all the following information. + */ +typedef struct SColumnInfo { + int16_t colId; + int16_t type; + int16_t bytes; + SColumnFilterList flist; +} SColumnInfo; + +typedef struct STableIdInfo { + uint64_t uid; + int32_t tid; + TSKEY key; // last accessed ts, for subscription +} STableIdInfo; + +typedef struct STimeWindow { + TSKEY skey; + TSKEY ekey; +} STimeWindow; + +typedef struct { + int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed + int32_t tsLen; // total length of ts comp block + int32_t tsNumOfBlocks; // ts comp block numbers + int32_t tsOrder; // ts comp block order +} STsBufInfo; + +typedef struct SInterval { + int32_t tz; // query client timezone + char intervalUnit; + char slidingUnit; + char offsetUnit; + int64_t interval; + int64_t sliding; + int64_t offset; +} SInterval; + +typedef struct SSessionWindow { + int64_t gap; // gap between two session window(in microseconds) + int32_t primaryColId; // primary timestamp column +} SSessionWindow; + +typedef struct { + SMsgHead head; + char version[TSDB_VERSION_LEN]; + + bool stableQuery; // super table query or not + bool topBotQuery; // TODO used bitwise flag + bool interpQuery; // interp query or not + bool groupbyColumn; // denote if this is a groupby normal column query + bool hasTagResults; // if there are tag values in final result or not + bool timeWindowInterpo;// if the time window start/end required interpolation + bool queryBlockDist; // if query data block distribution + bool stabledev; // super table stddev query + bool tsCompQuery; // is tscomp query + bool simpleAgg; + bool pointInterpQuery; // point interpolation query + bool needReverseScan; // need reverse scan + bool stateWindow; // state window flag + + STimeWindow window; + int32_t numOfTables; + int16_t order; + int16_t orderColId; + int16_t numOfCols; // the number of columns will be load from vnode + SInterval interval; + SSessionWindow sw; // session window + uint16_t tagCondLen; // tag length in current query + uint16_t colCondLen; // column length in current query + int16_t numOfGroupCols; // num of group by columns + int16_t orderByIdx; + int16_t orderType; // used in group by xx order by xxx + int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. + int16_t prjOrder; // global order in super table projection query. + int64_t limit; + int64_t offset; + uint32_t queryType; // denote another query process + int16_t numOfOutput; // final output columns numbers + int16_t fillType; // interpolate type + uint64_t fillVal; // default value array list + int32_t secondStageOutput; + STsBufInfo tsBuf; // tsBuf info + int32_t numOfTags; // number of tags columns involved + int32_t sqlstrLen; // sql query string + int32_t prevResultLen; // previous result length + int32_t numOfOperator; + int32_t tableScanOperator;// table scan operator. -1 means no scan operator + int32_t udfNum; // number of udf function + int32_t udfContentOffset; + int32_t udfContentLen; + SColumnInfo tableCols[]; +} SQueryTableMsg; + +typedef struct { + int32_t code; + union{uint64_t qhandle; uint64_t qId;}; // query handle +} SQueryTableRsp; + +// todo: the show handle should be replaced with id +typedef struct { + SMsgHead header; + union{uint64_t qhandle; uint64_t qId;}; // query handle + uint16_t free; +} SRetrieveTableMsg; + +typedef struct SRetrieveTableRsp { + int32_t numOfRows; + int8_t completed; // all results are returned to client + int16_t precision; + int64_t offset; // updated offset value for multi-vnode projection query + int64_t useconds; + int8_t compressed; + int32_t compLen; + char data[]; +} SRetrieveTableRsp; + +typedef struct { + int32_t vgId; + int32_t dbCfgVersion; + int64_t totalStorage; + int64_t compStorage; + int64_t pointsWritten; + uint64_t vnodeVersion; + int32_t vgCfgVersion; + uint8_t status; + uint8_t role; + uint8_t replica; + uint8_t compact; +} SVnodeLoad; + +typedef struct { + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + int32_t cacheBlockSize; //MB + int32_t totalBlocks; + int32_t maxTables; + int32_t daysPerFile; + int32_t daysToKeep0; + int32_t daysToKeep1; + int32_t daysToKeep2; + int32_t minRowsPerFileBlock; + int32_t maxRowsPerFileBlock; + int32_t commitTime; + int32_t fsyncPeriod; + uint8_t precision; // time resolution + int8_t compression; + int8_t walLevel; + int8_t replications; + int8_t quorum; + int8_t ignoreExist; + int8_t update; + int8_t cacheLastRow; + int8_t dbType; + int16_t partitions; + int8_t reserve[5]; +} SCreateDbMsg, SAlterDbMsg; + +typedef struct { + char name[TSDB_FUNC_NAME_LEN]; + char path[PATH_MAX]; + int32_t funcType; + uint8_t outputType; + int16_t outputLen; + int32_t bufSize; + int32_t codeLen; + char code[]; +} SCreateFuncMsg; + +typedef struct { + int32_t num; + char name[]; +} SRetrieveFuncMsg; + +typedef struct { + char name[TSDB_FUNC_NAME_LEN]; + int32_t funcType; + int8_t resType; + int16_t resBytes; + int32_t bufSize; + int32_t len; + char content[]; +} SFunctionInfoMsg; + +typedef struct { + int32_t num; + char content[]; +} SUdfFuncMsg; + +typedef struct { + char name[TSDB_FUNC_NAME_LEN]; +} SDropFuncMsg; + +typedef struct { + char db[TSDB_TABLE_FNAME_LEN]; + uint8_t ignoreNotExists; +} SDropDbMsg, SUseDbMsg, SSyncDbMsg; + +// IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed +// TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE +typedef struct { + int64_t pointsWritten; // In unit of points + int64_t totalStorage; // In unit of bytes + int64_t compStorage; // In unit of bytes + int64_t queryTime; // In unit of second ?? + char reserved[64]; +} SVnodeStatisticInfo; + +typedef struct SVgroupAccess { + int32_t vgId; + int8_t accessState; +} SVgroupAccess; + +typedef struct { + int32_t dnodeId; + uint32_t moduleStatus; + uint32_t numOfVnodes; + char clusterId[TSDB_CLUSTER_ID_LEN]; + char reserved[16]; +} SDnodeCfg; + +typedef struct { + int32_t dnodeId; + uint16_t dnodePort; + char dnodeFqdn[TSDB_FQDN_LEN]; +} SDnodeEp; + +typedef struct { + int32_t dnodeNum; + SDnodeEp dnodeEps[]; +} SDnodeEps; + +typedef struct { + int32_t mnodeId; + char mnodeEp[TSDB_EP_LEN]; +} SMInfo; + +typedef struct SMInfos { + int8_t inUse; + int8_t mnodeNum; + SMInfo mnodeInfos[TSDB_MAX_REPLICA]; +} SMInfos; + +typedef struct { + int32_t numOfMnodes; // tsNumOfMnodes + int32_t mnodeEqualVnodeNum; // tsMnodeEqualVnodeNum + int32_t offlineThreshold; // tsOfflineThreshold + int32_t statusInterval; // tsStatusInterval + int32_t maxtablesPerVnode; + int32_t maxVgroupsPerDb; + char arbitrator[TSDB_EP_LEN]; // tsArbitrator + char reserve[2]; // to solve arm32 bus error + char timezone[64]; // tsTimezone + int64_t checkTime; // 1970-01-01 00:00:00.000 + char locale[TSDB_LOCALE_LEN]; // tsLocale + char charset[TSDB_LOCALE_LEN]; // tsCharset + int8_t enableBalance; // tsEnableBalance + int8_t flowCtrl; + int8_t slaveQuery; + int8_t adjustMaster; + int8_t reserved[4]; +} SClusterCfg; + +typedef struct SStatusMsg { + uint32_t version; + int32_t dnodeId; + char dnodeEp[TSDB_EP_LEN]; + uint32_t moduleStatus; + uint32_t lastReboot; // time stamp for last reboot + uint16_t reserve1; // from config file + uint16_t openVnodes; + uint16_t numOfCores; + float diskAvailable; // GB + char clusterId[TSDB_CLUSTER_ID_LEN]; + uint8_t alternativeRole; + uint8_t reserve2[15]; + SClusterCfg clusterCfg; + SVnodeLoad load[]; +} SStatusMsg; + +typedef struct { + SMInfos mnodes; + SDnodeCfg dnodeCfg; + SVgroupAccess vgAccess[]; +} SStatusRsp; + +typedef struct { + uint32_t vgId; + int32_t dbCfgVersion; + int32_t maxTables; + int32_t cacheBlockSize; + int32_t totalBlocks; + int32_t daysPerFile; + int32_t daysToKeep; + int32_t daysToKeep1; + int32_t daysToKeep2; + int32_t minRowsPerFileBlock; + int32_t maxRowsPerFileBlock; + int32_t commitTime; + int32_t fsyncPeriod; + int8_t precision; + int8_t compression; + int8_t walLevel; + int8_t vgReplica; + int8_t wals; + int8_t quorum; + int8_t update; + int8_t cacheLastRow; + int32_t vgCfgVersion; + int8_t dbReplica; + int8_t dbType; + int8_t reserved[8]; +} SVnodeCfg; + +typedef struct { + int32_t nodeId; + char nodeEp[TSDB_EP_LEN]; +} SVnodeDesc; + +typedef struct { + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + SVnodeCfg cfg; + SVnodeDesc nodes[TSDB_MAX_REPLICA]; +} SCreateVnodeMsg, SAlterVnodeMsg; + +typedef struct { + char tableFname[TSDB_TABLE_FNAME_LEN]; + int16_t createFlag; + char tags[]; +} STableInfoMsg; + +typedef struct { + uint8_t metaClone; // create local clone of the cached table meta + int32_t numOfVgroups; + int32_t numOfTables; + int32_t numOfUdfs; + char tableNames[]; +} SMultiTableInfoMsg; + +typedef struct SSTableVgroupMsg { + int32_t numOfTables; +} SSTableVgroupMsg, SSTableVgroupRspMsg; + +typedef struct { + int32_t vgId; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SVgroupMsg; + +typedef struct { + int32_t numOfVgroups; + SVgroupMsg vgroups[]; +} SVgroupsMsg, SVgroupsInfo; + +typedef struct STableMetaMsg { + int32_t contLen; + char tableFname[TSDB_TABLE_FNAME_LEN]; // table id + uint8_t numOfTags; + uint8_t precision; + uint8_t tableType; + int16_t numOfColumns; + int16_t sversion; + int16_t tversion; + int32_t tid; + uint64_t uid; + SVgroupMsg vgroup; + + char sTableName[TSDB_TABLE_FNAME_LEN]; + uint64_t suid; + SSchema schema[]; +} STableMetaMsg; + +typedef struct SMultiTableMeta { + int32_t numOfTables; + int32_t numOfVgroup; + int32_t numOfUdf; + int32_t contLen; + uint8_t compressed; // denote if compressed or not + uint32_t rawLen; // size before compress + uint8_t metaClone; // make meta clone after retrieve meta from mnode + char meta[]; +} SMultiTableMeta; + +typedef struct { + int32_t dataLen; + char name[TSDB_TABLE_FNAME_LEN]; + char *data; +} STagData; + +/* + * sql: show tables like '%a_%' + * payload is the query condition, e.g., '%a_%' + * payloadLen is the length of payload + */ +typedef struct { + int8_t type; + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + uint16_t payloadLen; + char payload[]; +} SShowMsg; + +typedef struct { + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + int32_t numOfVgroup; + int32_t vgid[]; +} SCompactMsg; + +typedef struct SShowRsp { + uint64_t qhandle; + STableMetaMsg tableMeta; +} SShowRsp; + +typedef struct { + char ep[TSDB_EP_LEN]; // end point, hostname:port +} SCreateDnodeMsg, SDropDnodeMsg; + +typedef struct { + int32_t dnodeId; + char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port + SMInfos mnodes; +} SCreateMnodeMsg; + +typedef struct { + int32_t dnodeId; + int32_t vgId; + int32_t tid; +} SConfigTableMsg; + +typedef struct { + uint32_t dnodeId; + int32_t vgId; +} SConfigVnodeMsg; + +typedef struct { + char ep[TSDB_EP_LEN]; // end point, hostname:port + char config[64]; +} SCfgDnodeMsg; + +typedef struct { + char sql[TSDB_SHOW_SQL_LEN]; + uint32_t queryId; + int64_t useconds; + int64_t stime; + uint64_t qId; + uint64_t sqlObjId; + int32_t pid; + char fqdn[TSDB_FQDN_LEN]; + uint8_t stableQuery; + int32_t numOfSub; + char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; //include subqueries' index, Obj IDs and states(C-complete/I-imcomplete) +} SQueryDesc; + +typedef struct { + char sql[TSDB_SHOW_SQL_LEN]; + char dstTable[TSDB_TABLE_NAME_LEN]; + uint32_t streamId; + int64_t num; // number of computing/cycles + int64_t useconds; + int64_t ctime; + int64_t stime; + int64_t slidingTime; + int64_t interval; +} SStreamDesc; + +typedef struct { + char clientVer[TSDB_VERSION_LEN]; + uint32_t connId; + int32_t pid; + int32_t numOfQueries; + int32_t numOfStreams; + char appName[TSDB_APPNAME_LEN]; + char pData[]; +} SHeartBeatMsg; + +typedef struct { + uint32_t queryId; + uint32_t streamId; + uint32_t totalDnodes; + uint32_t onlineDnodes; + uint32_t connId; + int8_t killConnection; + SEpSet epSet; +} SHeartBeatRsp; + +typedef struct { + char queryId[TSDB_KILL_MSG_LEN + 1]; +} SKillQueryMsg, SKillStreamMsg, SKillConnMsg; + +typedef struct { + int32_t vnode; + int32_t sid; + uint64_t uid; + uint64_t stime; // stream starting time + int32_t status; + char tableFname[TSDB_TABLE_FNAME_LEN]; +} SAlterStreamMsg; + +typedef struct { + char user[TSDB_USER_LEN]; + char spi; + char encrypt; + char secret[TSDB_KEY_LEN]; + char ckey[TSDB_KEY_LEN]; +} SAuthMsg, SAuthRsp; + +typedef struct { + int8_t finished; + int8_t reserved1[7]; + char name[TSDB_STEP_NAME_LEN]; + char desc[TSDB_STEP_DESC_LEN]; + char reserved2[64]; +} SStartupStep; + +typedef struct { + /* data */ +} SSubmitReq; + +typedef struct { + /* data */ +} SSubmitRsp; + +typedef struct { + /* data */ +} SSubmitReqReader; + +typedef struct { + /* data */ +} SCreateTableReq; + +typedef struct { + /* data */ +} SCreateTableRsp; + +typedef struct { + /* data */ +} SDropTableReq; + +typedef struct { + /* data */ +} SDropTableRsp; + +typedef struct { + /* data */ +} SAlterTableReq; + +typedef struct { + /* data */ +} SAlterTableRsp; + +#pragma pack(pop) + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_COMMON_TAOS_MSG_H_*/