2356 lines
64 KiB
C
2356 lines
64 KiB
C
/*
|
||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||
*
|
||
* This program is free software: you can use, redistribute, and/or modify
|
||
* it under the terms of the GNU Affero General Public License, version 3
|
||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||
*
|
||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||
*
|
||
* You should have received a copy of the GNU Affero General Public License
|
||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
*/
|
||
|
||
#ifndef _TD_COMMON_TAOS_MSG_H_
|
||
#define _TD_COMMON_TAOS_MSG_H_
|
||
|
||
#include "taosdef.h"
|
||
#include "taoserror.h"
|
||
#include "tarray.h"
|
||
#include "tcoding.h"
|
||
#include "tencode.h"
|
||
#include "thash.h"
|
||
#include "tlist.h"
|
||
#include "trow.h"
|
||
#include "tname.h"
|
||
#include "tuuid.h"
|
||
|
||
#ifdef __cplusplus
|
||
extern "C" {
|
||
#endif
|
||
|
||
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */
|
||
#define TD_MSG_NUMBER_
|
||
#undef TD_MSG_DICT_
|
||
#undef TD_MSG_INFO_
|
||
#undef TD_MSG_SEG_CODE_
|
||
#include "tmsgdef.h"
|
||
|
||
#undef TD_MSG_NUMBER_
|
||
#undef TD_MSG_DICT_
|
||
#undef TD_MSG_INFO_
|
||
#define TD_MSG_SEG_CODE_
|
||
#include "tmsgdef.h"
|
||
|
||
#undef TD_MSG_NUMBER_
|
||
#undef TD_MSG_DICT_
|
||
#undef TD_MSG_INFO_
|
||
#undef TD_MSG_SEG_CODE_
|
||
#include "tmsgdef.h"
|
||
|
||
extern char* tMsgInfo[];
|
||
extern int32_t tMsgDict[];
|
||
|
||
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
|
||
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
|
||
#define TMSG_INFO(TYPE) tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)]
|
||
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
|
||
|
||
typedef uint16_t tmsg_t;
|
||
|
||
/* ------------------------ OTHER DEFINITIONS ------------------------ */
|
||
// IE type
|
||
#define TSDB_IE_TYPE_SEC 1
|
||
#define TSDB_IE_TYPE_META 2
|
||
#define TSDB_IE_TYPE_MGMT_IP 3
|
||
#define TSDB_IE_TYPE_DNODE_CFG 4
|
||
#define TSDB_IE_TYPE_NEW_VERSION 5
|
||
#define TSDB_IE_TYPE_DNODE_EXT 6
|
||
#define TSDB_IE_TYPE_DNODE_STATE 7
|
||
|
||
typedef enum {
|
||
HEARTBEAT_TYPE_MQ = 0,
|
||
HEARTBEAT_TYPE_QUERY,
|
||
// types can be added here
|
||
//
|
||
HEARTBEAT_TYPE_MAX
|
||
} EHbType;
|
||
|
||
enum {
|
||
HEARTBEAT_KEY_DBINFO = 1,
|
||
HEARTBEAT_KEY_STBINFO,
|
||
HEARTBEAT_KEY_MQ_TMP,
|
||
};
|
||
|
||
typedef enum _mgmt_table {
|
||
TSDB_MGMT_TABLE_START,
|
||
TSDB_MGMT_TABLE_ACCT,
|
||
TSDB_MGMT_TABLE_USER,
|
||
TSDB_MGMT_TABLE_DB,
|
||
TSDB_MGMT_TABLE_TABLE,
|
||
TSDB_MGMT_TABLE_DNODE,
|
||
TSDB_MGMT_TABLE_MNODE,
|
||
TSDB_MGMT_TABLE_QNODE,
|
||
TSDB_MGMT_TABLE_SNODE,
|
||
TSDB_MGMT_TABLE_BNODE,
|
||
TSDB_MGMT_TABLE_VGROUP,
|
||
TSDB_MGMT_TABLE_STB,
|
||
TSDB_MGMT_TABLE_MODULE,
|
||
TSDB_MGMT_TABLE_QUERIES,
|
||
TSDB_MGMT_TABLE_STREAMS,
|
||
TSDB_MGMT_TABLE_VARIABLES,
|
||
TSDB_MGMT_TABLE_CONNS,
|
||
TSDB_MGMT_TABLE_TRANS,
|
||
TSDB_MGMT_TABLE_GRANTS,
|
||
TSDB_MGMT_TABLE_VNODES,
|
||
TSDB_MGMT_TABLE_CLUSTER,
|
||
TSDB_MGMT_TABLE_STREAMTABLES,
|
||
TSDB_MGMT_TABLE_TP,
|
||
TSDB_MGMT_TABLE_FUNC,
|
||
TSDB_MGMT_TABLE_MAX,
|
||
} EShowType;
|
||
|
||
#define TSDB_ALTER_TABLE_ADD_TAG 1
|
||
#define TSDB_ALTER_TABLE_DROP_TAG 2
|
||
#define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3
|
||
#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4
|
||
#define TSDB_ALTER_TABLE_ADD_COLUMN 5
|
||
#define TSDB_ALTER_TABLE_DROP_COLUMN 6
|
||
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7
|
||
#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8
|
||
#define TSDB_ALTER_TABLE_UPDATE_OPTIONS 9
|
||
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME 10
|
||
|
||
#define TSDB_FILL_NONE 0
|
||
#define TSDB_FILL_NULL 1
|
||
#define TSDB_FILL_SET_VALUE 2
|
||
#define TSDB_FILL_LINEAR 3
|
||
#define TSDB_FILL_PREV 4
|
||
#define TSDB_FILL_NEXT 5
|
||
|
||
#define TSDB_ALTER_USER_PASSWD 0x1
|
||
#define TSDB_ALTER_USER_SUPERUSER 0x2
|
||
#define TSDB_ALTER_USER_ADD_READ_DB 0x3
|
||
#define TSDB_ALTER_USER_REMOVE_READ_DB 0x4
|
||
#define TSDB_ALTER_USER_CLEAR_READ_DB 0x5
|
||
#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6
|
||
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7
|
||
#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8
|
||
|
||
#define TSDB_ALTER_USER_PRIVILEGES 0x2
|
||
|
||
#define TSDB_KILL_MSG_LEN 30
|
||
|
||
#define TSDB_TABLE_NUM_UNIT 100000
|
||
|
||
#define TSDB_VN_READ_ACCCESS ((char)0x1)
|
||
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
|
||
#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS)
|
||
|
||
#define TSDB_COL_NORMAL 0x0u // the normal column of the table
|
||
#define TSDB_COL_TAG 0x1u // the tag column type
|
||
#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column
|
||
#define TSDB_COL_TMP 0x4u // internal column generated by the previous operators
|
||
#define TSDB_COL_NULL 0x8u // the column filter NULL or not
|
||
|
||
#define TSDB_COL_IS_TAG(f) (((f & (~(TSDB_COL_NULL))) & TSDB_COL_TAG) != 0)
|
||
#define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL)
|
||
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
|
||
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
|
||
|
||
#define TD_SUPER_TABLE TSDB_SUPER_TABLE
|
||
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
|
||
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
|
||
|
||
typedef struct {
|
||
int32_t vgId;
|
||
char* dbFName;
|
||
char* tbName;
|
||
} SBuildTableMetaInput;
|
||
|
||
typedef struct {
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
int64_t dbId;
|
||
int32_t vgVersion;
|
||
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
||
} SBuildUseDBInput;
|
||
|
||
typedef struct SField {
|
||
char name[TSDB_COL_NAME_LEN];
|
||
uint8_t type;
|
||
int32_t bytes;
|
||
} SField;
|
||
|
||
#pragma pack(push, 1)
|
||
|
||
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
|
||
typedef struct SEp {
|
||
char fqdn[TSDB_FQDN_LEN];
|
||
uint16_t port;
|
||
} SEp;
|
||
|
||
typedef struct {
|
||
int32_t contLen;
|
||
union {
|
||
int32_t vgId;
|
||
int32_t streamTaskId;
|
||
};
|
||
} SMsgHead;
|
||
|
||
typedef struct {
|
||
int32_t workerType;
|
||
int32_t streamTaskId;
|
||
} SStreamExecMsgHead;
|
||
|
||
// Submit message for one table
|
||
typedef struct SSubmitBlk {
|
||
int64_t uid; // table unique id
|
||
int32_t tid; // table id
|
||
int32_t padding; // TODO just for padding here
|
||
int32_t sversion; // data schema version
|
||
int32_t dataLen; // data part length, not including the SSubmitBlk head
|
||
int32_t schemaLen; // schema length, if length is 0, no schema exists
|
||
int16_t numOfRows; // total number of rows in current submit block
|
||
char data[];
|
||
} SSubmitBlk;
|
||
|
||
// Submit message for this TSDB
|
||
typedef struct {
|
||
SMsgHead header;
|
||
int64_t version;
|
||
int32_t length;
|
||
int32_t numOfBlocks;
|
||
char blocks[];
|
||
} SSubmitReq;
|
||
|
||
typedef struct {
|
||
int32_t totalLen;
|
||
int32_t len;
|
||
STSRow* row;
|
||
} SSubmitBlkIter;
|
||
|
||
typedef struct {
|
||
int32_t totalLen;
|
||
int32_t len;
|
||
void* pMsg;
|
||
} SSubmitMsgIter;
|
||
|
||
int32_t tInitSubmitMsgIter(SSubmitReq* pMsg, SSubmitMsgIter* pIter);
|
||
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
||
int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
||
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
||
|
||
typedef struct {
|
||
int32_t index; // index of failed block in submit blocks
|
||
int32_t vnode; // vnode index of failed block
|
||
int32_t sid; // table index of failed block
|
||
int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table
|
||
} SSubmitRspBlock;
|
||
|
||
typedef struct {
|
||
int32_t code; // 0-success, > 0 error code
|
||
int32_t numOfRows; // number of records the client is trying to write
|
||
int32_t affectedRows; // number of records actually written
|
||
int32_t failedRows; // number of failed records (exclude duplicate records)
|
||
int32_t numOfFailedBlocks;
|
||
SSubmitRspBlock failedBlocks[];
|
||
} SSubmitRsp;
|
||
|
||
typedef struct SSchema {
|
||
int8_t type;
|
||
int32_t colId;
|
||
int32_t bytes;
|
||
char name[TSDB_COL_NAME_LEN];
|
||
} SSchema;
|
||
|
||
typedef struct {
|
||
char name[TSDB_TABLE_FNAME_LEN];
|
||
int8_t igExists;
|
||
int32_t numOfColumns;
|
||
int32_t numOfTags;
|
||
SArray* pColumns;
|
||
SArray* pTags;
|
||
char comment[TSDB_STB_COMMENT_LEN];
|
||
} SMCreateStbReq;
|
||
|
||
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
|
||
int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
|
||
void tFreeSMCreateStbReq(SMCreateStbReq* pReq);
|
||
|
||
typedef struct {
|
||
char name[TSDB_TABLE_FNAME_LEN];
|
||
int8_t igNotExists;
|
||
} SMDropStbReq;
|
||
|
||
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
|
||
int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
|
||
|
||
typedef struct {
|
||
char name[TSDB_TABLE_FNAME_LEN];
|
||
int8_t alterType;
|
||
int32_t numOfFields;
|
||
SArray* pFields;
|
||
} SMAltertbReq;
|
||
|
||
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
|
||
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
|
||
void tFreeSMAltertbReq(SMAltertbReq* pReq);
|
||
|
||
typedef struct SEpSet {
|
||
int8_t inUse;
|
||
int8_t numOfEps;
|
||
SEp eps[TSDB_MAX_REPLICA];
|
||
} SEpSet;
|
||
|
||
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp);
|
||
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
|
||
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
|
||
void* taosDecodeSEpSet(void* buf, SEpSet* pEp);
|
||
|
||
typedef struct {
|
||
int32_t pid;
|
||
char app[TSDB_APP_NAME_LEN];
|
||
char db[TSDB_DB_NAME_LEN];
|
||
int64_t startTime;
|
||
} SConnectReq;
|
||
|
||
int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
|
||
int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t acctId;
|
||
int64_t clusterId;
|
||
int32_t connId;
|
||
int8_t superUser;
|
||
SEpSet epSet;
|
||
char sVersion[128];
|
||
} SConnectRsp;
|
||
|
||
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
|
||
int32_t tDeserializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
|
||
|
||
typedef struct {
|
||
char user[TSDB_USER_LEN];
|
||
char pass[TSDB_PASSWORD_LEN];
|
||
int32_t maxUsers;
|
||
int32_t maxDbs;
|
||
int32_t maxTimeSeries;
|
||
int32_t maxStreams;
|
||
int32_t accessState; // Configured only by command
|
||
int64_t maxStorage; // In unit of GB
|
||
} SCreateAcctReq, SAlterAcctReq;
|
||
|
||
int32_t tSerializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
|
||
int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
|
||
|
||
typedef struct {
|
||
char user[TSDB_USER_LEN];
|
||
} SDropUserReq, SDropAcctReq;
|
||
|
||
int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
|
||
int32_t tDeserializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
|
||
|
||
typedef struct {
|
||
int8_t createType;
|
||
int8_t superUser; // denote if it is a super user or not
|
||
char user[TSDB_USER_LEN];
|
||
char pass[TSDB_PASSWORD_LEN];
|
||
} SCreateUserReq;
|
||
|
||
int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
|
||
int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
|
||
|
||
typedef struct {
|
||
int8_t alterType;
|
||
int8_t superUser;
|
||
char user[TSDB_USER_LEN];
|
||
char pass[TSDB_PASSWORD_LEN];
|
||
char dbname[TSDB_DB_FNAME_LEN];
|
||
} SAlterUserReq;
|
||
|
||
int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
|
||
int32_t tDeserializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
|
||
|
||
typedef struct {
|
||
char user[TSDB_USER_LEN];
|
||
} SGetUserAuthReq;
|
||
|
||
int32_t tSerializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq* pReq);
|
||
int32_t tDeserializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq* pReq);
|
||
|
||
typedef struct {
|
||
char user[TSDB_USER_LEN];
|
||
int8_t superAuth;
|
||
SHashObj* readDbs;
|
||
SHashObj* writeDbs;
|
||
} SGetUserAuthRsp;
|
||
|
||
int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
|
||
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
|
||
|
||
typedef struct {
|
||
int16_t colId; // column id
|
||
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
|
||
int16_t flag; // denote if it is a tag or a normal column
|
||
char name[TSDB_DB_FNAME_LEN];
|
||
} SColIndex;
|
||
|
||
typedef struct {
|
||
int16_t lowerRelOptr;
|
||
int16_t upperRelOptr;
|
||
int16_t filterstr; // denote if current column is char(binary/nchar)
|
||
|
||
union {
|
||
struct {
|
||
int64_t lowerBndi;
|
||
int64_t upperBndi;
|
||
};
|
||
struct {
|
||
double lowerBndd;
|
||
double upperBndd;
|
||
};
|
||
struct {
|
||
int64_t pz;
|
||
int64_t len;
|
||
};
|
||
};
|
||
} SColumnFilterInfo;
|
||
|
||
typedef struct {
|
||
int16_t numOfFilters;
|
||
union {
|
||
int64_t placeholder;
|
||
SColumnFilterInfo* filterInfo;
|
||
};
|
||
} SColumnFilterList;
|
||
/*
|
||
* for client side struct, only column id, type, bytes are necessary
|
||
* But for data in vnode side, we need all the following information.
|
||
*/
|
||
typedef struct {
|
||
union {
|
||
int16_t colId;
|
||
int16_t slotId;
|
||
};
|
||
|
||
int16_t type;
|
||
int32_t bytes;
|
||
uint8_t precision;
|
||
uint8_t scale;
|
||
} SColumnInfo;
|
||
|
||
typedef struct {
|
||
int64_t uid;
|
||
TSKEY key; // last accessed ts, for subscription
|
||
} STableIdInfo;
|
||
|
||
typedef struct STimeWindow {
|
||
TSKEY skey;
|
||
TSKEY ekey;
|
||
} STimeWindow;
|
||
|
||
typedef struct {
|
||
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
|
||
int32_t tsLen; // total length of ts comp block
|
||
int32_t tsNumOfBlocks; // ts comp block numbers
|
||
int32_t tsOrder; // ts comp block order
|
||
} STsBufInfo;
|
||
|
||
typedef struct {
|
||
int32_t tz; // query client timezone
|
||
char intervalUnit;
|
||
char slidingUnit;
|
||
char offsetUnit;
|
||
int64_t interval;
|
||
int64_t sliding;
|
||
int64_t offset;
|
||
} SInterval;
|
||
|
||
typedef struct {
|
||
int32_t code;
|
||
} SQueryTableRsp;
|
||
|
||
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
|
||
|
||
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
|
||
|
||
|
||
typedef struct {
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
int32_t numOfVgroups;
|
||
int32_t cacheBlockSize; // MB
|
||
int32_t totalBlocks;
|
||
int32_t daysPerFile;
|
||
int32_t daysToKeep0;
|
||
int32_t daysToKeep1;
|
||
int32_t daysToKeep2;
|
||
int32_t minRows;
|
||
int32_t maxRows;
|
||
int32_t commitTime;
|
||
int32_t fsyncPeriod;
|
||
int8_t walLevel;
|
||
int8_t precision; // time resolution
|
||
int8_t compression;
|
||
int8_t replications;
|
||
int8_t quorum;
|
||
int8_t update;
|
||
int8_t cacheLastRow;
|
||
int8_t ignoreExist;
|
||
int8_t streamMode;
|
||
} SCreateDbReq;
|
||
|
||
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
|
||
int32_t tDeserializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
|
||
|
||
typedef struct {
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
int32_t totalBlocks;
|
||
int32_t daysToKeep0;
|
||
int32_t daysToKeep1;
|
||
int32_t daysToKeep2;
|
||
int32_t fsyncPeriod;
|
||
int8_t walLevel;
|
||
int8_t quorum;
|
||
int8_t cacheLastRow;
|
||
} SAlterDbReq;
|
||
|
||
int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
|
||
int32_t tDeserializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
|
||
|
||
typedef struct {
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
int8_t ignoreNotExists;
|
||
} SDropDbReq;
|
||
|
||
int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
|
||
int32_t tDeserializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
|
||
|
||
typedef struct {
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
int64_t uid;
|
||
} SDropDbRsp;
|
||
|
||
int32_t tSerializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp);
|
||
int32_t tDeserializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp);
|
||
|
||
typedef struct {
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
int64_t dbId;
|
||
int32_t vgVersion;
|
||
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
||
} SUseDbReq;
|
||
|
||
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
|
||
int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
|
||
|
||
typedef struct {
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
int64_t uid;
|
||
int32_t vgVersion;
|
||
int32_t vgNum;
|
||
int8_t hashMethod;
|
||
SArray* pVgroupInfos; // Array of SVgroupInfo
|
||
} SUseDbRsp;
|
||
|
||
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
|
||
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
|
||
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
|
||
|
||
typedef struct {
|
||
int32_t rowNum;
|
||
} SQnodeListReq;
|
||
|
||
int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
|
||
int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
|
||
|
||
typedef struct {
|
||
SArray* epSetList; // SArray<SEpSet>
|
||
} SQnodeListRsp;
|
||
|
||
int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
|
||
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
|
||
void tFreeSQnodeListRsp(SQnodeListRsp* pRsp);
|
||
|
||
typedef struct {
|
||
SArray* pArray; // Array of SUseDbRsp
|
||
} SUseDbBatchRsp;
|
||
|
||
int32_t tSerializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
|
||
int32_t tDeserializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
|
||
void tFreeSUseDbBatchRsp(SUseDbBatchRsp* pRsp);
|
||
|
||
typedef struct {
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
} SSyncDbReq, SCompactDbReq;
|
||
|
||
int32_t tSerializeSSyncDbReq(void* buf, int32_t bufLen, SSyncDbReq* pReq);
|
||
int32_t tDeserializeSSyncDbReq(void* buf, int32_t bufLen, SSyncDbReq* pReq);
|
||
|
||
typedef struct {
|
||
char name[TSDB_FUNC_NAME_LEN];
|
||
int8_t igExists;
|
||
int8_t funcType;
|
||
int8_t scriptType;
|
||
int8_t outputType;
|
||
int32_t outputLen;
|
||
int32_t bufSize;
|
||
int64_t signature;
|
||
int32_t commentSize;
|
||
int32_t codeSize;
|
||
char pComment[TSDB_FUNC_COMMENT_LEN];
|
||
char pCode[TSDB_FUNC_CODE_LEN];
|
||
} SCreateFuncReq;
|
||
|
||
int32_t tSerializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq);
|
||
int32_t tDeserializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq);
|
||
|
||
typedef struct {
|
||
char name[TSDB_FUNC_NAME_LEN];
|
||
int8_t igNotExists;
|
||
} SDropFuncReq;
|
||
|
||
int32_t tSerializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq);
|
||
int32_t tDeserializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t numOfFuncs;
|
||
SArray* pFuncNames;
|
||
} SRetrieveFuncReq;
|
||
|
||
int32_t tSerializeSRetrieveFuncReq(void* buf, int32_t bufLen, SRetrieveFuncReq* pReq);
|
||
int32_t tDeserializeSRetrieveFuncReq(void* buf, int32_t bufLen, SRetrieveFuncReq* pReq);
|
||
|
||
typedef struct {
|
||
char name[TSDB_FUNC_NAME_LEN];
|
||
int8_t funcType;
|
||
int8_t scriptType;
|
||
int8_t outputType;
|
||
int32_t outputLen;
|
||
int32_t bufSize;
|
||
int64_t signature;
|
||
int32_t commentSize;
|
||
int32_t codeSize;
|
||
char pComment[TSDB_FUNC_COMMENT_LEN];
|
||
char pCode[TSDB_FUNC_CODE_LEN];
|
||
} SFuncInfo;
|
||
|
||
typedef struct {
|
||
int32_t numOfFuncs;
|
||
SArray* pFuncInfos;
|
||
} SRetrieveFuncRsp;
|
||
|
||
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
|
||
int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
|
||
|
||
typedef struct {
|
||
int32_t statusInterval;
|
||
int64_t checkTime; // 1970-01-01 00:00:00.000
|
||
char timezone[TD_TIMEZONE_LEN]; // tsTimezone
|
||
char locale[TD_LOCALE_LEN]; // tsLocale
|
||
char charset[TD_LOCALE_LEN]; // tsCharset
|
||
} SClusterCfg;
|
||
|
||
typedef struct {
|
||
int32_t vgId;
|
||
int8_t role;
|
||
int64_t numOfTables;
|
||
int64_t numOfTimeSeries;
|
||
int64_t totalStorage;
|
||
int64_t compStorage;
|
||
int64_t pointsWritten;
|
||
int64_t numOfSelectReqs;
|
||
int64_t numOfInsertReqs;
|
||
int64_t numOfInsertSuccessReqs;
|
||
int64_t numOfBatchInsertReqs;
|
||
int64_t numOfBatchInsertSuccessReqs;
|
||
} SVnodeLoad;
|
||
|
||
typedef struct {
|
||
int32_t sver; // software version
|
||
int64_t dver; // dnode table version in sdb
|
||
int32_t dnodeId;
|
||
int64_t clusterId;
|
||
int64_t rebootTime;
|
||
int64_t updateTime;
|
||
int32_t numOfCores;
|
||
int32_t numOfSupportVnodes;
|
||
char dnodeEp[TSDB_EP_LEN];
|
||
SClusterCfg clusterCfg;
|
||
SArray* pVloads; // array of SVnodeLoad
|
||
} SStatusReq;
|
||
|
||
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
||
int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t dnodeId;
|
||
int64_t clusterId;
|
||
} SDnodeCfg;
|
||
|
||
typedef struct {
|
||
int32_t id;
|
||
int8_t isMnode;
|
||
SEp ep;
|
||
} SDnodeEp;
|
||
|
||
typedef struct {
|
||
int64_t dver;
|
||
SDnodeCfg dnodeCfg;
|
||
SArray* pDnodeEps; // Array of SDnodeEp
|
||
} SStatusRsp;
|
||
|
||
int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
|
||
int32_t tDeserializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
|
||
void tFreeSStatusRsp(SStatusRsp* pRsp);
|
||
|
||
typedef struct {
|
||
int32_t reserved;
|
||
} SMTimerReq;
|
||
|
||
int32_t tSerializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
|
||
int32_t tDeserializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t id;
|
||
uint16_t port; // node sync Port
|
||
char fqdn[TSDB_FQDN_LEN]; // node FQDN
|
||
} SReplica;
|
||
|
||
typedef struct {
|
||
int32_t vgId;
|
||
int32_t dnodeId;
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
int64_t dbUid;
|
||
int32_t vgVersion;
|
||
int32_t cacheBlockSize;
|
||
int32_t totalBlocks;
|
||
int32_t daysPerFile;
|
||
int32_t daysToKeep0;
|
||
int32_t daysToKeep1;
|
||
int32_t daysToKeep2;
|
||
int32_t minRows;
|
||
int32_t maxRows;
|
||
int32_t commitTime;
|
||
int32_t fsyncPeriod;
|
||
uint32_t hashBegin;
|
||
uint32_t hashEnd;
|
||
int8_t hashMethod;
|
||
int8_t walLevel;
|
||
int8_t precision;
|
||
int8_t compression;
|
||
int8_t quorum;
|
||
int8_t update;
|
||
int8_t cacheLastRow;
|
||
int8_t replica;
|
||
int8_t selfIndex;
|
||
int8_t streamMode;
|
||
SReplica replicas[TSDB_MAX_REPLICA];
|
||
|
||
} SCreateVnodeReq, SAlterVnodeReq;
|
||
|
||
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||
int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t vgId;
|
||
int32_t dnodeId;
|
||
int64_t dbUid;
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq;
|
||
|
||
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
|
||
int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
|
||
|
||
typedef struct {
|
||
SMsgHead header;
|
||
char dbFName[TSDB_DB_FNAME_LEN];
|
||
char tbName[TSDB_TABLE_NAME_LEN];
|
||
} STableInfoReq;
|
||
|
||
int32_t tSerializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq);
|
||
int32_t tDeserializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq);
|
||
|
||
typedef struct {
|
||
int8_t metaClone; // create local clone of the cached table meta
|
||
int32_t numOfVgroups;
|
||
int32_t numOfTables;
|
||
int32_t numOfUdfs;
|
||
char tableNames[];
|
||
} SMultiTableInfoReq;
|
||
|
||
// todo refactor
|
||
typedef struct SVgroupInfo {
|
||
int32_t vgId;
|
||
uint32_t hashBegin;
|
||
uint32_t hashEnd;
|
||
SEpSet epSet;
|
||
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
||
} SVgroupInfo;
|
||
|
||
typedef struct {
|
||
int32_t numOfVgroups;
|
||
SVgroupInfo vgroups[];
|
||
} SVgroupsInfo;
|
||
|
||
typedef struct {
|
||
char tbName[TSDB_TABLE_NAME_LEN];
|
||
char stbName[TSDB_TABLE_NAME_LEN];
|
||
char dbFName[TSDB_DB_FNAME_LEN];
|
||
int64_t dbId;
|
||
int32_t numOfTags;
|
||
int32_t numOfColumns;
|
||
int8_t precision;
|
||
int8_t tableType;
|
||
int8_t update;
|
||
int32_t sversion;
|
||
int32_t tversion;
|
||
uint64_t suid;
|
||
uint64_t tuid;
|
||
int32_t vgId;
|
||
SSchema* pSchemas;
|
||
} STableMetaRsp;
|
||
|
||
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
|
||
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
|
||
void tFreeSTableMetaRsp(STableMetaRsp* pRsp);
|
||
|
||
typedef struct {
|
||
SArray* pArray; // Array of STableMetaRsp
|
||
} STableMetaBatchRsp;
|
||
|
||
int32_t tSerializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
|
||
int32_t tDeserializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
|
||
void tFreeSTableMetaBatchRsp(STableMetaBatchRsp* pRsp);
|
||
|
||
typedef struct {
|
||
int32_t numOfTables;
|
||
int32_t numOfVgroup;
|
||
int32_t numOfUdf;
|
||
int32_t contLen;
|
||
int8_t compressed; // denote if compressed or not
|
||
int32_t rawLen; // size before compress
|
||
uint8_t metaClone; // make meta clone after retrieve meta from mnode
|
||
char meta[];
|
||
} SMultiTableMeta;
|
||
|
||
typedef struct {
|
||
int32_t dataLen;
|
||
char name[TSDB_TABLE_FNAME_LEN];
|
||
char* data;
|
||
} STagData;
|
||
|
||
/*
|
||
* sql: show tables like '%a_%'
|
||
* payload is the query condition, e.g., '%a_%'
|
||
* payloadLen is the length of payload
|
||
*/
|
||
typedef struct {
|
||
int32_t type;
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
int32_t payloadLen;
|
||
char* payload;
|
||
} SShowReq;
|
||
|
||
int32_t tSerializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
|
||
int32_t tDeserializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
|
||
void tFreeSShowReq(SShowReq* pReq);
|
||
|
||
typedef struct {
|
||
int64_t showId;
|
||
STableMetaRsp tableMeta;
|
||
} SShowRsp, SVShowTablesRsp;
|
||
|
||
int32_t tSerializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
|
||
int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
|
||
void tFreeSShowRsp(SShowRsp* pRsp);
|
||
|
||
typedef struct {
|
||
int32_t type;
|
||
char db[TSDB_DB_FNAME_LEN];
|
||
char tb[TSDB_TABLE_NAME_LEN];
|
||
int64_t showId;
|
||
int8_t free;
|
||
} SRetrieveTableReq;
|
||
|
||
int32_t tSerializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);
|
||
int32_t tDeserializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);
|
||
|
||
typedef struct {
|
||
int64_t useconds;
|
||
int8_t completed; // all results are returned to client
|
||
int8_t precision;
|
||
int8_t compressed;
|
||
int32_t compLen;
|
||
int32_t numOfRows;
|
||
char data[];
|
||
} SRetrieveTableRsp;
|
||
|
||
typedef struct {
|
||
int64_t handle;
|
||
int64_t useconds;
|
||
int8_t completed; // all results are returned to client
|
||
int8_t precision;
|
||
int8_t compressed;
|
||
int32_t compLen;
|
||
int32_t numOfRows;
|
||
char data[];
|
||
} SRetrieveMetaTableRsp;
|
||
|
||
typedef struct {
|
||
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
|
||
int32_t port;
|
||
} SCreateDnodeReq;
|
||
|
||
int32_t tSerializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq);
|
||
int32_t tDeserializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t dnodeId;
|
||
char fqdn[TSDB_FQDN_LEN];
|
||
int32_t port;
|
||
} SDropDnodeReq;
|
||
|
||
int32_t tSerializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);
|
||
int32_t tDeserializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t dnodeId;
|
||
char config[TSDB_DNODE_CONFIG_LEN];
|
||
char value[TSDB_DNODE_VALUE_LEN];
|
||
} SMCfgDnodeReq, SDCfgDnodeReq;
|
||
|
||
int32_t tSerializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);
|
||
int32_t tDeserializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t dnodeId;
|
||
} SMCreateMnodeReq, SMDropMnodeReq, SDDropMnodeReq;
|
||
|
||
int32_t tSerializeSMCreateDropMnodeReq(void* buf, int32_t bufLen, SMCreateMnodeReq* pReq);
|
||
int32_t tDeserializeSMCreateDropMnodeReq(void* buf, int32_t bufLen, SMCreateMnodeReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t dnodeId;
|
||
int8_t replica;
|
||
SReplica replicas[TSDB_MAX_REPLICA];
|
||
} SDCreateMnodeReq, SDAlterMnodeReq;
|
||
|
||
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
||
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t dnodeId;
|
||
} SMCreateQnodeReq, SMDropQnodeReq, SDCreateQnodeReq, SDDropQnodeReq, SMCreateSnodeReq, SMDropSnodeReq,
|
||
SDCreateSnodeReq, SDDropSnodeReq, SMCreateBnodeReq, SMDropBnodeReq, SDCreateBnodeReq, SDDropBnodeReq;
|
||
|
||
int32_t tSerializeSMCreateDropQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq);
|
||
int32_t tDeserializeSMCreateDropQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq);
|
||
|
||
typedef struct {
|
||
char sql[TSDB_SHOW_SQL_LEN];
|
||
int32_t queryId;
|
||
int64_t useconds;
|
||
int64_t stime;
|
||
int64_t qId;
|
||
int64_t sqlObjId;
|
||
int32_t pid;
|
||
char fqdn[TSDB_FQDN_LEN];
|
||
int8_t stableQuery;
|
||
int32_t numOfSub;
|
||
char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete)
|
||
} SQueryDesc;
|
||
|
||
typedef struct {
|
||
int32_t connId;
|
||
int32_t pid;
|
||
int32_t numOfQueries;
|
||
int32_t numOfStreams;
|
||
char app[TSDB_APP_NAME_LEN];
|
||
char pData[];
|
||
} SHeartBeatReq;
|
||
|
||
typedef struct {
|
||
int32_t connId;
|
||
int32_t queryId;
|
||
int32_t streamId;
|
||
int32_t totalDnodes;
|
||
int32_t onlineDnodes;
|
||
int8_t killConnection;
|
||
int8_t align[3];
|
||
SEpSet epSet;
|
||
} SHeartBeatRsp;
|
||
|
||
typedef struct {
|
||
int32_t connId;
|
||
int32_t queryId;
|
||
} SKillQueryReq;
|
||
|
||
int32_t tSerializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq);
|
||
int32_t tDeserializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t connId;
|
||
} SKillConnReq;
|
||
|
||
int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
|
||
int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
|
||
|
||
typedef struct {
|
||
int32_t transId;
|
||
} SKillTransReq;
|
||
|
||
int32_t tSerializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq);
|
||
int32_t tDeserializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq);
|
||
|
||
typedef struct {
|
||
char user[TSDB_USER_LEN];
|
||
char spi;
|
||
char encrypt;
|
||
char secret[TSDB_PASSWORD_LEN];
|
||
char ckey[TSDB_PASSWORD_LEN];
|
||
} SAuthReq, SAuthRsp;
|
||
|
||
int32_t tSerializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);
|
||
int32_t tDeserializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);
|
||
|
||
typedef struct {
|
||
int8_t finished;
|
||
char name[TSDB_STEP_NAME_LEN];
|
||
char desc[TSDB_STEP_DESC_LEN];
|
||
} SStartupReq;
|
||
|
||
/**
|
||
* The layout of the query message payload is as following:
|
||
* +--------------------+---------------------------------+
|
||
* |Sql statement | Physical plan |
|
||
* |(denoted by sqlLen) |(In JSON, denoted by contentLen) |
|
||
* +--------------------+---------------------------------+
|
||
*/
|
||
typedef struct SSubQueryMsg {
|
||
SMsgHead header;
|
||
uint64_t sId;
|
||
uint64_t queryId;
|
||
uint64_t taskId;
|
||
int64_t refId;
|
||
int8_t taskType;
|
||
uint32_t sqlLen; // the query sql,
|
||
uint32_t phyLen;
|
||
char msg[];
|
||
} SSubQueryMsg;
|
||
|
||
typedef struct {
|
||
SMsgHead header;
|
||
uint64_t sId;
|
||
uint64_t queryId;
|
||
uint64_t taskId;
|
||
} SSinkDataReq;
|
||
|
||
typedef struct {
|
||
SMsgHead header;
|
||
uint64_t sId;
|
||
uint64_t queryId;
|
||
uint64_t taskId;
|
||
} SQueryContinueReq;
|
||
|
||
typedef struct {
|
||
SMsgHead header;
|
||
uint64_t sId;
|
||
uint64_t queryId;
|
||
uint64_t taskId;
|
||
} SResReadyReq;
|
||
|
||
typedef struct {
|
||
int32_t code;
|
||
} SResReadyRsp;
|
||
|
||
typedef struct {
|
||
SMsgHead header;
|
||
uint64_t sId;
|
||
uint64_t queryId;
|
||
uint64_t taskId;
|
||
} SResFetchReq;
|
||
|
||
typedef struct {
|
||
SMsgHead header;
|
||
uint64_t sId;
|
||
} SSchTasksStatusReq;
|
||
|
||
typedef struct {
|
||
uint64_t queryId;
|
||
uint64_t taskId;
|
||
int64_t refId;
|
||
int8_t status;
|
||
} STaskStatus;
|
||
|
||
typedef struct {
|
||
int64_t refId;
|
||
SArray* taskStatus; // SArray<STaskStatus>
|
||
} SSchedulerStatusRsp;
|
||
|
||
typedef struct {
|
||
uint64_t queryId;
|
||
uint64_t taskId;
|
||
int8_t action;
|
||
} STaskAction;
|
||
|
||
typedef struct SQueryNodeEpId {
|
||
int32_t nodeId; // vgId or qnodeId
|
||
SEp ep;
|
||
} SQueryNodeEpId;
|
||
|
||
typedef struct {
|
||
SMsgHead header;
|
||
uint64_t sId;
|
||
SQueryNodeEpId epId;
|
||
SArray* taskAction; // SArray<STaskAction>
|
||
} SSchedulerHbReq;
|
||
|
||
int32_t tSerializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* pReq);
|
||
int32_t tDeserializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* pReq);
|
||
void tFreeSSchedulerHbReq(SSchedulerHbReq* pReq);
|
||
|
||
typedef struct {
|
||
uint64_t seqId;
|
||
SQueryNodeEpId epId;
|
||
SArray* taskStatus; // SArray<STaskStatus>
|
||
} SSchedulerHbRsp;
|
||
|
||
int32_t tSerializeSSchedulerHbRsp(void* buf, int32_t bufLen, SSchedulerHbRsp* pRsp);
|
||
int32_t tDeserializeSSchedulerHbRsp(void* buf, int32_t bufLen, SSchedulerHbRsp* pRsp);
|
||
void tFreeSSchedulerHbRsp(SSchedulerHbRsp* pRsp);
|
||
|
||
typedef struct {
|
||
SMsgHead header;
|
||
uint64_t sId;
|
||
uint64_t queryId;
|
||
uint64_t taskId;
|
||
int64_t refId;
|
||
} STaskCancelReq;
|
||
|
||
typedef struct {
|
||
int32_t code;
|
||
} STaskCancelRsp;
|
||
|
||
typedef struct {
|
||
SMsgHead header;
|
||
uint64_t sId;
|
||
uint64_t queryId;
|
||
uint64_t taskId;
|
||
int64_t refId;
|
||
} STaskDropReq;
|
||
|
||
typedef struct {
|
||
int32_t code;
|
||
} STaskDropRsp;
|
||
|
||
typedef struct {
|
||
char name[TSDB_TOPIC_FNAME_LEN];
|
||
char outputTbName[TSDB_TABLE_NAME_LEN];
|
||
int8_t igExists;
|
||
char* sql;
|
||
char* ast;
|
||
} SCMCreateStreamReq;
|
||
|
||
typedef struct {
|
||
int64_t streamId;
|
||
} SCMCreateStreamRsp;
|
||
|
||
int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateStreamReq* pReq);
|
||
int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq);
|
||
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
|
||
|
||
typedef struct {
|
||
char name[TSDB_TOPIC_FNAME_LEN];
|
||
int64_t streamId;
|
||
char* sql;
|
||
char* executorMsg;
|
||
} SMVCreateStreamReq, SMSCreateStreamReq;
|
||
|
||
typedef struct {
|
||
int64_t streamId;
|
||
} SMVCreateStreamRsp, SMSCreateStreamRsp;
|
||
|
||
typedef struct {
|
||
char name[TSDB_TOPIC_FNAME_LEN];
|
||
int8_t igExists;
|
||
char* sql;
|
||
char* ast;
|
||
char subscribeDbName[TSDB_DB_NAME_LEN];
|
||
} SCMCreateTopicReq;
|
||
|
||
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
|
||
int32_t tDeserializeSCMCreateTopicReq(void* buf, int32_t bufLen, SCMCreateTopicReq* pReq);
|
||
void tFreeSCMCreateTopicReq(SCMCreateTopicReq* pReq);
|
||
|
||
typedef struct {
|
||
int64_t topicId;
|
||
} SCMCreateTopicRsp;
|
||
|
||
int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp);
|
||
int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp);
|
||
|
||
typedef struct {
|
||
int32_t topicNum;
|
||
int64_t consumerId;
|
||
char* consumerGroup;
|
||
SArray* topicNames; // SArray<char*>
|
||
} SCMSubscribeReq;
|
||
|
||
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI32(buf, pReq->topicNum);
|
||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
||
tlen += taosEncodeString(buf, pReq->consumerGroup);
|
||
|
||
for (int32_t i = 0; i < pReq->topicNum; i++) {
|
||
tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
|
||
}
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
|
||
buf = taosDecodeFixedI32(buf, &pReq->topicNum);
|
||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||
buf = taosDecodeString(buf, &pReq->consumerGroup);
|
||
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
|
||
for (int32_t i = 0; i < pReq->topicNum; i++) {
|
||
char* name;
|
||
buf = taosDecodeString(buf, &name);
|
||
taosArrayPush(pReq->topicNames, &name);
|
||
}
|
||
return buf;
|
||
}
|
||
|
||
typedef struct SMqSubTopic {
|
||
int32_t vgId;
|
||
int64_t topicId;
|
||
SEpSet epSet;
|
||
} SMqSubTopic;
|
||
|
||
typedef struct {
|
||
int32_t topicNum;
|
||
SMqSubTopic topics[];
|
||
} SCMSubscribeRsp;
|
||
|
||
static FORCE_INLINE int32_t tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI32(buf, pRsp->topicNum);
|
||
for (int32_t i = 0; i < pRsp->topicNum; i++) {
|
||
tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId);
|
||
tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId);
|
||
tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet);
|
||
}
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) {
|
||
buf = taosDecodeFixedI32(buf, &pRsp->topicNum);
|
||
for (int32_t i = 0; i < pRsp->topicNum; i++) {
|
||
buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId);
|
||
buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId);
|
||
buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet);
|
||
}
|
||
return buf;
|
||
}
|
||
|
||
typedef struct {
|
||
int64_t topicId;
|
||
int64_t consumerId;
|
||
int64_t consumerGroupId;
|
||
int64_t offset;
|
||
char* sql;
|
||
char* logicalPlan;
|
||
char* physicalPlan;
|
||
} SMVSubscribeReq;
|
||
|
||
static FORCE_INLINE int32_t tSerializeSMVSubscribeReq(void** buf, SMVSubscribeReq* pReq) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI64(buf, pReq->topicId);
|
||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
||
tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId);
|
||
tlen += taosEncodeFixedI64(buf, pReq->offset);
|
||
tlen += taosEncodeString(buf, pReq->sql);
|
||
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) {
|
||
buf = taosDecodeFixedI64(buf, &pReq->topicId);
|
||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||
buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId);
|
||
buf = taosDecodeFixedI64(buf, &pReq->offset);
|
||
buf = taosDecodeString(buf, &pReq->sql);
|
||
buf = taosDecodeString(buf, &pReq->logicalPlan);
|
||
buf = taosDecodeString(buf, &pReq->physicalPlan);
|
||
return buf;
|
||
}
|
||
|
||
typedef struct {
|
||
const char* key;
|
||
SArray* lostConsumers; // SArray<int64_t>
|
||
SArray* removedConsumers; // SArray<int64_t>
|
||
SArray* newConsumers; // SArray<int64_t>
|
||
} SMqRebSubscribe;
|
||
|
||
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
|
||
SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)calloc(1, sizeof(SMqRebSubscribe));
|
||
if (pRebSub == NULL) {
|
||
goto _err;
|
||
}
|
||
pRebSub->key = strdup(key);
|
||
pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t));
|
||
if (pRebSub->lostConsumers == NULL) {
|
||
goto _err;
|
||
}
|
||
pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t));
|
||
if (pRebSub->removedConsumers == NULL) {
|
||
goto _err;
|
||
}
|
||
pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t));
|
||
if (pRebSub->newConsumers == NULL) {
|
||
goto _err;
|
||
}
|
||
return pRebSub;
|
||
_err:
|
||
taosArrayDestroy(pRebSub->lostConsumers);
|
||
taosArrayDestroy(pRebSub->removedConsumers);
|
||
taosArrayDestroy(pRebSub->newConsumers);
|
||
tfree(pRebSub);
|
||
return NULL;
|
||
}
|
||
|
||
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
|
||
// deserialization
|
||
typedef struct {
|
||
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
|
||
} SMqDoRebalanceMsg;
|
||
|
||
typedef struct {
|
||
int64_t status;
|
||
} SMVSubscribeRsp;
|
||
|
||
typedef struct {
|
||
char name[TSDB_TABLE_FNAME_LEN];
|
||
int8_t igNotExists;
|
||
} SMDropTopicReq;
|
||
|
||
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||
|
||
typedef struct {
|
||
char name[TSDB_TABLE_FNAME_LEN];
|
||
int8_t alterType;
|
||
SSchema schema;
|
||
} SAlterTopicReq;
|
||
|
||
typedef struct {
|
||
SMsgHead head;
|
||
char name[TSDB_TABLE_FNAME_LEN];
|
||
int64_t tuid;
|
||
int32_t sverson;
|
||
int32_t execLen;
|
||
char* executor;
|
||
int32_t sqlLen;
|
||
char* sql;
|
||
} SDCreateTopicReq;
|
||
|
||
typedef struct {
|
||
SMsgHead head;
|
||
char name[TSDB_TABLE_FNAME_LEN];
|
||
int64_t tuid;
|
||
} SDDropTopicReq;
|
||
|
||
typedef struct SVCreateTbReq {
|
||
int64_t ver; // use a general definition
|
||
char* dbFName;
|
||
char* name;
|
||
uint32_t ttl;
|
||
uint32_t keep;
|
||
uint8_t type;
|
||
union {
|
||
struct {
|
||
tb_uid_t suid;
|
||
uint32_t nCols;
|
||
SSchema* pSchema;
|
||
uint32_t nTagCols;
|
||
SSchema* pTagSchema;
|
||
} stbCfg;
|
||
struct {
|
||
tb_uid_t suid;
|
||
SKVRow pTag;
|
||
} ctbCfg;
|
||
struct {
|
||
uint32_t nCols;
|
||
SSchema* pSchema;
|
||
} ntbCfg;
|
||
};
|
||
} SVCreateTbReq, SVUpdateTbReq;
|
||
|
||
typedef struct {
|
||
int32_t code;
|
||
} SVCreateTbRsp, SVUpdateTbRsp;
|
||
|
||
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
|
||
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
|
||
|
||
typedef struct {
|
||
int64_t ver; // use a general definition
|
||
SArray* pArray;
|
||
} SVCreateTbBatchReq;
|
||
|
||
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
|
||
void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
|
||
|
||
typedef struct {
|
||
SArray* rspList; // SArray<SVCreateTbRsp>
|
||
} SVCreateTbBatchRsp;
|
||
|
||
int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp);
|
||
int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp);
|
||
|
||
|
||
typedef struct {
|
||
int64_t ver;
|
||
char* name;
|
||
uint8_t type;
|
||
tb_uid_t suid;
|
||
} SVDropTbReq;
|
||
|
||
typedef struct {
|
||
int tmp; // TODO: to avoid compile error
|
||
} SVDropTbRsp;
|
||
|
||
int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq);
|
||
void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq);
|
||
|
||
typedef struct {
|
||
SMsgHead head;
|
||
int64_t uid;
|
||
int32_t tid;
|
||
int16_t tversion;
|
||
int16_t colId;
|
||
int8_t type;
|
||
int16_t bytes;
|
||
int32_t tagValLen;
|
||
int16_t numOfTags;
|
||
int32_t schemaLen;
|
||
char data[];
|
||
} SUpdateTagValReq;
|
||
|
||
typedef struct {
|
||
SMsgHead head;
|
||
} SUpdateTagValRsp;
|
||
|
||
typedef struct {
|
||
SMsgHead head;
|
||
} SVShowTablesReq;
|
||
|
||
typedef struct {
|
||
SMsgHead head;
|
||
int32_t id;
|
||
} SVShowTablesFetchReq;
|
||
|
||
typedef struct {
|
||
int64_t useconds;
|
||
int8_t completed; // all results are returned to client
|
||
int8_t precision;
|
||
int8_t compressed;
|
||
int32_t compLen;
|
||
int32_t numOfRows;
|
||
char data[];
|
||
} SVShowTablesFetchRsp;
|
||
|
||
typedef struct SMqCMGetSubEpReq {
|
||
int64_t consumerId;
|
||
int32_t epoch;
|
||
char cgroup[TSDB_CGROUP_LEN];
|
||
} SMqCMGetSubEpReq;
|
||
|
||
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI32(buf, pMsg->contLen);
|
||
tlen += taosEncodeFixedI32(buf, pMsg->vgId);
|
||
return tlen;
|
||
}
|
||
|
||
typedef struct SMqHbRsp {
|
||
int8_t status; // idle or not
|
||
int8_t vnodeChanged;
|
||
int8_t epChanged; // should use new epset
|
||
int8_t reserved;
|
||
SEpSet epSet;
|
||
} SMqHbRsp;
|
||
|
||
static FORCE_INLINE int32_t taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI8(buf, pRsp->status);
|
||
tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged);
|
||
tlen += taosEncodeFixedI8(buf, pRsp->epChanged);
|
||
tlen += taosEncodeSEpSet(buf, &pRsp->epSet);
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
|
||
buf = taosDecodeFixedI8(buf, &pRsp->status);
|
||
buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged);
|
||
buf = taosDecodeFixedI8(buf, &pRsp->epChanged);
|
||
buf = taosDecodeSEpSet(buf, &pRsp->epSet);
|
||
return buf;
|
||
}
|
||
|
||
typedef struct SMqHbOneTopicBatchRsp {
|
||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||
SArray* rsps; // SArray<SMqHbRsp>
|
||
} SMqHbOneTopicBatchRsp;
|
||
|
||
static FORCE_INLINE int32_t taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeString(buf, pBatchRsp->topicName);
|
||
int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
|
||
tlen += taosEncodeFixedI32(buf, sz);
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i);
|
||
tlen += taosEncodeSMqHbRsp(buf, pRsp);
|
||
}
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) {
|
||
int32_t sz;
|
||
buf = taosDecodeStringTo(buf, pBatchRsp->topicName);
|
||
buf = taosDecodeFixedI32(buf, &sz);
|
||
pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqHbRsp rsp;
|
||
buf = taosDecodeSMqHbRsp(buf, &rsp);
|
||
buf = taosArrayPush(pBatchRsp->rsps, &rsp);
|
||
}
|
||
return buf;
|
||
}
|
||
|
||
typedef struct SMqHbBatchRsp {
|
||
int64_t consumerId;
|
||
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
|
||
} SMqHbBatchRsp;
|
||
|
||
static FORCE_INLINE int32_t taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId);
|
||
int32_t sz;
|
||
tlen += taosEncodeFixedI32(buf, sz);
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i);
|
||
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
|
||
}
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) {
|
||
buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId);
|
||
int32_t sz;
|
||
buf = taosDecodeFixedI32(buf, &sz);
|
||
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqHbOneTopicBatchRsp rsp;
|
||
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
|
||
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
|
||
}
|
||
return buf;
|
||
}
|
||
|
||
typedef struct {
|
||
int32_t key;
|
||
int32_t valueLen;
|
||
void* value;
|
||
} SKv;
|
||
|
||
typedef struct {
|
||
int32_t connId;
|
||
int32_t hbType;
|
||
} SClientHbKey;
|
||
|
||
typedef struct {
|
||
SClientHbKey connKey;
|
||
SHashObj* info; // hash<Skv.key, Skv>
|
||
} SClientHbReq;
|
||
|
||
typedef struct {
|
||
int64_t reqId;
|
||
SArray* reqs; // SArray<SClientHbReq>
|
||
} SClientHbBatchReq;
|
||
|
||
typedef struct {
|
||
SClientHbKey connKey;
|
||
int32_t status;
|
||
SArray* info; // Array<Skv>
|
||
} SClientHbRsp;
|
||
|
||
typedef struct {
|
||
int64_t reqId;
|
||
int64_t rspId;
|
||
SArray* rsps; // SArray<SClientHbRsp>
|
||
} SClientHbBatchRsp;
|
||
|
||
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); }
|
||
|
||
static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) {
|
||
void* pIter = taosHashIterate(info, NULL);
|
||
while (pIter != NULL) {
|
||
SKv* kv = (SKv*)pIter;
|
||
tfree(kv->value);
|
||
pIter = taosHashIterate(info, pIter);
|
||
}
|
||
}
|
||
|
||
static FORCE_INLINE void tFreeClientHbReq(void* pReq) {
|
||
SClientHbReq* req = (SClientHbReq*)pReq;
|
||
if (req->info) {
|
||
tFreeReqKvHash(req->info);
|
||
taosHashCleanup(req->info);
|
||
}
|
||
}
|
||
|
||
int32_t tSerializeSClientHbBatchReq(void* buf, int32_t bufLen, const SClientHbBatchReq* pReq);
|
||
int32_t tDeserializeSClientHbBatchReq(void* buf, int32_t bufLen, SClientHbBatchReq* pReq);
|
||
|
||
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
|
||
SClientHbBatchReq* req = (SClientHbBatchReq*)pReq;
|
||
if (deep) {
|
||
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
|
||
} else {
|
||
taosArrayDestroy(req->reqs);
|
||
}
|
||
free(pReq);
|
||
}
|
||
|
||
static FORCE_INLINE void tFreeClientKv(void* pKv) {
|
||
SKv* kv = (SKv*)pKv;
|
||
if (kv) {
|
||
tfree(kv->value);
|
||
}
|
||
}
|
||
|
||
static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) {
|
||
SClientHbRsp* rsp = (SClientHbRsp*)pRsp;
|
||
if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv);
|
||
}
|
||
|
||
static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
|
||
SClientHbBatchRsp* rsp = (SClientHbBatchRsp*)pRsp;
|
||
taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp);
|
||
}
|
||
|
||
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
|
||
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
|
||
|
||
static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) {
|
||
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
|
||
if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
|
||
if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
|
||
return 0;
|
||
}
|
||
|
||
static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
|
||
if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1;
|
||
if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1;
|
||
pKv->value = malloc(pKv->valueLen + 1);
|
||
if (pKv->value == NULL) return -1;
|
||
if (tDecodeCStrTo(pDecoder, (char*)pKv->value) < 0) return -1;
|
||
return 0;
|
||
}
|
||
|
||
static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) {
|
||
if (tEncodeI32(pEncoder, pKey->connId) < 0) return -1;
|
||
if (tEncodeI32(pEncoder, pKey->hbType) < 0) return -1;
|
||
return 0;
|
||
}
|
||
|
||
static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) {
|
||
if (tDecodeI32(pDecoder, &pKey->connId) < 0) return -1;
|
||
if (tDecodeI32(pDecoder, &pKey->hbType) < 0) return -1;
|
||
return 0;
|
||
}
|
||
|
||
typedef struct SMqHbVgInfo {
|
||
int32_t vgId;
|
||
} SMqHbVgInfo;
|
||
|
||
static FORCE_INLINE int32_t taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI32(buf, pVgInfo->vgId);
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) {
|
||
buf = taosDecodeFixedI32(buf, &pVgInfo->vgId);
|
||
return buf;
|
||
}
|
||
|
||
typedef struct SMqHbTopicInfo {
|
||
int32_t epoch;
|
||
int64_t topicUid;
|
||
char name[TSDB_TOPIC_FNAME_LEN];
|
||
SArray* pVgInfo;
|
||
} SMqHbTopicInfo;
|
||
|
||
static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch);
|
||
tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid);
|
||
tlen += taosEncodeString(buf, pTopicInfo->name);
|
||
int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo);
|
||
tlen += taosEncodeFixedI32(buf, sz);
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i);
|
||
tlen += taosEncodeSMqVgInfo(buf, pVgInfo);
|
||
}
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) {
|
||
buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch);
|
||
buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid);
|
||
buf = taosDecodeStringTo(buf, pTopicInfo->name);
|
||
int32_t sz;
|
||
buf = taosDecodeFixedI32(buf, &sz);
|
||
pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo));
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqHbVgInfo vgInfo;
|
||
buf = taosDecodeSMqVgInfo(buf, &vgInfo);
|
||
taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
|
||
}
|
||
return buf;
|
||
}
|
||
|
||
typedef struct SMqHbMsg {
|
||
int32_t status; // ask hb endpoint
|
||
int32_t epoch;
|
||
int64_t consumerId;
|
||
SArray* pTopics; // SArray<SMqHbTopicInfo>
|
||
} SMqHbMsg;
|
||
|
||
static FORCE_INLINE int32_t taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI32(buf, pMsg->status);
|
||
tlen += taosEncodeFixedI32(buf, pMsg->epoch);
|
||
tlen += taosEncodeFixedI64(buf, pMsg->consumerId);
|
||
int32_t sz = taosArrayGetSize(pMsg->pTopics);
|
||
tlen += taosEncodeFixedI32(buf, sz);
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i);
|
||
tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo);
|
||
}
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
|
||
buf = taosDecodeFixedI32(buf, &pMsg->status);
|
||
buf = taosDecodeFixedI32(buf, &pMsg->epoch);
|
||
buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
|
||
int32_t sz;
|
||
buf = taosDecodeFixedI32(buf, &sz);
|
||
pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo));
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqHbTopicInfo topicInfo;
|
||
buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo);
|
||
taosArrayPush(pMsg->pTopics, &topicInfo);
|
||
}
|
||
return buf;
|
||
}
|
||
|
||
typedef struct {
|
||
int64_t leftForVer;
|
||
int32_t vgId;
|
||
int64_t consumerId;
|
||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||
char cgroup[TSDB_CGROUP_LEN];
|
||
char* sql;
|
||
char* logicalPlan;
|
||
char* physicalPlan;
|
||
char* qmsg;
|
||
} SMqSetCVgReq;
|
||
|
||
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
|
||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
||
tlen += taosEncodeString(buf, pReq->topicName);
|
||
tlen += taosEncodeString(buf, pReq->cgroup);
|
||
tlen += taosEncodeString(buf, pReq->sql);
|
||
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
||
tlen += taosEncodeString(buf, pReq->qmsg);
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
||
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
|
||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||
buf = taosDecodeStringTo(buf, pReq->topicName);
|
||
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
||
buf = taosDecodeString(buf, &pReq->sql);
|
||
buf = taosDecodeString(buf, &pReq->logicalPlan);
|
||
buf = taosDecodeString(buf, &pReq->physicalPlan);
|
||
buf = taosDecodeString(buf, &pReq->qmsg);
|
||
return buf;
|
||
}
|
||
|
||
typedef struct {
|
||
int64_t leftForVer;
|
||
int32_t vgId;
|
||
int64_t oldConsumerId;
|
||
int64_t newConsumerId;
|
||
} SMqMVRebReq;
|
||
|
||
static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
|
||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
||
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
|
||
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
|
||
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
|
||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
||
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
|
||
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
|
||
return buf;
|
||
}
|
||
|
||
typedef struct {
|
||
SMsgHead header;
|
||
int32_t vgId;
|
||
int64_t consumerId;
|
||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||
char cgroup[TSDB_CGROUP_LEN];
|
||
} SMqSetCVgRsp;
|
||
|
||
typedef struct {
|
||
SMsgHead header;
|
||
int32_t vgId;
|
||
int64_t consumerId;
|
||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||
char cgroup[TSDB_CGROUP_LEN];
|
||
} SMqMVRebRsp;
|
||
|
||
typedef struct {
|
||
int32_t vgId;
|
||
int64_t offset;
|
||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||
char cgroup[TSDB_CGROUP_LEN];
|
||
} SMqOffset;
|
||
|
||
typedef struct {
|
||
int32_t num;
|
||
SMqOffset* offsets;
|
||
} SMqCMCommitOffsetReq;
|
||
|
||
typedef struct {
|
||
int32_t reserved;
|
||
} SMqCMCommitOffsetRsp;
|
||
|
||
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
|
||
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
|
||
int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq);
|
||
int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq);
|
||
|
||
typedef struct {
|
||
uint32_t nCols;
|
||
SSchema* pSchema;
|
||
} SSchemaWrapper;
|
||
|
||
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI8(buf, pSchema->type);
|
||
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
|
||
tlen += taosEncodeFixedI32(buf, pSchema->colId);
|
||
tlen += taosEncodeString(buf, pSchema->name);
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
|
||
buf = taosDecodeFixedI8(buf, &pSchema->type);
|
||
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
|
||
buf = taosDecodeFixedI32(buf, &pSchema->colId);
|
||
buf = taosDecodeStringTo(buf, pSchema->name);
|
||
return buf;
|
||
}
|
||
|
||
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
|
||
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
|
||
if (tEncodeI32(pEncoder, pSchema->bytes) < 0) return -1;
|
||
if (tEncodeI32(pEncoder, pSchema->colId) < 0) return -1;
|
||
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
|
||
return 0;
|
||
}
|
||
|
||
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
|
||
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
|
||
if (tDecodeI32(pDecoder, &pSchema->bytes) < 0) return -1;
|
||
if (tDecodeI32(pDecoder, &pSchema->colId) < 0) return -1;
|
||
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
|
||
return 0;
|
||
}
|
||
|
||
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedU32(buf, pSW->nCols);
|
||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
|
||
}
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
|
||
buf = taosDecodeFixedU32(buf, &pSW->nCols);
|
||
pSW->pSchema = (SSchema*)calloc(pSW->nCols, sizeof(SSchema));
|
||
if (pSW->pSchema == NULL) {
|
||
return NULL;
|
||
}
|
||
|
||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
|
||
}
|
||
return buf;
|
||
}
|
||
typedef struct {
|
||
int8_t version; // for compatibility(default 0)
|
||
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
||
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
|
||
char indexName[TSDB_INDEX_NAME_LEN];
|
||
char timezone[TD_TIMEZONE_LEN]; // sma data expired if timezone changes.
|
||
int32_t exprLen;
|
||
int32_t tagsFilterLen;
|
||
int64_t indexUid;
|
||
tb_uid_t tableUid; // super/child/common table uid
|
||
int64_t interval;
|
||
int64_t offset; // use unit by precision of DB
|
||
int64_t sliding;
|
||
char* expr; // sma expression
|
||
char* tagsFilter;
|
||
} STSma; // Time-range-wise SMA
|
||
|
||
typedef struct {
|
||
int64_t ver; // use a general definition
|
||
STSma tSma;
|
||
} SVCreateTSmaReq;
|
||
|
||
typedef struct {
|
||
int8_t type; // 0 status report, 1 update data
|
||
int64_t indexUid;
|
||
int64_t skey; // start TS key of interval/sliding window
|
||
} STSmaMsg;
|
||
|
||
typedef struct {
|
||
int64_t ver; // use a general definition
|
||
int64_t indexUid;
|
||
char indexName[TSDB_INDEX_NAME_LEN];
|
||
} SVDropTSmaReq;
|
||
|
||
typedef struct {
|
||
int tmp; // TODO: to avoid compile error
|
||
} SVCreateTSmaRsp, SVDropTSmaRsp;
|
||
|
||
int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq);
|
||
void* tDeserializeSVCreateTSmaReq(void* buf, SVCreateTSmaReq* pReq);
|
||
int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq);
|
||
void* tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq);
|
||
|
||
typedef struct {
|
||
col_id_t colId;
|
||
uint16_t blockSize; // sma data block size
|
||
char data[];
|
||
} STSmaColData;
|
||
|
||
typedef struct {
|
||
tb_uid_t tableUid; // super/child/normal table uid
|
||
int32_t dataLen; // not including head
|
||
char data[];
|
||
} STSmaTbData;
|
||
|
||
typedef struct {
|
||
int64_t indexUid;
|
||
TSKEY skey; // startKey of one interval/sliding window
|
||
int64_t interval;
|
||
int32_t dataLen; // not including head
|
||
int8_t intervalUnit;
|
||
char data[];
|
||
} STSmaDataWrapper; // sma data for a interval/sliding window
|
||
|
||
// interval/sliding => window
|
||
|
||
// => window->table->colId
|
||
// => 当一个window下所有的表均计算完成时,流计算告知tsdb清除window的过期标记
|
||
|
||
// RSma: Rollup SMA
|
||
typedef struct {
|
||
int64_t interval;
|
||
int32_t retention; // unit: day
|
||
uint16_t days; // unit: day
|
||
int8_t intervalUnit;
|
||
} SSmaParams;
|
||
|
||
typedef struct {
|
||
STSma tsma;
|
||
float xFilesFactor;
|
||
SArray* smaParams; // SSmaParams
|
||
} SRSma;
|
||
|
||
typedef struct {
|
||
uint32_t number;
|
||
STSma* tSma;
|
||
} STSmaWrapper;
|
||
|
||
static FORCE_INLINE void tdDestroyTSma(STSma* pSma) {
|
||
if (pSma) {
|
||
tfree(pSma->expr);
|
||
tfree(pSma->tagsFilter);
|
||
}
|
||
}
|
||
|
||
static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) {
|
||
if (pSW) {
|
||
if (pSW->tSma) {
|
||
for (uint32_t i = 0; i < pSW->number; ++i) {
|
||
tdDestroyTSma(pSW->tSma + i);
|
||
}
|
||
tfree(pSW->tSma);
|
||
}
|
||
}
|
||
}
|
||
|
||
static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
|
||
int32_t tlen = 0;
|
||
|
||
tlen += taosEncodeFixedI8(buf, pSma->version);
|
||
tlen += taosEncodeFixedI8(buf, pSma->intervalUnit);
|
||
tlen += taosEncodeFixedI8(buf, pSma->slidingUnit);
|
||
tlen += taosEncodeString(buf, pSma->indexName);
|
||
tlen += taosEncodeString(buf, pSma->timezone);
|
||
tlen += taosEncodeFixedI32(buf, pSma->exprLen);
|
||
tlen += taosEncodeFixedI32(buf, pSma->tagsFilterLen);
|
||
tlen += taosEncodeFixedI64(buf, pSma->indexUid);
|
||
tlen += taosEncodeFixedI64(buf, pSma->tableUid);
|
||
tlen += taosEncodeFixedI64(buf, pSma->interval);
|
||
tlen += taosEncodeFixedI64(buf, pSma->offset);
|
||
tlen += taosEncodeFixedI64(buf, pSma->sliding);
|
||
|
||
if (pSma->exprLen > 0) {
|
||
tlen += taosEncodeString(buf, pSma->expr);
|
||
}
|
||
|
||
if (pSma->tagsFilterLen > 0) {
|
||
tlen += taosEncodeString(buf, pSma->tagsFilter);
|
||
}
|
||
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* pSW) {
|
||
int32_t tlen = 0;
|
||
|
||
tlen += taosEncodeFixedU32(buf, pSW->number);
|
||
for (uint32_t i = 0; i < pSW->number; ++i) {
|
||
tlen += tEncodeTSma(buf, pSW->tSma + i);
|
||
}
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
|
||
buf = taosDecodeFixedI8(buf, &pSma->version);
|
||
buf = taosDecodeFixedI8(buf, &pSma->intervalUnit);
|
||
buf = taosDecodeFixedI8(buf, &pSma->slidingUnit);
|
||
buf = taosDecodeStringTo(buf, pSma->indexName);
|
||
buf = taosDecodeStringTo(buf, pSma->timezone);
|
||
buf = taosDecodeFixedI32(buf, &pSma->exprLen);
|
||
buf = taosDecodeFixedI32(buf, &pSma->tagsFilterLen);
|
||
buf = taosDecodeFixedI64(buf, &pSma->indexUid);
|
||
buf = taosDecodeFixedI64(buf, &pSma->tableUid);
|
||
buf = taosDecodeFixedI64(buf, &pSma->interval);
|
||
buf = taosDecodeFixedI64(buf, &pSma->offset);
|
||
buf = taosDecodeFixedI64(buf, &pSma->sliding);
|
||
|
||
if (pSma->exprLen > 0) {
|
||
if ((buf = taosDecodeString(buf, &pSma->expr)) == NULL) {
|
||
tdDestroyTSma(pSma);
|
||
return NULL;
|
||
}
|
||
} else {
|
||
pSma->expr = NULL;
|
||
}
|
||
|
||
if (pSma->tagsFilterLen > 0) {
|
||
if ((buf = taosDecodeString(buf, &pSma->tagsFilter)) == NULL) {
|
||
tdDestroyTSma(pSma);
|
||
return NULL;
|
||
}
|
||
} else {
|
||
pSma->tagsFilter = NULL;
|
||
}
|
||
|
||
return buf;
|
||
}
|
||
|
||
static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) {
|
||
buf = taosDecodeFixedU32(buf, &pSW->number);
|
||
|
||
pSW->tSma = (STSma*)calloc(pSW->number, sizeof(STSma));
|
||
if (pSW->tSma == NULL) {
|
||
return NULL;
|
||
}
|
||
|
||
for (uint32_t i = 0; i < pSW->number; ++i) {
|
||
if ((buf = tDecodeTSma(buf, pSW->tSma + i)) == NULL) {
|
||
for (uint32_t j = i; j >= 0; --i) {
|
||
tdDestroyTSma(pSW->tSma + j);
|
||
}
|
||
free(pSW->tSma);
|
||
return NULL;
|
||
}
|
||
}
|
||
return buf;
|
||
}
|
||
|
||
typedef struct {
|
||
int64_t uid;
|
||
int32_t numOfRows;
|
||
char* colData;
|
||
} SMqTbData;
|
||
|
||
typedef struct {
|
||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||
int64_t committedOffset;
|
||
int64_t reqOffset;
|
||
int64_t rspOffset;
|
||
int32_t skipLogNum;
|
||
int32_t bodyLen;
|
||
int32_t numOfTb;
|
||
SMqTbData* tbData;
|
||
} SMqTopicData;
|
||
|
||
typedef struct {
|
||
int8_t mqMsgType;
|
||
int32_t code;
|
||
int32_t epoch;
|
||
int64_t consumerId;
|
||
} SMqRspHead;
|
||
|
||
typedef struct {
|
||
SMsgHead head;
|
||
|
||
int64_t consumerId;
|
||
int64_t blockingTime;
|
||
int32_t epoch;
|
||
int8_t withSchema;
|
||
char cgroup[TSDB_CGROUP_LEN];
|
||
|
||
int64_t currentOffset;
|
||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||
} SMqPollReq;
|
||
|
||
typedef struct {
|
||
int32_t vgId;
|
||
int64_t offset;
|
||
SEpSet epSet;
|
||
} SMqSubVgEp;
|
||
|
||
typedef struct {
|
||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||
SArray* vgs; // SArray<SMqSubVgEp>
|
||
} SMqSubTopicEp;
|
||
|
||
typedef struct {
|
||
SMqRspHead head;
|
||
int64_t reqOffset;
|
||
int64_t rspOffset;
|
||
int32_t skipLogNum;
|
||
// TODO: replace with topic name
|
||
int32_t numOfTopics;
|
||
// TODO: remove from msg
|
||
SSchemaWrapper* schema;
|
||
SArray* pBlockData; // SArray<SSDataBlock>
|
||
} SMqPollRsp;
|
||
|
||
typedef struct {
|
||
SMqRspHead head;
|
||
char cgroup[TSDB_CGROUP_LEN];
|
||
SArray* topics; // SArray<SMqSubTopicEp>
|
||
} SMqCMGetSubEpRsp;
|
||
|
||
typedef struct {
|
||
int32_t curBlock;
|
||
int32_t curRow;
|
||
void** uData;
|
||
} SMqRowIter;
|
||
|
||
struct tmq_message_t {
|
||
SMqPollRsp msg;
|
||
void* vg;
|
||
SMqRowIter iter;
|
||
};
|
||
|
||
#if 0
|
||
struct tmq_message_t {
|
||
SMqRspHead head;
|
||
union {
|
||
SMqPollRsp consumeRsp;
|
||
SMqCMGetSubEpRsp getEpRsp;
|
||
};
|
||
void* extra;
|
||
int32_t curBlock;
|
||
int32_t curRow;
|
||
void** uData;
|
||
};
|
||
#endif
|
||
|
||
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
|
||
|
||
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
|
||
tlen += taosEncodeFixedI64(buf, pVgEp->offset);
|
||
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
|
||
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
|
||
buf = taosDecodeFixedI64(buf, &pVgEp->offset);
|
||
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
|
||
return buf;
|
||
}
|
||
|
||
static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
|
||
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
|
||
}
|
||
|
||
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
|
||
int32_t tlen = 0;
|
||
tlen += taosEncodeString(buf, pTopicEp->topic);
|
||
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
|
||
tlen += taosEncodeFixedI32(buf, sz);
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
|
||
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
|
||
}
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
|
||
buf = taosDecodeStringTo(buf, pTopicEp->topic);
|
||
int32_t sz;
|
||
buf = taosDecodeFixedI32(buf, &sz);
|
||
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
|
||
if (pTopicEp->vgs == NULL) {
|
||
return NULL;
|
||
}
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqSubVgEp vgEp;
|
||
buf = tDecodeSMqSubVgEp(buf, &vgEp);
|
||
taosArrayPush(pTopicEp->vgs, &vgEp);
|
||
}
|
||
return buf;
|
||
}
|
||
|
||
static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
|
||
int32_t tlen = 0;
|
||
// tlen += taosEncodeString(buf, pRsp->cgroup);
|
||
int32_t sz = taosArrayGetSize(pRsp->topics);
|
||
tlen += taosEncodeFixedI32(buf, sz);
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i);
|
||
tlen += tEncodeSMqSubTopicEp(buf, pVgEp);
|
||
}
|
||
return tlen;
|
||
}
|
||
|
||
static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
|
||
// buf = taosDecodeStringTo(buf, pRsp->cgroup);
|
||
int32_t sz;
|
||
buf = taosDecodeFixedI32(buf, &sz);
|
||
pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
|
||
if (pRsp->topics == NULL) {
|
||
return NULL;
|
||
}
|
||
for (int32_t i = 0; i < sz; i++) {
|
||
SMqSubTopicEp topicEp;
|
||
buf = tDecodeSMqSubTopicEp(buf, &topicEp);
|
||
taosArrayPush(pRsp->topics, &topicEp);
|
||
}
|
||
return buf;
|
||
}
|
||
|
||
enum {
|
||
STREAM_TASK_STATUS__RUNNING = 1,
|
||
STREAM_TASK_STATUS__STOP,
|
||
};
|
||
|
||
typedef struct {
|
||
void* inputHandle;
|
||
void* executor[4];
|
||
} SStreamTaskParRunner;
|
||
|
||
typedef struct {
|
||
int64_t streamId;
|
||
int32_t taskId;
|
||
int32_t level;
|
||
int8_t status;
|
||
int8_t pipeEnd;
|
||
int8_t parallel;
|
||
SEpSet NextOpEp;
|
||
char* qmsg;
|
||
// not applied to encoder and decoder
|
||
SStreamTaskParRunner runner;
|
||
// void* executor;
|
||
// void* stateStore;
|
||
// storage handle
|
||
} SStreamTask;
|
||
|
||
static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId, int32_t level) {
|
||
SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask));
|
||
if (pTask == NULL) {
|
||
return NULL;
|
||
}
|
||
pTask->taskId = tGenIdPI32();
|
||
pTask->status = STREAM_TASK_STATUS__RUNNING;
|
||
pTask->qmsg = NULL;
|
||
return pTask;
|
||
}
|
||
|
||
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask);
|
||
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask);
|
||
void tFreeSStreamTask(SStreamTask* pTask);
|
||
|
||
typedef struct {
|
||
SMsgHead head;
|
||
SStreamTask* task;
|
||
} SStreamTaskDeployReq;
|
||
|
||
typedef struct {
|
||
int32_t reserved;
|
||
} SStreamTaskDeployRsp;
|
||
|
||
typedef struct {
|
||
SStreamExecMsgHead head;
|
||
// TODO: other info needed by task
|
||
} SStreamTaskExecReq;
|
||
|
||
typedef struct {
|
||
int32_t reserved;
|
||
} SStreamTaskExecRsp;
|
||
|
||
#pragma pack(pop)
|
||
|
||
#ifdef __cplusplus
|
||
}
|
||
#endif
|
||
|
||
#endif /*_TD_COMMON_TAOS_MSG_H_*/
|