diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb843ced91..2be6aca97e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3132,32 +3132,6 @@ int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo); int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo); void tDeleteSTqCheckInfo(STqCheckInfo* pInfo); -typedef struct { - char topic[TSDB_TOPIC_FNAME_LEN]; -} STqDelCheckInfoReq; - -typedef struct { - int32_t vgId; - int64_t offset; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; -} SMqOffset; - -typedef struct { - int64_t consumerId; - int32_t num; - SMqOffset* offsets; -} SMqCMCommitOffsetReq; - -typedef struct { - int32_t reserved; -} SMqCMCommitOffsetRsp; - -int32_t tEncodeSMqOffset(SEncoder* encoder, const SMqOffset* pOffset); -int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset); -int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq); -int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq); - // tqOffset enum { TMQ_OFFSET__RESET_NONE = -3, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0553f73bb3..49b1e4f6f4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5174,43 +5174,7 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp tDecoderClear(&decoder); return 0; } -int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) { - if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1; - if (tEncodeI64(encoder, pOffset->offset) < 0) return -1; - if (tEncodeCStr(encoder, pOffset->topicName) < 0) return -1; - if (tEncodeCStr(encoder, pOffset->cgroup) < 0) return -1; - return encoder->pos; -} -int32_t tDecodeSMqOffset(SDecoder *decoder, SMqOffset *pOffset) { - if (tDecodeI32(decoder, &pOffset->vgId) < 0) return -1; - if (tDecodeI64(decoder, &pOffset->offset) < 0) return -1; - if (tDecodeCStrTo(decoder, pOffset->topicName) < 0) return -1; - if (tDecodeCStrTo(decoder, pOffset->cgroup) < 0) return -1; - return 0; -} - -int32_t tEncodeSMqCMCommitOffsetReq(SEncoder *encoder, const SMqCMCommitOffsetReq *pReq) { - if (tStartEncode(encoder) < 0) return -1; - if (tEncodeI32(encoder, pReq->num) < 0) return -1; - for (int32_t i = 0; i < pReq->num; i++) { - tEncodeSMqOffset(encoder, &pReq->offsets[i]); - } - tEndEncode(encoder); - return encoder->pos; -} - -int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pReq) { - if (tStartDecode(decoder) < 0) return -1; - if (tDecodeI32(decoder, &pReq->num) < 0) return -1; - pReq->offsets = (SMqOffset *)tDecoderMalloc(decoder, sizeof(SMqOffset) * pReq->num); - if (pReq->offsets == NULL) return -1; - for (int32_t i = 0; i < pReq->num; i++) { - tDecodeSMqOffset(decoder, &pReq->offsets[i]); - } - tEndDecode(decoder); - return 0; -} int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index a9fb5096fb..94fd6027c0 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -485,12 +485,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.astLen = strlen(pCreate->ast) + 1; } } - /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/ - /*topicObj.ast = NULL;*/ - /*topicObj.astLen = 0;*/ - /*topicObj.physicalPlan = NULL;*/ - /*topicObj.withTbName = 1;*/ - /*topicObj.withSchema = 1;*/ SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c6a424666c..72310f6b19 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -88,14 +88,10 @@ typedef struct { int64_t snapshotVer; SWalReader* pWalReader; SWalRef* pRef; - // STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec SRpcMsg* msg; tq_handle_status status; } STqHandle; -typedef struct { - int64_t snapshotVer; -} SStreamHandle; struct STQ { SVnode* pVnode;