From fa73f1d497436170f9cdeae75455aaf8e31332f6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 20 Jan 2022 15:04:40 +0800 Subject: [PATCH] fix compile error --- include/common/tmsg.h | 8 +++++++- source/dnode/vnode/inc/tq.h | 1 + source/dnode/vnode/src/vnd/vnodeWrite.c | 24 +++++++++++++----------- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fe96f4ee52..a3d62cc52d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1567,6 +1567,12 @@ typedef struct SMqCVConsumeReq { char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqCVConsumeReq; +typedef struct SMqConsumeRspBlock { + int32_t bodyLen; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char body[]; +} SMqConsumeRspBlock; + typedef struct SMqCVConsumeRsp { int64_t reqId; int64_t clientId; @@ -1576,7 +1582,7 @@ typedef struct SMqCVConsumeRsp { int32_t skipLogNum; int32_t bodyLen; char topicName[TSDB_TOPIC_FNAME_LEN]; - char body[]; + SMqConsumeRspBlock blocks[]; } SMqCvConsumeRsp; #ifdef __cplusplus diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index aaf7f33061..588305c8ae 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -189,6 +189,7 @@ typedef struct STqTopicHandle { typedef struct STqConsumerHandle { int64_t consumerId; + int64_t epoch; SArray* topics; // SArray } STqConsumerHandle; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index deb8a714a6..f05520a960 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -114,15 +114,17 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { char* reqStr = ptr; SMqSetCVgReq req; tDecodeSMqSetCVgReq(reqStr, &req); - STqClientHandle* pHandle = calloc(sizeof(STqClientHandle), 1); - if (pHandle == NULL) { + STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1); + + STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); + if (pTopic == NULL) { // TODO: handle error } - strcpy(pHandle->topicName, req.topicName); - strcpy(pHandle->cGroup, req.cGroup); - strcpy(pHandle->sql, req.sql); - strcpy(pHandle->logicalPlan, req.logicalPlan); - strcpy(pHandle->physicalPlan, req.physicalPlan); + strcpy(pTopic->topicName, req.topicName); + strcpy(pTopic->cGroup, req.cGroup); + strcpy(pTopic->sql, req.sql); + strcpy(pTopic->logicalPlan, req.logicalPlan); + strcpy(pTopic->physicalPlan, req.physicalPlan); SArray *pArray; //TODO: deserialize to SQueryDag SQueryDag *pDag; @@ -134,12 +136,12 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { STaskInfo *pInfo = taosArrayGet(pArray, 0); SArray* pTasks; schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); - pHandle->buffer.firstOffset = -1; - pHandle->buffer.lastOffset = -1; + pTopic->buffer.firstOffset = -1; + pTopic->buffer.lastOffset = -1; for (int i = 0; i < TQ_BUFFER_SIZE; i++) { SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); - pHandle->buffer.output[i].pMsg = pMsg; - pHandle->buffer.output[i].status = 0; + pTopic->buffer.output[i].pMsg = pMsg; + pTopic->buffer.output[i].status = 0; } // write mq meta }