diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 73d1ab2473..b42df78e34 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -35,12 +35,14 @@ extern "C" { #define TD_MSG_NUMBER_ #undef TD_MSG_DICT_ #undef TD_MSG_INFO_ +#undef TD_MSG_RANGE_CODE_ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" #undef TD_MSG_NUMBER_ #undef TD_MSG_DICT_ #undef TD_MSG_INFO_ +#undef TD_MSG_RANGE_CODE_ #define TD_MSG_SEG_CODE_ #include "tmsgdef.h" @@ -48,33 +50,31 @@ extern "C" { #undef TD_MSG_DICT_ #undef TD_MSG_INFO_ #undef TD_MSG_SEG_CODE_ +#undef TD_MSG_RANGE_CODE_ #include "tmsgdef.h" extern char* tMsgInfo[]; extern int32_t tMsgDict[]; - -#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8) -#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) -#define TMSG_INFO(TYPE) \ - ((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \ - (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) || \ - (TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG || (TYPE) < TDMT_VND_TMQ_MAX_MSG \ - ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ - : 0 - -#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) +extern int32_t tMsgRangeDict[]; typedef uint16_t tmsg_t; +#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8) +#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) +#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) + static inline bool tmsgIsValid(tmsg_t type) { - if (type < TDMT_DND_MAX_MSG || type < TDMT_MND_MAX_MSG || type < TDMT_VND_MAX_MSG || type < TDMT_SCH_MAX_MSG || - type < TDMT_STREAM_MAX_MSG || type < TDMT_MON_MAX_MSG || type < TDMT_SYNC_MAX_MSG || type < TDMT_VND_STREAM_MSG || - type < TDMT_VND_TMQ_MSG || type < TDMT_VND_TMQ_MAX_MSG) { - return true; - } else { - return false; + // static int8_t sz = sizeof(tMsgRangeDict) / sizeof(tMsgRangeDict[0]); + int8_t maxSegIdx = TMSG_SEG_CODE(TDMT_MAX_MSG); + int segIdx = TMSG_SEG_CODE(type); + if (segIdx >= 0 && segIdx < maxSegIdx) { + return type < tMsgRangeDict[segIdx]; } + return false; } + +#define TMSG_INFO(type) (tmsgIsValid(type) ? tMsgInfo[TMSG_INDEX(type)] : "unKnown") + static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT) || @@ -169,14 +169,14 @@ typedef enum _mgmt_table { #define TSDB_FILL_PREV 6 #define TSDB_FILL_NEXT 7 -#define TSDB_ALTER_USER_PASSWD 0x1 -#define TSDB_ALTER_USER_SUPERUSER 0x2 -#define TSDB_ALTER_USER_ENABLE 0x3 -#define TSDB_ALTER_USER_SYSINFO 0x4 -#define TSDB_ALTER_USER_ADD_PRIVILEGES 0x5 -#define TSDB_ALTER_USER_DEL_PRIVILEGES 0x6 -#define TSDB_ALTER_USER_ADD_WHITE_LIST 0x7 -#define TSDB_ALTER_USER_DROP_WHITE_LIST 0x8 +#define TSDB_ALTER_USER_PASSWD 0x1 +#define TSDB_ALTER_USER_SUPERUSER 0x2 +#define TSDB_ALTER_USER_ENABLE 0x3 +#define TSDB_ALTER_USER_SYSINFO 0x4 +#define TSDB_ALTER_USER_ADD_PRIVILEGES 0x5 +#define TSDB_ALTER_USER_DEL_PRIVILEGES 0x6 +#define TSDB_ALTER_USER_ADD_WHITE_LIST 0x7 +#define TSDB_ALTER_USER_DROP_WHITE_LIST 0x8 #define TSDB_KILL_MSG_LEN 30 @@ -346,7 +346,7 @@ typedef enum ENodeType { QUERY_NODE_RESTORE_MNODE_STMT, QUERY_NODE_RESTORE_VNODE_STMT, QUERY_NODE_PAUSE_STREAM_STMT, - QUERY_NODE_RESUME_STREAM_STMT, + QUERY_NODE_RESUME_STREAM_STMT, QUERY_NODE_CREATE_VIEW_STMT, QUERY_NODE_DROP_VIEW_STMT, @@ -790,7 +790,7 @@ typedef struct { int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq); int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq); -void tFreeSMDropStbReq(SMDropStbReq *pReq); +void tFreeSMDropStbReq(SMDropStbReq* pReq); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; @@ -871,18 +871,18 @@ int32_t tSerializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq); typedef struct { - char user[TSDB_USER_LEN]; + char user[TSDB_USER_LEN]; int32_t sqlLen; char* sql; } SDropUserReq, SDropAcctReq; int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq); int32_t tDeserializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq); -void tFreeSDropUserReq(SDropUserReq *pReq); +void tFreeSDropUserReq(SDropUserReq* pReq); -typedef struct SIpV4Range{ - uint32_t ip; - uint32_t mask; +typedef struct SIpV4Range { + uint32_t ip; + uint32_t mask; } SIpV4Range; typedef struct { @@ -892,21 +892,21 @@ typedef struct { SIpWhiteList* cloneIpWhiteList(SIpWhiteList* pIpWhiteList); typedef struct { - int8_t createType; - int8_t superUser; // denote if it is a super user or not - int8_t sysInfo; - int8_t enable; - char user[TSDB_USER_LEN]; - char pass[TSDB_USET_PASSWORD_LEN]; + int8_t createType; + int8_t superUser; // denote if it is a super user or not + int8_t sysInfo; + int8_t enable; + char user[TSDB_USER_LEN]; + char pass[TSDB_USET_PASSWORD_LEN]; int32_t numIpRanges; SIpV4Range* pIpRanges; - int32_t sqlLen; - char* sql; + int32_t sqlLen; + char* sql; } SCreateUserReq; int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq); int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq); -void tFreeSCreateUserReq(SCreateUserReq *pReq); +void tFreeSCreateUserReq(SCreateUserReq* pReq); typedef struct { int64_t ver; @@ -933,22 +933,22 @@ int32_t tSerializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq); typedef struct { - int8_t alterType; - int8_t superUser; - int8_t sysInfo; - int8_t enable; - int8_t isView; - char user[TSDB_USER_LEN]; - char pass[TSDB_USET_PASSWORD_LEN]; - char objname[TSDB_DB_FNAME_LEN]; // db or topic - char tabName[TSDB_TABLE_NAME_LEN]; - char* tagCond; - int32_t tagCondLen; + int8_t alterType; + int8_t superUser; + int8_t sysInfo; + int8_t enable; + int8_t isView; + char user[TSDB_USER_LEN]; + char pass[TSDB_USET_PASSWORD_LEN]; + char objname[TSDB_DB_FNAME_LEN]; // db or topic + char tabName[TSDB_TABLE_NAME_LEN]; + char* tagCond; + int32_t tagCondLen; int32_t numIpRanges; SIpV4Range* pIpRanges; int64_t privileges; - int32_t sqlLen; - char* sql; + int32_t sqlLen; + char* sql; } SAlterUserReq; int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq); @@ -978,9 +978,9 @@ typedef struct { SHashObj* alterTbs; SHashObj* readViews; SHashObj* writeViews; - SHashObj* alterViews; + SHashObj* alterViews; SHashObj* useDbs; - int64_t whiteListVer; + int64_t whiteListVer; } SGetUserAuthRsp; int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp); @@ -995,8 +995,8 @@ int32_t tSerializeSGetUserWhiteListReq(void* buf, int32_t bufLen, SGetUserWhiteL int32_t tDeserializeSGetUserWhiteListReq(void* buf, int32_t bufLen, SGetUserWhiteListReq* pReq); typedef struct { - char user[TSDB_USER_LEN]; - int32_t numWhiteLists; + char user[TSDB_USER_LEN]; + int32_t numWhiteLists; SIpV4Range* pWhiteLists; } SGetUserWhiteListRsp; @@ -1169,8 +1169,8 @@ int32_t tDeserializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq); void tFreeSAlterDbReq(SAlterDbReq* pReq); typedef struct { - char db[TSDB_DB_FNAME_LEN]; - int8_t ignoreNotExists; + char db[TSDB_DB_FNAME_LEN]; + int8_t ignoreNotExists; int32_t sqlLen; char* sql; } SDropDbReq; @@ -1378,7 +1378,7 @@ typedef struct { int32_t tSerializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq); int32_t tDeserializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq); -void tFreeSCompactDbReq(SCompactDbReq *pReq); +void tFreeSCompactDbReq(SCompactDbReq* pReq); typedef struct { char name[TSDB_FUNC_NAME_LEN]; @@ -1817,7 +1817,6 @@ int32_t tSerializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp); int32_t tDeserializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp); void tFreeSViewHbRsp(SViewHbRsp* pRsp); - typedef struct { int32_t numOfTables; int32_t numOfVgroup; @@ -2006,7 +2005,7 @@ typedef struct { int32_t tSerializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq); int32_t tDeserializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq); -void tFreeSRestoreDnodeReq(SRestoreDnodeReq *pReq); +void tFreeSRestoreDnodeReq(SRestoreDnodeReq* pReq); typedef struct { int32_t dnodeId; @@ -2018,7 +2017,7 @@ typedef struct { int32_t tSerializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq); int32_t tDeserializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq); -void tFreeSMCfgDnodeReq(SMCfgDnodeReq *pReq); +void tFreeSMCfgDnodeReq(SMCfgDnodeReq* pReq); typedef struct { char config[TSDB_DNODE_CONFIG_LEN]; @@ -2037,7 +2036,7 @@ typedef struct { int32_t tSerializeSCreateDropMQSNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq); int32_t tDeserializeSCreateDropMQSNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq); -void tFreeSMCreateQnodeReq(SMCreateQnodeReq *pReq); +void tFreeSMCreateQnodeReq(SMCreateQnodeReq* pReq); void tFreeSDDropQnodeReq(SDDropQnodeReq* pReq); typedef struct { int8_t replica; @@ -2079,7 +2078,7 @@ typedef struct { int32_t tSerializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq* pReq); int32_t tDeserializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq* pReq); -void tFreeSBalanceVgroupReq(SBalanceVgroupReq *pReq); +void tFreeSBalanceVgroupReq(SBalanceVgroupReq* pReq); typedef struct { int32_t vgId1; @@ -2100,7 +2099,7 @@ typedef struct { int32_t tSerializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq); int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq); -void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq *pReq); +void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq* pReq); typedef struct { int32_t useless; @@ -2111,7 +2110,7 @@ typedef struct { int32_t tSerializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq); int32_t tDeserializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq); -void tFreeSBalanceVgroupLeaderReq(SBalanceVgroupLeaderReq *pReq); +void tFreeSBalanceVgroupLeaderReq(SBalanceVgroupLeaderReq* pReq); typedef struct { int32_t vgId; @@ -2503,15 +2502,15 @@ typedef struct { } SMVSubscribeRsp; typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; - int8_t igNotExists; + char name[TSDB_TOPIC_FNAME_LEN]; + int8_t igNotExists; int32_t sqlLen; char* sql; } SMDropTopicReq; int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); -void tFreeSMDropTopicReq(SMDropTopicReq *pReq); +void tFreeSMDropTopicReq(SMDropTopicReq* pReq); typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; @@ -3082,8 +3081,8 @@ typedef struct { } SMqVDeleteRsp; typedef struct { - char name[TSDB_STREAM_FNAME_LEN]; - int8_t igNotExists; + char name[TSDB_STREAM_FNAME_LEN]; + int8_t igNotExists; int32_t sqlLen; char* sql; } SMDropStreamReq; @@ -3920,7 +3919,7 @@ int32_t tDeserializeSCMDropViewReq(void* buf, int32_t bufLen, SCMDropViewReq* pR void tFreeSCMDropViewReq(SCMDropViewReq* pReq); typedef struct { - char fullname[TSDB_VIEW_FNAME_LEN]; + char fullname[TSDB_VIEW_FNAME_LEN]; } SViewMetaReq; int32_t tSerializeSViewMetaReq(void* buf, int32_t bufLen, const SViewMetaReq* pReq); int32_t tDeserializeSViewMetaReq(void* buf, int32_t bufLen, SViewMetaReq* pReq); @@ -3942,7 +3941,6 @@ int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pR int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp); void tFreeSViewMetaRsp(SViewMetaRsp* pRsp); - #pragma pack(pop) #ifdef __cplusplus diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index afa0fa2a6e..61b471912f 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -24,48 +24,70 @@ #if defined(TD_MSG_INFO_) -#undef TD_NEW_MSG_SEG -#undef TD_DEF_MSG_TYPE -#define TD_NEW_MSG_SEG(TYPE) "null", -#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) MSG, MSG "-rsp", + #undef TD_NEW_MSG_SEG + #undef TD_DEF_MSG_TYPE + #undef TD_CLOSE_MSG_TYPE + #define TD_NEW_MSG_SEG(TYPE) "null", + #define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) MSG, MSG "-rsp", + #define TD_CLOSE_MSG_TYPE(TYPE) -char *tMsgInfo[] = { + char *tMsgInfo[] = { + +#elif defined(TD_MSG_RANGE_CODE_) + + #undef TD_NEW_MSG_SEG + #undef TD_DEF_MSG_TYPE + #undef TD_CLOSE_MSG_TYPE + #define TD_NEW_MSG_SEG(TYPE) + #define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) + #define TD_CLOSE_MSG_TYPE(TYPE) TYPE, + int32_t tMsgRangeDict[] = { #elif defined(TD_MSG_NUMBER_) -#undef TD_NEW_MSG_SEG -#undef TD_DEF_MSG_TYPE -#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM, -#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE##_NUM, TYPE##_RSP_NUM, + #undef TD_NEW_MSG_SEG + #undef TD_DEF_MSG_TYPE + #undef TD_CLOSE_MSG_TYPE + #define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM, + #define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE##_NUM, TYPE##_RSP_NUM, + #define TD_CLOSE_MSG_TYPE(TYPE) -enum { + enum { #elif defined(TD_MSG_DICT_) -#undef TD_NEW_MSG_SEG -#undef TD_DEF_MSG_TYPE -#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM, -#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) + #undef TD_NEW_MSG_SEG + #undef TD_DEF_MSG_TYPE + #undef TD_CLOSE_MSG_TYPE + #define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM, + #define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) + #define TD_CLOSE_MSG_TYPE(type) + + int32_t tMsgDict[] = { -int32_t tMsgDict[] = { #elif defined(TD_MSG_SEG_CODE_) -#undef TD_NEW_MSG_SEG -#undef TD_DEF_MSG_TYPE -#define TD_NEW_MSG_SEG(TYPE) TYPE##_SEG_CODE, -#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) + #undef TD_NEW_MSG_SEG + #undef TD_DEF_MSG_TYPE + #undef TD_CLOSE_MSG_TYPE + #define TD_NEW_MSG_SEG(TYPE) TYPE##_SEG_CODE, + #define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) + #define TD_CLOSE_MSG_TYPE(TYPE) -enum { + enum { -#else -#undef TD_NEW_MSG_SEG -#undef TD_DEF_MSG_TYPE -#define TD_NEW_MSG_SEG(TYPE) TYPE = ((TYPE##_SEG_CODE) << 8), -#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE, TYPE##_RSP, +#else -enum { // WARN: new msg should be appended to segment tail + #undef TD_NEW_MSG_SEG + #undef TD_DEF_MSG_TYPE + #undef TD_CLOSE_MSG_TYPE + #define TD_NEW_MSG_SEG(TYPE) TYPE = ((TYPE##_SEG_CODE) << 8), + #define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE, TYPE##_RSP, + #define TD_CLOSE_MSG_TYPE(TYPE) TYPE, + + enum { // WARN: new msg should be appended to segment tail #endif TD_NEW_MSG_SEG(TDMT_DND_MSG) // 0<<8 TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL) @@ -82,10 +104,12 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_UNUSED_CODE, "dnd-unused", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE_TYPE, "dnode-alter-mnode-type", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL) + TD_CLOSE_MSG_TYPE(TDMT_END_DND_MSG) TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8 TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) @@ -194,6 +218,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_MND_DROP_VIEW, "drop-view", SCMDropViewReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) + TD_CLOSE_MSG_TYPE(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) @@ -231,7 +256,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) +TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIRM, "alter-confirm", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL) @@ -243,6 +268,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_VND_DROP_INDEX, "vnode-drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL) + TD_CLOSE_MSG_TYPE(TDMT_END_VND_MSG) TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8 TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL) @@ -257,6 +283,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_TASK_NOTIFY, "task-notify", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL) + TD_CLOSE_MSG_TYPE(TDMT_END_SCH_MSG) TD_NEW_MSG_SEG(TDMT_STREAM_MSG) //4 << 8 @@ -274,9 +301,11 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) + TD_CLOSE_MSG_TYPE(TDMT_END_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8 TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) + TD_CLOSE_MSG_TYPE(TDMT_END_MON_MSG) TD_NEW_MSG_SEG(TDMT_SYNC_MSG) //6 << 8 TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL) @@ -308,6 +337,8 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_SYNC_PREP_SNAPSHOT_REPLY, "sync-prep-snapshot-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL) + TD_CLOSE_MSG_TYPE(TDMT_END_SYNC_MSG) + TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) //7 << 8 TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) @@ -317,6 +348,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) + TD_CLOSE_MSG_TYPE(TDMT_END_VND_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) //8 << 8 TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp) @@ -330,9 +362,15 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committedinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) + TD_CLOSE_MSG_TYPE(TDMT_END_TMQ_MSG) + + TD_NEW_MSG_SEG(TDMT_MAX_MSG) // msg end mark + + + -#if defined(TD_MSG_NUMBER_) - TDMT_MAX -#endif + #if defined(TD_MSG_NUMBER_) + TDMT_MAX + #endif }; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 01b1df9d5f..d69542c98b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -18,42 +18,51 @@ #undef TD_MSG_NUMBER_ #undef TD_MSG_DICT_ +#undef TD_MSG_RANGE_CODE_ #define TD_MSG_INFO_ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" #undef TD_MSG_NUMBER_ #undef TD_MSG_INFO_ +#undef TD_MSG_RANGE_CODE_ #define TD_MSG_DICT_ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" +#undef TD_MSG_NUMBER_ +#undef TD_MSG_INFO_ +#undef TD_MSG_DICT_ +#undef TD_MSG_SEG_CODE_ +#define TD_MSG_RANGE_CODE_ +#include "tmsgdef.h" + #include "tlog.h" -#define DECODESQL() \ - do { \ - if(!tDecodeIsEnd(&decoder)){ \ - if(tDecodeI32(&decoder, &pReq->sqlLen) < 0) return -1; \ - if(pReq->sqlLen > 0){ \ - if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->sql, NULL) < 0) return -1; \ - } \ - } \ +#define DECODESQL() \ + do { \ + if (!tDecodeIsEnd(&decoder)) { \ + if (tDecodeI32(&decoder, &pReq->sqlLen) < 0) return -1; \ + if (pReq->sqlLen > 0) { \ + if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->sql, NULL) < 0) return -1; \ + } \ + } \ } while (0) -#define ENCODESQL() \ - do { \ - if (pReq->sqlLen > 0 && pReq->sql != NULL){ \ - if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \ - if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; \ - } \ +#define ENCODESQL() \ + do { \ + if (pReq->sqlLen > 0 && pReq->sql != NULL) { \ + if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \ + if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; \ + } \ } while (0) -#define FREESQL() \ - do { \ - if(pReq->sql != NULL){ \ - taosMemoryFree(pReq->sql); \ - } \ - pReq->sql = NULL; \ +#define FREESQL() \ + do { \ + if (pReq->sql != NULL) { \ + taosMemoryFree(pReq->sql); \ + } \ + pReq->sql = NULL; \ } while (0) static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq); @@ -742,9 +751,7 @@ int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) return 0; } -void tFreeSMDropStbReq(SMDropStbReq *pReq) { - FREESQL(); -} +void tFreeSMDropStbReq(SMDropStbReq *pReq) { FREESQL(); } int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) { SEncoder encoder = {0}; @@ -1489,9 +1496,7 @@ int32_t tDeserializeSDropUserReq(void *buf, int32_t bufLen, SDropUserReq *pReq) return 0; } -void tFreeSDropUserReq(SDropUserReq *pReq) { - FREESQL(); -} +void tFreeSDropUserReq(SDropUserReq *pReq) { FREESQL(); } SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList) { if (pIpWhiteList == NULL) return NULL; @@ -1822,7 +1827,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) char *tb = taosHashIterate(pRsp->readTbs, NULL); while (tb != NULL) { size_t keyLen = 0; - void *key = taosHashGetKey(tb, &keyLen); + void * key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1837,7 +1842,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->writeTbs, NULL); while (tb != NULL) { size_t keyLen = 0; - void *key = taosHashGetKey(tb, &keyLen); + void * key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1852,7 +1857,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->alterTbs, NULL); while (tb != NULL) { size_t keyLen = 0; - void *key = taosHashGetKey(tb, &keyLen); + void * key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1867,7 +1872,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->readViews, NULL); while (tb != NULL) { size_t keyLen = 0; - void *key = taosHashGetKey(tb, &keyLen); + void * key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1882,7 +1887,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->writeViews, NULL); while (tb != NULL) { size_t keyLen = 0; - void *key = taosHashGetKey(tb, &keyLen); + void * key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1897,7 +1902,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->alterViews, NULL); while (tb != NULL) { size_t keyLen = 0; - void *key = taosHashGetKey(tb, &keyLen); + void * key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1912,7 +1917,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) int32_t *useDb = taosHashIterate(pRsp->useDbs, NULL); while (useDb != NULL) { size_t keyLen = 0; - void *key = taosHashGetKey(useDb, &keyLen); + void * key = taosHashGetKey(useDb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1954,8 +1959,8 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs pRsp->alterViews = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); pRsp->useDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (pRsp->createdDbs == NULL || pRsp->readDbs == NULL || pRsp->writeDbs == NULL || pRsp->readTbs == NULL || - pRsp->writeTbs == NULL || pRsp->alterTbs == NULL || pRsp->readViews == NULL || - pRsp->writeViews == NULL || pRsp->alterViews == NULL ||pRsp->useDbs == NULL) { + pRsp->writeTbs == NULL || pRsp->alterTbs == NULL || pRsp->readViews == NULL || pRsp->writeViews == NULL || + pRsp->alterViews == NULL || pRsp->useDbs == NULL) { goto _err; } @@ -2219,7 +2224,7 @@ int32_t tDeserializeSGetUserWhiteListReq(void *buf, int32_t bufLen, SGetUserWhit return 0; } -int32_t tSerializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp) { +int32_t tSerializeSGetUserWhiteListRsp(void *buf, int32_t bufLen, SGetUserWhiteListRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -2237,7 +2242,7 @@ int32_t tSerializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteL return tlen; } -int32_t tDeserializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp) { +int32_t tDeserializeSGetUserWhiteListRsp(void *buf, int32_t bufLen, SGetUserWhiteListRsp *pRsp) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -2257,9 +2262,7 @@ int32_t tDeserializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhit return 0; } -void tFreeSGetUserWhiteListRsp(SGetUserWhiteListRsp* pRsp) { - taosMemoryFree(pRsp->pWhiteLists); -} +void tFreeSGetUserWhiteListRsp(SGetUserWhiteListRsp *pRsp) { taosMemoryFree(pRsp->pWhiteLists); } int32_t tSerializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQnodeReq *pReq) { SEncoder encoder = {0}; @@ -2288,13 +2291,9 @@ int32_t tDeserializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQno return 0; } -void tFreeSMCreateQnodeReq(SMCreateQnodeReq *pReq){ - FREESQL(); -} +void tFreeSMCreateQnodeReq(SMCreateQnodeReq *pReq) { FREESQL(); } -void tFreeSDDropQnodeReq(SDDropQnodeReq* pReq) { - FREESQL(); -} +void tFreeSDDropQnodeReq(SDDropQnodeReq *pReq) { FREESQL(); } int32_t tSerializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq) { SEncoder encoder = {0}; @@ -2336,9 +2335,7 @@ int32_t tDeserializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq return 0; } -void tFreeSDropDnodeReq(SDropDnodeReq *pReq) { - FREESQL(); -} +void tFreeSDropDnodeReq(SDropDnodeReq *pReq) { FREESQL(); } int32_t tSerializeSRestoreDnodeReq(void *buf, int32_t bufLen, SRestoreDnodeReq *pReq) { SEncoder encoder = {0}; @@ -2369,9 +2366,7 @@ int32_t tDeserializeSRestoreDnodeReq(void *buf, int32_t bufLen, SRestoreDnodeReq return 0; } -void tFreeSRestoreDnodeReq(SRestoreDnodeReq *pReq) { - FREESQL(); -} +void tFreeSRestoreDnodeReq(SRestoreDnodeReq *pReq) { FREESQL(); } int32_t tSerializeSMCfgDnodeReq(void *buf, int32_t bufLen, SMCfgDnodeReq *pReq) { SEncoder encoder = {0}; @@ -2404,9 +2399,7 @@ int32_t tDeserializeSMCfgDnodeReq(void *buf, int32_t bufLen, SMCfgDnodeReq *pReq return 0; } -void tFreeSMCfgDnodeReq(SMCfgDnodeReq *pReq) { - FREESQL(); -} +void tFreeSMCfgDnodeReq(SMCfgDnodeReq *pReq) { FREESQL(); } int32_t tSerializeSDCfgDnodeReq(void *buf, int32_t bufLen, SDCfgDnodeReq *pReq) { SEncoder encoder = {0}; @@ -2464,9 +2457,7 @@ int32_t tDeserializeSCreateDnodeReq(void *buf, int32_t bufLen, SCreateDnodeReq * return 0; } -void tFreeSCreateDnodeReq(SCreateDnodeReq *pReq) { - FREESQL(); -} +void tFreeSCreateDnodeReq(SCreateDnodeReq *pReq) { FREESQL(); } int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq) { SEncoder encoder = {0}; @@ -3121,9 +3112,7 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { return 0; } -void tFreeSAlterDbReq(SAlterDbReq *pReq) { - FREESQL(); -} +void tFreeSAlterDbReq(SAlterDbReq *pReq) { FREESQL(); } int32_t tSerializeSDropDbReq(void *buf, int32_t bufLen, SDropDbReq *pReq) { SEncoder encoder = {0}; @@ -3154,9 +3143,7 @@ int32_t tDeserializeSDropDbReq(void *buf, int32_t bufLen, SDropDbReq *pReq) { return 0; } -void tFreeSDropDbReq(SDropDbReq *pReq) { - FREESQL(); -} +void tFreeSDropDbReq(SDropDbReq *pReq) { FREESQL(); } int32_t tSerializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) { SEncoder encoder = {0}; @@ -3435,9 +3422,7 @@ int32_t tDeserializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq return 0; } -void tFreeSCompactDbReq(SCompactDbReq *pReq) { - FREESQL(); -} +void tFreeSCompactDbReq(SCompactDbReq *pReq) { FREESQL(); } int32_t tSerializeSUseDbRspImp(SEncoder *pEncoder, const SUseDbRsp *pRsp) { if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1; @@ -4611,9 +4596,7 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR return 0; } -void tFreeSMDropTopicReq(SMDropTopicReq *pReq) { - FREESQL(); -} +void tFreeSMDropTopicReq(SMDropTopicReq *pReq) { FREESQL(); } int32_t tSerializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) { SEncoder encoder = {0}; @@ -5501,9 +5484,7 @@ int32_t tDeserializeSBalanceVgroupReq(void *buf, int32_t bufLen, SBalanceVgroupR return 0; } -void tFreeSBalanceVgroupReq(SBalanceVgroupReq *pReq) { - FREESQL(); -} +void tFreeSBalanceVgroupReq(SBalanceVgroupReq *pReq) { FREESQL(); } int32_t tSerializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgroupLeaderReq *pReq) { SEncoder encoder = {0}; @@ -5526,7 +5507,7 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceV if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pReq->useless) < 0) return -1; - if(!tDecodeIsEnd(&decoder)){ + if (!tDecodeIsEnd(&decoder)) { if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; } @@ -5537,9 +5518,7 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceV return 0; } -void tFreeSBalanceVgroupLeaderReq(SBalanceVgroupLeaderReq *pReq) { - FREESQL(); -} +void tFreeSBalanceVgroupLeaderReq(SBalanceVgroupLeaderReq *pReq) { FREESQL(); } int32_t tSerializeSMergeVgroupReq(void *buf, int32_t bufLen, SMergeVgroupReq *pReq) { SEncoder encoder = {0}; @@ -5601,9 +5580,7 @@ int32_t tDeserializeSRedistributeVgroupReq(void *buf, int32_t bufLen, SRedistrib return 0; } -void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq *pReq) { - FREESQL(); -} +void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq *pReq) { FREESQL(); } int32_t tSerializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq *pReq) { SEncoder encoder = {0}; @@ -7152,9 +7129,7 @@ int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq * return 0; } -void tFreeMDropStreamReq(SMDropStreamReq *pReq) { - FREESQL(); -} +void tFreeMDropStreamReq(SMDropStreamReq *pReq) { FREESQL(); } int32_t tSerializeSMRecoverStreamReq(void *buf, int32_t bufLen, const SMRecoverStreamReq *pReq) { SEncoder encoder = {0}; @@ -7297,8 +7272,8 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { } else { ASSERT(0); } - //ENCODESQL - if(pReq->sqlLen > 0 && pReq->sql != NULL) { + // ENCODESQL + if (pReq->sqlLen > 0 && pReq->sql != NULL) { if (tEncodeI32(pCoder, pReq->sqlLen) < 0) return -1; if (tEncodeBinary(pCoder, pReq->sql, pReq->sqlLen) < 0) return -1; } @@ -7345,11 +7320,11 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { ASSERT(0); } - //DECODESQL - if(!tDecodeIsEnd(pCoder)){ - if(tDecodeI32(pCoder, &pReq->sqlLen) < 0) return -1; - if(pReq->sqlLen > 0){ - if (tDecodeBinaryAlloc(pCoder, (void**)&pReq->sql, NULL) < 0) return -1; + // DECODESQL + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeI32(pCoder, &pReq->sqlLen) < 0) return -1; + if (pReq->sqlLen > 0) { + if (tDecodeBinaryAlloc(pCoder, (void **)&pReq->sql, NULL) < 0) return -1; } } @@ -7375,7 +7350,7 @@ void tDestroySVCreateTbReq(SVCreateTbReq *pReq, int32_t flags) { } } - if(pReq->sql != NULL){ + if (pReq->sql != NULL) { taosMemoryFree(pReq->sql); } pReq->sql = NULL; @@ -8194,7 +8169,7 @@ int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { for (int32_t i = 0; i < pRsp->blockNum; i++) { int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i); - void *data = taosArrayGetP(pRsp->blockData, i); + void * data = taosArrayGetP(pRsp->blockData, i); if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1; if (pRsp->withSchema) { SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i); @@ -8227,7 +8202,7 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { } for (int32_t i = 0; i < pRsp->blockNum; i++) { - void *data; + void * data; uint64_t bLen; if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1; taosArrayPush(pRsp->blockData, &data); @@ -8273,7 +8248,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1; if (pRsp->createTableNum) { for (int32_t i = 0; i < pRsp->createTableNum; i++) { - void *createTableReq = taosArrayGetP(pRsp->createTableReq, i); + void * createTableReq = taosArrayGetP(pRsp->createTableReq, i); int32_t createTableLen = *(int32_t *)taosArrayGet(pRsp->createTableLen, i); if (tEncodeBinary(pEncoder, createTableReq, createTableLen) < 0) return -1; } @@ -8289,7 +8264,7 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t)); pRsp->createTableReq = taosArrayInit(pRsp->createTableNum, sizeof(void *)); for (int32_t i = 0; i < pRsp->createTableNum; i++) { - void *pCreate = NULL; + void * pCreate = NULL; uint64_t len; if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1; int32_t l = (int32_t)len; @@ -8591,7 +8566,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) { taosArrayDestroy(pTbData->aCol); } else { int32_t nRow = TARRAY_SIZE(pTbData->aRowP); - SRow **rows = (SRow **)TARRAY_DATA(pTbData->aRowP); + SRow ** rows = (SRow **)TARRAY_DATA(pTbData->aRowP); for (int32_t i = 0; i < nRow; ++i) { tRowDestroy(rows[i]); @@ -8855,7 +8830,7 @@ int32_t tDeserializeSCMCreateViewReq(void *buf, int32_t bufLen, SCMCreateViewReq } for (int32_t i = 0; i < pReq->numOfCols; ++i) { - SSchema* pSchema = pReq->pSchema + i; + SSchema *pSchema = pReq->pSchema + i; if (tDecodeSSchema(&decoder, pSchema) < 0) return -1; } } @@ -8866,17 +8841,17 @@ int32_t tDeserializeSCMCreateViewReq(void *buf, int32_t bufLen, SCMCreateViewReq return 0; } -void tFreeSCMCreateViewReq(SCMCreateViewReq* pReq) { +void tFreeSCMCreateViewReq(SCMCreateViewReq *pReq) { if (NULL == pReq) { return; } taosMemoryFreeClear(pReq->querySql); - taosMemoryFreeClear(pReq->sql); + taosMemoryFreeClear(pReq->sql); taosMemoryFreeClear(pReq->pSchema); } -int32_t tSerializeSCMDropViewReq(void* buf, int32_t bufLen, const SCMDropViewReq* pReq) { +int32_t tSerializeSCMDropViewReq(void *buf, int32_t bufLen, const SCMDropViewReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -8894,7 +8869,7 @@ int32_t tSerializeSCMDropViewReq(void* buf, int32_t bufLen, const SCMDropViewReq return tlen; } -int32_t tDeserializeSCMDropViewReq(void* buf, int32_t bufLen, SCMDropViewReq* pReq) { +int32_t tDeserializeSCMDropViewReq(void *buf, int32_t bufLen, SCMDropViewReq *pReq) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -8910,15 +8885,15 @@ int32_t tDeserializeSCMDropViewReq(void* buf, int32_t bufLen, SCMDropViewReq* pR tDecoderClear(&decoder); return 0; } -void tFreeSCMDropViewReq(SCMDropViewReq* pReq) { +void tFreeSCMDropViewReq(SCMDropViewReq *pReq) { if (NULL == pReq) { return; } - + taosMemoryFree(pReq->sql); } -int32_t tSerializeSViewMetaReq(void* buf, int32_t bufLen, const SViewMetaReq* pReq) { +int32_t tSerializeSViewMetaReq(void *buf, int32_t bufLen, const SViewMetaReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -8932,7 +8907,7 @@ int32_t tSerializeSViewMetaReq(void* buf, int32_t bufLen, const SViewMetaReq* pR return tlen; } -int32_t tDeserializeSViewMetaReq(void* buf, int32_t bufLen, SViewMetaReq* pReq) { +int32_t tDeserializeSViewMetaReq(void *buf, int32_t bufLen, SViewMetaReq *pReq) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -8964,8 +8939,7 @@ static int32_t tEncodeSViewMetaRsp(SEncoder *pEncoder, const SViewMetaRsp *pRsp) return 0; } - -int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pRsp) { +int32_t tSerializeSViewMetaRsp(void *buf, int32_t bufLen, const SViewMetaRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -8998,7 +8972,7 @@ static int32_t tDecodeSViewMetaRsp(SDecoder *pDecoder, SViewMetaRsp *pRsp) { } for (int32_t i = 0; i < pRsp->numOfCols; ++i) { - SSchema* pSchema = pRsp->pSchema + i; + SSchema *pSchema = pRsp->pSchema + i; if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1; } } @@ -9006,7 +8980,7 @@ static int32_t tDecodeSViewMetaRsp(SDecoder *pDecoder, SViewMetaRsp *pRsp) { return 0; } -int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp) { +int32_t tDeserializeSViewMetaRsp(void *buf, int32_t bufLen, SViewMetaRsp *pRsp) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -9019,7 +8993,7 @@ int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp) return 0; } -void tFreeSViewMetaRsp(SViewMetaRsp* pRsp) { +void tFreeSViewMetaRsp(SViewMetaRsp *pRsp) { if (NULL == pRsp) { return; } @@ -9064,7 +9038,7 @@ int32_t tDeserializeSViewHbRsp(void *buf, int32_t bufLen, SViewHbRsp *pRsp) { } for (int32_t i = 0; i < numOfMeta; ++i) { - SViewMetaRsp* metaRsp = taosMemoryCalloc(1, sizeof(SViewMetaRsp)); + SViewMetaRsp *metaRsp = taosMemoryCalloc(1, sizeof(SViewMetaRsp)); if (NULL == metaRsp) return -1; if (tDecodeSViewMetaRsp(&decoder, metaRsp) < 0) return -1; taosArrayPush(pRsp->pViewRsp, &metaRsp); @@ -9086,7 +9060,3 @@ void tFreeSViewHbRsp(SViewHbRsp *pRsp) { taosArrayDestroy(pRsp->pViewRsp); } - - - - diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 73427446e6..19b2dfc300 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -23,6 +23,7 @@ extern "C" { #include "taoserror.h" #include "theap.h" #include "tmisce.h" +#include "tmsg.h" #include "transLog.h" #include "transportInt.h" #include "trpc.h"