From b49fa87e73367189f9557fafb35a6269f4e93243 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 19 May 2022 17:14:54 +0800 Subject: [PATCH 1/2] enh(tmq): drop consumer group cg_xx on topic_name --- include/common/tmsg.h | 9 +++++++++ source/common/src/tmsg.c | 29 +++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f7562a4b9b..3b4739a424 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1633,6 +1633,15 @@ typedef struct { int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); +typedef struct { + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + int8_t igNotExists; +} SMDropCgroupReq; + +int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq); +int32_t tDeserializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq); + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4e97ebbe47..7a62589098 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2625,6 +2625,35 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR return 0; } +int32_t tSerializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->topic) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->cgroup) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->topic) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) { int32_t sqlLen = 0; int32_t astLen = 0; From 8b067c6c3e1d62db3616c565f4061e699b83888f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 19 May 2022 18:00:18 +0800 Subject: [PATCH 2/2] fix(stream): fix bad merge --- source/dnode/vnode/src/vnd/vnodeSvr.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fc2308c327..87a7d5e3d4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -118,11 +118,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg break; } - if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { - vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {