serialize topic msg
This commit is contained in:
parent
abd6b98ba4
commit
353feec145
|
@ -1118,39 +1118,15 @@ typedef struct {
|
||||||
char* logicalPlan;
|
char* logicalPlan;
|
||||||
} SMCreateTopicReq;
|
} SMCreateTopicReq;
|
||||||
|
|
||||||
static FORCE_INLINE int tSerializeSMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq) {
|
int32_t tSerializeMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq);
|
||||||
int tlen = 0;
|
void* tDeserializeSMCreateTopicReq(void* buf, SMCreateTopicReq* pReq);
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->igExists);
|
|
||||||
tlen += taosEncodeString(buf, pReq->name);
|
|
||||||
tlen += taosEncodeString(buf, pReq->sql);
|
|
||||||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
|
||||||
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SMCreateTopicReq* pReq) {
|
|
||||||
buf = taosDecodeFixedI8(buf, &(pReq->igExists));
|
|
||||||
buf = taosDecodeString(buf, &(pReq->name));
|
|
||||||
buf = taosDecodeString(buf, &(pReq->sql));
|
|
||||||
buf = taosDecodeString(buf, &(pReq->physicalPlan));
|
|
||||||
buf = taosDecodeString(buf, &(pReq->logicalPlan));
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t topicId;
|
int64_t topicId;
|
||||||
} SCMCreateTopicRsp;
|
} SMCreateTopicRsp;
|
||||||
|
|
||||||
static FORCE_INLINE int tSerializeSCMCreateTopicRsp(void** buf, const SCMCreateTopicRsp* pRsp) {
|
int32_t tSerializeSMCreateTopicRsp(void* buf, int32_t bufLen, const SMCreateTopicRsp* pRsp);
|
||||||
int tlen = 0;
|
int32_t tDeserializeSMCreateTopicRsp(void* buf, int32_t bufLen, SMCreateTopicRsp* pRsp);
|
||||||
tlen += taosEncodeFixedI64(buf, pRsp->topicId);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopicRsp* pRsp) {
|
|
||||||
buf = taosDecodeFixedI64(buf, &pRsp->topicId);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t topicNum;
|
int32_t topicNum;
|
||||||
|
|
|
@ -137,7 +137,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SCMCreateTopicRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||||
|
|
|
@ -368,14 +368,14 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
||||||
.logicalPlan = (char*)"no logic plan",
|
.logicalPlan = (char*)"no logic plan",
|
||||||
};
|
};
|
||||||
|
|
||||||
int tlen = tSerializeSMCreateTopicReq(NULL, &req);
|
int tlen = tSerializeMCreateTopicReq(NULL, &req);
|
||||||
void* buf = malloc(tlen);
|
void* buf = malloc(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* abuf = buf;
|
void* abuf = buf;
|
||||||
tSerializeSMCreateTopicReq(&abuf, &req);
|
tSerializeMCreateTopicReq(&abuf, &req);
|
||||||
/*printf("formatted: %s\n", dagStr);*/
|
/*printf("formatted: %s\n", dagStr);*/
|
||||||
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
|
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
|
||||||
|
|
|
@ -1816,3 +1816,46 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeMCreateTopicReq(void **buf, const SMCreateTopicReq *pReq) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeFixedI8(buf, pReq->igExists);
|
||||||
|
tlen += taosEncodeString(buf, pReq->name);
|
||||||
|
tlen += taosEncodeString(buf, pReq->sql);
|
||||||
|
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
||||||
|
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *tDeserializeSMCreateTopicReq(void *buf, SMCreateTopicReq *pReq) {
|
||||||
|
buf = taosDecodeFixedI8(buf, &(pReq->igExists));
|
||||||
|
buf = taosDecodeString(buf, &(pReq->name));
|
||||||
|
buf = taosDecodeString(buf, &(pReq->sql));
|
||||||
|
buf = taosDecodeString(buf, &(pReq->physicalPlan));
|
||||||
|
buf = taosDecodeString(buf, &(pReq->logicalPlan));
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopicRsp *pRsp) {
|
||||||
|
SCoder encoder = {0};
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pRsp->topicId) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSMCreateTopicRsp(void *buf, int32_t bufLen, SMCreateTopicRsp *pRsp) {
|
||||||
|
SCoder decoder = {0};
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pRsp->topicId) < 0) return -1;
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -265,7 +265,7 @@ static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) {
|
||||||
char *msgStr = pReq->rpcMsg.pCont;
|
char *msgStr = pReq->rpcMsg.pCont;
|
||||||
|
|
||||||
SMCreateTopicReq createTopicReq = {0};
|
SMCreateTopicReq createTopicReq = {0};
|
||||||
tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq);
|
tDeserializeSMCreateTopicReq(msgStr, &createTopicReq);
|
||||||
|
|
||||||
mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
|
mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue