Merge pull request #12703 from taosdata/feature/tq
enh(tmq): add drop cgroup msg
This commit is contained in:
commit
eb48e8d943
|
@ -1648,6 +1648,15 @@ typedef struct {
|
|||
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||||
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
int8_t igNotExists;
|
||||
} SMDropCgroupReq;
|
||||
|
||||
int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
|
||||
int32_t tDeserializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t alterType;
|
||||
|
|
|
@ -2627,6 +2627,35 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->topic) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->cgroup) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->topic) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) {
|
||||
int32_t sqlLen = 0;
|
||||
int32_t astLen = 0;
|
||||
|
|
|
@ -118,11 +118,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
|||
break;
|
||||
}
|
||||
|
||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||
vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
|
||||
|
||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||
|
|
Loading…
Reference in New Issue