valid msg

This commit is contained in:
yihaoDeng 2023-12-01 14:20:16 +08:00
parent c75e9ade39
commit 3ae0e2d74f
3 changed files with 230 additions and 223 deletions

View File

@ -35,12 +35,14 @@ extern "C" {
#define TD_MSG_NUMBER_
#undef TD_MSG_DICT_
#undef TD_MSG_INFO_
#undef TD_MSG_RANGE_CODE_
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
#undef TD_MSG_NUMBER_
#undef TD_MSG_DICT_
#undef TD_MSG_INFO_
#undef TD_MSG_RANGE_CODE_
#define TD_MSG_SEG_CODE_
#include "tmsgdef.h"
@ -48,33 +50,22 @@ extern "C" {
#undef TD_MSG_DICT_
#undef TD_MSG_INFO_
#undef TD_MSG_SEG_CODE_
#undef TD_MSG_RANGE_CODE_
#include "tmsgdef.h"
extern char* tMsgInfo[];
extern int32_t tMsgDict[];
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
#define TMSG_INFO(TYPE) \
((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \
(TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) || \
(TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG || (TYPE) < TDMT_VND_TMQ_MAX_MSG \
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
: 0
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
// extern int32_t tMsgRangeDict[];
typedef uint16_t tmsg_t;
static inline bool tmsgIsValid(tmsg_t type) {
if (type < TDMT_DND_MAX_MSG || type < TDMT_MND_MAX_MSG || type < TDMT_VND_MAX_MSG || type < TDMT_SCH_MAX_MSG ||
type < TDMT_STREAM_MAX_MSG || type < TDMT_MON_MAX_MSG || type < TDMT_SYNC_MAX_MSG || type < TDMT_VND_STREAM_MSG ||
type < TDMT_VND_TMQ_MSG || type < TDMT_VND_TMQ_MAX_MSG) {
return true;
} else {
return false;
}
}
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
bool tmsgIsValid(tmsg_t type);
char* TMSG_INFO(tmsg_t type);
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT) ||
@ -169,14 +160,14 @@ typedef enum _mgmt_table {
#define TSDB_FILL_PREV 6
#define TSDB_FILL_NEXT 7
#define TSDB_ALTER_USER_PASSWD 0x1
#define TSDB_ALTER_USER_SUPERUSER 0x2
#define TSDB_ALTER_USER_ENABLE 0x3
#define TSDB_ALTER_USER_SYSINFO 0x4
#define TSDB_ALTER_USER_ADD_PRIVILEGES 0x5
#define TSDB_ALTER_USER_DEL_PRIVILEGES 0x6
#define TSDB_ALTER_USER_ADD_WHITE_LIST 0x7
#define TSDB_ALTER_USER_DROP_WHITE_LIST 0x8
#define TSDB_ALTER_USER_PASSWD 0x1
#define TSDB_ALTER_USER_SUPERUSER 0x2
#define TSDB_ALTER_USER_ENABLE 0x3
#define TSDB_ALTER_USER_SYSINFO 0x4
#define TSDB_ALTER_USER_ADD_PRIVILEGES 0x5
#define TSDB_ALTER_USER_DEL_PRIVILEGES 0x6
#define TSDB_ALTER_USER_ADD_WHITE_LIST 0x7
#define TSDB_ALTER_USER_DROP_WHITE_LIST 0x8
#define TSDB_KILL_MSG_LEN 30
@ -346,7 +337,7 @@ typedef enum ENodeType {
QUERY_NODE_RESTORE_MNODE_STMT,
QUERY_NODE_RESTORE_VNODE_STMT,
QUERY_NODE_PAUSE_STREAM_STMT,
QUERY_NODE_RESUME_STREAM_STMT,
QUERY_NODE_RESUME_STREAM_STMT,
QUERY_NODE_CREATE_VIEW_STMT,
QUERY_NODE_DROP_VIEW_STMT,
@ -790,7 +781,7 @@ typedef struct {
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
void tFreeSMDropStbReq(SMDropStbReq *pReq);
void tFreeSMDropStbReq(SMDropStbReq* pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
@ -871,18 +862,18 @@ int32_t tSerializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq
int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
typedef struct {
char user[TSDB_USER_LEN];
char user[TSDB_USER_LEN];
int32_t sqlLen;
char* sql;
} SDropUserReq, SDropAcctReq;
int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
int32_t tDeserializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
void tFreeSDropUserReq(SDropUserReq *pReq);
void tFreeSDropUserReq(SDropUserReq* pReq);
typedef struct SIpV4Range{
uint32_t ip;
uint32_t mask;
typedef struct SIpV4Range {
uint32_t ip;
uint32_t mask;
} SIpV4Range;
typedef struct {
@ -892,21 +883,21 @@ typedef struct {
SIpWhiteList* cloneIpWhiteList(SIpWhiteList* pIpWhiteList);
typedef struct {
int8_t createType;
int8_t superUser; // denote if it is a super user or not
int8_t sysInfo;
int8_t enable;
char user[TSDB_USER_LEN];
char pass[TSDB_USET_PASSWORD_LEN];
int8_t createType;
int8_t superUser; // denote if it is a super user or not
int8_t sysInfo;
int8_t enable;
char user[TSDB_USER_LEN];
char pass[TSDB_USET_PASSWORD_LEN];
int32_t numIpRanges;
SIpV4Range* pIpRanges;
int32_t sqlLen;
char* sql;
int32_t sqlLen;
char* sql;
} SCreateUserReq;
int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
void tFreeSCreateUserReq(SCreateUserReq *pReq);
void tFreeSCreateUserReq(SCreateUserReq* pReq);
typedef struct {
int64_t ver;
@ -933,22 +924,22 @@ int32_t tSerializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq
int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
typedef struct {
int8_t alterType;
int8_t superUser;
int8_t sysInfo;
int8_t enable;
int8_t isView;
char user[TSDB_USER_LEN];
char pass[TSDB_USET_PASSWORD_LEN];
char objname[TSDB_DB_FNAME_LEN]; // db or topic
char tabName[TSDB_TABLE_NAME_LEN];
char* tagCond;
int32_t tagCondLen;
int8_t alterType;
int8_t superUser;
int8_t sysInfo;
int8_t enable;
int8_t isView;
char user[TSDB_USER_LEN];
char pass[TSDB_USET_PASSWORD_LEN];
char objname[TSDB_DB_FNAME_LEN]; // db or topic
char tabName[TSDB_TABLE_NAME_LEN];
char* tagCond;
int32_t tagCondLen;
int32_t numIpRanges;
SIpV4Range* pIpRanges;
int64_t privileges;
int32_t sqlLen;
char* sql;
int32_t sqlLen;
char* sql;
} SAlterUserReq;
int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
@ -978,9 +969,9 @@ typedef struct {
SHashObj* alterTbs;
SHashObj* readViews;
SHashObj* writeViews;
SHashObj* alterViews;
SHashObj* alterViews;
SHashObj* useDbs;
int64_t whiteListVer;
int64_t whiteListVer;
} SGetUserAuthRsp;
int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
@ -995,8 +986,8 @@ int32_t tSerializeSGetUserWhiteListReq(void* buf, int32_t bufLen, SGetUserWhiteL
int32_t tDeserializeSGetUserWhiteListReq(void* buf, int32_t bufLen, SGetUserWhiteListReq* pReq);
typedef struct {
char user[TSDB_USER_LEN];
int32_t numWhiteLists;
char user[TSDB_USER_LEN];
int32_t numWhiteLists;
SIpV4Range* pWhiteLists;
} SGetUserWhiteListRsp;
@ -1169,8 +1160,8 @@ int32_t tDeserializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
void tFreeSAlterDbReq(SAlterDbReq* pReq);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
int8_t ignoreNotExists;
char db[TSDB_DB_FNAME_LEN];
int8_t ignoreNotExists;
int32_t sqlLen;
char* sql;
} SDropDbReq;
@ -1378,7 +1369,7 @@ typedef struct {
int32_t tSerializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq);
int32_t tDeserializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq);
void tFreeSCompactDbReq(SCompactDbReq *pReq);
void tFreeSCompactDbReq(SCompactDbReq* pReq);
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
@ -1817,7 +1808,6 @@ int32_t tSerializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp);
int32_t tDeserializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp);
void tFreeSViewHbRsp(SViewHbRsp* pRsp);
typedef struct {
int32_t numOfTables;
int32_t numOfVgroup;
@ -2006,7 +1996,7 @@ typedef struct {
int32_t tSerializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq);
int32_t tDeserializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq);
void tFreeSRestoreDnodeReq(SRestoreDnodeReq *pReq);
void tFreeSRestoreDnodeReq(SRestoreDnodeReq* pReq);
typedef struct {
int32_t dnodeId;
@ -2018,7 +2008,7 @@ typedef struct {
int32_t tSerializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);
int32_t tDeserializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);
void tFreeSMCfgDnodeReq(SMCfgDnodeReq *pReq);
void tFreeSMCfgDnodeReq(SMCfgDnodeReq* pReq);
typedef struct {
char config[TSDB_DNODE_CONFIG_LEN];
@ -2037,7 +2027,7 @@ typedef struct {
int32_t tSerializeSCreateDropMQSNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq);
int32_t tDeserializeSCreateDropMQSNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq);
void tFreeSMCreateQnodeReq(SMCreateQnodeReq *pReq);
void tFreeSMCreateQnodeReq(SMCreateQnodeReq* pReq);
void tFreeSDDropQnodeReq(SDDropQnodeReq* pReq);
typedef struct {
int8_t replica;
@ -2079,7 +2069,7 @@ typedef struct {
int32_t tSerializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq* pReq);
int32_t tDeserializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq* pReq);
void tFreeSBalanceVgroupReq(SBalanceVgroupReq *pReq);
void tFreeSBalanceVgroupReq(SBalanceVgroupReq* pReq);
typedef struct {
int32_t vgId1;
@ -2100,7 +2090,7 @@ typedef struct {
int32_t tSerializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq *pReq);
void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq* pReq);
typedef struct {
int32_t useless;
@ -2111,7 +2101,7 @@ typedef struct {
int32_t tSerializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);
int32_t tDeserializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);
void tFreeSBalanceVgroupLeaderReq(SBalanceVgroupLeaderReq *pReq);
void tFreeSBalanceVgroupLeaderReq(SBalanceVgroupLeaderReq* pReq);
typedef struct {
int32_t vgId;
@ -2503,15 +2493,15 @@ typedef struct {
} SMVSubscribeRsp;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
int8_t igNotExists;
char name[TSDB_TOPIC_FNAME_LEN];
int8_t igNotExists;
int32_t sqlLen;
char* sql;
} SMDropTopicReq;
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
void tFreeSMDropTopicReq(SMDropTopicReq *pReq);
void tFreeSMDropTopicReq(SMDropTopicReq* pReq);
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
@ -3082,8 +3072,8 @@ typedef struct {
} SMqVDeleteRsp;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
int8_t igNotExists;
char name[TSDB_STREAM_FNAME_LEN];
int8_t igNotExists;
int32_t sqlLen;
char* sql;
} SMDropStreamReq;
@ -3920,7 +3910,7 @@ int32_t tDeserializeSCMDropViewReq(void* buf, int32_t bufLen, SCMDropViewReq* pR
void tFreeSCMDropViewReq(SCMDropViewReq* pReq);
typedef struct {
char fullname[TSDB_VIEW_FNAME_LEN];
char fullname[TSDB_VIEW_FNAME_LEN];
} SViewMetaReq;
int32_t tSerializeSViewMetaReq(void* buf, int32_t bufLen, const SViewMetaReq* pReq);
int32_t tDeserializeSViewMetaReq(void* buf, int32_t bufLen, SViewMetaReq* pReq);
@ -3942,7 +3932,6 @@ int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pR
int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp);
void tFreeSViewMetaRsp(SViewMetaRsp* pRsp);
#pragma pack(pop)
#ifdef __cplusplus

View File

@ -24,48 +24,74 @@
#if defined(TD_MSG_INFO_)
#undef TD_NEW_MSG_SEG
#undef TD_DEF_MSG_TYPE
#define TD_NEW_MSG_SEG(TYPE) "null",
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) MSG, MSG "-rsp",
#undef TD_NEW_MSG_SEG
#undef TD_DEF_MSG_TYPE
#undef TD_CLOSE_MSG_TYPE
#define TD_NEW_MSG_SEG(TYPE) "null",
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) MSG, MSG "-rsp",
#define TD_CLOSE_MSG_TYPE(TYPE)
char *tMsgInfo[] = {
char *tMsgInfo[] = {
#elif defined(TD_MSG_RANGE_CODE_)
typedef struct {
int32_t start;
int32_t end;
} TMsgRange;
#undef TD_NEW_MSG_SEG
#undef TD_DEF_MSG_TYPE
#undef TD_CLOSE_MSG_TYPE
#define TD_NEW_MSG_SEG(TYPE) {(TYPE##_SEG_CODE) << 8,
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP)
#define TD_CLOSE_MSG_TYPE(TYPE) TYPE},
TMsgRange tMsgRangeDict[] = {
#elif defined(TD_MSG_NUMBER_)
#undef TD_NEW_MSG_SEG
#undef TD_DEF_MSG_TYPE
#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM,
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE##_NUM, TYPE##_RSP_NUM,
#undef TD_NEW_MSG_SEG
#undef TD_DEF_MSG_TYPE
#undef TD_CLOSE_MSG_TYPE
#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM,
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE##_NUM, TYPE##_RSP_NUM,
#define TD_CLOSE_MSG_TYPE(TYPE)
enum {
enum {
#elif defined(TD_MSG_DICT_)
#undef TD_NEW_MSG_SEG
#undef TD_DEF_MSG_TYPE
#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM,
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP)
#undef TD_NEW_MSG_SEG
#undef TD_DEF_MSG_TYPE
#undef TD_CLOSE_MSG_TYPE
#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM,
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP)
#define TD_CLOSE_MSG_TYPE(type)
int32_t tMsgDict[] = {
int32_t tMsgDict[] = {
#elif defined(TD_MSG_SEG_CODE_)
#undef TD_NEW_MSG_SEG
#undef TD_DEF_MSG_TYPE
#define TD_NEW_MSG_SEG(TYPE) TYPE##_SEG_CODE,
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP)
#undef TD_NEW_MSG_SEG
#undef TD_DEF_MSG_TYPE
#undef TD_CLOSE_MSG_TYPE
#define TD_NEW_MSG_SEG(TYPE) TYPE##_SEG_CODE,
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP)
#define TD_CLOSE_MSG_TYPE(TYPE)
enum {
enum {
#else
#undef TD_NEW_MSG_SEG
#undef TD_DEF_MSG_TYPE
#define TD_NEW_MSG_SEG(TYPE) TYPE = ((TYPE##_SEG_CODE) << 8),
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE, TYPE##_RSP,
#else
enum { // WARN: new msg should be appended to segment tail
#undef TD_NEW_MSG_SEG
#undef TD_DEF_MSG_TYPE
#undef TD_CLOSE_MSG_TYPE
#define TD_NEW_MSG_SEG(TYPE) TYPE = ((TYPE##_SEG_CODE) << 8),
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE, TYPE##_RSP,
#define TD_CLOSE_MSG_TYPE(TYPE) TYPE,
enum { // WARN: new msg should be appended to segment tail
#endif
TD_NEW_MSG_SEG(TDMT_DND_MSG) // 0<<8
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL)
@ -82,10 +108,12 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_UNUSED_CODE, "dnd-unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE_TYPE, "dnode-alter-mnode-type", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
TD_CLOSE_MSG_TYPE(TDMT_END_DND_MSG)
TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
@ -194,6 +222,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_DROP_VIEW, "drop-view", SCMDropViewReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_CLOSE_MSG_TYPE(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
@ -243,6 +272,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_VND_DROP_INDEX, "vnode-drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
TD_CLOSE_MSG_TYPE(TDMT_END_VND_MSG)
TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL)
@ -257,6 +287,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_TASK_NOTIFY, "task-notify", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL)
TD_CLOSE_MSG_TYPE(TDMT_END_SCH_MSG)
TD_NEW_MSG_SEG(TDMT_STREAM_MSG) //4 << 8
@ -274,9 +305,11 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_CLOSE_MSG_TYPE(TDMT_END_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8
TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL)
TD_CLOSE_MSG_TYPE(TDMT_END_MON_MSG)
TD_NEW_MSG_SEG(TDMT_SYNC_MSG) //6 << 8
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL)
@ -308,6 +341,8 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_SYNC_PREP_SNAPSHOT_REPLY, "sync-prep-snapshot-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
TD_CLOSE_MSG_TYPE(TDMT_END_SYNC_MSG)
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) //7 << 8
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
@ -317,6 +352,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_CLOSE_MSG_TYPE(TDMT_END_VND_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) //8 << 8
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp)
@ -330,9 +366,10 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committedinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
TD_CLOSE_MSG_TYPE(TDMT_END_TMQ_MSG)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
#endif
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
#endif
};

View File

@ -18,42 +18,62 @@
#undef TD_MSG_NUMBER_
#undef TD_MSG_DICT_
#undef TD_MSG_RANGE_CODE_
#define TD_MSG_INFO_
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
#undef TD_MSG_NUMBER_
#undef TD_MSG_INFO_
#undef TD_MSG_RANGE_CODE_
#define TD_MSG_DICT_
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
#undef TD_MSG_NUMBER_
#undef TD_MSG_INFO_
#undef TD_MSG_DICT_
#undef TD_MSG_SEG_CODE_
#define TD_MSG_RANGE_CODE_
#include "tmsgdef.h"
#include "tlog.h"
#define DECODESQL() \
do { \
if(!tDecodeIsEnd(&decoder)){ \
if(tDecodeI32(&decoder, &pReq->sqlLen) < 0) return -1; \
if(pReq->sqlLen > 0){ \
if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->sql, NULL) < 0) return -1; \
} \
} \
inline bool tmsgIsValid(tmsg_t type) {
static int8_t sz = sizeof(tMsgRangeDict) / sizeof(tMsgRangeDict[0]);
for (int i = 0; i < sz; i++) {
if (type > tMsgRangeDict[i].start && type < tMsgRangeDict[i].end) {
return true;
}
}
return false;
}
inline char *TMSG_INFO(tmsg_t type) { return (tmsgIsValid(type) ? tMsgInfo[TMSG_INDEX(type)] : "null"); };
#define DECODESQL() \
do { \
if (!tDecodeIsEnd(&decoder)) { \
if (tDecodeI32(&decoder, &pReq->sqlLen) < 0) return -1; \
if (pReq->sqlLen > 0) { \
if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->sql, NULL) < 0) return -1; \
} \
} \
} while (0)
#define ENCODESQL() \
do { \
if (pReq->sqlLen > 0 && pReq->sql != NULL){ \
if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \
if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; \
} \
#define ENCODESQL() \
do { \
if (pReq->sqlLen > 0 && pReq->sql != NULL) { \
if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \
if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; \
} \
} while (0)
#define FREESQL() \
do { \
if(pReq->sql != NULL){ \
taosMemoryFree(pReq->sql); \
} \
pReq->sql = NULL; \
#define FREESQL() \
do { \
if (pReq->sql != NULL) { \
taosMemoryFree(pReq->sql); \
} \
pReq->sql = NULL; \
} while (0)
static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq);
@ -742,9 +762,7 @@ int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq)
return 0;
}
void tFreeSMDropStbReq(SMDropStbReq *pReq) {
FREESQL();
}
void tFreeSMDropStbReq(SMDropStbReq *pReq) { FREESQL(); }
int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) {
SEncoder encoder = {0};
@ -1489,9 +1507,7 @@ int32_t tDeserializeSDropUserReq(void *buf, int32_t bufLen, SDropUserReq *pReq)
return 0;
}
void tFreeSDropUserReq(SDropUserReq *pReq) {
FREESQL();
}
void tFreeSDropUserReq(SDropUserReq *pReq) { FREESQL(); }
SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList) {
if (pIpWhiteList == NULL) return NULL;
@ -1822,7 +1838,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
char *tb = taosHashIterate(pRsp->readTbs, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void *key = taosHashGetKey(tb, &keyLen);
void * key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1837,7 +1853,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
tb = taosHashIterate(pRsp->writeTbs, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void *key = taosHashGetKey(tb, &keyLen);
void * key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1852,7 +1868,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
tb = taosHashIterate(pRsp->alterTbs, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void *key = taosHashGetKey(tb, &keyLen);
void * key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1867,7 +1883,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
tb = taosHashIterate(pRsp->readViews, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void *key = taosHashGetKey(tb, &keyLen);
void * key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1882,7 +1898,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
tb = taosHashIterate(pRsp->writeViews, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void *key = taosHashGetKey(tb, &keyLen);
void * key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1897,7 +1913,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
tb = taosHashIterate(pRsp->alterViews, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void *key = taosHashGetKey(tb, &keyLen);
void * key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1912,7 +1928,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
int32_t *useDb = taosHashIterate(pRsp->useDbs, NULL);
while (useDb != NULL) {
size_t keyLen = 0;
void *key = taosHashGetKey(useDb, &keyLen);
void * key = taosHashGetKey(useDb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1954,8 +1970,8 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
pRsp->alterViews = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pRsp->useDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pRsp->createdDbs == NULL || pRsp->readDbs == NULL || pRsp->writeDbs == NULL || pRsp->readTbs == NULL ||
pRsp->writeTbs == NULL || pRsp->alterTbs == NULL || pRsp->readViews == NULL ||
pRsp->writeViews == NULL || pRsp->alterViews == NULL ||pRsp->useDbs == NULL) {
pRsp->writeTbs == NULL || pRsp->alterTbs == NULL || pRsp->readViews == NULL || pRsp->writeViews == NULL ||
pRsp->alterViews == NULL || pRsp->useDbs == NULL) {
goto _err;
}
@ -2219,7 +2235,7 @@ int32_t tDeserializeSGetUserWhiteListReq(void *buf, int32_t bufLen, SGetUserWhit
return 0;
}
int32_t tSerializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp) {
int32_t tSerializeSGetUserWhiteListRsp(void *buf, int32_t bufLen, SGetUserWhiteListRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -2237,7 +2253,7 @@ int32_t tSerializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteL
return tlen;
}
int32_t tDeserializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp) {
int32_t tDeserializeSGetUserWhiteListRsp(void *buf, int32_t bufLen, SGetUserWhiteListRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
@ -2257,9 +2273,7 @@ int32_t tDeserializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhit
return 0;
}
void tFreeSGetUserWhiteListRsp(SGetUserWhiteListRsp* pRsp) {
taosMemoryFree(pRsp->pWhiteLists);
}
void tFreeSGetUserWhiteListRsp(SGetUserWhiteListRsp *pRsp) { taosMemoryFree(pRsp->pWhiteLists); }
int32_t tSerializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQnodeReq *pReq) {
SEncoder encoder = {0};
@ -2288,13 +2302,9 @@ int32_t tDeserializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQno
return 0;
}
void tFreeSMCreateQnodeReq(SMCreateQnodeReq *pReq){
FREESQL();
}
void tFreeSMCreateQnodeReq(SMCreateQnodeReq *pReq) { FREESQL(); }
void tFreeSDDropQnodeReq(SDDropQnodeReq* pReq) {
FREESQL();
}
void tFreeSDDropQnodeReq(SDDropQnodeReq *pReq) { FREESQL(); }
int32_t tSerializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq) {
SEncoder encoder = {0};
@ -2336,9 +2346,7 @@ int32_t tDeserializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq
return 0;
}
void tFreeSDropDnodeReq(SDropDnodeReq *pReq) {
FREESQL();
}
void tFreeSDropDnodeReq(SDropDnodeReq *pReq) { FREESQL(); }
int32_t tSerializeSRestoreDnodeReq(void *buf, int32_t bufLen, SRestoreDnodeReq *pReq) {
SEncoder encoder = {0};
@ -2369,9 +2377,7 @@ int32_t tDeserializeSRestoreDnodeReq(void *buf, int32_t bufLen, SRestoreDnodeReq
return 0;
}
void tFreeSRestoreDnodeReq(SRestoreDnodeReq *pReq) {
FREESQL();
}
void tFreeSRestoreDnodeReq(SRestoreDnodeReq *pReq) { FREESQL(); }
int32_t tSerializeSMCfgDnodeReq(void *buf, int32_t bufLen, SMCfgDnodeReq *pReq) {
SEncoder encoder = {0};
@ -2404,9 +2410,7 @@ int32_t tDeserializeSMCfgDnodeReq(void *buf, int32_t bufLen, SMCfgDnodeReq *pReq
return 0;
}
void tFreeSMCfgDnodeReq(SMCfgDnodeReq *pReq) {
FREESQL();
}
void tFreeSMCfgDnodeReq(SMCfgDnodeReq *pReq) { FREESQL(); }
int32_t tSerializeSDCfgDnodeReq(void *buf, int32_t bufLen, SDCfgDnodeReq *pReq) {
SEncoder encoder = {0};
@ -2464,9 +2468,7 @@ int32_t tDeserializeSCreateDnodeReq(void *buf, int32_t bufLen, SCreateDnodeReq *
return 0;
}
void tFreeSCreateDnodeReq(SCreateDnodeReq *pReq) {
FREESQL();
}
void tFreeSCreateDnodeReq(SCreateDnodeReq *pReq) { FREESQL(); }
int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq) {
SEncoder encoder = {0};
@ -3121,9 +3123,7 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
return 0;
}
void tFreeSAlterDbReq(SAlterDbReq *pReq) {
FREESQL();
}
void tFreeSAlterDbReq(SAlterDbReq *pReq) { FREESQL(); }
int32_t tSerializeSDropDbReq(void *buf, int32_t bufLen, SDropDbReq *pReq) {
SEncoder encoder = {0};
@ -3154,9 +3154,7 @@ int32_t tDeserializeSDropDbReq(void *buf, int32_t bufLen, SDropDbReq *pReq) {
return 0;
}
void tFreeSDropDbReq(SDropDbReq *pReq) {
FREESQL();
}
void tFreeSDropDbReq(SDropDbReq *pReq) { FREESQL(); }
int32_t tSerializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) {
SEncoder encoder = {0};
@ -3435,9 +3433,7 @@ int32_t tDeserializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq
return 0;
}
void tFreeSCompactDbReq(SCompactDbReq *pReq) {
FREESQL();
}
void tFreeSCompactDbReq(SCompactDbReq *pReq) { FREESQL(); }
int32_t tSerializeSUseDbRspImp(SEncoder *pEncoder, const SUseDbRsp *pRsp) {
if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1;
@ -4611,9 +4607,7 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
return 0;
}
void tFreeSMDropTopicReq(SMDropTopicReq *pReq) {
FREESQL();
}
void tFreeSMDropTopicReq(SMDropTopicReq *pReq) { FREESQL(); }
int32_t tSerializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) {
SEncoder encoder = {0};
@ -5501,9 +5495,7 @@ int32_t tDeserializeSBalanceVgroupReq(void *buf, int32_t bufLen, SBalanceVgroupR
return 0;
}
void tFreeSBalanceVgroupReq(SBalanceVgroupReq *pReq) {
FREESQL();
}
void tFreeSBalanceVgroupReq(SBalanceVgroupReq *pReq) { FREESQL(); }
int32_t tSerializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgroupLeaderReq *pReq) {
SEncoder encoder = {0};
@ -5526,7 +5518,7 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceV
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->useless) < 0) return -1;
if(!tDecodeIsEnd(&decoder)){
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
}
@ -5537,9 +5529,7 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceV
return 0;
}
void tFreeSBalanceVgroupLeaderReq(SBalanceVgroupLeaderReq *pReq) {
FREESQL();
}
void tFreeSBalanceVgroupLeaderReq(SBalanceVgroupLeaderReq *pReq) { FREESQL(); }
int32_t tSerializeSMergeVgroupReq(void *buf, int32_t bufLen, SMergeVgroupReq *pReq) {
SEncoder encoder = {0};
@ -5601,9 +5591,7 @@ int32_t tDeserializeSRedistributeVgroupReq(void *buf, int32_t bufLen, SRedistrib
return 0;
}
void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq *pReq) {
FREESQL();
}
void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq *pReq) { FREESQL(); }
int32_t tSerializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq *pReq) {
SEncoder encoder = {0};
@ -7152,9 +7140,7 @@ int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *
return 0;
}
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
FREESQL();
}
void tFreeMDropStreamReq(SMDropStreamReq *pReq) { FREESQL(); }
int32_t tSerializeSMRecoverStreamReq(void *buf, int32_t bufLen, const SMRecoverStreamReq *pReq) {
SEncoder encoder = {0};
@ -7297,8 +7283,8 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
} else {
ASSERT(0);
}
//ENCODESQL
if(pReq->sqlLen > 0 && pReq->sql != NULL) {
// ENCODESQL
if (pReq->sqlLen > 0 && pReq->sql != NULL) {
if (tEncodeI32(pCoder, pReq->sqlLen) < 0) return -1;
if (tEncodeBinary(pCoder, pReq->sql, pReq->sqlLen) < 0) return -1;
}
@ -7345,11 +7331,11 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
ASSERT(0);
}
//DECODESQL
if(!tDecodeIsEnd(pCoder)){
if(tDecodeI32(pCoder, &pReq->sqlLen) < 0) return -1;
if(pReq->sqlLen > 0){
if (tDecodeBinaryAlloc(pCoder, (void**)&pReq->sql, NULL) < 0) return -1;
// DECODESQL
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI32(pCoder, &pReq->sqlLen) < 0) return -1;
if (pReq->sqlLen > 0) {
if (tDecodeBinaryAlloc(pCoder, (void **)&pReq->sql, NULL) < 0) return -1;
}
}
@ -7375,7 +7361,7 @@ void tDestroySVCreateTbReq(SVCreateTbReq *pReq, int32_t flags) {
}
}
if(pReq->sql != NULL){
if (pReq->sql != NULL) {
taosMemoryFree(pReq->sql);
}
pReq->sql = NULL;
@ -8194,7 +8180,7 @@ int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i);
void *data = taosArrayGetP(pRsp->blockData, i);
void * data = taosArrayGetP(pRsp->blockData, i);
if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1;
if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i);
@ -8227,7 +8213,7 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
}
for (int32_t i = 0; i < pRsp->blockNum; i++) {
void *data;
void * data;
uint64_t bLen;
if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1;
taosArrayPush(pRsp->blockData, &data);
@ -8273,7 +8259,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
void *createTableReq = taosArrayGetP(pRsp->createTableReq, i);
void * createTableReq = taosArrayGetP(pRsp->createTableReq, i);
int32_t createTableLen = *(int32_t *)taosArrayGet(pRsp->createTableLen, i);
if (tEncodeBinary(pEncoder, createTableReq, createTableLen) < 0) return -1;
}
@ -8289,7 +8275,7 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(pRsp->createTableNum, sizeof(void *));
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
void *pCreate = NULL;
void * pCreate = NULL;
uint64_t len;
if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1;
int32_t l = (int32_t)len;
@ -8591,7 +8577,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
taosArrayDestroy(pTbData->aCol);
} else {
int32_t nRow = TARRAY_SIZE(pTbData->aRowP);
SRow **rows = (SRow **)TARRAY_DATA(pTbData->aRowP);
SRow ** rows = (SRow **)TARRAY_DATA(pTbData->aRowP);
for (int32_t i = 0; i < nRow; ++i) {
tRowDestroy(rows[i]);
@ -8855,7 +8841,7 @@ int32_t tDeserializeSCMCreateViewReq(void *buf, int32_t bufLen, SCMCreateViewReq
}
for (int32_t i = 0; i < pReq->numOfCols; ++i) {
SSchema* pSchema = pReq->pSchema + i;
SSchema *pSchema = pReq->pSchema + i;
if (tDecodeSSchema(&decoder, pSchema) < 0) return -1;
}
}
@ -8866,17 +8852,17 @@ int32_t tDeserializeSCMCreateViewReq(void *buf, int32_t bufLen, SCMCreateViewReq
return 0;
}
void tFreeSCMCreateViewReq(SCMCreateViewReq* pReq) {
void tFreeSCMCreateViewReq(SCMCreateViewReq *pReq) {
if (NULL == pReq) {
return;
}
taosMemoryFreeClear(pReq->querySql);
taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->pSchema);
}
int32_t tSerializeSCMDropViewReq(void* buf, int32_t bufLen, const SCMDropViewReq* pReq) {
int32_t tSerializeSCMDropViewReq(void *buf, int32_t bufLen, const SCMDropViewReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -8894,7 +8880,7 @@ int32_t tSerializeSCMDropViewReq(void* buf, int32_t bufLen, const SCMDropViewReq
return tlen;
}
int32_t tDeserializeSCMDropViewReq(void* buf, int32_t bufLen, SCMDropViewReq* pReq) {
int32_t tDeserializeSCMDropViewReq(void *buf, int32_t bufLen, SCMDropViewReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
@ -8910,15 +8896,15 @@ int32_t tDeserializeSCMDropViewReq(void* buf, int32_t bufLen, SCMDropViewReq* pR
tDecoderClear(&decoder);
return 0;
}
void tFreeSCMDropViewReq(SCMDropViewReq* pReq) {
void tFreeSCMDropViewReq(SCMDropViewReq *pReq) {
if (NULL == pReq) {
return;
}
taosMemoryFree(pReq->sql);
}
int32_t tSerializeSViewMetaReq(void* buf, int32_t bufLen, const SViewMetaReq* pReq) {
int32_t tSerializeSViewMetaReq(void *buf, int32_t bufLen, const SViewMetaReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -8932,7 +8918,7 @@ int32_t tSerializeSViewMetaReq(void* buf, int32_t bufLen, const SViewMetaReq* pR
return tlen;
}
int32_t tDeserializeSViewMetaReq(void* buf, int32_t bufLen, SViewMetaReq* pReq) {
int32_t tDeserializeSViewMetaReq(void *buf, int32_t bufLen, SViewMetaReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
@ -8964,8 +8950,7 @@ static int32_t tEncodeSViewMetaRsp(SEncoder *pEncoder, const SViewMetaRsp *pRsp)
return 0;
}
int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pRsp) {
int32_t tSerializeSViewMetaRsp(void *buf, int32_t bufLen, const SViewMetaRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -8998,7 +8983,7 @@ static int32_t tDecodeSViewMetaRsp(SDecoder *pDecoder, SViewMetaRsp *pRsp) {
}
for (int32_t i = 0; i < pRsp->numOfCols; ++i) {
SSchema* pSchema = pRsp->pSchema + i;
SSchema *pSchema = pRsp->pSchema + i;
if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1;
}
}
@ -9006,7 +8991,7 @@ static int32_t tDecodeSViewMetaRsp(SDecoder *pDecoder, SViewMetaRsp *pRsp) {
return 0;
}
int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp) {
int32_t tDeserializeSViewMetaRsp(void *buf, int32_t bufLen, SViewMetaRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
@ -9019,7 +9004,7 @@ int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp)
return 0;
}
void tFreeSViewMetaRsp(SViewMetaRsp* pRsp) {
void tFreeSViewMetaRsp(SViewMetaRsp *pRsp) {
if (NULL == pRsp) {
return;
}
@ -9064,7 +9049,7 @@ int32_t tDeserializeSViewHbRsp(void *buf, int32_t bufLen, SViewHbRsp *pRsp) {
}
for (int32_t i = 0; i < numOfMeta; ++i) {
SViewMetaRsp* metaRsp = taosMemoryCalloc(1, sizeof(SViewMetaRsp));
SViewMetaRsp *metaRsp = taosMemoryCalloc(1, sizeof(SViewMetaRsp));
if (NULL == metaRsp) return -1;
if (tDecodeSViewMetaRsp(&decoder, metaRsp) < 0) return -1;
taosArrayPush(pRsp->pViewRsp, &metaRsp);
@ -9086,7 +9071,3 @@ void tFreeSViewHbRsp(SViewHbRsp *pRsp) {
taosArrayDestroy(pRsp->pViewRsp);
}